123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- from flask_login import UserMixin, AnonymousUserMixin
- from werkzeug.security import generate_password_hash, check_password_hash
- from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
- from typing import Optional
- from configure import conf
- from sql.user import read_user, check_role, get_user_email, create_user, get_role_name
- import core.blog
- import core.comment
- import core.msg
- class AnonymousUser(AnonymousUserMixin):
- def __init__(self):
- super(AnonymousUser, self).__init__()
- self.role = 3 # 默认角色
- self.email = "" # 无邮箱
- self.passwd_hash = "" # 无密码
- def check_role(self, operate: str):
- return check_role(self.role, operate)
- @staticmethod
- def get_user_id():
- return 0
- def load_user_by_email(email: str) -> "Optional[User]":
- user = read_user(email)
- if len(user) == 0:
- return None
- passwd_hash = user[0]
- role = user[1]
- user_id = user[2]
- return User(email, passwd_hash, role, user_id)
- def load_user_by_id(user_id):
- email = get_user_email(user_id)
- if email is None:
- return None
- return load_user_by_email(email)
- class User(UserMixin):
- def __init__(self, email, passwd_hash, role, user_id):
- self.email = email
- self.passwd_hash = passwd_hash
- self.role = role
- if role is not None:
- self.role_name = get_role_name(role)
- else:
- self.role_name = None
- self.id = user_id
- def count_info(self):
- msg = core.msg.Message.get_msg_count(self)
- comment = core.comment.Comment.get_user_comment_count(self)
- blog = core.blog.BlogArticle.get_blog_count(None, self)
- return msg, comment, blog
- @property
- def s_email(self):
- if len(self.email) <= 4:
- return f"{self.email[0]}****"
- else:
- email = f"{self.email[0]}****{self.email[5:]}"
- return email
- @property
- def comment_count(self):
- return 0
- @property
- def blog_count(self):
- return 0
- @property
- def msg_count(self):
- return 0
- @property
- def is_active(self):
- """Flask要求的属性, 表示用户是否激活(可登录), HGSSystem没有封禁用户系统, 所有用户都是被激活的"""
- return True
- @property
- def is_authenticated(self):
- """Flask要求的属性, 表示登录的凭据是否正确, 这里检查是否能 load_user_by_id"""
- return True
- def get_id(self):
- """Flask要求的方法"""
- return self.email
- def get_user_id(self):
- return self.id
- @staticmethod
- def creat_token(email: str, passwd_hash: str):
- s = Serializer(conf["secret-key"], expires_in=3600)
- return s.dumps({"email": email, "passwd_hash": passwd_hash})
- @staticmethod
- def load_token(token: str):
- s = Serializer(conf["secret-key"], expires_in=3600)
- try:
- token = s.loads(token)
- return token['email'], token['passwd_hash']
- except Exception:
- return None
- @staticmethod
- def get_passwd_hash(passwd: str):
- return generate_password_hash(passwd)
- def check_passwd(self, passwd: str):
- return check_password_hash(self.passwd_hash, passwd)
- def check_role(self, operate: str):
- return check_role(self.role, operate)
- def create_user(self):
- return create_user(self.email, self.passwd_hash)
|