user.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. from sql import db
  2. from sql.base import DBBit
  3. import object.user
  4. from typing import List
  5. role_authority = ["WriteBlog", "WriteComment", "WriteMsg", "CreateUser",
  6. "ReadBlog", "ReadComment", "ReadMsg", "ReadSecretMsg", "ReadUserInfo",
  7. "DeleteBlog", "DeleteComment", "DeleteMsg", "DeleteUser",
  8. "ConfigureSystem", "ReadSystem"]
  9. def read_user(email: str):
  10. """ 读取用户 """
  11. cur = db.search(columns=["PasswdHash", "Role", "ID"], table="user", where=f"Email='{email}'")
  12. if cur is None or cur.rowcount == 0:
  13. return []
  14. assert cur.rowcount == 1
  15. return cur.fetchone()
  16. def create_user(email: str, passwd: str):
  17. """ 创建用户 """
  18. email = email.replace("'", "''")
  19. cur = db.search(columns=["count(Email)"], table="user") # 统计个数
  20. passwd = object.user.User.get_passwd_hash(passwd)
  21. if cur is None or cur.rowcount == 0 or cur.fetchone()[0] == 0:
  22. db.insert(table='user', columns=['Email', 'PasswdHash', 'Role'], values=f"'{email}', '{passwd}', 1") # 创建为管理员用户
  23. else:
  24. db.insert(table='user', columns=['Email', 'PasswdHash'], values=f"'{email}', '{passwd}'")
  25. def delete_user(user_id: int):
  26. """ 删除用户 """
  27. cur = db.delete(table="message", where=f"Auth={user_id}")
  28. if cur is None:
  29. return False
  30. cur = db.delete(table="comment", where=f"Auth={user_id}")
  31. if cur is None:
  32. return False
  33. cur = db.delete(table="blog", where=f"Auth={user_id}")
  34. if cur is None:
  35. return False
  36. cur = db.delete(table="user", where=f"ID={user_id}")
  37. if cur is None or cur.rowcount == 0:
  38. return False
  39. return True
  40. def create_role(name: str, authority: List[str]):
  41. name = name.replace("'", "''")
  42. cur = db.insert(table="role", columns=["RoleName"], values=f"'{name}'", not_commit=True)
  43. if cur is None or cur.rowcount == 0:
  44. return False
  45. kw = {}
  46. for i in role_authority:
  47. kw[i] = '0'
  48. for i in authority:
  49. if i in role_authority:
  50. kw[i] = '1'
  51. cur = db.update(table='role', kw=kw, where=f"RoleName='{name}'")
  52. if cur is None or cur.rowcount == 0:
  53. return False
  54. return True
  55. def delete_role(role_id: int):
  56. cur = db.delete(table="role", where=f"RoleID={role_id}")
  57. if cur is None or cur.rowcount == 0:
  58. return False
  59. return True
  60. def set_user_role(role_id: int, user_id: str):
  61. cur = db.update(table="user", kw={"Role": f"{role_id}"}, where=f"ID={user_id}")
  62. if cur is None or cur.rowcount == 0:
  63. return False
  64. return True
  65. def change_passwd_hash(user_id: int, passwd_hash: str):
  66. cur = db.update(table='user', kw={'PasswdHash': f"'{passwd_hash}'"}, where=f'ID={user_id}')
  67. if cur is None or cur.rowcount == 0:
  68. return False
  69. return True
  70. def get_user_email(user_id):
  71. """ 获取用户邮箱 """
  72. cur = db.search(columns=["Email"], table="user", where=f"ID='{user_id}'")
  73. if cur is None or cur.rowcount == 0:
  74. return None
  75. return cur.fetchone()[0]
  76. def get_role_name(role: int):
  77. """ 获取用户角色名称 """
  78. cur = db.search(columns=["RoleName"], table="role", where=f"RoleID={role}")
  79. if cur is None or cur.rowcount == 0:
  80. return None
  81. return cur.fetchone()[0]
  82. def check_role(role: int, operate: str):
  83. """ 检查角色权限(通过角色ID) """
  84. cur = db.search(columns=[operate], table="role", where=f"RoleID={role}")
  85. if cur is None or cur.rowcount == 0:
  86. return False
  87. return cur.fetchone()[0] == DBBit.BIT_1
  88. def check_role_by_name(role: str, operate: str):
  89. """ 检查角色权限(通过角色名) """
  90. role = role.replace("'", "''")
  91. cur = db.search(columns=[operate], table="role", where=f"RoleName='{role}'")
  92. if cur is None or cur.rowcount == 0:
  93. return False
  94. return cur.fetchone()[0] == DBBit.BIT_1
  95. def get_role_id_by_name(role: str):
  96. """ 检查角色权限(通过角色名) """
  97. role = role.replace("'", "''")
  98. cur = db.search(columns=["RoleID"], table="role", where=f"RoleName='{role}'")
  99. if cur is None or cur.rowcount == 0:
  100. return None
  101. return cur.fetchone()[0]
  102. def get_role_list():
  103. """ 获取归档列表 """
  104. cur = db.search(columns=["RoleID", "RoleName"], table="role")
  105. if cur is None or cur.rowcount == 0:
  106. return []
  107. return cur.fetchall()