1
0

user.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. from sql import db
  2. from sql.base import DBBit
  3. from sql.cache import (get_user_from_cache, write_user_to_cache, delete_user_from_cache,
  4. get_user_email_from_cache, write_user_email_to_cache, delete_user_email_from_cache,
  5. get_role_name_from_cache, write_role_name_to_cache, delete_role_name_from_cache,
  6. get_role_operate_from_cache, write_role_operate_to_cache, delete_role_operate_from_cache)
  7. import object.user
  8. from typing import List
  9. role_authority = ["WriteBlog", "WriteComment", "WriteMsg", "CreateUser",
  10. "ReadBlog", "ReadComment", "ReadMsg", "ReadSecretMsg", "ReadUserInfo",
  11. "DeleteBlog", "DeleteComment", "DeleteMsg", "DeleteUser",
  12. "ConfigureSystem", "ReadSystem"]
  13. def read_user(email: str):
  14. """ 读取用户 """
  15. res = get_user_from_cache(email)
  16. if res is not None:
  17. return res
  18. cur = db.search("SELECT PasswdHash, Role, ID FROM user WHERE Email=%s", email)
  19. if cur is None or cur.rowcount != 1:
  20. return ["", -1, -1]
  21. res = cur.fetchone()
  22. write_user_to_cache(email, *res)
  23. return res
  24. def create_user(email: str, passwd: str):
  25. """ 创建用户 """
  26. if len(email) == 0:
  27. return None
  28. cur = db.search("SELECT COUNT(*) FROM user")
  29. passwd = object.user.User.get_passwd_hash(passwd)
  30. if cur is None or cur.rowcount == 0 or cur.fetchone()[0] == 0:
  31. # 创建为管理员用户
  32. cur = db.insert("INSERT INTO user(Email, PasswdHash, Role) "
  33. "VALUES (%s, %s, %s)", email, passwd, 1)
  34. else:
  35. cur = db.insert("INSERT INTO user(Email, PasswdHash) "
  36. "VALUES (%s, %s)", email, passwd)
  37. if cur is None or cur.rowcount != 1:
  38. return None
  39. return cur.lastrowid
  40. def delete_user(user_id: int):
  41. """ 删除用户 """
  42. delete_user_from_cache(get_user_email(user_id))
  43. delete_user_email_from_cache(user_id)
  44. cur = db.delete("DELETE FROM message WHERE Auth=%s", user_id)
  45. if cur is None:
  46. return False
  47. cur = db.delete("DELETE FROM comment WHERE Auth=%s", user_id)
  48. if cur is None:
  49. return False
  50. cur = db.delete("DELETE FROM blog WHERE Auth=%s", user_id)
  51. if cur is None:
  52. return False
  53. cur = db.delete("DELETE FROM user WHERE ID=%s", user_id)
  54. if cur is None or cur.rowcount == 0:
  55. return False
  56. return True
  57. def change_passwd_hash(user_email: str, passwd_hash: str):
  58. delete_user_from_cache(user_email)
  59. cur = db.update("UPDATE user "
  60. "SET PasswdHash=%s "
  61. "WHERE Email=%s", passwd_hash, user_email)
  62. if cur is None or cur.rowcount == 0:
  63. return False
  64. return True
  65. def get_user_email(user_id):
  66. """ 获取用户邮箱 """
  67. res = get_user_email_from_cache(user_id)
  68. if res is not None:
  69. return res
  70. cur = db.search("SELECT Email FROM user WHERE ID=%s", user_id)
  71. if cur is None or cur.rowcount == 0:
  72. return None
  73. res = cur.fetchone()[0]
  74. write_user_email_to_cache(user_id, res)
  75. return res
  76. def __authority_to_sql(authority):
  77. """ authority 转换为 Update语句, 不检查合法性 """
  78. sql = []
  79. args = []
  80. for i in authority:
  81. sql.append(f"{i}=%s")
  82. args.append(authority[i])
  83. return ",".join(sql), args
  84. def create_role(name: str, authority: List[str]):
  85. cur = db.insert("INSERT INTO role(RoleName) VALUES (%s)", name)
  86. if cur is None or cur.rowcount == 0:
  87. return False
  88. sql, args = __authority_to_sql({i: (1 if i in authority else 0) for i in role_authority})
  89. cur = db.update(f"UPDATE role "
  90. f"SET {sql} "
  91. f"WHERE RoleName=%s", *args, name)
  92. if cur is None or cur.rowcount == 0:
  93. return False
  94. return True
  95. def delete_role(role_id: int):
  96. delete_role_name_from_cache(role_id)
  97. delete_role_operate_from_cache(role_id)
  98. cur = db.delete("DELETE FROM role WHERE RoleID=%s", role_id)
  99. if cur is None or cur.rowcount == 0:
  100. return False
  101. return True
  102. def set_user_role(role_id: int, user_id: str):
  103. cur = db.update("UPDATE user "
  104. "SET Role=%s "
  105. "WHERE ID=%s", role_id, user_id)
  106. if cur is None or cur.rowcount == 0:
  107. return False
  108. return True
  109. def get_role_name(role: int):
  110. """ 获取用户角色名称 """
  111. res = get_role_name_from_cache(role)
  112. if res is not None:
  113. return res
  114. cur = db.search("SELECT RoleName FROM role WHERE RoleID=%s", role)
  115. if cur is None or cur.rowcount == 0:
  116. return None
  117. res = cur.fetchone()[0]
  118. write_role_name_to_cache(role, res)
  119. return res
  120. def __check_operate(operate):
  121. return operate in role_authority
  122. def check_role(role: int, operate: str):
  123. """ 检查角色权限(通过角色ID) """
  124. if not __check_operate(operate): # 检查, 防止SQL注入
  125. return False
  126. res = get_role_operate_from_cache(role, operate)
  127. if res is not None:
  128. return res
  129. cur = db.search(f"SELECT {operate} FROM role WHERE RoleID=%s", role)
  130. if cur is None or cur.rowcount == 0:
  131. return False
  132. res = cur.fetchone()[0] == DBBit.BIT_1
  133. write_role_operate_to_cache(role, operate, res)
  134. return res
  135. def get_role_list():
  136. """ 获取归档列表 """
  137. cur = db.search("SELECT RoleID, RoleName FROM role")
  138. if cur is None or cur.rowcount == 0:
  139. return []
  140. return cur.fetchall()