archive.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from sql import db
  2. from typing import Optional
  3. def create_archive(name: str, describe: str):
  4. """ 创建新归档 """
  5. name = name.replace("'", "''")
  6. describe = describe.replace("'", "''")
  7. cur = db.insert(table="archive",
  8. columns=["Name", "DescribeText"],
  9. values=f"'{name}', '{describe}'")
  10. if cur is None or cur.rowcount == 0:
  11. return None
  12. return cur.lastrowid
  13. def read_archive(archive_id: int):
  14. """ 获取归档 ID """
  15. cur = db.search(columns=["Name", "DescribeText"], table="archive",
  16. where=f"ID={archive_id}")
  17. if cur is None or cur.rowcount == 0:
  18. return ["", ""]
  19. return cur.fetchone()
  20. def get_blog_archive(blog_id: int):
  21. """ 获取文章的归档 """
  22. cur = db.search(columns=["ArchiveID", "ArchiveName", "DescribeText"], table="blog_archive_with_name",
  23. where=f"BlogID={blog_id}")
  24. if cur is None or cur.rowcount == 0:
  25. return []
  26. return cur.fetchall()
  27. def delete_archive(archive_id: int):
  28. cur = db.delete(table="blog_archive", where=f"ArchiveID={archive_id}")
  29. if cur is None:
  30. return False
  31. cur = db.delete(table="archive", where=f"ID={archive_id}")
  32. if cur is None or cur.rowcount == 0:
  33. return False
  34. return True
  35. def add_blog_to_archive(blog_id: int, archive_id: int):
  36. cur = db.search(columns=["BlogID"], table="blog_archive", where=f"BlogID={blog_id} AND ArchiveID={archive_id}")
  37. if cur is None:
  38. print("H1")
  39. return False
  40. if cur.rowcount > 0:
  41. print("H2")
  42. return True
  43. cur = db.insert(table="blog_archive", columns=["BlogID", "ArchiveID"], values=f"{blog_id}, {archive_id}")
  44. if cur is None or cur.rowcount != 1:
  45. print("H3")
  46. return False
  47. return True
  48. def sub_blog_from_archive(blog_id: int, archive_id: int):
  49. cur = db.delete(table="blog_archive", where=f"BlogID={blog_id} AND ArchiveID={archive_id}")
  50. if cur is None:
  51. return False
  52. return True
  53. def get_archive_list(limit: Optional[int] = None, offset: Optional[int] = None):
  54. """ 获取归档列表 """
  55. cur = db.search(columns=["ID", "Name", "DescribeText", "Count"], table="archive_with_count",
  56. limit=limit,
  57. offset=offset,
  58. order_by=[("Count", "DESC"), ("Name", "ASC")])
  59. if cur is None or cur.rowcount == 0:
  60. return []
  61. return cur.fetchall()