blog.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. from sql import db
  2. from sql.archive import add_blog_to_archive
  3. from typing import Optional, List
  4. import object.archive
  5. def create_blog(auth_id: int, title: str, subtitle: str, content: str,
  6. archive_list: List[object.archive.Archive]) -> bool:
  7. """ 写入新的blog """
  8. title = title.replace("'", "''")
  9. subtitle = subtitle.replace("'", "''")
  10. content = content.replace("'", "''")
  11. cur = db.insert(table="blog", columns=["Auth", "Title", "SubTitle", "Content"],
  12. values=f"{auth_id}, '{title}', '{subtitle}', '{content}'")
  13. if cur is None or cur.rowcount == 0:
  14. return False
  15. blog_id = cur.lastrowid
  16. for archive in archive_list:
  17. if not add_blog_to_archive(blog_id, archive.id):
  18. return False
  19. return True
  20. def update_blog(blog_id: int, content: str) -> bool:
  21. """ 更新博客文章 """
  22. content = content.replace("'", "''")
  23. cur = db.update(table="blog",
  24. kw={"UpdateTime": "CURRENT_TIMESTAMP()", "Content": f"'{content}'"},
  25. where=f"ID={blog_id}")
  26. if cur is None or cur.rowcount != 1:
  27. return False
  28. return True
  29. def read_blog(blog_id: int) -> list:
  30. """ 读取blog内容 """
  31. cur = db.search("SELECT Auth, Title, SubTitle, Content, UpdateTime, CreateTime, Top "
  32. "FROM blog "
  33. "WHERE ID=%s", blog_id)
  34. if cur is None or cur.rowcount == 0:
  35. return [-1, "", "", "", 0, -1, False]
  36. return cur.fetchone()
  37. def delete_blog(blog_id: int):
  38. cur = db.delete(table="blog_archive", where=f"BlogID={blog_id}")
  39. if cur is None:
  40. return False
  41. cur = db.delete(table="comment", where=f"BlogID={blog_id}")
  42. if cur is None:
  43. return False
  44. cur = db.delete(table="blog", where=f"ID={blog_id}")
  45. if cur is None or cur.rowcount == 0:
  46. return False
  47. return True
  48. def set_blog_top(blog_id: int, top: bool = True):
  49. cur = db.update(table="blog", kw={"Top": "1" if top else "0"}, where=f"ID={blog_id}")
  50. if cur is None or cur.rowcount != 1:
  51. return False
  52. return True
  53. def get_blog_list(limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  54. """ 获得 blog 列表 """
  55. if limit is not None and offset is not None:
  56. cur = db.search("SELECT ID, Title, SubTitle, UpdateTime, CreateTime, Top "
  57. "FROM blog_with_top " # TODO: 去除blog_with_top
  58. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle "
  59. "LIMIT %s OFFSET %s", limit, offset)
  60. else:
  61. cur = db.search("SELECT ID, Title, SubTitle, UpdateTime, CreateTime, Top "
  62. "FROM blog_with_top " # TODO: 去除blog_with_top
  63. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle")
  64. if cur is None or cur.rowcount == 0:
  65. return []
  66. return cur.fetchall()
  67. def get_blog_list_not_top(limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  68. """ 获得blog列表 忽略置顶 """
  69. if limit is not None and offset is not None:
  70. cur = db.search("SELECT ID, Title, SubTitle, UpdateTime, CreateTime "
  71. "FROM blog "
  72. "ORDER BY CreateTime DESC, Title, SubTitle "
  73. "LIMIT %s OFFSET %s", limit, offset)
  74. else:
  75. cur = db.search("SELECT ID, Title, SubTitle, UpdateTime, CreateTime "
  76. "FROM blog "
  77. "ORDER BY CreateTime DESC, Title, SubTitle")
  78. if cur is None or cur.rowcount == 0:
  79. return []
  80. return cur.fetchall()
  81. def get_blog_count() -> int:
  82. """ 统计 blog 个数 """
  83. cur = db.search("SELECT COUNT(*) FROM blog")
  84. if cur is None or cur.rowcount == 0:
  85. return 0
  86. return cur.fetchone()[0]
  87. def get_archive_blog_list(archive_id, limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  88. """ 获得指定归档的 blog 列表 """
  89. if limit is not None and offset is not None:
  90. cur = db.search("SELECT BlogID, Title, SubTitle, UpdateTime, CreateTime, Top "
  91. "FROM blog_with_archive "
  92. "WHERE ArchiveID=%s "
  93. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle "
  94. "LIMIT %s OFFSET %s", archive_id, limit, offset)
  95. else:
  96. cur = db.search("SELECT BlogID, Title, SubTitle, UpdateTime, CreateTime, Top "
  97. "FROM blog_with_archive "
  98. "WHERE ArchiveID=%s "
  99. "ORDER BY Top DESC, CreateTime DESC, Title, SubTitle")
  100. if cur is None or cur.rowcount == 0:
  101. return []
  102. return cur.fetchall()
  103. def get_archive_blog_count(archive_id) -> int:
  104. """ 统计指定归档的 blog 个数 """
  105. cur = db.search("SELECT COUNT(*) FROM blog_with_archive WHERE ArchiveID=%s", archive_id)
  106. if cur is None or cur.rowcount == 0:
  107. return 0
  108. return cur.fetchone()[0]
  109. def get_user_user_count(user_id: int) -> int:
  110. """ 获得指定用户的 blog 个数 """
  111. cur = db.search("SELECT COUNT(*) FROM blog WHERE Auth=%s", user_id)
  112. if cur is None or cur.rowcount == 0:
  113. return 0
  114. return cur.fetchone()[0]