123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- from sql import db
- from sql.base import DBBit
- import object.user
- from typing import List
- role_authority = ["WriteBlog", "WriteComment", "WriteMsg", "CreateUser",
- "ReadBlog", "ReadComment", "ReadMsg", "ReadSecretMsg", "ReadUserInfo",
- "DeleteBlog", "DeleteComment", "DeleteMsg", "DeleteUser",
- "ConfigureSystem", "ReadSystem"]
- def read_user(email: str):
- """ 读取用户 """
- cur = db.search(columns=["PasswdHash", "Role", "ID"], table="user", where=f"Email='{email}'")
- if cur is None or cur.rowcount == 0:
- return []
- assert cur.rowcount == 1
- return cur.fetchone()
- def create_user(email: str, passwd: str):
- """ 创建用户 """
- cur = db.search(columns=["count(Email)"], table="user") # 统计个数
- passwd = object.user.User.get_passwd_hash(passwd)
- if cur is None or cur.rowcount == 0 or cur.fetchone()[0] == 0:
- db.insert(table='user', columns=['Email', 'PasswdHash', 'Role'], values=f"'{email}', '{passwd}', 1") # 创建为管理员用户
- else:
- db.insert(table='user', columns=['Email', 'PasswdHash'], values=f"'{email}', '{passwd}'")
- def delete_user(user_id: int):
- """ 删除用户 """
- cur = db.delete(table="message", where=f"Auth={user_id}")
- if cur is None:
- return False
- cur = db.delete(table="comment", where=f"Auth={user_id}")
- if cur is None:
- return False
- cur = db.delete(table="blog", where=f"Auth={user_id}")
- if cur is None:
- return False
- cur = db.delete(table="user", where=f"ID={user_id}")
- if cur is None or cur.rowcount == 0:
- return False
- return True
- def create_role(name: str, authority: List[str]):
- cur = db.insert(table="role", columns=["RoleName"], values=f"'{name}'", not_commit=True)
- if cur is None or cur.rowcount == 0:
- return False
- kw = {}
- for i in role_authority:
- kw[i] = '0'
- for i in authority:
- if i in role_authority:
- kw[i] = '1'
- cur = db.update(table='role', kw=kw, where=f"RoleName='{name}'")
- if cur is None or cur.rowcount == 0:
- return False
- return True
- def delete_role(name: str):
- cur = db.delete(table="role", where=f"RoleName='{name}'")
- if cur is None or cur.rowcount == 0:
- return False
- return True
- def set_user_role(name: str, user_id: str):
- role_id = get_role_id_by_name(name)
- if role_id is None:
- return False
- cur = db.update(table="user", kw={"Role": f"{role_id}"}, where=f"ID={user_id}")
- if cur is None or cur.rowcount == 0:
- return False
- return True
- def change_passwd_hash(user_id: int, passwd_hash: str):
- cur = db.update(table='user', kw={'PasswdHash': f"'{passwd_hash}'"}, where=f'ID={user_id}')
- if cur is None or cur.rowcount == 0:
- return False
- return True
- def get_user_email(user_id):
- """ 获取用户邮箱 """
- cur = db.search(columns=["Email"], table="user", where=f"ID='{user_id}'")
- if cur is None or cur.rowcount == 0:
- return None
- return cur.fetchone()[0]
- def get_role_name(role: int):
- """ 获取用户角色名称 """
- cur = db.search(columns=["RoleName"], table="role", where=f"RoleID={role}")
- if cur is None or cur.rowcount == 0:
- return None
- return cur.fetchone()[0]
- def check_role(role: int, operate: str):
- """ 检查角色权限(通过角色ID) """
- cur = db.search(columns=[operate], table="role", where=f"RoleID={role}")
- if cur is None or cur.rowcount == 0:
- return False
- return cur.fetchone()[0] == DBBit.BIT_1
- def check_role_by_name(role: str, operate: str):
- """ 检查角色权限(通过角色名) """
- cur = db.search(columns=[operate], table="role", where=f"RoleName='{role}'")
- if cur is None or cur.rowcount == 0:
- return False
- return cur.fetchone()[0] == DBBit.BIT_1
- def get_role_id_by_name(role: str):
- """ 检查角色权限(通过角色名) """
- cur = db.search(columns=["RoleID"], table="role", where=f"RoleName='{role}'")
- if cur is None or cur.rowcount == 0:
- return None
- return cur.fetchone()[0]
|