user.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. from flask_login import UserMixin, AnonymousUserMixin
  2. from werkzeug.security import generate_password_hash, check_password_hash
  3. from itsdangerous import URLSafeTimedSerializer as Serializer
  4. from itsdangerous.exc import BadData
  5. from typing import Optional
  6. import sql.user
  7. from configure import conf
  8. from sql.user import (read_user,
  9. check_role,
  10. get_user_email,
  11. create_user,
  12. get_role_name,
  13. delete_user,
  14. change_passwd_hash,
  15. create_role,
  16. delete_role,
  17. set_user_role,
  18. get_role_list)
  19. import object.blog
  20. import object.comment
  21. import object.msg
  22. class AnonymousUser(AnonymousUserMixin):
  23. def __init__(self):
  24. super(AnonymousUser, self).__init__()
  25. self.role = 4 # 默认角色
  26. self.email = "" # 无邮箱
  27. self.passwd_hash = "" # 无密码
  28. def check_role(self, operate: str):
  29. return check_role(self.role, operate)
  30. @staticmethod
  31. def get_user_id():
  32. return 0
  33. def load_user_by_email(email: str) -> "Optional[User]":
  34. user = read_user(email)
  35. if len(user) == 0:
  36. return None
  37. passwd_hash = user[0]
  38. role = user[1]
  39. user_id = user[2]
  40. return User(email, passwd_hash, role, user_id)
  41. def load_user_by_id(user_id):
  42. email = get_user_email(user_id)
  43. if email is None:
  44. return None
  45. return load_user_by_email(email)
  46. class User(UserMixin):
  47. RoleAuthorize = sql.user.role_authority
  48. def __init__(self, email, passwd_hash, role, user_id):
  49. self.email = email
  50. self.passwd_hash = passwd_hash
  51. self.role = role
  52. if role is not None:
  53. self.role_name = get_role_name(role)
  54. else:
  55. self.role_name = None
  56. self.user_id = user_id
  57. def count_info(self):
  58. msg = object.msg.Message.get_msg_count(self)
  59. comment = object.comment.Comment.get_user_comment_count(self)
  60. blog = object.blog.BlogArticle.get_blog_count(None, self)
  61. return msg, comment, blog
  62. @property
  63. def s_email(self):
  64. if len(self.email) <= 4:
  65. return f"{self.email[0]}****"
  66. else:
  67. email = f"{self.email[0]}****{self.email[5:]}"
  68. return email
  69. @property
  70. def comment_count(self):
  71. return 0
  72. @property
  73. def blog_count(self):
  74. return 0
  75. @property
  76. def msg_count(self):
  77. return 0
  78. @property
  79. def is_active(self):
  80. """Flask要求的属性, 表示用户是否激活(可登录), HGSSystem没有封禁用户系统, 所有用户都是被激活的"""
  81. return True
  82. @property
  83. def is_authenticated(self):
  84. """Flask要求的属性, 表示登录的凭据是否正确, 这里检查是否能 load_user_by_id"""
  85. return True
  86. def get_id(self):
  87. """Flask要求的方法"""
  88. return self.email
  89. def get_user_id(self):
  90. return self.user_id
  91. @staticmethod
  92. def creat_token(email: str, passwd_hash: str):
  93. s = Serializer(conf["SECRET_KEY"])
  94. return s.dumps({"email": email, "passwd_hash": passwd_hash})
  95. @staticmethod
  96. def load_token(token: str):
  97. s = Serializer(conf["SECRET_KEY"])
  98. try:
  99. token = s.loads(token, max_age=3600)
  100. return token['email'], token['passwd_hash']
  101. except BadData:
  102. return None
  103. @staticmethod
  104. def get_passwd_hash(passwd: str):
  105. return generate_password_hash(passwd)
  106. def check_passwd(self, passwd: str):
  107. return check_password_hash(self.passwd_hash, passwd)
  108. def check_role(self, operate: str):
  109. return check_role(self.role, operate)
  110. def create(self):
  111. return create_user(self.email, self.passwd_hash)
  112. def delete(self):
  113. return delete_user(self.user_id)
  114. def change_passwd(self, passwd):
  115. return change_passwd_hash(self.user_id, self.get_passwd_hash(passwd))
  116. @staticmethod
  117. def create_role(name: str, authority):
  118. return create_role(name, authority)
  119. @staticmethod
  120. def delete_role(role_id: int):
  121. return delete_role(role_id)
  122. def set_user_role(self, role_id: int):
  123. return set_user_role(role_id, self.user_id)
  124. @staticmethod
  125. def get_role_list():
  126. return get_role_list()