msg.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. from sql import db
  2. from typing import Optional
  3. def get_msg_list(limit: Optional[int] = None, offset: Optional[int] = None, show_secret: bool = False):
  4. if show_secret:
  5. where = None
  6. else:
  7. where = "Secret=0"
  8. cur = db.search(columns=["MsgID", "Auth", "Email", "Context", "UpdateTime", "Secret"], table="message_user",
  9. limit=limit,
  10. where=where,
  11. offset=offset)
  12. if cur is None or cur.rowcount == 0:
  13. return []
  14. return cur.fetchall()
  15. def get_msg_count():
  16. cur = db.search(columns=["count(ID)"], table="message")
  17. if cur is None or cur.rowcount == 0:
  18. return 0
  19. return cur.fetchone()[0]
  20. def get_user_msg_count(user_id: int):
  21. cur = db.search(columns=["count(ID)"], table="message",
  22. where=f"Auth={user_id}")
  23. if cur is None or cur.rowcount == 0:
  24. return 0
  25. return cur.fetchone()[0]
  26. def write_msg(auth: int, context: str, secret: bool = False):
  27. cur = db.insert(table="message",
  28. columns=["Auth", "Context", "Secret"],
  29. values=f"{auth}, '{context}', {1 if secret else 0}")
  30. return cur is not None and cur.rowcount == 1