user.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. from flask_login import UserMixin, AnonymousUserMixin
  2. from werkzeug.security import generate_password_hash, check_password_hash
  3. from itsdangerous import URLSafeTimedSerializer as Serializer
  4. from itsdangerous.exc import BadData
  5. from typing import Optional
  6. from configure import conf
  7. from sql.user import (read_user,
  8. check_role,
  9. get_user_email,
  10. create_user,
  11. get_role_name,
  12. delete_user,
  13. change_passwd_hash,
  14. create_role,
  15. delete_role,
  16. set_user_role)
  17. import object.blog
  18. import object.comment
  19. import object.msg
  20. class AnonymousUser(AnonymousUserMixin):
  21. def __init__(self):
  22. super(AnonymousUser, self).__init__()
  23. self.role = 4 # 默认角色
  24. self.email = "" # 无邮箱
  25. self.passwd_hash = "" # 无密码
  26. def check_role(self, operate: str):
  27. return check_role(self.role, operate)
  28. @staticmethod
  29. def get_user_id():
  30. return 0
  31. def load_user_by_email(email: str) -> "Optional[User]":
  32. user = read_user(email)
  33. if len(user) == 0:
  34. return None
  35. passwd_hash = user[0]
  36. role = user[1]
  37. user_id = user[2]
  38. return User(email, passwd_hash, role, user_id)
  39. def load_user_by_id(user_id):
  40. email = get_user_email(user_id)
  41. if email is None:
  42. return None
  43. return load_user_by_email(email)
  44. class User(UserMixin):
  45. def __init__(self, email, passwd_hash, role, user_id):
  46. self.email = email
  47. self.passwd_hash = passwd_hash
  48. self.role = role
  49. if role is not None:
  50. self.role_name = get_role_name(role)
  51. else:
  52. self.role_name = None
  53. self.user_id = user_id
  54. def count_info(self):
  55. msg = object.msg.Message.get_msg_count(self)
  56. comment = object.comment.Comment.get_user_comment_count(self)
  57. blog = object.blog.BlogArticle.get_blog_count(None, self)
  58. return msg, comment, blog
  59. @property
  60. def s_email(self):
  61. if len(self.email) <= 4:
  62. return f"{self.email[0]}****"
  63. else:
  64. email = f"{self.email[0]}****{self.email[5:]}"
  65. return email
  66. @property
  67. def comment_count(self):
  68. return 0
  69. @property
  70. def blog_count(self):
  71. return 0
  72. @property
  73. def msg_count(self):
  74. return 0
  75. @property
  76. def is_active(self):
  77. """Flask要求的属性, 表示用户是否激活(可登录), HGSSystem没有封禁用户系统, 所有用户都是被激活的"""
  78. return True
  79. @property
  80. def is_authenticated(self):
  81. """Flask要求的属性, 表示登录的凭据是否正确, 这里检查是否能 load_user_by_id"""
  82. return True
  83. def get_id(self):
  84. """Flask要求的方法"""
  85. return self.email
  86. def get_user_id(self):
  87. return self.user_id
  88. @staticmethod
  89. def creat_token(email: str, passwd_hash: str):
  90. s = Serializer(conf["secret-key"])
  91. return s.dumps({"email": email, "passwd_hash": passwd_hash})
  92. @staticmethod
  93. def load_token(token: str):
  94. s = Serializer(conf["secret-key"])
  95. try:
  96. token = s.loads(token, max_age=3600)
  97. return token['email'], token['passwd_hash']
  98. except BadData:
  99. return None
  100. @staticmethod
  101. def get_passwd_hash(passwd: str):
  102. return generate_password_hash(passwd)
  103. def check_passwd(self, passwd: str):
  104. return check_password_hash(self.passwd_hash, passwd)
  105. def check_role(self, operate: str):
  106. return check_role(self.role, operate)
  107. def create(self):
  108. return create_user(self.email, self.passwd_hash)
  109. def delete(self):
  110. return delete_user(self.user_id)
  111. def change_passwd(self, passwd):
  112. return change_passwd_hash(self.user_id, self.get_passwd_hash(passwd))
  113. @staticmethod
  114. def create_role(name: str, authority):
  115. return create_role(name, authority)
  116. @staticmethod
  117. def delete_role(name: str):
  118. return delete_role(name)
  119. def set_user_role(self, name: str):
  120. return set_user_role(name, self.user_id)