|
@@ -1,5 +1,9 @@
|
|
from sql import db
|
|
from sql import db
|
|
from sql.base import DBBit
|
|
from sql.base import DBBit
|
|
|
|
+from sql.cache import (read_user_from_cache, write_user_to_cache, delete_user_from_cache,
|
|
|
|
+ get_user_email_from_cache, write_user_email_to_cache, delete_user_email_from_cache,
|
|
|
|
+ get_role_name_from_cache, write_role_name_to_cache, delete_role_name_from_cache,
|
|
|
|
+ get_role_operate_from_cache, write_role_operate_to_cache, delete_role_operate_from_cache)
|
|
import object.user
|
|
import object.user
|
|
|
|
|
|
from typing import List
|
|
from typing import List
|
|
@@ -12,10 +16,17 @@ role_authority = ["WriteBlog", "WriteComment", "WriteMsg", "CreateUser",
|
|
|
|
|
|
def read_user(email: str):
|
|
def read_user(email: str):
|
|
""" 读取用户 """
|
|
""" 读取用户 """
|
|
|
|
+ res = read_user_from_cache(email)
|
|
|
|
+ if res is not None:
|
|
|
|
+ return res
|
|
|
|
+
|
|
cur = db.search("SELECT PasswdHash, Role, ID FROM user WHERE Email=%s", email)
|
|
cur = db.search("SELECT PasswdHash, Role, ID FROM user WHERE Email=%s", email)
|
|
if cur is None or cur.rowcount != 1:
|
|
if cur is None or cur.rowcount != 1:
|
|
return ["", -1, -1]
|
|
return ["", -1, -1]
|
|
- return cur.fetchone()
|
|
|
|
|
|
+
|
|
|
|
+ res = cur.fetchone()
|
|
|
|
+ write_user_to_cache(email, *res)
|
|
|
|
+ return res
|
|
|
|
|
|
|
|
|
|
def create_user(email: str, passwd: str):
|
|
def create_user(email: str, passwd: str):
|
|
@@ -40,6 +51,9 @@ def create_user(email: str, passwd: str):
|
|
|
|
|
|
def delete_user(user_id: int):
|
|
def delete_user(user_id: int):
|
|
""" 删除用户 """
|
|
""" 删除用户 """
|
|
|
|
+ delete_user_from_cache(get_user_email(user_id))
|
|
|
|
+ delete_user_email_from_cache(user_id)
|
|
|
|
+
|
|
cur = db.delete("DELETE FROM message WHERE Auth=%s", user_id)
|
|
cur = db.delete("DELETE FROM message WHERE Auth=%s", user_id)
|
|
if cur is None:
|
|
if cur is None:
|
|
return False
|
|
return False
|
|
@@ -55,6 +69,31 @@ def delete_user(user_id: int):
|
|
return True
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
+def change_passwd_hash(user_email: str, passwd_hash: str):
|
|
|
|
+ delete_user_from_cache(user_email)
|
|
|
|
+ cur = db.update("UPDATE user "
|
|
|
|
+ "SET PasswdHash=%s "
|
|
|
|
+ "WHERE Email=%s", passwd_hash, user_email)
|
|
|
|
+ if cur is None or cur.rowcount == 0:
|
|
|
|
+ return False
|
|
|
|
+ return True
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def get_user_email(user_id):
|
|
|
|
+ """ 获取用户邮箱 """
|
|
|
|
+ res = get_user_email_from_cache(user_id)
|
|
|
|
+ if res is not None:
|
|
|
|
+ return res
|
|
|
|
+
|
|
|
|
+ cur = db.search("SELECT Email FROM user WHERE ID=%s", user_id)
|
|
|
|
+ if cur is None or cur.rowcount == 0:
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+ res = cur.fetchone()[0]
|
|
|
|
+ write_user_email_to_cache(res)
|
|
|
|
+ return res
|
|
|
|
+
|
|
|
|
+
|
|
def __authority_to_sql(authority):
|
|
def __authority_to_sql(authority):
|
|
""" authority 转换为 Update语句, 不检查合法性 """
|
|
""" authority 转换为 Update语句, 不检查合法性 """
|
|
sql = []
|
|
sql = []
|
|
@@ -81,6 +120,9 @@ def create_role(name: str, authority: List[str]):
|
|
|
|
|
|
|
|
|
|
def delete_role(role_id: int):
|
|
def delete_role(role_id: int):
|
|
|
|
+ delete_role_name_from_cache(role_id)
|
|
|
|
+ delete_role_operate_from_cache(role_id)
|
|
|
|
+
|
|
cur = db.delete("DELETE FROM role WHERE RoleID=%s", role_id)
|
|
cur = db.delete("DELETE FROM role WHERE RoleID=%s", role_id)
|
|
if cur is None or cur.rowcount == 0:
|
|
if cur is None or cur.rowcount == 0:
|
|
return False
|
|
return False
|
|
@@ -96,29 +138,19 @@ def set_user_role(role_id: int, user_id: str):
|
|
return True
|
|
return True
|
|
|
|
|
|
|
|
|
|
-def change_passwd_hash(user_id: int, passwd_hash: str):
|
|
|
|
- cur = db.update("UPDATE user "
|
|
|
|
- "SET PasswdHash=%s "
|
|
|
|
- "WHERE ID=%s", passwd_hash, user_id)
|
|
|
|
- if cur is None or cur.rowcount == 0:
|
|
|
|
- return False
|
|
|
|
- return True
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-def get_user_email(user_id):
|
|
|
|
- """ 获取用户邮箱 """
|
|
|
|
- cur = db.search("SELECT Email FROM user WHERE ID=%s", user_id)
|
|
|
|
- if cur is None or cur.rowcount == 0:
|
|
|
|
- return None
|
|
|
|
- return cur.fetchone()[0]
|
|
|
|
-
|
|
|
|
-
|
|
|
|
def get_role_name(role: int):
|
|
def get_role_name(role: int):
|
|
""" 获取用户角色名称 """
|
|
""" 获取用户角色名称 """
|
|
|
|
+ res = get_role_name_from_cache(role)
|
|
|
|
+ if res is not None:
|
|
|
|
+ return res
|
|
|
|
+
|
|
cur = db.search("SELECT RoleName FROM role WHERE RoleID=%s", role)
|
|
cur = db.search("SELECT RoleName FROM role WHERE RoleID=%s", role)
|
|
if cur is None or cur.rowcount == 0:
|
|
if cur is None or cur.rowcount == 0:
|
|
return None
|
|
return None
|
|
- return cur.fetchone()[0]
|
|
|
|
|
|
+
|
|
|
|
+ res = cur.fetchone()[0]
|
|
|
|
+ write_role_name_to_cache(role, res)
|
|
|
|
+ return res
|
|
|
|
|
|
|
|
|
|
def __check_operate(operate):
|
|
def __check_operate(operate):
|
|
@@ -129,30 +161,18 @@ def check_role(role: int, operate: str):
|
|
""" 检查角色权限(通过角色ID) """
|
|
""" 检查角色权限(通过角色ID) """
|
|
if not __check_operate(operate): # 检查, 防止SQL注入
|
|
if not __check_operate(operate): # 检查, 防止SQL注入
|
|
return False
|
|
return False
|
|
- cur = db.search(f"SELECT {operate} FROM role WHERE RoleID=%s", role)
|
|
|
|
- if cur is None or cur.rowcount == 0:
|
|
|
|
- return False
|
|
|
|
- return cur.fetchone()[0] == DBBit.BIT_1
|
|
|
|
|
|
|
|
|
|
+ res = get_role_operate_from_cache(role, operate)
|
|
|
|
+ if res is not None:
|
|
|
|
+ return res
|
|
|
|
|
|
-def check_role_by_name(role: str, operate: str):
|
|
|
|
- """ 检查角色权限(通过角色名) """
|
|
|
|
- if not __check_operate(operate): # 检查, 防止SQL注入
|
|
|
|
- return False
|
|
|
|
- role = role.replace("'", "''")
|
|
|
|
- cur = db.search(f"SELECT {operate} FROM role WHERE RoleName=%s", role)
|
|
|
|
|
|
+ cur = db.search(f"SELECT {operate} FROM role WHERE RoleID=%s", role)
|
|
if cur is None or cur.rowcount == 0:
|
|
if cur is None or cur.rowcount == 0:
|
|
return False
|
|
return False
|
|
- return cur.fetchone()[0] == DBBit.BIT_1
|
|
|
|
-
|
|
|
|
|
|
|
|
-def get_role_id_by_name(role: str):
|
|
|
|
- """ 检查角色权限(通过角色名) """
|
|
|
|
- role = role.replace("'", "''")
|
|
|
|
- cur = db.search("SELECT RoleID FROM role WHERE RoleName=%s", role)
|
|
|
|
- if cur is None or cur.rowcount == 0:
|
|
|
|
- return None
|
|
|
|
- return cur.fetchone()[0]
|
|
|
|
|
|
+ res = cur.fetchone()[0] == DBBit.BIT_1
|
|
|
|
+ write_role_operate_to_cache(role, operate, res)
|
|
|
|
+ return res
|
|
|
|
|
|
|
|
|
|
def get_role_list():
|
|
def get_role_list():
|