1
0

msg.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. from sql import db
  2. from typing import Optional
  3. def read_msg_list(limit: Optional[int] = None, offset: Optional[int] = None, show_secret: bool = False):
  4. if show_secret:
  5. where = None
  6. else:
  7. where = "Secret=0"
  8. cur = db.search(columns=["MsgID"], table="message_user",
  9. limit=limit,
  10. where=where,
  11. offset=offset,
  12. order_by=[("UpdateTime", "DESC")])
  13. if cur is None or cur.rowcount == 0:
  14. return []
  15. return [i[0] for i in cur.fetchall()]
  16. def create_msg(auth: int, content: str, secret: bool = False):
  17. content = content.replace("'", "''")
  18. cur = db.insert(table="message",
  19. columns=["Auth", "Content", "Secret"],
  20. values=f"{auth}, '{content}', {1 if secret else 0}")
  21. if cur is None or cur.rowcount != 1:
  22. return None
  23. return cur.lastrowid
  24. def read_msg(msg_id: int):
  25. cur = db.search(columns=["Email", "Content", "UpdateTime", "Secret"], table="message_user",
  26. where=f"MsgID={msg_id}")
  27. if cur is None or cur.rowcount == 0:
  28. return ["", "", 0, False]
  29. return cur.fetchone()
  30. def delete_msg(msg_id: int):
  31. cur = db.delete(table="message", where=f"ID={msg_id}")
  32. if cur is None or cur.rowcount == 0:
  33. return False
  34. return True
  35. def get_msg_count():
  36. cur = db.search(columns=["count(ID)"], table="message")
  37. if cur is None or cur.rowcount == 0:
  38. return 0
  39. return cur.fetchone()[0]
  40. def get_user_msg_count(user_id: int):
  41. cur = db.search(columns=["count(ID)"], table="message",
  42. where=f"Auth={user_id}")
  43. if cur is None or cur.rowcount == 0:
  44. return 0
  45. return cur.fetchone()[0]