msg.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. from sql import db
  2. from typing import Optional
  3. def create_msg(auth: int, context: str, secret: bool = False):
  4. cur = db.insert(table="message",
  5. columns=["Auth", "Context", "Secret"],
  6. values=f"{auth}, '{context}', {1 if secret else 0}")
  7. return cur is not None and cur.rowcount == 1
  8. def read_msg(limit: Optional[int] = None, offset: Optional[int] = None, show_secret: bool = False):
  9. if show_secret:
  10. where = None
  11. else:
  12. where = "Secret=0"
  13. cur = db.search(columns=["MsgID", "Auth", "Email", "Context", "UpdateTime", "Secret"], table="message_user",
  14. limit=limit,
  15. where=where,
  16. offset=offset)
  17. if cur is None or cur.rowcount == 0:
  18. return []
  19. return cur.fetchall()
  20. def delete_msg(msg_id: int):
  21. cur = db.delete(table="message", where=f"ID={msg_id}")
  22. if cur is None or cur.rowcount == 0:
  23. return False
  24. return True
  25. def get_msg_count():
  26. cur = db.search(columns=["count(ID)"], table="message")
  27. if cur is None or cur.rowcount == 0:
  28. return 0
  29. return cur.fetchone()[0]
  30. def get_user_msg_count(user_id: int):
  31. cur = db.search(columns=["count(ID)"], table="message",
  32. where=f"Auth={user_id}")
  33. if cur is None or cur.rowcount == 0:
  34. return 0
  35. return cur.fetchone()[0]