user.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. from flask_login import UserMixin, AnonymousUserMixin
  2. from werkzeug.security import generate_password_hash, check_password_hash
  3. from itsdangerous import URLSafeTimedSerializer as Serializer
  4. from itsdangerous.exc import BadData
  5. from configure import conf
  6. from sql.user import (read_user,
  7. check_role,
  8. create_user,
  9. get_role_name,
  10. delete_user,
  11. change_passwd_hash,
  12. create_role,
  13. delete_role,
  14. set_user_role,
  15. get_role_list,
  16. role_authority)
  17. import object.blog
  18. import object.comment
  19. import object.msg
  20. class AnonymousUser(AnonymousUserMixin):
  21. def __init__(self):
  22. super(AnonymousUser, self).__init__()
  23. self.role = 4 # 默认角色
  24. self.email = "" # 无邮箱
  25. self.passwd_hash = "" # 无密码
  26. def check_role(self, operate: str):
  27. return check_role(self.role, operate)
  28. @property
  29. def id(self):
  30. return 0
  31. class _User(UserMixin):
  32. @staticmethod
  33. def create(email, passwd_hash):
  34. if create_user(email, passwd_hash) is not None:
  35. return User(email)
  36. return None
  37. @staticmethod
  38. def creat_token(email: str, passwd_hash: str):
  39. s = Serializer(conf["SECRET_KEY"])
  40. return s.dumps({"email": email, "passwd_hash": passwd_hash})
  41. @staticmethod
  42. def load_token(token: str):
  43. s = Serializer(conf["SECRET_KEY"])
  44. try:
  45. token = s.loads(token, max_age=3600)
  46. return token['email'], token['passwd_hash']
  47. except BadData:
  48. return None
  49. @staticmethod
  50. def get_passwd_hash(passwd: str):
  51. return generate_password_hash(passwd)
  52. @staticmethod
  53. def create_role(name: str, authority):
  54. return create_role(name, authority)
  55. @staticmethod
  56. def delete_role(role_id: int):
  57. return delete_role(role_id)
  58. @staticmethod
  59. def get_role_list():
  60. return get_role_list()
  61. class User(_User):
  62. RoleAuthorize = role_authority
  63. def __init__(self, email):
  64. self.email = email
  65. def get_id(self):
  66. """Flask要求的方法"""
  67. return self.email
  68. @property
  69. def is_active(self):
  70. """Flask要求的属性, 表示用户是否激活(可登录), HGSSystem没有封禁用户系统, 所有用户都是被激活的"""
  71. return self.id != -1
  72. @property
  73. def is_authenticated(self):
  74. """Flask要求的属性, 表示登录的凭据是否正确, 这里检查是否能 load_user_by_id"""
  75. return self.is_active
  76. @property
  77. def star_email(self):
  78. if len(self.email) <= 4:
  79. return f"{self.email[0]}****"
  80. else:
  81. email = f"{self.email[0]}****{self.email[5:]}"
  82. return email
  83. @property
  84. def info(self):
  85. return read_user(self.email)
  86. @property
  87. def passwd_hash(self):
  88. return self.info[0]
  89. @property
  90. def role(self):
  91. return self.info[1]
  92. @property
  93. def role_name(self):
  94. return get_role_name(self.info[1])
  95. @property
  96. def id(self):
  97. return self.info[2]
  98. @property
  99. def count(self):
  100. msg = object.msg.Message.get_msg_count(self)
  101. comment = object.comment.Comment.get_user_comment_count(self)
  102. blog = object.blog.BlogArticle.get_blog_count(None, self)
  103. return msg, comment, blog
  104. def check_passwd(self, passwd: str):
  105. return check_password_hash(self.passwd_hash, passwd)
  106. def check_role(self, operate: str):
  107. return check_role(self.role, operate)
  108. def delete(self):
  109. return delete_user(self.id)
  110. def change_passwd(self, passwd):
  111. return change_passwd_hash(self.id, self.get_passwd_hash(passwd))
  112. def set_user_role(self, role_id: int):
  113. return set_user_role(role_id, self.id)