user.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. from sql import db
  2. from sql.base import DBBit
  3. import core.user
  4. def get_user_email(user_id):
  5. cur = db.search(columns=["Email"], table="user", where=f"ID='{user_id}'")
  6. if cur is None or cur.rowcount == 0:
  7. return None
  8. return cur.fetchone()[0]
  9. def read_user(email: str):
  10. cur = db.search(columns=["PasswdHash", "Role", "ID"], table="user", where=f"Email='{email}'")
  11. if cur is None or cur.rowcount == 0:
  12. return []
  13. assert cur.rowcount == 1
  14. return cur.fetchone()
  15. def add_user(email: str, passwd: str):
  16. cur = db.search(columns=["count(Email)"], table="user") # 统计个数
  17. passwd = core.user.User.get_passwd_hash(passwd)
  18. if cur is None or cur.rowcount == 0 or cur.fetchone()[0] == 0:
  19. db.insert(table='user', columns=['Email', 'PasswdHash', 'Role'], values=f"'{email}', '{passwd}', 1") # 创建为管理员用户
  20. else:
  21. db.insert(table='user', columns=['Email', 'PasswdHash'], values=f"'{email}', '{passwd}'")
  22. def get_role_name(role: int):
  23. cur = db.search(columns=["RoleName"], table="role", where=f"RoleID={role}")
  24. if cur is None or cur.rowcount == 0:
  25. return None
  26. return cur.fetchone()[0]
  27. def check_role(role: int, operate: str):
  28. cur = db.search(columns=[operate], table="role", where=f"RoleID={role}")
  29. if cur is None or cur.rowcount == 0:
  30. return False
  31. return cur.fetchone()[0] == DBBit.BIT_1
  32. def check_role_by_name(role: str, operate: str):
  33. cur = db.search(columns=[operate], table="role", where=f"RoleName='{role}")
  34. if cur is None or cur.rowcount == 0:
  35. return False
  36. return cur.fetchone()[0] == DBBit.BIT_1