user.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. from flask_login import UserMixin, AnonymousUserMixin
  2. from werkzeug.security import generate_password_hash, check_password_hash
  3. from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
  4. from typing import Optional
  5. from configure import conf
  6. from sql.user import read_user, check_role, get_user_email, create_user, get_role_name, delete_user
  7. import core.blog
  8. import core.comment
  9. import core.msg
  10. class AnonymousUser(AnonymousUserMixin):
  11. def __init__(self):
  12. super(AnonymousUser, self).__init__()
  13. self.role = 3 # 默认角色
  14. self.email = "" # 无邮箱
  15. self.passwd_hash = "" # 无密码
  16. def check_role(self, operate: str):
  17. return check_role(self.role, operate)
  18. @staticmethod
  19. def get_user_id():
  20. return 0
  21. def load_user_by_email(email: str) -> "Optional[User]":
  22. user = read_user(email)
  23. if len(user) == 0:
  24. return None
  25. passwd_hash = user[0]
  26. role = user[1]
  27. user_id = user[2]
  28. return User(email, passwd_hash, role, user_id)
  29. def load_user_by_id(user_id):
  30. email = get_user_email(user_id)
  31. if email is None:
  32. return None
  33. return load_user_by_email(email)
  34. class User(UserMixin):
  35. def __init__(self, email, passwd_hash, role, user_id):
  36. self.email = email
  37. self.passwd_hash = passwd_hash
  38. self.role = role
  39. if role is not None:
  40. self.role_name = get_role_name(role)
  41. else:
  42. self.role_name = None
  43. self.id = user_id
  44. def count_info(self):
  45. msg = core.msg.Message.get_msg_count(self)
  46. comment = core.comment.Comment.get_user_comment_count(self)
  47. blog = core.blog.BlogArticle.get_blog_count(None, self)
  48. return msg, comment, blog
  49. @property
  50. def s_email(self):
  51. if len(self.email) <= 4:
  52. return f"{self.email[0]}****"
  53. else:
  54. email = f"{self.email[0]}****{self.email[5:]}"
  55. return email
  56. @property
  57. def comment_count(self):
  58. return 0
  59. @property
  60. def blog_count(self):
  61. return 0
  62. @property
  63. def msg_count(self):
  64. return 0
  65. @property
  66. def is_active(self):
  67. """Flask要求的属性, 表示用户是否激活(可登录), HGSSystem没有封禁用户系统, 所有用户都是被激活的"""
  68. return True
  69. @property
  70. def is_authenticated(self):
  71. """Flask要求的属性, 表示登录的凭据是否正确, 这里检查是否能 load_user_by_id"""
  72. return True
  73. def get_id(self):
  74. """Flask要求的方法"""
  75. return self.email
  76. def get_user_id(self):
  77. return self.id
  78. @staticmethod
  79. def creat_token(email: str, passwd_hash: str):
  80. s = Serializer(conf["secret-key"], expires_in=3600)
  81. return s.dumps({"email": email, "passwd_hash": passwd_hash})
  82. @staticmethod
  83. def load_token(token: str):
  84. s = Serializer(conf["secret-key"], expires_in=3600)
  85. try:
  86. token = s.loads(token)
  87. return token['email'], token['passwd_hash']
  88. except Exception:
  89. return None
  90. @staticmethod
  91. def get_passwd_hash(passwd: str):
  92. return generate_password_hash(passwd)
  93. def check_passwd(self, passwd: str):
  94. return check_password_hash(self.passwd_hash, passwd)
  95. def check_role(self, operate: str):
  96. return check_role(self.role, operate)
  97. def create(self):
  98. return create_user(self.email, self.passwd_hash)
  99. def delete(self):
  100. return delete_user(self.id)