1
0

archive.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from sql import db
  2. from typing import Optional
  3. def create_archive(name: str, describe: str):
  4. """ 创建新归档 """
  5. name = name.replace("'", "''")
  6. describe = describe.replace("'", "''")
  7. cur = db.insert(table="archive",
  8. columns=["Name", "DescribeText"],
  9. values=f"'{name}', '{describe}'")
  10. if cur is None or cur.rowcount == 0:
  11. return None
  12. return cur.lastrowid
  13. def read_archive(archive_id: int):
  14. """ 获取归档 ID """
  15. cur = db.search(columns=["Name", "DescribeText"],
  16. table="archive",
  17. where=f"ID={archive_id}")
  18. if cur is None or cur.rowcount == 0:
  19. return ["", ""]
  20. return cur.fetchone()
  21. def get_blog_archive(blog_id: int):
  22. """ 获取文章的归档 """
  23. cur = db.search(columns=["ArchiveID"],
  24. table="blog_archive_with_name",
  25. where=f"BlogID={blog_id}",
  26. order_by=[("ArchiveName", "ASC")])
  27. if cur is None or cur.rowcount == 0:
  28. return []
  29. return [i[0] for i in cur.fetchall()]
  30. def delete_archive(archive_id: int):
  31. cur = db.delete(table="blog_archive", where=f"ArchiveID={archive_id}")
  32. if cur is None:
  33. return False
  34. cur = db.delete(table="archive", where=f"ID={archive_id}")
  35. if cur is None or cur.rowcount == 0:
  36. return False
  37. return True
  38. def add_blog_to_archive(blog_id: int, archive_id: int):
  39. cur = db.search(columns=["BlogID"], table="blog_archive", where=f"BlogID={blog_id} AND ArchiveID={archive_id}")
  40. if cur is None:
  41. return False
  42. if cur.rowcount > 0:
  43. return True
  44. cur = db.insert(table="blog_archive", columns=["BlogID", "ArchiveID"], values=f"{blog_id}, {archive_id}")
  45. if cur is None or cur.rowcount != 1:
  46. return False
  47. return True
  48. def sub_blog_from_archive(blog_id: int, archive_id: int):
  49. cur = db.delete(table="blog_archive", where=f"BlogID={blog_id} AND ArchiveID={archive_id}")
  50. if cur is None:
  51. return False
  52. return True
  53. def get_archive_list(limit: Optional[int] = None, offset: Optional[int] = None):
  54. """ 获取归档列表 """
  55. cur = db.search(columns=["ID", "Name", "DescribeText", "Count"], table="archive_with_count",
  56. limit=limit,
  57. offset=offset,
  58. order_by=[("Count", "DESC"), ("Name", "ASC")])
  59. if cur is None or cur.rowcount == 0:
  60. return []
  61. return cur.fetchall()