blog.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. from sql import db
  2. from sql.archive import add_blog_to_archive
  3. from sql.cache import (write_blog_to_cache, read_blog_from_cache, delete_blog_from_cache,
  4. write_blog_count_to_cache, get_blog_count_from_cache, delete_blog_count_from_cache,
  5. write_archive_blog_count_to_cache, get_archive_blog_count_from_cache,
  6. delete_all_archive_blog_count_from_cache, delete_archive_blog_count_from_cache,
  7. write_user_blog_count_to_cache, get_user_blog_count_from_cache,
  8. delete_all_user_blog_count_from_cache, delete_user_blog_count_from_cache,
  9. delete_blog_archive_from_cache)
  10. import object.archive
  11. from typing import Optional, List
  12. def create_blog(auth_id: int, title: str, subtitle: str, content: str,
  13. archive_list: List[object.archive.Archive]) -> bool:
  14. """ 写入新的blog """
  15. delete_blog_count_from_cache()
  16. delete_user_blog_count_from_cache(auth_id)
  17. # archive cache 在下面循环删除
  18. title = title.replace("'", "''")
  19. subtitle = subtitle.replace("'", "''")
  20. content = content.replace("'", "''")
  21. cur = db.insert("INSERT INTO blog(Auth, Title, SubTitle, Content) "
  22. "VALUES (%s, %s, %s, %s)", auth_id, title, subtitle, content)
  23. if cur is None or cur.rowcount == 0:
  24. return False
  25. blog_id = cur.lastrowid
  26. for archive in archive_list:
  27. delete_archive_blog_count_from_cache(archive.id)
  28. if not add_blog_to_archive(blog_id, archive.id):
  29. return False
  30. return True
  31. def update_blog(blog_id: int, content: str) -> bool:
  32. """ 更新博客文章 """
  33. delete_blog_from_cache(blog_id)
  34. content = content.replace("'", "''")
  35. cur = db.update("Update blog "
  36. "SET UpdateTime=CURRENT_TIMESTAMP(), Content=%s "
  37. "WHERE ID=%s", content, blog_id)
  38. if cur is None or cur.rowcount != 1:
  39. return False
  40. return True
  41. def read_blog(blog_id: int) -> list:
  42. """ 读取blog内容 """
  43. res = read_blog_from_cache(blog_id)
  44. if res is not None:
  45. return res
  46. cur = db.search("SELECT Auth, Title, SubTitle, Content, UpdateTime, CreateTime, Top "
  47. "FROM blog "
  48. "WHERE ID=%s", blog_id)
  49. if cur is None or cur.rowcount == 0:
  50. return [-1, "", "", "", 0, -1, False]
  51. res = cur.fetchone()
  52. write_blog_to_cache(blog_id, *res)
  53. return res
  54. def delete_blog(blog_id: int):
  55. delete_blog_count_from_cache()
  56. delete_all_archive_blog_count_from_cache()
  57. delete_all_user_blog_count_from_cache()
  58. delete_blog_from_cache(blog_id)
  59. delete_blog_archive_from_cache(blog_id)
  60. cur = db.delete("DELETE FROM blog_archive WHERE BlogID=%s", blog_id)
  61. if cur is None:
  62. return False
  63. cur = db.delete("DELETE FROM comment WHERE BlogID=%s", blog_id)
  64. if cur is None:
  65. return False
  66. cur = db.delete("DELETE FROM blog WHERE ID=%s", blog_id)
  67. if cur is None or cur.rowcount == 0:
  68. return False
  69. return True
  70. def set_blog_top(blog_id: int, top: bool = True):
  71. delete_blog_from_cache(blog_id)
  72. cur = db.update("UPDATE blog "
  73. "SET Top=%s "
  74. "WHERE ID=%s", 1 if top else 0, blog_id)
  75. if cur is None or cur.rowcount != 1:
  76. return False
  77. return True
  78. def get_blog_list(limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  79. """ 获得 blog 列表 """
  80. if limit is not None and offset is not None:
  81. cur = db.search("SELECT ID "
  82. "FROM blog "
  83. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle "
  84. "LIMIT %s OFFSET %s", limit, offset)
  85. else:
  86. cur = db.search("SELECT ID "
  87. "FROM blog "
  88. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle")
  89. if cur is None or cur.rowcount == 0:
  90. return []
  91. return [i[0] for i in cur.fetchall()]
  92. def get_blog_list_not_top(limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  93. """ 获得blog列表 忽略置顶 """
  94. if limit is not None and offset is not None:
  95. cur = db.search("SELECT ID "
  96. "FROM blog "
  97. "ORDER BY CreateTime DESC, Title, SubTitle "
  98. "LIMIT %s OFFSET %s", limit, offset)
  99. else:
  100. cur = db.search("SELECT ID "
  101. "FROM blog "
  102. "ORDER BY CreateTime DESC, Title, SubTitle")
  103. if cur is None or cur.rowcount == 0:
  104. return []
  105. return [i[0] for i in cur.fetchall()]
  106. def get_archive_blog_list(archive_id, limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  107. """ 获得指定归档的 blog 列表 """
  108. if limit is not None and offset is not None:
  109. cur = db.search("SELECT BlogID "
  110. "FROM blog_with_archive "
  111. "WHERE ArchiveID=%s "
  112. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle "
  113. "LIMIT %s OFFSET %s", archive_id, limit, offset)
  114. else:
  115. cur = db.search("SELECT BlogID "
  116. "FROM blog_with_archive "
  117. "WHERE ArchiveID=%s "
  118. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle")
  119. if cur is None or cur.rowcount == 0:
  120. return []
  121. return [i[0] for i in cur.fetchall()]
  122. def get_blog_count() -> int:
  123. """ 统计 blog 个数 """
  124. res = get_blog_count_from_cache()
  125. if res is not None:
  126. return res
  127. cur = db.search("SELECT COUNT(*) FROM blog")
  128. if cur is None or cur.rowcount == 0:
  129. return 0
  130. res = cur.fetchone()[0]
  131. write_blog_count_to_cache(res)
  132. return res
  133. def get_archive_blog_count(archive_id) -> int:
  134. """ 统计指定归档的 blog 个数 """
  135. res = get_archive_blog_count_from_cache(archive_id)
  136. if res is not None:
  137. return res
  138. cur = db.search("SELECT COUNT(*) FROM blog_with_archive WHERE ArchiveID=%s", archive_id)
  139. if cur is None or cur.rowcount == 0:
  140. return 0
  141. res = cur.fetchone()[0]
  142. write_archive_blog_count_to_cache(archive_id, res)
  143. return res
  144. def get_user_blog_count(user_id: int) -> int:
  145. """ 获得指定用户的 blog 个数 """
  146. res = get_user_blog_count_from_cache(user_id)
  147. if res is not None:
  148. return res
  149. cur = db.search("SELECT COUNT(*) FROM blog WHERE Auth=%s", user_id)
  150. if cur is None or cur.rowcount == 0:
  151. return 0
  152. res = cur.fetchone()[0]
  153. write_user_blog_count_to_cache(user_id, res)
  154. return res