user.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. from flask_login import UserMixin, AnonymousUserMixin
  2. from werkzeug.security import generate_password_hash, check_password_hash
  3. from itsdangerous import URLSafeTimedSerializer as Serializer
  4. from itsdangerous.exc import BadData
  5. from collections import namedtuple
  6. from configure import conf
  7. from sql.user import (read_user,
  8. check_role,
  9. create_user,
  10. get_role_name,
  11. delete_user,
  12. change_passwd_hash,
  13. create_role,
  14. delete_role,
  15. set_user_role,
  16. get_role_list,
  17. role_authority)
  18. import object.blog
  19. import object.comment
  20. import object.msg
  21. class AnonymousUser(AnonymousUserMixin):
  22. def __init__(self):
  23. super(AnonymousUser, self).__init__()
  24. self.role = 4 # 默认角色
  25. self.email = "" # 无邮箱
  26. self.passwd_hash = "" # 无密码
  27. def check_role(self, operate: str):
  28. return check_role(self.role, operate)
  29. @property
  30. def id(self):
  31. return 0
  32. class _User(UserMixin):
  33. user_tuple = namedtuple("User", "passwd role id")
  34. @staticmethod
  35. def create(email, passwd_hash):
  36. if create_user(email, passwd_hash) is not None:
  37. return User(email)
  38. return None
  39. @staticmethod
  40. def creat_token(email: str, passwd_hash: str):
  41. s = Serializer(conf["SECRET_KEY"])
  42. return s.dumps({"email": email, "passwd_hash": passwd_hash})
  43. @staticmethod
  44. def load_token(token: str):
  45. s = Serializer(conf["SECRET_KEY"])
  46. try:
  47. token = s.loads(token, max_age=3600)
  48. return token['email'], token['passwd_hash']
  49. except BadData:
  50. return None
  51. @staticmethod
  52. def get_passwd_hash(passwd: str):
  53. return generate_password_hash(passwd)
  54. @staticmethod
  55. def create_role(name: str, authority):
  56. return create_role(name, authority)
  57. @staticmethod
  58. def delete_role(role_id: int):
  59. return delete_role(role_id)
  60. @staticmethod
  61. def get_role_list():
  62. return get_role_list()
  63. class User(_User):
  64. RoleAuthorize = role_authority
  65. def __init__(self, email):
  66. self.email = email
  67. def get_id(self):
  68. """Flask要求的方法"""
  69. return self.email
  70. @property
  71. def is_active(self):
  72. """Flask要求的属性, 表示用户是否激活(可登录), HGSSystem没有封禁用户系统, 所有用户都是被激活的"""
  73. return self.id != -1
  74. @property
  75. def is_authenticated(self):
  76. """Flask要求的属性, 表示登录的凭据是否正确, 这里检查是否能 load_user_by_id"""
  77. return self.is_active
  78. @property
  79. def star_email(self):
  80. if len(self.email) <= 4:
  81. return f"{self.email[0]}****"
  82. else:
  83. email = f"{self.email[0]}****{self.email[5:]}"
  84. return email
  85. @property
  86. def info(self):
  87. return User.user_tuple(*read_user(self.email))
  88. @property
  89. def passwd_hash(self):
  90. return self.info.passwd
  91. @property
  92. def role(self):
  93. return self.info.role
  94. @property
  95. def role_name(self):
  96. return get_role_name(self.info.role)
  97. @property
  98. def id(self):
  99. return self.info.id
  100. @property
  101. def count(self):
  102. msg = object.msg.Message.get_msg_count(self)
  103. comment = object.comment.Comment.get_user_comment_count(self)
  104. blog = object.blog.BlogArticle.get_blog_count(None, self)
  105. return msg, comment, blog
  106. def check_passwd(self, passwd: str):
  107. return check_password_hash(self.passwd_hash, passwd)
  108. def check_role(self, operate: str):
  109. return check_role(self.role, operate)
  110. def delete(self):
  111. return delete_user(self.id)
  112. def change_passwd(self, passwd):
  113. return change_passwd_hash(self.id, self.get_passwd_hash(passwd))
  114. def set_user_role(self, role_id: int):
  115. return set_user_role(role_id, self.id)