archive.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. from sql import db
  2. from typing import Optional
  3. def create_archive(name: str, describe: str):
  4. """ 创建新归档 """
  5. name = name.replace("'", "''")
  6. describe = describe.replace("'", "''")
  7. cur = db.insert(table="archive",
  8. columns=["Name", "DescribeText"],
  9. values=f"'{name}', '{describe}'")
  10. if cur is None or cur.rowcount == 0:
  11. return False
  12. return True
  13. def read_archive(blog_id: int):
  14. """ 获取文章的归档 """
  15. cur = db.search(columns=["ArchiveID", "ArchiveName", "DescribeText"], table="blog_archive_with_name",
  16. where=f"BlogID={blog_id}")
  17. if cur is None or cur.rowcount == 0:
  18. return []
  19. return cur.fetchall()
  20. def delete_archive(archive_id: int):
  21. cur = db.delete(table="blog_archive", where=f"ArchiveID={archive_id}")
  22. if cur is None:
  23. return False
  24. cur = db.delete(table="archive", where=f"ID={archive_id}")
  25. if cur is None or cur.rowcount == 0:
  26. return False
  27. return True
  28. def add_blog_to_archive(blog_id: int, archive_id: int):
  29. cur = db.search(columns=["BlogID"], table="blog_archive", where=f"BlogID={blog_id} AND ArchiveID={archive_id}")
  30. if cur is None:
  31. print("H1")
  32. return False
  33. if cur.rowcount > 0:
  34. print("H2")
  35. return True
  36. cur = db.insert(table="blog_archive", columns=["BlogID", "ArchiveID"], values=f"{blog_id}, {archive_id}")
  37. if cur is None or cur.rowcount != 1:
  38. print("H3")
  39. return False
  40. return True
  41. def sub_blog_from_archive(blog_id: int, archive_id: int):
  42. cur = db.delete(table="blog_archive", where=f"BlogID={blog_id} AND ArchiveID={archive_id}")
  43. if cur is None:
  44. return False
  45. return True
  46. def get_archive_list(limit: Optional[int] = None, offset: Optional[int] = None):
  47. """ 获取归档列表 """
  48. cur = db.search(columns=["ID", "Name", "DescribeText", "Count"], table="archive_with_count",
  49. limit=limit,
  50. offset=offset)
  51. if cur is None or cur.rowcount == 0:
  52. return []
  53. return cur.fetchall()
  54. def get_archive_name_by_id(archive_id: int):
  55. """ 获取归档 ID """
  56. cur = db.search(columns=["Name", "DescribeText"], table="archive",
  57. where=f"ID={archive_id}")
  58. if cur is None or cur.rowcount == 0:
  59. return None, None
  60. return cur.fetchone()