user.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. from sql import db
  2. from sql.base import DBBit
  3. import core.user
  4. def read_user(email: str):
  5. """ 读取用户 """
  6. cur = db.search(columns=["PasswdHash", "Role", "ID"], table="user", where=f"Email='{email}'")
  7. if cur is None or cur.rowcount == 0:
  8. return []
  9. assert cur.rowcount == 1
  10. return cur.fetchone()
  11. def create_user(email: str, passwd: str):
  12. """ 创建用户 """
  13. cur = db.search(columns=["count(Email)"], table="user") # 统计个数
  14. passwd = core.user.User.get_passwd_hash(passwd)
  15. if cur is None or cur.rowcount == 0 or cur.fetchone()[0] == 0:
  16. db.insert(table='user', columns=['Email', 'PasswdHash', 'Role'], values=f"'{email}', '{passwd}', 1") # 创建为管理员用户
  17. else:
  18. db.insert(table='user', columns=['Email', 'PasswdHash'], values=f"'{email}', '{passwd}'")
  19. def delete_user(user_id: int):
  20. """ 删除用户 """
  21. cur = db.delete(table="message", where=f"Auth={user_id}")
  22. if cur is None:
  23. return False
  24. cur = db.delete(table="comment", where=f"Auth={user_id}")
  25. if cur is None:
  26. return False
  27. cur = db.delete(table="blog", where=f"Auth={user_id}")
  28. if cur is None:
  29. return False
  30. cur = db.delete(table="user", where=f"ID={user_id}")
  31. if cur is None or cur.rowcount == 0:
  32. return False
  33. return True
  34. def get_user_email(user_id):
  35. """ 获取用户邮箱 """
  36. cur = db.search(columns=["Email"], table="user", where=f"ID='{user_id}'")
  37. if cur is None or cur.rowcount == 0:
  38. return None
  39. return cur.fetchone()[0]
  40. def get_role_name(role: int):
  41. """ 获取用户角色名称 """
  42. cur = db.search(columns=["RoleName"], table="role", where=f"RoleID={role}")
  43. if cur is None or cur.rowcount == 0:
  44. return None
  45. return cur.fetchone()[0]
  46. def check_role(role: int, operate: str):
  47. """ 检查角色权限(通过角色ID) """
  48. cur = db.search(columns=[operate], table="role", where=f"RoleID={role}")
  49. if cur is None or cur.rowcount == 0:
  50. return False
  51. return cur.fetchone()[0] == DBBit.BIT_1
  52. def check_role_by_name(role: str, operate: str):
  53. """ 检查角色权限(通过角色名) """
  54. cur = db.search(columns=[operate], table="role", where=f"RoleName='{role}")
  55. if cur is None or cur.rowcount == 0:
  56. return False
  57. return cur.fetchone()[0] == DBBit.BIT_1