blog.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. from sql import db, DB
  2. from sql.base import DBBit
  3. from sql.archive import add_blog_to_archive
  4. from sql.cache import (write_blog_to_cache, get_blog_from_cache, delete_blog_from_cache,
  5. write_blog_count_to_cache, get_blog_count_from_cache, delete_blog_count_from_cache,
  6. write_archive_blog_count_to_cache, get_archive_blog_count_from_cache,
  7. delete_all_archive_blog_count_from_cache, delete_archive_blog_count_from_cache,
  8. write_user_blog_count_to_cache, get_user_blog_count_from_cache,
  9. delete_all_user_blog_count_from_cache, delete_user_blog_count_from_cache,
  10. delete_blog_archive_from_cache)
  11. import object.archive
  12. from typing import Optional, List
  13. def create_blog(auth_id: int, title: str, subtitle: str, content: str,
  14. archive_list: List[object.archive.Archive], mysql: DB = db) -> bool:
  15. """ 写入新的blog """
  16. delete_blog_count_from_cache()
  17. delete_user_blog_count_from_cache(auth_id)
  18. # archive cache 在下面循环删除
  19. cur = mysql.insert("INSERT INTO blog(Auth, Title, SubTitle, Content) "
  20. "VALUES (%s, %s, %s, %s)", auth_id, title, subtitle, content)
  21. if cur is None or cur.rowcount == 0:
  22. return False
  23. blog_id = cur.lastrowid
  24. for archive in archive_list:
  25. if not add_blog_to_archive(blog_id, archive.id):
  26. return False
  27. delete_archive_blog_count_from_cache(archive.id)
  28. read_blog(blog_id, mysql) # 刷新缓存
  29. return True
  30. def update_blog(blog_id: int, content: str, mysql: DB = db) -> bool:
  31. """ 更新博客文章 """
  32. delete_blog_from_cache(blog_id)
  33. cur = mysql.update("Update blog "
  34. "SET UpdateTime=CURRENT_TIMESTAMP(), Content=%s "
  35. "WHERE ID=%s", content, blog_id)
  36. if cur is None or cur.rowcount != 1:
  37. return False
  38. read_blog(blog_id, mysql) # 刷新缓存
  39. return True
  40. def read_blog(blog_id: int, mysql: DB = db, not_cache=False) -> list:
  41. """ 读取blog内容 """
  42. if not not_cache:
  43. res = get_blog_from_cache(blog_id)
  44. if res is not None:
  45. return res
  46. cur = mysql.search("SELECT Auth, Title, SubTitle, Content, UpdateTime, CreateTime, Top "
  47. "FROM blog "
  48. "WHERE ID=%s", blog_id)
  49. if cur is None or cur.rowcount == 0:
  50. return [-1, "", "", "", 0, -1, False]
  51. res = cur.fetchone()
  52. write_blog_to_cache(blog_id, *res, is_db_bit=True)
  53. return [*res[:6], res[-1] == DBBit.BIT_1]
  54. def delete_blog(blog_id: int, mysql: DB = db):
  55. delete_blog_count_from_cache()
  56. delete_all_archive_blog_count_from_cache()
  57. delete_all_user_blog_count_from_cache()
  58. delete_blog_from_cache(blog_id)
  59. delete_blog_archive_from_cache(blog_id)
  60. conn = mysql.get_connection()
  61. cur = mysql.delete("DELETE FROM blog_archive WHERE BlogID=%s", blog_id, connection=conn)
  62. if cur is None:
  63. conn.rollback()
  64. conn.close()
  65. return False
  66. cur = mysql.delete("DELETE FROM comment WHERE BlogID=%s", blog_id, connection=conn)
  67. if cur is None:
  68. conn.rollback()
  69. conn.close()
  70. return False
  71. cur = mysql.delete("DELETE FROM blog WHERE ID=%s", blog_id, connection=conn)
  72. if cur is None or cur.rowcount == 0:
  73. conn.rollback()
  74. conn.close()
  75. return False
  76. conn.commit()
  77. conn.close()
  78. return True
  79. def set_blog_top(blog_id: int, top: bool = True, mysql: DB = db):
  80. delete_blog_from_cache(blog_id)
  81. cur = mysql.update("UPDATE blog "
  82. "SET Top=%s "
  83. "WHERE ID=%s", 1 if top else 0, blog_id)
  84. if cur is None or cur.rowcount != 1:
  85. return False
  86. read_blog(blog_id, mysql) # 刷新缓存
  87. return True
  88. def get_blog_list(limit: Optional[int] = None, offset: Optional[int] = None, mysql: DB = db) -> list:
  89. """ 获得 blog 列表 """
  90. if limit is not None and offset is not None:
  91. cur = mysql.search("SELECT ID "
  92. "FROM blog "
  93. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle "
  94. "LIMIT %s OFFSET %s", limit, offset)
  95. else:
  96. cur = mysql.search("SELECT ID "
  97. "FROM blog "
  98. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle")
  99. if cur is None or cur.rowcount == 0:
  100. return []
  101. return [i[0] for i in cur.fetchall()]
  102. def get_blog_list_iter(mysql: DB = db):
  103. """ 获得 blog 列表 """
  104. cur = mysql.search("SELECT ID "
  105. "FROM blog "
  106. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle")
  107. if cur is None or cur.rowcount == 0:
  108. return []
  109. return cur
  110. def get_blog_list_not_top(limit: Optional[int] = None, offset: Optional[int] = None, mysql: DB = db) -> list:
  111. """ 获得blog列表 忽略置顶 """
  112. if limit is not None and offset is not None:
  113. cur = mysql.search("SELECT ID "
  114. "FROM blog "
  115. "ORDER BY CreateTime DESC, Title, SubTitle "
  116. "LIMIT %s OFFSET %s", limit, offset)
  117. else:
  118. cur = mysql.search("SELECT ID "
  119. "FROM blog "
  120. "ORDER BY CreateTime DESC, Title, SubTitle")
  121. if cur is None or cur.rowcount == 0:
  122. return []
  123. return [i[0] for i in cur.fetchall()]
  124. def get_archive_blog_list(archive_id, limit: Optional[int] = None,
  125. offset: Optional[int] = None,
  126. mysql: DB = db) -> list:
  127. """ 获得指定归档的 blog 列表 """
  128. if limit is not None and offset is not None:
  129. cur = mysql.search("SELECT BlogID "
  130. "FROM blog_with_archive "
  131. "WHERE ArchiveID=%s "
  132. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle "
  133. "LIMIT %s OFFSET %s", archive_id, limit, offset)
  134. else:
  135. cur = mysql.search("SELECT BlogID "
  136. "FROM blog_with_archive "
  137. "WHERE ArchiveID=%s "
  138. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle")
  139. if cur is None or cur.rowcount == 0:
  140. return []
  141. return [i[0] for i in cur.fetchall()]
  142. def get_blog_count(mysql: DB = db, not_cache=False) -> int:
  143. """ 统计 blog 个数 """
  144. if not not_cache:
  145. res = get_blog_count_from_cache()
  146. if res is not None:
  147. return res
  148. cur = mysql.search("SELECT COUNT(*) FROM blog")
  149. if cur is None or cur.rowcount == 0:
  150. return 0
  151. res = cur.fetchone()[0]
  152. write_blog_count_to_cache(res)
  153. return res
  154. def get_archive_blog_count(archive_id, mysql: DB = db, not_cache=False) -> int:
  155. """ 统计指定归档的 blog 个数 """
  156. if not not_cache:
  157. res = get_archive_blog_count_from_cache(archive_id)
  158. if res is not None:
  159. return res
  160. cur = mysql.search("SELECT COUNT(*) FROM blog_with_archive WHERE ArchiveID=%s", archive_id)
  161. if cur is None or cur.rowcount == 0:
  162. return 0
  163. res = cur.fetchone()[0]
  164. write_archive_blog_count_to_cache(archive_id, res)
  165. return res
  166. def get_user_blog_count(user_id: int, mysql: DB = db, not_cache=False) -> int:
  167. """ 获得指定用户的 blog 个数 """
  168. if not not_cache:
  169. res = get_user_blog_count_from_cache(user_id)
  170. if res is not None:
  171. return res
  172. cur = mysql.search("SELECT COUNT(*) FROM blog WHERE Auth=%s", user_id)
  173. if cur is None or cur.rowcount == 0:
  174. return 0
  175. res = cur.fetchone()[0]
  176. write_user_blog_count_to_cache(user_id, res)
  177. return res