1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- from sql import db
- from sql.base import DBBit
- import core.user
- def read_user(email: str):
- """ 读取用户 """
- cur = db.search(columns=["PasswdHash", "Role", "ID"], table="user", where=f"Email='{email}'")
- if cur is None or cur.rowcount == 0:
- return []
- assert cur.rowcount == 1
- return cur.fetchone()
- def create_user(email: str, passwd: str):
- """ 创建用户 """
- cur = db.search(columns=["count(Email)"], table="user") # 统计个数
- passwd = core.user.User.get_passwd_hash(passwd)
- if cur is None or cur.rowcount == 0 or cur.fetchone()[0] == 0:
- db.insert(table='user', columns=['Email', 'PasswdHash', 'Role'], values=f"'{email}', '{passwd}', 1") # 创建为管理员用户
- else:
- db.insert(table='user', columns=['Email', 'PasswdHash'], values=f"'{email}', '{passwd}'")
- def delete_user(user_id: int):
- """ 删除用户 """
- cur = db.delete(table="message", where=f"Auth={user_id}")
- if cur is None:
- return False
- cur = db.delete(table="comment", where=f"Auth={user_id}")
- if cur is None:
- return False
- cur = db.delete(table="blog", where=f"Auth={user_id}")
- if cur is None:
- return False
- cur = db.delete(table="user", where=f"ID={user_id}")
- if cur is None or cur.rowcount == 0:
- return False
- return True
- def get_user_email(user_id):
- """ 获取用户邮箱 """
- cur = db.search(columns=["Email"], table="user", where=f"ID='{user_id}'")
- if cur is None or cur.rowcount == 0:
- return None
- return cur.fetchone()[0]
- def get_role_name(role: int):
- """ 获取用户角色名称 """
- cur = db.search(columns=["RoleName"], table="role", where=f"RoleID={role}")
- if cur is None or cur.rowcount == 0:
- return None
- return cur.fetchone()[0]
- def check_role(role: int, operate: str):
- """ 检查角色权限(通过角色ID) """
- cur = db.search(columns=[operate], table="role", where=f"RoleID={role}")
- if cur is None or cur.rowcount == 0:
- return False
- return cur.fetchone()[0] == DBBit.BIT_1
- def check_role_by_name(role: str, operate: str):
- """ 检查角色权限(通过角色名) """
- cur = db.search(columns=[operate], table="role", where=f"RoleName='{role}")
- if cur is None or cur.rowcount == 0:
- return False
- return cur.fetchone()[0] == DBBit.BIT_1
|