user.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. from sql import db
  2. from sql.base import DBBit
  3. import object.user
  4. from typing import List
  5. role_authority = ["WriteBlog", "WriteComment", "WriteMsg", "CreateUser",
  6. "ReadBlog", "ReadComment", "ReadMsg", "ReadSecretMsg", "ReadUserInfo",
  7. "DeleteBlog", "DeleteComment", "DeleteMsg", "DeleteUser",
  8. "ConfigureSystem", "ReadSystem"]
  9. def read_user(email: str):
  10. """ 读取用户 """
  11. cur = db.search("SELECT PasswdHash, Role, ID FROM user WHERE Email=%s", email)
  12. if cur is None or cur.rowcount != 1:
  13. return ["", -1, -1]
  14. return cur.fetchone()
  15. def create_user(email: str, passwd: str):
  16. """ 创建用户 """
  17. email = email.replace("'", "''")
  18. if len(email) == 0:
  19. return None
  20. cur = db.search("SELECT COUNT(*) FROM user")
  21. passwd = object.user.User.get_passwd_hash(passwd)
  22. if cur is None or cur.rowcount == 0 or cur.fetchone()[0] == 0:
  23. # 创建为管理员用户
  24. cur = db.insert(table='user', columns=['Email', 'PasswdHash', 'Role'], values=f"'{email}', '{passwd}', 1")
  25. else:
  26. cur = db.insert(table='user', columns=['Email', 'PasswdHash'], values=f"'{email}', '{passwd}'")
  27. if cur is None or cur.rowcount != 1:
  28. return None
  29. return cur.lastrowid
  30. def delete_user(user_id: int):
  31. """ 删除用户 """
  32. cur = db.delete(table="message", where=f"Auth={user_id}")
  33. if cur is None:
  34. return False
  35. cur = db.delete(table="comment", where=f"Auth={user_id}")
  36. if cur is None:
  37. return False
  38. cur = db.delete(table="blog", where=f"Auth={user_id}")
  39. if cur is None:
  40. return False
  41. cur = db.delete(table="user", where=f"ID={user_id}")
  42. if cur is None or cur.rowcount == 0:
  43. return False
  44. return True
  45. def create_role(name: str, authority: List[str]):
  46. name = name.replace("'", "''")
  47. cur = db.insert(table="role", columns=["RoleName"], values=f"'{name}'", not_commit=True)
  48. if cur is None or cur.rowcount == 0:
  49. return False
  50. kw = {}
  51. for i in role_authority:
  52. kw[i] = '0'
  53. for i in authority:
  54. if i in role_authority:
  55. kw[i] = '1'
  56. cur = db.update(table='role', kw=kw, where=f"RoleName='{name}'")
  57. if cur is None or cur.rowcount == 0:
  58. return False
  59. return True
  60. def delete_role(role_id: int):
  61. cur = db.delete(table="role", where=f"RoleID={role_id}")
  62. if cur is None or cur.rowcount == 0:
  63. return False
  64. return True
  65. def set_user_role(role_id: int, user_id: str):
  66. cur = db.update(table="user", kw={"Role": f"{role_id}"}, where=f"ID={user_id}")
  67. if cur is None or cur.rowcount == 0:
  68. return False
  69. return True
  70. def change_passwd_hash(user_id: int, passwd_hash: str):
  71. cur = db.update(table='user', kw={'PasswdHash': f"'{passwd_hash}'"}, where=f'ID={user_id}')
  72. if cur is None or cur.rowcount == 0:
  73. return False
  74. return True
  75. def get_user_email(user_id):
  76. """ 获取用户邮箱 """
  77. cur = db.search("SELECT Email FROM user WHERE ID=%s", user_id)
  78. if cur is None or cur.rowcount == 0:
  79. return None
  80. return cur.fetchone()[0]
  81. def get_role_name(role: int):
  82. """ 获取用户角色名称 """
  83. cur = db.search("SELECT RoleName FROM role WHERE RoleID=%s", role)
  84. if cur is None or cur.rowcount == 0:
  85. return None
  86. return cur.fetchone()[0]
  87. def __check_operate(operate):
  88. return operate in ["WriteBlog",
  89. "WriteComment",
  90. "WriteMsg",
  91. "CreateUser",
  92. "ReadBlog",
  93. "ReadComment",
  94. "ReadMsg",
  95. "ReadSecretMsg",
  96. "ReadUserInfo",
  97. "DeleteBlog",
  98. "DeleteComment",
  99. "DeleteMsg",
  100. "DeleteUser",
  101. "ConfigureSystem",
  102. "ReadSystem"]
  103. def check_role(role: int, operate: str):
  104. """ 检查角色权限(通过角色ID) """
  105. if not __check_operate(operate): # 检查, 防止SQL注入
  106. return False
  107. cur = db.search(f"SELECT {operate} FROM role WHERE RoleID=%s", role)
  108. if cur is None or cur.rowcount == 0:
  109. return False
  110. return cur.fetchone()[0] == DBBit.BIT_1
  111. def check_role_by_name(role: str, operate: str):
  112. """ 检查角色权限(通过角色名) """
  113. if not __check_operate(operate): # 检查, 防止SQL注入
  114. return False
  115. role = role.replace("'", "''")
  116. cur = db.search(f"SELECT {operate} FROM role WHERE RoleName=%s", role)
  117. if cur is None or cur.rowcount == 0:
  118. return False
  119. return cur.fetchone()[0] == DBBit.BIT_1
  120. def get_role_id_by_name(role: str):
  121. """ 检查角色权限(通过角色名) """
  122. role = role.replace("'", "''")
  123. cur = db.search("SELECT RoleID FROM role WHERE RoleName=%s", role)
  124. if cur is None or cur.rowcount == 0:
  125. return None
  126. return cur.fetchone()[0]
  127. def get_role_list():
  128. """ 获取归档列表 """
  129. cur = db.search("SELECT RoleID, RoleName FROM role")
  130. if cur is None or cur.rowcount == 0:
  131. return []
  132. return cur.fetchall()