msg.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. from sql import db
  2. from typing import Optional
  3. def create_msg(auth: int, context: str, secret: bool = False):
  4. context = context.replace("'", "''")
  5. cur = db.insert(table="message",
  6. columns=["Auth", "Context", "Secret"],
  7. values=f"{auth}, '{context}', {1 if secret else 0}")
  8. return cur is not None and cur.rowcount == 1
  9. def read_msg(limit: Optional[int] = None, offset: Optional[int] = None, show_secret: bool = False):
  10. if show_secret:
  11. where = None
  12. else:
  13. where = "Secret=0"
  14. cur = db.search(columns=["MsgID", "Auth", "Email", "Context", "UpdateTime", "Secret"], table="message_user",
  15. limit=limit,
  16. where=where,
  17. offset=offset)
  18. if cur is None or cur.rowcount == 0:
  19. return []
  20. return cur.fetchall()
  21. def delete_msg(msg_id: int):
  22. cur = db.delete(table="message", where=f"ID={msg_id}")
  23. if cur is None or cur.rowcount == 0:
  24. return False
  25. return True
  26. def get_msg_count():
  27. cur = db.search(columns=["count(ID)"], table="message")
  28. if cur is None or cur.rowcount == 0:
  29. return 0
  30. return cur.fetchone()[0]
  31. def get_user_msg_count(user_id: int):
  32. cur = db.search(columns=["count(ID)"], table="message",
  33. where=f"Auth={user_id}")
  34. if cur is None or cur.rowcount == 0:
  35. return 0
  36. return cur.fetchone()[0]