blog.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. from sql import db
  2. from sql.archive import add_blog_to_archive
  3. from sql.cache import (write_blog_to_cache, get_blog_from_cache, delete_blog_from_cache,
  4. write_blog_count_to_cache, get_blog_count_from_cache, delete_blog_count_from_cache,
  5. write_archive_blog_count_to_cache, get_archive_blog_count_from_cache,
  6. delete_all_archive_blog_count_from_cache, delete_archive_blog_count_from_cache,
  7. write_user_blog_count_to_cache, get_user_blog_count_from_cache,
  8. delete_all_user_blog_count_from_cache, delete_user_blog_count_from_cache,
  9. delete_blog_archive_from_cache)
  10. import object.archive
  11. from typing import Optional, List
  12. def create_blog(auth_id: int, title: str, subtitle: str, content: str,
  13. archive_list: List[object.archive.Archive]) -> bool:
  14. """ 写入新的blog """
  15. delete_blog_count_from_cache()
  16. delete_user_blog_count_from_cache(auth_id)
  17. # archive cache 在下面循环删除
  18. cur = db.insert("INSERT INTO blog(Auth, Title, SubTitle, Content) "
  19. "VALUES (%s, %s, %s, %s)", auth_id, title, subtitle, content)
  20. if cur is None or cur.rowcount == 0:
  21. return False
  22. blog_id = cur.lastrowid
  23. for archive in archive_list:
  24. delete_archive_blog_count_from_cache(archive.id)
  25. if not add_blog_to_archive(blog_id, archive.id):
  26. return False
  27. return True
  28. def update_blog(blog_id: int, content: str) -> bool:
  29. """ 更新博客文章 """
  30. delete_blog_from_cache(blog_id)
  31. cur = db.update("Update blog "
  32. "SET UpdateTime=CURRENT_TIMESTAMP(), Content=%s "
  33. "WHERE ID=%s", content, blog_id)
  34. if cur is None or cur.rowcount != 1:
  35. return False
  36. return True
  37. def read_blog(blog_id: int) -> list:
  38. """ 读取blog内容 """
  39. res = get_blog_from_cache(blog_id)
  40. if res is not None:
  41. return res
  42. cur = db.search("SELECT Auth, Title, SubTitle, Content, UpdateTime, CreateTime, Top "
  43. "FROM blog "
  44. "WHERE ID=%s", blog_id)
  45. if cur is None or cur.rowcount == 0:
  46. return [-1, "", "", "", 0, -1, False]
  47. res = cur.fetchone()
  48. write_blog_to_cache(blog_id, *res)
  49. return res
  50. def delete_blog(blog_id: int):
  51. delete_blog_count_from_cache()
  52. delete_all_archive_blog_count_from_cache()
  53. delete_all_user_blog_count_from_cache()
  54. delete_blog_from_cache(blog_id)
  55. delete_blog_archive_from_cache(blog_id)
  56. cur = db.delete("DELETE FROM blog_archive WHERE BlogID=%s", blog_id)
  57. if cur is None:
  58. return False
  59. cur = db.delete("DELETE FROM comment WHERE BlogID=%s", blog_id)
  60. if cur is None:
  61. return False
  62. cur = db.delete("DELETE FROM blog WHERE ID=%s", blog_id)
  63. if cur is None or cur.rowcount == 0:
  64. return False
  65. return True
  66. def set_blog_top(blog_id: int, top: bool = True):
  67. delete_blog_from_cache(blog_id)
  68. cur = db.update("UPDATE blog "
  69. "SET Top=%s "
  70. "WHERE ID=%s", 1 if top else 0, blog_id)
  71. if cur is None or cur.rowcount != 1:
  72. return False
  73. return True
  74. def get_blog_list(limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  75. """ 获得 blog 列表 """
  76. if limit is not None and offset is not None:
  77. cur = db.search("SELECT ID "
  78. "FROM blog "
  79. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle "
  80. "LIMIT %s OFFSET %s", limit, offset)
  81. else:
  82. cur = db.search("SELECT ID "
  83. "FROM blog "
  84. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle")
  85. if cur is None or cur.rowcount == 0:
  86. return []
  87. return [i[0] for i in cur.fetchall()]
  88. def get_blog_list_not_top(limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  89. """ 获得blog列表 忽略置顶 """
  90. if limit is not None and offset is not None:
  91. cur = db.search("SELECT ID "
  92. "FROM blog "
  93. "ORDER BY CreateTime DESC, Title, SubTitle "
  94. "LIMIT %s OFFSET %s", limit, offset)
  95. else:
  96. cur = db.search("SELECT ID "
  97. "FROM blog "
  98. "ORDER BY CreateTime DESC, Title, SubTitle")
  99. if cur is None or cur.rowcount == 0:
  100. return []
  101. return [i[0] for i in cur.fetchall()]
  102. def get_archive_blog_list(archive_id, limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  103. """ 获得指定归档的 blog 列表 """
  104. if limit is not None and offset is not None:
  105. cur = db.search("SELECT BlogID "
  106. "FROM blog_with_archive "
  107. "WHERE ArchiveID=%s "
  108. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle "
  109. "LIMIT %s OFFSET %s", archive_id, limit, offset)
  110. else:
  111. cur = db.search("SELECT BlogID "
  112. "FROM blog_with_archive "
  113. "WHERE ArchiveID=%s "
  114. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle")
  115. if cur is None or cur.rowcount == 0:
  116. return []
  117. return [i[0] for i in cur.fetchall()]
  118. def get_blog_count() -> int:
  119. """ 统计 blog 个数 """
  120. res = get_blog_count_from_cache()
  121. if res is not None:
  122. return res
  123. cur = db.search("SELECT COUNT(*) FROM blog")
  124. if cur is None or cur.rowcount == 0:
  125. return 0
  126. res = cur.fetchone()[0]
  127. write_blog_count_to_cache(res)
  128. return res
  129. def get_archive_blog_count(archive_id) -> int:
  130. """ 统计指定归档的 blog 个数 """
  131. res = get_archive_blog_count_from_cache(archive_id)
  132. if res is not None:
  133. return res
  134. cur = db.search("SELECT COUNT(*) FROM blog_with_archive WHERE ArchiveID=%s", archive_id)
  135. if cur is None or cur.rowcount == 0:
  136. return 0
  137. res = cur.fetchone()[0]
  138. write_archive_blog_count_to_cache(archive_id, res)
  139. return res
  140. def get_user_blog_count(user_id: int) -> int:
  141. """ 获得指定用户的 blog 个数 """
  142. res = get_user_blog_count_from_cache(user_id)
  143. if res is not None:
  144. return res
  145. cur = db.search("SELECT COUNT(*) FROM blog WHERE Auth=%s", user_id)
  146. if cur is None or cur.rowcount == 0:
  147. return 0
  148. res = cur.fetchone()[0]
  149. write_user_blog_count_to_cache(user_id, res)
  150. return res