db.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. from flask import abort
  2. from flask_sqlalchemy import SQLAlchemy
  3. from flask_login import UserMixin, AnonymousUserMixin
  4. from datetime import datetime
  5. from itsdangerous import URLSafeTimedSerializer as Serializer
  6. from itsdangerous.exc import BadData
  7. from werkzeug.security import generate_password_hash, check_password_hash
  8. from configure import conf
  9. db = SQLAlchemy()
  10. class Follow(db.Model):
  11. time = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
  12. follower_id = db.Column(db.Integer, db.ForeignKey("user.id"), primary_key=True, nullable=True) # 关注者
  13. followed_id = db.Column(db.Integer, db.ForeignKey("user.id"), primary_key=True, nullable=True) # 被关注者
  14. follower = db.relationship("User", primaryjoin="Follow.follower_id==User.id", back_populates="followed")
  15. followed = db.relationship("User", primaryjoin="Follow.followed_id==User.id", back_populates="follower")
  16. class AnonymousUser(AnonymousUserMixin):
  17. @property
  18. def email(self):
  19. return None
  20. @property
  21. def role(self):
  22. anonymous = Role.query.filter_by(name="anonymous").first()
  23. if not anonymous:
  24. return abort(500)
  25. return anonymous
  26. class User(db.Model, UserMixin):
  27. __tablename__ = "user"
  28. id = db.Column(db.Integer, autoincrement=True, primary_key=True, nullable=False)
  29. email = db.Column(db.String(32), nullable=False, unique=True)
  30. passwd_hash = db.Column(db.String(128), nullable=False)
  31. role_id = db.Column(db.Integer, db.ForeignKey("role.id"), default=3)
  32. role = db.relationship("Role", back_populates="user")
  33. comment = db.relationship("Comment", back_populates="auth", lazy="dynamic")
  34. followed = db.relationship("Follow", primaryjoin="Follow.follower_id==User.id", back_populates="follower",
  35. lazy="dynamic") # User 关注的人
  36. follower = db.relationship("Follow", primaryjoin="Follow.followed_id==User.id", back_populates="followed",
  37. lazy="dynamic") # 关注 User 的人
  38. @property
  39. def comment_count(self):
  40. return self.comment.count()
  41. @property
  42. def follower_count(self):
  43. return self.follower.count()
  44. @property
  45. def followed_count(self):
  46. return self.followed.count()
  47. def in_followed(self, user):
  48. user_id = user.id
  49. if Follow.query.filter_by(followed_id=user_id, follower_id=self.id).first():
  50. return True
  51. return False
  52. @staticmethod
  53. def register_creat_token(email: str, passwd_hash: str):
  54. s = Serializer(conf["SECRET_KEY"])
  55. return s.dumps({"email": email, "passwd_hash": passwd_hash})
  56. @staticmethod
  57. def register_load_token(token: str):
  58. s = Serializer(conf["SECRET_KEY"])
  59. try:
  60. token = s.loads(token, max_age=3600)
  61. return token['email'], token['passwd_hash']
  62. except (BadData, KeyError):
  63. return None
  64. def login_creat_token(self, remember_me=False):
  65. s = Serializer(conf["SECRET_KEY"])
  66. return s.dumps({"email": self.email, "remember_me": remember_me})
  67. @staticmethod
  68. def login_load_token(token: str):
  69. s = Serializer(conf["SECRET_KEY"])
  70. try:
  71. token = s.loads(token, max_age=3600)
  72. return token['email'], token['remember_me']
  73. except (BadData, KeyError):
  74. return None
  75. @staticmethod
  76. def get_passwd_hash(passwd: str):
  77. return generate_password_hash(passwd)
  78. def check_passwd(self, passwd: str):
  79. return check_password_hash(self.passwd_hash, passwd)
  80. @property
  81. def passwd(self):
  82. return None
  83. @passwd.setter
  84. def passwd(self, passwd):
  85. self.passwd_hash = self.get_passwd_hash(passwd)
  86. class Role(db.Model):
  87. __tablename__ = "role"
  88. USABLE = 1 # 账号可使用
  89. CHECK_COMMENT = 2
  90. CHECK_ARCHIVE = 4
  91. CHECK_FOLLOW = 8
  92. CREATE_COMMENT = 16
  93. CREATE_ARCHIVE = 32 # 系统权限
  94. FOLLOW = 64
  95. BLOCK_USER = 128 # 系统权限
  96. SYSTEM = 256 # 系统权限
  97. id = db.Column(db.Integer, autoincrement=True, primary_key=True, nullable=False)
  98. name = db.Column(db.String(32), nullable=False, unique=True)
  99. permission = db.Column(db.Integer, nullable=False, default=95) # 非系统权限
  100. user = db.relationship("User", back_populates="role")
  101. def has_permission(self, permission):
  102. return self.permission & permission == permission
  103. def add_permission(self, permission):
  104. if not self.has_permission(permission):
  105. self.permission += permission
  106. def remove_permission(self, permission):
  107. if self.has_permission(permission):
  108. self.permission -= permission
  109. ArchiveComment = db.Table("archive_comment",
  110. db.Column("archive_id", db.Integer, db.ForeignKey("archive.id"),
  111. nullable=False, primary_key=True),
  112. db.Column("comment_id", db.Integer, db.ForeignKey("comment.id"),
  113. nullable=False, primary_key=True))
  114. class Comment(db.Model):
  115. __tablename__ = "comment"
  116. id = db.Column(db.Integer, autoincrement=True, primary_key=True, nullable=False)
  117. title = db.Column(db.String(32), nullable=True) # 允许为空
  118. content = db.Column(db.Text, nullable=False)
  119. create_time = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
  120. update_time = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
  121. auth_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False)
  122. father_id = db.Column(db.Integer, db.ForeignKey("comment.id"))
  123. auth = db.relationship("User", back_populates="comment")
  124. father = db.relationship("Comment", foreign_keys="[Comment.father_id]", remote_side="[Comment.id]",
  125. back_populates="son")
  126. son = db.relationship("Comment", foreign_keys="[Comment.father_id]", remote_side="[Comment.father_id]",
  127. back_populates="father", lazy="dynamic")
  128. archive = db.relationship("Archive", back_populates="comment", secondary="archive_comment")
  129. @property
  130. def son_count(self):
  131. return self.son.count()
  132. class Archive(db.Model):
  133. __tablename__ = "archive"
  134. id = db.Column(db.Integer, autoincrement=True, primary_key=True, nullable=False)
  135. name = db.Column(db.String(32), nullable=False, unique=True)
  136. describe = db.Column(db.String(100), nullable=False)
  137. comment = db.relationship("Comment", back_populates="archive", secondary="archive_comment", lazy="dynamic")
  138. @property
  139. def comment_count(self):
  140. return self.comment.filter(Comment.title != None).filter(Comment.father_id == None).count()
  141. def create_all():
  142. try:
  143. db.create_all()
  144. except Exception:
  145. pass
  146. admin = Role(name="admin", permission=511)
  147. coordinator = Role(name="coordinator", permission=255)
  148. default = Role(name="default")
  149. block = Role(name="block", permission=0)
  150. anonymous = Role(name="anonymous", permission=15)
  151. db.session.add_all([admin, coordinator, default, block, anonymous])
  152. db.session.commit()
  153. def create_faker_user():
  154. from faker import Faker
  155. from sqlalchemy.exc import IntegrityError
  156. fake = Faker("zh_CN")
  157. count_user = 0
  158. while count_user < 100:
  159. user = User(email=fake.email(), passwd_hash=User.get_passwd_hash("passwd"), role_id=3)
  160. db.session.add(user)
  161. try:
  162. db.session.commit()
  163. except IntegrityError:
  164. db.session.rollback()
  165. else:
  166. count_user += 1
  167. def create_faker_comment(auth_max=100):
  168. from random import randint, random
  169. from faker import Faker
  170. from sqlalchemy.exc import IntegrityError
  171. fake = Faker("zh_CN")
  172. count_comment = 0
  173. while count_comment < 100:
  174. title = None
  175. if random() < 0.5:
  176. title = "加人" + fake.company()
  177. father = None
  178. if count_comment > 10 and random() < 0.5:
  179. father = randint(1, count_comment)
  180. time = fake.past_datetime()
  181. comment = Comment(title=title, content=fake.text(), update_time=time, create_time=time, father_id=father,
  182. auth_id=randint(1, auth_max))
  183. db.session.add(comment)
  184. try:
  185. db.session.commit()
  186. except IntegrityError:
  187. db.session.rollback()
  188. else:
  189. count_comment += 1
  190. def create_faker_archive():
  191. from faker import Faker
  192. from sqlalchemy.exc import IntegrityError
  193. fake = Faker("zh_CN")
  194. count_archive = 0
  195. while count_archive < 20:
  196. company = fake.company()
  197. archive = Archive(name=company, describe=f"加人{company}")
  198. db.session.add(archive)
  199. try:
  200. db.session.commit()
  201. except IntegrityError:
  202. db.session.rollback()
  203. else:
  204. count_archive += 1
  205. def create_fake_archive_comment():
  206. from random import randint
  207. from sqlalchemy.exc import IntegrityError
  208. comment_count = Comment.query.count()
  209. archive_count = Archive.query.count()
  210. count_archive_comment = 0
  211. while count_archive_comment < 20:
  212. comment = Comment.query.offset(randint(0, comment_count)).limit(1).first()
  213. archive = Archive.query.offset(randint(0, archive_count)).limit(1).first()
  214. archive.archive.append(comment)
  215. try:
  216. db.session.commit()
  217. except IntegrityError:
  218. db.session.rollback()
  219. else:
  220. count_archive_comment += 1
  221. def create_fake_follow():
  222. from random import randint
  223. from sqlalchemy.exc import IntegrityError
  224. user_count = User.query.count()
  225. count_archive_comment = 0
  226. while count_archive_comment < 20:
  227. follower_id = randint(0, user_count)
  228. followed_id = randint(0, user_count)
  229. if follower_id == followed_id:
  230. continue
  231. follower = User.query.offset(follower_id).limit(1).first()
  232. followed = User.query.offset(followed_id).limit(1).first()
  233. follow = Follow(followed=followed, follower=follower)
  234. db.session.add(follow)
  235. try:
  236. db.session.commit()
  237. except IntegrityError:
  238. db.session.rollback()
  239. else:
  240. count_archive_comment += 1