1
0

user.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. from flask_login import UserMixin, AnonymousUserMixin
  2. from werkzeug.security import generate_password_hash, check_password_hash
  3. from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
  4. from typing import Optional
  5. from configure import conf
  6. from sql.user import (read_user,
  7. check_role,
  8. get_user_email,
  9. create_user,
  10. get_role_name,
  11. delete_user,
  12. change_passwd_hash,
  13. create_role,
  14. delete_role,
  15. set_user_role)
  16. import core.blog
  17. import core.comment
  18. import core.msg
  19. class AnonymousUser(AnonymousUserMixin):
  20. def __init__(self):
  21. super(AnonymousUser, self).__init__()
  22. self.role = 4 # 默认角色
  23. self.email = "" # 无邮箱
  24. self.passwd_hash = "" # 无密码
  25. def check_role(self, operate: str):
  26. return check_role(self.role, operate)
  27. @staticmethod
  28. def get_user_id():
  29. return 0
  30. def load_user_by_email(email: str) -> "Optional[User]":
  31. user = read_user(email)
  32. if len(user) == 0:
  33. return None
  34. passwd_hash = user[0]
  35. role = user[1]
  36. user_id = user[2]
  37. return User(email, passwd_hash, role, user_id)
  38. def load_user_by_id(user_id):
  39. email = get_user_email(user_id)
  40. if email is None:
  41. return None
  42. return load_user_by_email(email)
  43. class User(UserMixin):
  44. def __init__(self, email, passwd_hash, role, user_id):
  45. self.email = email
  46. self.passwd_hash = passwd_hash
  47. self.role = role
  48. if role is not None:
  49. self.role_name = get_role_name(role)
  50. else:
  51. self.role_name = None
  52. self.user_id = user_id
  53. def count_info(self):
  54. msg = core.msg.Message.get_msg_count(self)
  55. comment = core.comment.Comment.get_user_comment_count(self)
  56. blog = core.blog.BlogArticle.get_blog_count(None, self)
  57. return msg, comment, blog
  58. @property
  59. def s_email(self):
  60. if len(self.email) <= 4:
  61. return f"{self.email[0]}****"
  62. else:
  63. email = f"{self.email[0]}****{self.email[5:]}"
  64. return email
  65. @property
  66. def comment_count(self):
  67. return 0
  68. @property
  69. def blog_count(self):
  70. return 0
  71. @property
  72. def msg_count(self):
  73. return 0
  74. @property
  75. def is_active(self):
  76. """Flask要求的属性, 表示用户是否激活(可登录), HGSSystem没有封禁用户系统, 所有用户都是被激活的"""
  77. return True
  78. @property
  79. def is_authenticated(self):
  80. """Flask要求的属性, 表示登录的凭据是否正确, 这里检查是否能 load_user_by_id"""
  81. return True
  82. def get_id(self):
  83. """Flask要求的方法"""
  84. return self.email
  85. def get_user_id(self):
  86. return self.user_id
  87. @staticmethod
  88. def creat_token(email: str, passwd_hash: str):
  89. s = Serializer(conf["secret-key"], expires_in=3600)
  90. return s.dumps({"email": email, "passwd_hash": passwd_hash})
  91. @staticmethod
  92. def load_token(token: str):
  93. s = Serializer(conf["secret-key"], expires_in=3600)
  94. try:
  95. token = s.loads(token)
  96. return token['email'], token['passwd_hash']
  97. except Exception:
  98. return None
  99. @staticmethod
  100. def get_passwd_hash(passwd: str):
  101. return generate_password_hash(passwd)
  102. def check_passwd(self, passwd: str):
  103. return check_password_hash(self.passwd_hash, passwd)
  104. def check_role(self, operate: str):
  105. return check_role(self.role, operate)
  106. def create(self):
  107. return create_user(self.email, self.passwd_hash)
  108. def delete(self):
  109. return delete_user(self.user_id)
  110. def change_passwd(self, passwd):
  111. return change_passwd_hash(self.user_id, self.get_passwd_hash(passwd))
  112. @staticmethod
  113. def create_role(name: str, authority):
  114. return create_role(name, authority)
  115. @staticmethod
  116. def delete_role(name: str):
  117. return delete_role(name)
  118. def set_user_role(self, name: str):
  119. return set_user_role(name, self.user_id)