archive.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. from sql import db
  2. from typing import Optional
  3. def create_archive(name: str, describe: str):
  4. """ 创建新归档 """
  5. name = name.replace("'", "''")
  6. describe = describe.replace("'", "''")
  7. cur = db.insert(table="archive",
  8. columns=["Name", "DescribeText"],
  9. values=f"'{name}', '{describe}'")
  10. if cur is None or cur.rowcount == 0:
  11. return None
  12. return cur.lastrowid
  13. def read_archive(archive_id: int):
  14. """ 获取归档 ID """
  15. cur = db.search("SELECT Name, DescribeText "
  16. "FROM archive "
  17. "WHERE ID=%s", archive_id)
  18. if cur is None or cur.rowcount == 0:
  19. return ["", ""]
  20. return cur.fetchone()
  21. def get_blog_archive(blog_id: int):
  22. """ 获取文章的归档 """
  23. cur = db.search("SELECT ArchiveID FROM blog_archive_with_name "
  24. "WHERE BlogID=%s "
  25. "ORDER BY ArchiveName", blog_id)
  26. if cur is None or cur.rowcount == 0:
  27. return []
  28. return [i[0] for i in cur.fetchall()]
  29. def delete_archive(archive_id: int):
  30. cur = db.delete(table="blog_archive", where=f"ArchiveID={archive_id}")
  31. if cur is None:
  32. return False
  33. cur = db.delete(table="archive", where=f"ID={archive_id}")
  34. if cur is None or cur.rowcount == 0:
  35. return False
  36. return True
  37. def add_blog_to_archive(blog_id: int, archive_id: int):
  38. cur = db.search("SELECT BlogID FROM blog_archive WHERE BlogID=%s AND ArchiveID=%s", blog_id, archive_id)
  39. if cur is None:
  40. return False
  41. if cur.rowcount > 0:
  42. return True
  43. cur = db.insert(table="blog_archive", columns=["BlogID", "ArchiveID"], values=f"{blog_id}, {archive_id}")
  44. if cur is None or cur.rowcount != 1:
  45. return False
  46. return True
  47. def sub_blog_from_archive(blog_id: int, archive_id: int):
  48. cur = db.delete(table="blog_archive", where=f"BlogID={blog_id} AND ArchiveID={archive_id}")
  49. if cur is None:
  50. return False
  51. return True
  52. def get_archive_list(limit: Optional[int] = None, offset: Optional[int] = None):
  53. """ 获取归档列表 """
  54. if limit is not None and offset is not None:
  55. cur = db.search("SELECT ID, Name, DescribeText, Count "
  56. "FROM archive_with_count "
  57. "ORDER BY Count DESC , Name "
  58. "LIMIT %s "
  59. "OFFSET %s ", limit, offset)
  60. else:
  61. cur = db.search("SELECT ID, Name, DescribeText, Count "
  62. "FROM archive_with_count "
  63. "ORDER BY Count DESC , Name")
  64. if cur is None or cur.rowcount == 0:
  65. return []
  66. return cur.fetchall()