msg.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. from sql import db
  2. from typing import Optional
  3. def read_msg_list(limit: Optional[int] = None, offset: Optional[int] = None, show_secret: bool = False):
  4. if show_secret:
  5. if limit is not None and offset is not None:
  6. cur = db.search("SELECT MsgID "
  7. "FROM message_user "
  8. "ORDER BY UpdateTime DESC "
  9. "LIMIT %s "
  10. "OFFSET %s", limit, offset)
  11. else:
  12. cur = db.search("SELECT MsgID "
  13. "FROM message_user "
  14. "ORDER BY UpdateTime DESC")
  15. else:
  16. if limit is not None and offset is not None:
  17. cur = db.search("SELECT MsgID "
  18. "FROM message_user "
  19. "WHERE Secret=0 "
  20. "ORDER BY UpdateTime DESC "
  21. "LIMIT %s "
  22. "OFFSET %s", limit, offset)
  23. else:
  24. cur = db.search("SELECT MsgID "
  25. "FROM message_user "
  26. "WHERE Secret=0 "
  27. "ORDER BY UpdateTime DESC")
  28. if cur is None or cur.rowcount == 0:
  29. return []
  30. return [i[0] for i in cur.fetchall()]
  31. def create_msg(auth: int, content: str, secret: bool = False):
  32. content = content.replace("'", "''")
  33. cur = db.insert(table="message",
  34. columns=["Auth", "Content", "Secret"],
  35. values=f"{auth}, '{content}', {1 if secret else 0}")
  36. if cur is None or cur.rowcount != 1:
  37. return None
  38. return cur.lastrowid
  39. def read_msg(msg_id: int):
  40. cur = db.search("SELECT Email, Content, UpdateTime, Secret "
  41. "FROM message_user "
  42. "WHERE MsgID=%s", msg_id)
  43. if cur is None or cur.rowcount == 0:
  44. return ["", "", 0, False]
  45. return cur.fetchone()
  46. def delete_msg(msg_id: int):
  47. cur = db.delete(table="message", where=f"ID={msg_id}")
  48. if cur is None or cur.rowcount == 0:
  49. return False
  50. return True
  51. def get_msg_count():
  52. cur = db.search("SELECT COUNT(*) FROM message")
  53. if cur is None or cur.rowcount == 0:
  54. return 0
  55. return cur.fetchone()[0]
  56. def get_user_msg_count(user_id: int):
  57. cur = db.search("SELECT COUNT(*) FROM message WHERE Auth=%s", user_id)
  58. if cur is None or cur.rowcount == 0:
  59. return 0
  60. return cur.fetchone()[0]