1
0

archive.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. from sql import db
  2. from typing import Optional
  3. def create_archive(name: str, describe: str):
  4. """ 创建新归档 """
  5. name = name.replace("'", "''")
  6. describe = describe.replace("'", "''")
  7. cur = db.insert("INSERT INTO archive(Name, DescribeText) "
  8. "VALUES (%s, %s)", name, describe)
  9. if cur is None or cur.rowcount == 0:
  10. return None
  11. return cur.lastrowid
  12. def read_archive(archive_id: int):
  13. """ 获取归档 ID """
  14. cur = db.search("SELECT Name, DescribeText "
  15. "FROM archive "
  16. "WHERE ID=%s", archive_id)
  17. if cur is None or cur.rowcount == 0:
  18. return ["", ""]
  19. return cur.fetchone()
  20. def get_blog_archive(blog_id: int):
  21. """ 获取文章的归档 """
  22. cur = db.search("SELECT ArchiveID FROM blog_archive_with_name "
  23. "WHERE BlogID=%s "
  24. "ORDER BY ArchiveName", blog_id)
  25. if cur is None or cur.rowcount == 0:
  26. return []
  27. return [i[0] for i in cur.fetchall()]
  28. def delete_archive(archive_id: int):
  29. cur = db.delete("DELETE FROM blog_archive WHERE ArchiveID=%s", archive_id)
  30. if cur is None:
  31. return False
  32. cur = db.delete("DELETE FROM archive WHERE ID=%s", archive_id)
  33. if cur is None or cur.rowcount == 0:
  34. return False
  35. return True
  36. def add_blog_to_archive(blog_id: int, archive_id: int):
  37. cur = db.search("SELECT BlogID FROM blog_archive WHERE BlogID=%s AND ArchiveID=%s", blog_id, archive_id)
  38. if cur is None:
  39. return False
  40. if cur.rowcount > 0:
  41. return True
  42. cur = db.insert("INSERT INTO blog_archive(BlogID, ArchiveID) VALUES (%s, %s)", blog_id, archive_id)
  43. if cur is None or cur.rowcount != 1:
  44. return False
  45. return True
  46. def sub_blog_from_archive(blog_id: int, archive_id: int):
  47. cur = db.delete("DELETE FROM blog_archive WHERE BlogID=%s AND ArchiveID=%s", blog_id, archive_id)
  48. if cur is None:
  49. return False
  50. return True
  51. def get_archive_list(limit: Optional[int] = None, offset: Optional[int] = None):
  52. """ 获取归档列表 """
  53. if limit is not None and offset is not None:
  54. cur = db.search("SELECT ID, Name, DescribeText, Count "
  55. "FROM archive_with_count "
  56. "ORDER BY Count DESC , Name "
  57. "LIMIT %s "
  58. "OFFSET %s ", limit, offset)
  59. else:
  60. cur = db.search("SELECT ID, Name, DescribeText, Count "
  61. "FROM archive_with_count "
  62. "ORDER BY Count DESC , Name")
  63. if cur is None or cur.rowcount == 0:
  64. return []
  65. return cur.fetchall()