user.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. from flask_login import UserMixin, AnonymousUserMixin
  2. from werkzeug.security import generate_password_hash, check_password_hash
  3. from itsdangerous import URLSafeTimedSerializer as Serializer
  4. from itsdangerous.exc import BadData
  5. from collections import namedtuple
  6. from configure import conf
  7. from sql.user import (read_user,
  8. check_role,
  9. create_user,
  10. get_role_name,
  11. delete_user,
  12. change_passwd_hash,
  13. create_role,
  14. delete_role,
  15. set_user_role,
  16. get_role_list,
  17. role_authority,
  18. get_user_email)
  19. import object.blog
  20. import object.comment
  21. import object.msg
  22. class AnonymousUser(AnonymousUserMixin):
  23. def __init__(self):
  24. super(AnonymousUser, self).__init__()
  25. self.role = 4 # 默认角色
  26. self.email = "" # 无邮箱
  27. self.passwd_hash = "" # 无密码
  28. def check_role(self, operate: str):
  29. return check_role(self.role, operate)
  30. @property
  31. def id(self):
  32. return 0
  33. class _User(UserMixin):
  34. user_tuple = namedtuple("User", "passwd role id")
  35. @staticmethod
  36. def create(email, passwd_hash):
  37. if create_user(email, passwd_hash) is not None:
  38. return User(email)
  39. return None
  40. @staticmethod
  41. def creat_token(email: str, passwd_hash: str):
  42. s = Serializer(conf["SECRET_KEY"])
  43. return s.dumps({"email": email, "passwd_hash": passwd_hash})
  44. @staticmethod
  45. def load_token(token: str):
  46. s = Serializer(conf["SECRET_KEY"])
  47. try:
  48. token = s.loads(token, max_age=3600)
  49. return token['email'], token['passwd_hash']
  50. except BadData:
  51. return None
  52. @staticmethod
  53. def get_passwd_hash(passwd: str):
  54. return generate_password_hash(passwd)
  55. @staticmethod
  56. def create_role(name: str, authority):
  57. return create_role(name, authority)
  58. @staticmethod
  59. def delete_role(role_id: int):
  60. return delete_role(role_id)
  61. @staticmethod
  62. def get_role_list():
  63. return get_role_list()
  64. class User(_User):
  65. RoleAuthorize = role_authority
  66. def __init__(self, email, is_id=False):
  67. if is_id:
  68. self.email = get_user_email(email)
  69. else:
  70. self.email = email
  71. def get_id(self):
  72. """Flask要求的方法"""
  73. return self.email
  74. @property
  75. def is_active(self):
  76. """Flask要求的属性, 表示用户是否激活(可登录), HGSSystem没有封禁用户系统, 所有用户都是被激活的"""
  77. return self.id != -1
  78. @property
  79. def is_authenticated(self):
  80. """Flask要求的属性, 表示登录的凭据是否正确, 这里检查是否能 load_user_by_id"""
  81. return self.is_active
  82. @property
  83. def star_email(self):
  84. if len(self.email) <= 4:
  85. return f"{self.email[0]}****"
  86. else:
  87. email = f"{self.email[0]}****{self.email[5:]}"
  88. return email
  89. @property
  90. def info(self):
  91. return User.user_tuple(*read_user(self.email))
  92. @property
  93. def passwd_hash(self):
  94. return self.info.passwd
  95. @property
  96. def role(self):
  97. return self.info.role
  98. @property
  99. def role_name(self):
  100. return get_role_name(self.info.role)
  101. @property
  102. def id(self):
  103. return self.info.id
  104. @property
  105. def count(self):
  106. msg = object.msg.Message.get_msg_count(self)
  107. comment = object.comment.Comment.get_user_comment_count(self)
  108. blog = object.blog.BlogArticle.get_blog_count(None, self)
  109. return msg, comment, blog
  110. def check_passwd(self, passwd: str):
  111. return check_password_hash(self.passwd_hash, passwd)
  112. def check_role(self, operate: str):
  113. return check_role(self.role, operate)
  114. def delete(self):
  115. return delete_user(self.id)
  116. def change_passwd(self, passwd):
  117. return change_passwd_hash(self.email, self.get_passwd_hash(passwd))
  118. def set_user_role(self, role_id: int):
  119. return set_user_role(role_id, self.id)