blog.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. from sql import db
  2. from typing import Optional, List
  3. import object.archive
  4. def create_blog(auth_id: int, title: str, subtitle: str, content: str,
  5. archive_list: List[object.archive.Archive]) -> bool:
  6. """ 写入新的blog """
  7. title = title.replace("'", "''")
  8. subtitle = subtitle.replace("'", "''")
  9. content = content.replace("'", "''")
  10. cur = db.insert(table="blog", columns=["Auth", "Title", "SubTitle", "Content"],
  11. values=f"{auth_id}, '{title}', '{subtitle}', '{content}'")
  12. if cur is None or cur.rowcount == 0:
  13. return False
  14. blog_id = cur.lastrowid
  15. for archive in archive_list:
  16. cur = db.insert(table="blog_archive", columns=["BlogID", "ArchiveID"],
  17. values=f"{blog_id}, {archive.archive_id}")
  18. if cur is None or cur.rowcount == 0:
  19. return False
  20. return True
  21. def update_blog(blog_id: int, content: str) -> bool:
  22. """ 更新博客文章 """
  23. content = content.replace("'", "''")
  24. cur = db.update(table="blog",
  25. kw={"UpdateTime": "CURRENT_TIMESTAMP()", "Content": f"'{content}'"},
  26. where=f"ID={blog_id}")
  27. if cur is None or cur.rowcount != 1:
  28. return False
  29. return True
  30. def read_blog(blog_id: int) -> list:
  31. """ 读取blog内容 """
  32. cur = db.search(columns=["Auth", "Title", "SubTitle", "Content", "UpdateTime", "CreateTime", "Top"],
  33. table="blog",
  34. where=f"ID={blog_id}")
  35. if cur is None or cur.rowcount == 0:
  36. return []
  37. return cur.fetchone()
  38. def delete_blog(blog_id: int):
  39. cur = db.delete(table="blog_archive", where=f"BlogID={blog_id}")
  40. if cur is None:
  41. return False
  42. cur = db.delete(table="comment", where=f"BlogID={blog_id}")
  43. if cur is None:
  44. return False
  45. cur = db.delete(table="blog", where=f"ID={blog_id}")
  46. if cur is None or cur.rowcount == 0:
  47. return False
  48. return True
  49. def get_blog_list(limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  50. """ 获得 blog 列表 """
  51. cur = db.search(columns=["ID", "Title", "SubTitle", "UpdateTime", "CreateTime", "Top"], table="blog_with_top",
  52. order_by=[("CreateTime", "DESC"), ("Title", "ASC"), ("SubTitle", "ASC")],
  53. limit=limit,
  54. offset=offset)
  55. if cur is None or cur.rowcount == 0:
  56. return []
  57. return cur.fetchall()
  58. def get_blog_list_not_top(limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  59. """ 获得blog列表 忽略置顶 """
  60. cur = db.search(columns=["ID", "Title", "SubTitle", "UpdateTime", "CreateTime"], table="blog",
  61. order_by=[("CreateTime", "DESC"), ("Title", "ASC"), ("SubTitle", "ASC")],
  62. limit=limit,
  63. offset=offset)
  64. if cur is None or cur.rowcount == 0:
  65. return []
  66. return cur.fetchall()
  67. def get_blog_count() -> int:
  68. """ 统计 blog 个数 """
  69. cur = db.search(columns=["count(ID)"], table="blog")
  70. if cur is None or cur.rowcount == 0:
  71. return 0
  72. return cur.fetchone()[0]
  73. def get_archive_blog_list(archive_id, limit: Optional[int] = None, offset: Optional[int] = None) -> list:
  74. """ 获得指定归档的 blog 列表 """
  75. cur = db.search(columns=["BlogID", "Title", "SubTitle", "UpdateTime", "CreateTime", "Top"], table="blog_with_archive",
  76. order_by=[("CreateTime", "DESC"), ("Title", "ASC"), ("SubTitle", "ASC")],
  77. where=f"ArchiveID={archive_id}",
  78. limit=limit,
  79. offset=offset)
  80. if cur is None or cur.rowcount == 0:
  81. return []
  82. return cur.fetchall()
  83. def get_archive_blog_count(archive_id) -> int:
  84. """ 统计指定归档的 blog 个数 """
  85. cur = db.search(columns=["count(ID)"], table="blog_with_archive",
  86. where=f"ArchiveID={archive_id}")
  87. if cur is None or cur.rowcount == 0:
  88. return 0
  89. return cur.fetchone()[0]
  90. def get_user_user_count(user_id: int) -> int:
  91. """ 获得指定用户的 blog 个数 """
  92. cur = db.search(columns=["count(ID)"], table="blog",
  93. where=f"Auth={user_id}")
  94. if cur is None or cur.rowcount == 0:
  95. return 0
  96. return cur.fetchone()[0]