user.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. from flask_login import UserMixin, AnonymousUserMixin
  2. from werkzeug.security import generate_password_hash, check_password_hash
  3. from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
  4. from configure import conf
  5. from sql.user import read_user, check_role, get_user_email, add_user, get_role_name
  6. import core.blog
  7. import core.comment
  8. import core.msg
  9. class LoaderUserError(Exception):
  10. pass
  11. class AnonymousUser(AnonymousUserMixin):
  12. def __init__(self):
  13. super(AnonymousUser, self).__init__()
  14. self.role = 3 # 默认角色
  15. self.email = "" # 无邮箱
  16. self.passwd_hash = "" # 无密码
  17. def check_role(self, operate: str):
  18. return check_role(self.role, operate)
  19. @staticmethod
  20. def get_user_id():
  21. return 0
  22. def load_user_by_email(email: str) -> "User":
  23. user = read_user(email)
  24. if len(user) == 0:
  25. raise LoaderUserError
  26. passwd_hash = user[0]
  27. role = user[1]
  28. user_id = user[2]
  29. return User(email, passwd_hash, role, user_id)
  30. class User(UserMixin):
  31. def __init__(self, email, passwd_hash, role, user_id):
  32. self.email = email
  33. self.passwd_hash = passwd_hash
  34. self.role = role
  35. if role is not None:
  36. self.role_name = get_role_name(role)
  37. else:
  38. self.role_name = None
  39. self.id = user_id
  40. def count_info(self):
  41. msg = core.msg.Message.get_msg_count(self)
  42. comment = core.comment.Comment.get_user_comment_count(self)
  43. blog = core.blog.BlogArticle.get_blog_count(None, self)
  44. return msg, comment, blog
  45. @property
  46. def s_email(self):
  47. if len(self.email) <= 4:
  48. return f"{self.email[0]}****"
  49. else:
  50. email = f"{self.email[0]}****{self.email[5:]}"
  51. return email
  52. @staticmethod
  53. def load_user_by_id(user_id):
  54. email = get_user_email(user_id)
  55. if email is None:
  56. raise LoaderUserError
  57. return load_user_by_email(email)
  58. @property
  59. def comment_count(self):
  60. return 0
  61. @property
  62. def blog_count(self):
  63. return 0
  64. @property
  65. def msg_count(self):
  66. return 0
  67. @property
  68. def is_active(self):
  69. """Flask要求的属性, 表示用户是否激活(可登录), HGSSystem没有封禁用户系统, 所有用户都是被激活的"""
  70. return True
  71. @property
  72. def is_authenticated(self):
  73. """Flask要求的属性, 表示登录的凭据是否正确, 这里检查是否能 load_user_by_id"""
  74. return True
  75. def get_id(self):
  76. """Flask要求的方法"""
  77. return self.email
  78. def get_user_id(self):
  79. return self.id
  80. @staticmethod
  81. def creat_token(email: str, passwd_hash: str):
  82. s = Serializer(conf["secret-key"], expires_in=3600)
  83. return s.dumps({"email": email, "passwd_hash": passwd_hash})
  84. @staticmethod
  85. def load_token(token: str):
  86. s = Serializer(conf["secret-key"], expires_in=3600)
  87. try:
  88. token = s.loads(token)
  89. return token['email'], token['passwd_hash']
  90. except Exception:
  91. return None
  92. @staticmethod
  93. def get_passwd_hash(passwd: str):
  94. return generate_password_hash(passwd)
  95. def check_passwd(self, passwd: str):
  96. return check_password_hash(self.passwd_hash, passwd)
  97. def check_role(self, operate: str):
  98. return check_role(self.role, operate)
  99. def create_user(self):
  100. return add_user(self.email, self.passwd_hash)