user.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. from sql import db
  2. from sql.base import DBBit
  3. import object.user
  4. from typing import List
  5. role_authority = ["WriteBlog", "WriteComment", "WriteMsg", "CreateUser",
  6. "ReadBlog", "ReadComment", "ReadMsg", "ReadSecretMsg", "ReadUserInfo",
  7. "DeleteBlog", "DeleteComment", "DeleteMsg", "DeleteUser",
  8. "ConfigureSystem", "ReadSystem"]
  9. def read_user(email: str):
  10. """ 读取用户 """
  11. cur = db.search(columns=["PasswdHash", "Role", "ID"], table="user", where=f"Email='{email}'")
  12. if cur is None or cur.rowcount != 1:
  13. return ["", -1, -1]
  14. return cur.fetchone()
  15. def create_user(email: str, passwd: str):
  16. """ 创建用户 """
  17. email = email.replace("'", "''")
  18. if len(email) == 0:
  19. return None
  20. cur = db.search(columns=["count(Email)"], table="user") # 统计个数
  21. passwd = object.user.User.get_passwd_hash(passwd)
  22. if cur is None or cur.rowcount == 0 or cur.fetchone()[0] == 0:
  23. # 创建为管理员用户
  24. cur = db.insert(table='user', columns=['Email', 'PasswdHash', 'Role'], values=f"'{email}', '{passwd}', 1")
  25. else:
  26. cur = db.insert(table='user', columns=['Email', 'PasswdHash'], values=f"'{email}', '{passwd}'")
  27. if cur is None or cur.rowcount != 1:
  28. return None
  29. return cur.lastrowid
  30. def delete_user(user_id: int):
  31. """ 删除用户 """
  32. cur = db.delete(table="message", where=f"Auth={user_id}")
  33. if cur is None:
  34. return False
  35. cur = db.delete(table="comment", where=f"Auth={user_id}")
  36. if cur is None:
  37. return False
  38. cur = db.delete(table="blog", where=f"Auth={user_id}")
  39. if cur is None:
  40. return False
  41. cur = db.delete(table="user", where=f"ID={user_id}")
  42. if cur is None or cur.rowcount == 0:
  43. return False
  44. return True
  45. def create_role(name: str, authority: List[str]):
  46. name = name.replace("'", "''")
  47. cur = db.insert(table="role", columns=["RoleName"], values=f"'{name}'", not_commit=True)
  48. if cur is None or cur.rowcount == 0:
  49. return False
  50. kw = {}
  51. for i in role_authority:
  52. kw[i] = '0'
  53. for i in authority:
  54. if i in role_authority:
  55. kw[i] = '1'
  56. cur = db.update(table='role', kw=kw, where=f"RoleName='{name}'")
  57. if cur is None or cur.rowcount == 0:
  58. return False
  59. return True
  60. def delete_role(role_id: int):
  61. cur = db.delete(table="role", where=f"RoleID={role_id}")
  62. if cur is None or cur.rowcount == 0:
  63. return False
  64. return True
  65. def set_user_role(role_id: int, user_id: str):
  66. cur = db.update(table="user", kw={"Role": f"{role_id}"}, where=f"ID={user_id}")
  67. if cur is None or cur.rowcount == 0:
  68. return False
  69. return True
  70. def change_passwd_hash(user_id: int, passwd_hash: str):
  71. cur = db.update(table='user', kw={'PasswdHash': f"'{passwd_hash}'"}, where=f'ID={user_id}')
  72. if cur is None or cur.rowcount == 0:
  73. return False
  74. return True
  75. def get_user_email(user_id):
  76. """ 获取用户邮箱 """
  77. cur = db.search(columns=["Email"], table="user", where=f"ID='{user_id}'")
  78. if cur is None or cur.rowcount == 0:
  79. return None
  80. return cur.fetchone()[0]
  81. def get_role_name(role: int):
  82. """ 获取用户角色名称 """
  83. cur = db.search(columns=["RoleName"], table="role", where=f"RoleID={role}")
  84. if cur is None or cur.rowcount == 0:
  85. return None
  86. return cur.fetchone()[0]
  87. def check_role(role: int, operate: str):
  88. """ 检查角色权限(通过角色ID) """
  89. cur = db.search(columns=[operate], table="role", where=f"RoleID={role}")
  90. if cur is None or cur.rowcount == 0:
  91. return False
  92. return cur.fetchone()[0] == DBBit.BIT_1
  93. def check_role_by_name(role: str, operate: str):
  94. """ 检查角色权限(通过角色名) """
  95. role = role.replace("'", "''")
  96. cur = db.search(columns=[operate], table="role", where=f"RoleName='{role}'")
  97. if cur is None or cur.rowcount == 0:
  98. return False
  99. return cur.fetchone()[0] == DBBit.BIT_1
  100. def get_role_id_by_name(role: str):
  101. """ 检查角色权限(通过角色名) """
  102. role = role.replace("'", "''")
  103. cur = db.search(columns=["RoleID"], table="role", where=f"RoleName='{role}'")
  104. if cur is None or cur.rowcount == 0:
  105. return None
  106. return cur.fetchone()[0]
  107. def get_role_list():
  108. """ 获取归档列表 """
  109. cur = db.search(columns=["RoleID", "RoleName"], table="role")
  110. if cur is None or cur.rowcount == 0:
  111. return []
  112. return cur.fetchall()