news.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. import datetime
  2. from sql.db import DB
  3. from tool.typing import *
  4. from tool.string import mysql_str
  5. def write_news(text, uid: uid_t, db: DB):
  6. text = mysql_str(text)
  7. cur = db.insert(table="context",
  8. columns=["Context", "Author"],
  9. values=f"'{text}', '{uid}'")
  10. if cur is None:
  11. return False
  12. assert cur.rowcount == 1
  13. return True
  14. def get_news(db: DB, limit: Optional[int] = None, offset: Optional[int] = None):
  15. cur = db.search(columns=["ContextID", "Context", "Name", "Time"],
  16. table="context_user",
  17. limit=limit,
  18. offset=offset,
  19. order_by=[("Time", "DESC")])
  20. if cur is None:
  21. return False, None
  22. res = []
  23. for i in range(cur.rowcount):
  24. re = cur.fetchone()
  25. time: datetime.datetime = re[3]
  26. res.append((re[0], re[1], re[2], time.strftime("%Y-%m-%d %H:%M")))
  27. return True, res
  28. def get_news_count(db: DB):
  29. cur = db.search(columns=["count(ContextID)"], table="context_user")
  30. if cur is None:
  31. return 0
  32. assert cur.rowcount == 1
  33. return int(cur.fetchone()[0])
  34. def delete_news(context_id: str, db: DB):
  35. cur = db.delete(table="context",
  36. where=f"ContextID={context_id}")
  37. if cur is None or cur.rowcount == 0:
  38. return False
  39. assert cur.rowcount == 1
  40. return True