|
@@ -28,9 +28,11 @@ def create_user(email: str, passwd: str):
|
|
|
passwd = object.user.User.get_passwd_hash(passwd)
|
|
|
if cur is None or cur.rowcount == 0 or cur.fetchone()[0] == 0:
|
|
|
# 创建为管理员用户
|
|
|
- cur = db.insert(table='user', columns=['Email', 'PasswdHash', 'Role'], values=f"'{email}', '{passwd}', 1")
|
|
|
+ cur = db.insert("INSERT INTO user(Email, PasswdHash, Role) "
|
|
|
+ "VALUES (%s, %s, %s)", email, passwd, 1)
|
|
|
else:
|
|
|
- cur = db.insert(table='user', columns=['Email', 'PasswdHash'], values=f"'{email}', '{passwd}'")
|
|
|
+ cur = db.insert("INSERT INTO user(Email, PasswdHash) "
|
|
|
+ "VALUES (%s, %s)", email, passwd)
|
|
|
if cur is None or cur.rowcount != 1:
|
|
|
return None
|
|
|
return cur.lastrowid
|
|
@@ -38,56 +40,66 @@ def create_user(email: str, passwd: str):
|
|
|
|
|
|
def delete_user(user_id: int):
|
|
|
""" 删除用户 """
|
|
|
- cur = db.delete(table="message", where=f"Auth={user_id}")
|
|
|
+ cur = db.delete("DELETE FROM message WHERE Auth=%s", user_id)
|
|
|
if cur is None:
|
|
|
return False
|
|
|
- cur = db.delete(table="comment", where=f"Auth={user_id}")
|
|
|
+ cur = db.delete("DELETE FROM comment WHERE Auth=%s", user_id)
|
|
|
if cur is None:
|
|
|
return False
|
|
|
- cur = db.delete(table="blog", where=f"Auth={user_id}")
|
|
|
+ cur = db.delete("DELETE FROM blog WHERE Auth=%s", user_id)
|
|
|
if cur is None:
|
|
|
return False
|
|
|
- cur = db.delete(table="user", where=f"ID={user_id}")
|
|
|
+ cur = db.delete("DELETE FROM user WHERE ID=%s", user_id)
|
|
|
if cur is None or cur.rowcount == 0:
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
|
|
|
+def __authority_to_sql(authority):
|
|
|
+ """ authority 转换为 Update语句, 不检查合法性 """
|
|
|
+ sql = []
|
|
|
+ args = []
|
|
|
+ for i in authority:
|
|
|
+ sql.append(f"{i}=%s")
|
|
|
+ args.append(authority[i])
|
|
|
+ return ",".join(sql), args
|
|
|
+
|
|
|
+
|
|
|
def create_role(name: str, authority: List[str]):
|
|
|
name = name.replace("'", "''")
|
|
|
- cur = db.insert(table="role", columns=["RoleName"], values=f"'{name}'", not_commit=True)
|
|
|
+ cur = db.insert("INSERT INTO role(RoleName) VALUES (%s)", name)
|
|
|
if cur is None or cur.rowcount == 0:
|
|
|
return False
|
|
|
|
|
|
- kw = {}
|
|
|
- for i in role_authority:
|
|
|
- kw[i] = '0'
|
|
|
- for i in authority:
|
|
|
- if i in role_authority:
|
|
|
- kw[i] = '1'
|
|
|
-
|
|
|
- cur = db.update(table='role', kw=kw, where=f"RoleName='{name}'")
|
|
|
+ sql, args = __authority_to_sql({i: (1 if i in authority else 0) for i in role_authority})
|
|
|
+ cur = db.update(f"UPDATE role "
|
|
|
+ f"SET {sql} "
|
|
|
+ f"WHERE RoleName=%s", *args, name)
|
|
|
if cur is None or cur.rowcount == 0:
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
|
|
|
def delete_role(role_id: int):
|
|
|
- cur = db.delete(table="role", where=f"RoleID={role_id}")
|
|
|
+ cur = db.delete("DELETE FROM role WHERE RoleID=%s", role_id)
|
|
|
if cur is None or cur.rowcount == 0:
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
|
|
|
def set_user_role(role_id: int, user_id: str):
|
|
|
- cur = db.update(table="user", kw={"Role": f"{role_id}"}, where=f"ID={user_id}")
|
|
|
+ cur = db.update("UPDATE user "
|
|
|
+ "SET Role=%s "
|
|
|
+ "WHERE ID=%s", role_id, user_id)
|
|
|
if cur is None or cur.rowcount == 0:
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
|
|
|
def change_passwd_hash(user_id: int, passwd_hash: str):
|
|
|
- cur = db.update(table='user', kw={'PasswdHash': f"'{passwd_hash}'"}, where=f'ID={user_id}')
|
|
|
+ cur = db.update("UPDATE user "
|
|
|
+ "SET PasswdHash=%s "
|
|
|
+ "WHERE ID=%s", passwd_hash, user_id)
|
|
|
if cur is None or cur.rowcount == 0:
|
|
|
return False
|
|
|
return True
|
|
@@ -110,21 +122,7 @@ def get_role_name(role: int):
|
|
|
|
|
|
|
|
|
def __check_operate(operate):
|
|
|
- return operate in ["WriteBlog",
|
|
|
- "WriteComment",
|
|
|
- "WriteMsg",
|
|
|
- "CreateUser",
|
|
|
- "ReadBlog",
|
|
|
- "ReadComment",
|
|
|
- "ReadMsg",
|
|
|
- "ReadSecretMsg",
|
|
|
- "ReadUserInfo",
|
|
|
- "DeleteBlog",
|
|
|
- "DeleteComment",
|
|
|
- "DeleteMsg",
|
|
|
- "DeleteUser",
|
|
|
- "ConfigureSystem",
|
|
|
- "ReadSystem"]
|
|
|
+ return operate in role_authority
|
|
|
|
|
|
|
|
|
def check_role(role: int, operate: str):
|