user.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. from sql import db, DB
  2. from sql.base import DBBit
  3. from sql.cache import (get_user_from_cache, write_user_to_cache, delete_user_from_cache,
  4. get_user_email_from_cache, write_user_email_to_cache, delete_user_email_from_cache,
  5. get_role_name_from_cache, write_role_name_to_cache, delete_role_name_from_cache,
  6. get_role_operate_from_cache, write_role_operate_to_cache, delete_role_operate_from_cache)
  7. import object.user
  8. from typing import List
  9. role_authority = ["WriteBlog", "WriteComment", "WriteMsg", "CreateUser",
  10. "ReadBlog", "ReadComment", "ReadMsg", "ReadSecretMsg", "ReadUserInfo",
  11. "DeleteBlog", "DeleteComment", "DeleteMsg", "DeleteUser",
  12. "ConfigureSystem", "ReadSystem"]
  13. def read_user(email: str, mysql: DB = db, not_cache=False):
  14. """ 读取用户 """
  15. if not not_cache:
  16. res = get_user_from_cache(email)
  17. if res is not None:
  18. return res
  19. cur = mysql.search("SELECT PasswdHash, Role, ID FROM user WHERE Email=%s", email)
  20. if cur is None or cur.rowcount != 1:
  21. return ["", -1, -1]
  22. res = cur.fetchone()
  23. write_user_to_cache(email, *res)
  24. return res
  25. def create_user(email: str, passwd: str, mysql: DB = db):
  26. """ 创建用户 """
  27. if len(email) == 0:
  28. return None
  29. cur = mysql.search("SELECT COUNT(*) FROM user")
  30. passwd = object.user.User.get_passwd_hash(passwd)
  31. if cur is None or cur.rowcount == 0 or cur.fetchone()[0] == 0:
  32. # 创建为管理员用户
  33. cur = mysql.insert("INSERT INTO user(Email, PasswdHash, Role) "
  34. "VALUES (%s, %s, %s)", email, passwd, 1)
  35. else:
  36. cur = mysql.insert("INSERT INTO user(Email, PasswdHash) "
  37. "VALUES (%s, %s)", email, passwd)
  38. if cur is None or cur.rowcount != 1:
  39. return None
  40. read_user(email, mysql) # 刷新缓存
  41. return cur.lastrowid
  42. def delete_user(user_id: int, mysql: DB = db):
  43. """ 删除用户 """
  44. delete_user_from_cache(get_user_email(user_id))
  45. delete_user_email_from_cache(user_id)
  46. conn = mysql.get_connection()
  47. cur = mysql.delete("DELETE FROM message WHERE Auth=%s", user_id, connection=conn)
  48. if cur is None:
  49. conn.rollback()
  50. conn.close()
  51. return False
  52. cur = mysql.delete("DELETE FROM comment WHERE Auth=%s", user_id, connection=conn)
  53. if cur is None:
  54. conn.rollback()
  55. conn.close()
  56. return False
  57. cur = mysql.delete("DELETE FROM blog WHERE Auth=%s", user_id, connection=conn)
  58. if cur is None:
  59. conn.rollback()
  60. conn.close()
  61. return False
  62. cur = mysql.delete("DELETE FROM user WHERE ID=%s", user_id, connection=conn)
  63. if cur is None or cur.rowcount == 0:
  64. conn.rollback()
  65. conn.close()
  66. return False
  67. conn.commit()
  68. conn.close()
  69. return True
  70. def change_passwd_hash(user_email: str, passwd_hash: str, mysql: DB = db):
  71. delete_user_from_cache(user_email)
  72. cur = mysql.update("UPDATE user "
  73. "SET PasswdHash=%s "
  74. "WHERE Email=%s", passwd_hash, user_email)
  75. read_user(user_email, mysql) # 刷新缓存
  76. if cur is None or cur.rowcount == 0:
  77. return False
  78. return True
  79. def get_user_email(user_id, mysql: DB = db, not_cache=False):
  80. """ 获取用户邮箱 """
  81. if not not_cache:
  82. res = get_user_email_from_cache(user_id)
  83. if res is not None:
  84. return res
  85. cur = mysql.search("SELECT Email FROM user WHERE ID=%s", user_id)
  86. if cur is None or cur.rowcount == 0:
  87. return None
  88. res = cur.fetchone()[0]
  89. write_user_email_to_cache(user_id, res)
  90. return res
  91. def __authority_to_sql(authority):
  92. """ authority 转换为 Update语句, 不检查合法性 """
  93. sql = []
  94. args = []
  95. for i in authority:
  96. sql.append(f"{i}=%s")
  97. args.append(authority[i])
  98. return ",".join(sql), args
  99. def create_role(name: str, authority: List[str], mysql: DB = db):
  100. conn = mysql.get_connection()
  101. cur = mysql.insert("INSERT INTO role(RoleName) VALUES (%s)", name, connection=conn)
  102. if cur is None or cur.rowcount == 0:
  103. conn.rollback()
  104. conn.close()
  105. return False
  106. sql, args = __authority_to_sql({i: (1 if i in authority else 0) for i in role_authority})
  107. cur = mysql.update(f"UPDATE role "
  108. f"SET {sql} "
  109. f"WHERE RoleName=%s", *args, name, connection=conn)
  110. if cur is None or cur.rowcount == 0:
  111. conn.rollback()
  112. conn.close()
  113. return False
  114. conn.commit()
  115. conn.close()
  116. return True
  117. def delete_role(role_id: int, mysql: DB = db):
  118. delete_role_name_from_cache(role_id)
  119. delete_role_operate_from_cache(role_id)
  120. cur = mysql.delete("DELETE FROM role WHERE RoleID=%s", role_id)
  121. if cur is None or cur.rowcount == 0:
  122. return False
  123. return True
  124. def set_user_role(role_id: int, user_id: str, mysql: DB = db):
  125. cur = mysql.update("UPDATE user "
  126. "SET Role=%s "
  127. "WHERE ID=%s", role_id, user_id)
  128. if cur is None or cur.rowcount == 0:
  129. return False
  130. return True
  131. def get_role_name(role: int, mysql: DB = db, not_cache=False):
  132. """ 获取用户角色名称 """
  133. if not not_cache:
  134. res = get_role_name_from_cache(role)
  135. if res is not None:
  136. return res
  137. cur = mysql.search("SELECT RoleName FROM role WHERE RoleID=%s", role)
  138. if cur is None or cur.rowcount == 0:
  139. return None
  140. res = cur.fetchone()[0]
  141. write_role_name_to_cache(role, res)
  142. return res
  143. def __check_operate(operate):
  144. return operate in role_authority
  145. def check_role(role: int, operate: str, mysql: DB = db, not_cache=False):
  146. """ 检查角色权限(通过角色ID) """
  147. if not __check_operate(operate): # 检查, 防止SQL注入
  148. return False
  149. if not not_cache:
  150. res = get_role_operate_from_cache(role, operate)
  151. if res is not None:
  152. return res
  153. cur = mysql.search(f"SELECT {operate} FROM role WHERE RoleID=%s", role)
  154. if cur is None or cur.rowcount == 0:
  155. return False
  156. res = cur.fetchone()[0] == DBBit.BIT_1
  157. write_role_operate_to_cache(role, operate, res)
  158. return res
  159. def get_role_list(mysql: DB = db):
  160. """ 获取归档列表 """
  161. cur = mysql.search("SELECT RoleID, RoleName FROM role")
  162. if cur is None or cur.rowcount == 0:
  163. return []
  164. return cur.fetchall()
  165. def get_role_list_iter(mysql: DB = db):
  166. """ 获取归档列表 """
  167. cur = mysql.search("SELECT RoleID, RoleName FROM role")
  168. if cur is None or cur.rowcount == 0:
  169. return []
  170. return cur
  171. def get_user_list_iter(mysql: DB = db):
  172. """ 获取归档列表 """
  173. cur = mysql.search("SELECT ID FROM user")
  174. if cur is None or cur.rowcount == 0:
  175. return []
  176. return cur