archive.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. from sql import db
  2. from sql.cache import (read_archive_from_cache, write_archive_to_cache, delete_archive_from_cache,
  3. get_blog_archive_from_cache, write_blog_archive_to_cache, delete_blog_archive_from_cache,
  4. delete_all_blog_archive_from_cache)
  5. from typing import Optional
  6. def create_archive(name: str, describe: str):
  7. """ 创建新归档 """
  8. name = name.replace("'", "''")
  9. describe = describe.replace("'", "''")
  10. cur = db.insert("INSERT INTO archive(Name, DescribeText) "
  11. "VALUES (%s, %s)", name, describe)
  12. if cur is None or cur.rowcount == 0:
  13. return None
  14. return cur.lastrowid
  15. def read_archive(archive_id: int):
  16. """ 获取归档 ID """
  17. res = read_archive_from_cache(archive_id)
  18. if res is not None:
  19. return res
  20. cur = db.search("SELECT Name, DescribeText "
  21. "FROM archive "
  22. "WHERE ID=%s", archive_id)
  23. if cur is None or cur.rowcount == 0:
  24. return ["", ""]
  25. res = cur.fetchone()
  26. write_archive_to_cache(archive_id, *res)
  27. return res
  28. def get_blog_archive(blog_id: int):
  29. """ 获取文章的归档 """
  30. res = get_blog_archive_from_cache(blog_id)
  31. if res is not None:
  32. return res
  33. cur = db.search("SELECT ArchiveID FROM blog_archive_with_name "
  34. "WHERE BlogID=%s "
  35. "ORDER BY ArchiveName", blog_id)
  36. if cur is None or cur.rowcount == 0:
  37. return []
  38. res = [i[0] for i in cur.fetchall()]
  39. write_blog_archive_to_cache(blog_id, res)
  40. return res
  41. def delete_archive(archive_id: int):
  42. delete_archive_from_cache(archive_id)
  43. delete_all_blog_archive_from_cache()
  44. cur = db.delete("DELETE FROM blog_archive WHERE ArchiveID=%s", archive_id)
  45. if cur is None:
  46. return False
  47. cur = db.delete("DELETE FROM archive WHERE ID=%s", archive_id)
  48. if cur is None or cur.rowcount == 0:
  49. return False
  50. return True
  51. def add_blog_to_archive(blog_id: int, archive_id: int):
  52. delete_blog_archive_from_cache(blog_id)
  53. cur = db.search("SELECT BlogID FROM blog_archive WHERE BlogID=%s AND ArchiveID=%s", blog_id, archive_id)
  54. if cur is None:
  55. return False
  56. if cur.rowcount > 0:
  57. return True
  58. cur = db.insert("INSERT INTO blog_archive(BlogID, ArchiveID) VALUES (%s, %s)", blog_id, archive_id)
  59. if cur is None or cur.rowcount != 1:
  60. return False
  61. return True
  62. def sub_blog_from_archive(blog_id: int, archive_id: int):
  63. delete_blog_archive_from_cache(blog_id)
  64. cur = db.delete("DELETE FROM blog_archive WHERE BlogID=%s AND ArchiveID=%s", blog_id, archive_id)
  65. if cur is None:
  66. return False
  67. return True
  68. def get_archive_list(limit: Optional[int] = None, offset: Optional[int] = None):
  69. """ 获取归档列表 """
  70. if limit is not None and offset is not None:
  71. cur = db.search("SELECT ID "
  72. "FROM archive "
  73. "ORDER BY Name "
  74. "LIMIT %s "
  75. "OFFSET %s ", limit, offset)
  76. else:
  77. cur = db.search("SELECT ID "
  78. "FROM archive "
  79. "ORDER BY Name")
  80. if cur is None or cur.rowcount == 0:
  81. return []
  82. return [i[0] for i in cur.fetchall()]