blog.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. from sql import db
  2. from sql.archive import add_blog_to_archive
  3. from typing import Optional, List
  4. import object.archive
  5. def create_blog(auth_id: int, title: str, subtitle: str, content: str,
  6. archive_list: List[object.archive.Archive]) -> bool:
  7. """ 写入新的blog """
  8. title = title.replace("'", "''")
  9. subtitle = subtitle.replace("'", "''")
  10. content = content.replace("'", "''")
  11. cur = db.insert(table="blog", columns=["Auth", "Title", "SubTitle", "Content"],
  12. values=f"{auth_id}, '{title}', '{subtitle}', '{content}'")
  13. if cur is None or cur.rowcount == 0:
  14. return False
  15. blog_id = cur.lastrowid
  16. for archive in archive_list:
  17. if not add_blog_to_archive(blog_id, archive.id):
  18. return False
  19. return True
  20. def update_blog(blog_id: int, content: str) -> bool:
  21. """ 更新博客文章 """
  22. content = content.replace("'", "''")
  23. cur = db.update(table="blog",
  24. kw={"UpdateTime": "CURRENT_TIMESTAMP()", "Content": f"'{content}'"},
  25. where=f"ID={blog_id}")
  26. if cur is None or cur.rowcount != 1:
  27. return False
  28. return True
  29. def read_blog(blog_id: int) -> list:
  30. """ 读取blog内容 """
  31. cur = db.search(columns=["Auth", "Title", "SubTitle", "Content", "UpdateTime", "CreateTime", "Top"],
  32. table="blog",
  33. where=f"ID={blog_id}")
  34. if cur is None or cur.rowcount == 0:
  35. return [-1, "", "", "", 0, -1, False]
  36. return cur.fetchone()
  37. def delete_blog(blog_id: int):
  38. cur = db.delete(table="blog_archive", where=f"BlogID={blog_id}")
  39. if cur is None:
  40. return False
  41. cur = db.delete(table="comment", where=f"BlogID={blog_id}")
  42. if cur is None:
  43. return False
  44. cur = db.delete(table="blog", where=f"ID={blog_id}")
  45. if cur is None or cur.rowcount == 0:
  46. return False
  47. return True
  48. def set_blog_top(blog_id: int, top: bool = True):
  49. cur = db.update(table="blog", kw={"Top": "1" if top else "0"}, where=f"ID={blog_id}")
  50. if cur is None or cur.rowcount != 1:
  51. return False
  52. return True
  53. def get_blog_list(limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  54. """ 获得 blog 列表 """
  55. cur = db.search(columns=["ID", "Title", "SubTitle", "UpdateTime", "CreateTime", "Top"], table="blog_with_top",
  56. order_by=[("CreateTime", "DESC"), ("Title", "ASC"), ("SubTitle", "ASC")],
  57. limit=limit,
  58. offset=offset)
  59. if cur is None or cur.rowcount == 0:
  60. return []
  61. return cur.fetchall()
  62. def get_blog_list_not_top(limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  63. """ 获得blog列表 忽略置顶 """
  64. cur = db.search(columns=["ID", "Title", "SubTitle", "UpdateTime", "CreateTime"], table="blog",
  65. order_by=[("CreateTime", "DESC"), ("Title", "ASC"), ("SubTitle", "ASC")],
  66. limit=limit,
  67. offset=offset)
  68. if cur is None or cur.rowcount == 0:
  69. return []
  70. return cur.fetchall()
  71. def get_blog_count() -> int:
  72. """ 统计 blog 个数 """
  73. cur = db.search(columns=["count(ID)"], table="blog")
  74. if cur is None or cur.rowcount == 0:
  75. return 0
  76. return cur.fetchone()[0]
  77. def get_archive_blog_list(archive_id, limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  78. """ 获得指定归档的 blog 列表 """
  79. cur = db.search(columns=["BlogID", "Title", "SubTitle", "UpdateTime", "CreateTime", "Top"], table="blog_with_archive",
  80. order_by=[("CreateTime", "DESC"), ("Title", "ASC"), ("SubTitle", "ASC")],
  81. where=f"ArchiveID={archive_id}",
  82. limit=limit,
  83. offset=offset)
  84. if cur is None or cur.rowcount == 0:
  85. return []
  86. return cur.fetchall()
  87. def get_archive_blog_count(archive_id) -> int:
  88. """ 统计指定归档的 blog 个数 """
  89. cur = db.search(columns=["count(ID)"], table="blog_with_archive",
  90. where=f"ArchiveID={archive_id}")
  91. if cur is None or cur.rowcount == 0:
  92. return 0
  93. return cur.fetchone()[0]
  94. def get_user_user_count(user_id: int) -> int:
  95. """ 获得指定用户的 blog 个数 """
  96. cur = db.search(columns=["count(ID)"], table="blog",
  97. where=f"Auth={user_id}")
  98. if cur is None or cur.rowcount == 0:
  99. return 0
  100. return cur.fetchone()[0]