blog.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. from sql import db
  2. from typing import Optional, List
  3. import object.archive
  4. def create_blog(auth_id: int, title: str, subtitle:str, context: str, archive_list: List[object.archive.Archive]) -> bool:
  5. """写入新的blog"""
  6. cur = db.insert(table="blog", columns=["Auth", "Title", "SubTitle", "Context"],
  7. values=f"{auth_id}, '{title}', '{subtitle}', '{context}'")
  8. if cur is None or cur.rowcount == 0:
  9. return False
  10. blog_id = cur.lastrowid
  11. for archive in archive_list:
  12. cur = db.insert(table="blog_archive", columns=["BlogID", "ArchiveID"],
  13. values=f"{blog_id}, {archive.archive_id}")
  14. if cur is None or cur.rowcount == 0:
  15. return False
  16. return True
  17. def read_blog(blog_id: int) -> list:
  18. """读取blog内容"""
  19. cur = db.search(columns=["Auth", "Title", "SubTitle", "Context", "Quote", "Spider", "UpdateTime", "Top"],
  20. table="blog",
  21. where=f"ID={blog_id}")
  22. if cur is None or cur.rowcount == 0:
  23. return []
  24. return cur.fetchone()
  25. def delete_blog(blog_id: int):
  26. cur = db.delete(table="blog_archive", where=f"BlogID={blog_id}")
  27. if cur is None:
  28. return False
  29. cur = db.delete(table="comment", where=f"BlogID={blog_id}")
  30. if cur is None:
  31. return False
  32. cur = db.delete(table="blog", where=f"ID={blog_id}")
  33. if cur is None or cur.rowcount == 0:
  34. return False
  35. return True
  36. def get_blog_list(limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  37. """获得 blog 列表"""
  38. cur = db.search(columns=["ID", "Title", "SubTitle", "UpdateTime", "Top"], table="blog_with_top",
  39. limit=limit,
  40. offset=offset)
  41. if cur is None or cur.rowcount == 0:
  42. return []
  43. return cur.fetchall()
  44. def get_blog_list_not_top(limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  45. """ 获得blog列表 忽略置顶 """
  46. cur = db.search(columns=["ID", "Title", "SubTitle", "UpdateTime"], table="blog",
  47. order_by=[("UpdateTime", "DESC")],
  48. limit=limit,
  49. offset=offset)
  50. if cur is None or cur.rowcount == 0:
  51. return []
  52. return cur.fetchall()
  53. def get_blog_count() -> int:
  54. """ 统计 blog 个数 """
  55. cur = db.search(columns=["count(ID)"], table="blog")
  56. if cur is None or cur.rowcount == 0:
  57. return 0
  58. return cur.fetchone()[0]
  59. def get_archive_blog_list(archive_id, limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  60. """ 获得指定归档的 blog 列表 """
  61. cur = db.search(columns=["BlogID", "Title", "SubTitle", "UpdateTime", "Top"], table="blog_with_archive",
  62. where=f"ArchiveID={archive_id}",
  63. limit=limit,
  64. offset=offset)
  65. if cur is None or cur.rowcount == 0:
  66. return []
  67. return cur.fetchall()
  68. def get_archive_blog_count(archive_id) -> int:
  69. """ 统计指定归档的 blog 个数 """
  70. cur = db.search(columns=["count(ID)"], table="blog_with_archive",
  71. where=f"ArchiveID={archive_id}")
  72. if cur is None or cur.rowcount == 0:
  73. return 0
  74. return cur.fetchone()[0]
  75. def get_user_user_count(user_id: int) -> int:
  76. """ 获得指定用户的 blog 个数 """
  77. cur = db.search(columns=["count(ID)"], table="blog",
  78. where=f"Auth={user_id}")
  79. if cur is None or cur.rowcount == 0:
  80. return 0
  81. return cur.fetchone()[0]