auth.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. from flask import Blueprint, render_template, redirect, url_for, request, current_app, flash, abort
  2. from flask_wtf import FlaskForm
  3. from wtforms import (EmailField,
  4. PasswordField,
  5. BooleanField,
  6. SubmitField,
  7. ValidationError)
  8. from wtforms.validators import DataRequired, Length, Regexp, EqualTo
  9. from flask_login import current_user, login_user, logout_user, login_required
  10. from urllib.parse import urljoin
  11. from sqlalchemy.exc import IntegrityError
  12. from .db import db, User, Role, Follow
  13. from .logger import Logger
  14. from .mail import send_msg
  15. auth = Blueprint("auth", __name__)
  16. class AuthField(FlaskForm):
  17. @staticmethod
  18. def email_field(name: str, description: str):
  19. """ 提前定义 email 字段的生成函数,供下文调用 """
  20. return EmailField(name, description=description,
  21. validators=[
  22. DataRequired(f"必须填写{name}"),
  23. Length(1, 32, message=f"{name}长度1-32个字符"),
  24. Regexp(r"^[a-zA-Z0-9_\.\-]+@[a-zA-Z0-9_\-]+(\.[a-zA-Z0-9_\.]+)+$",
  25. message=f"{name}不满足正则表达式")])
  26. @staticmethod
  27. def passwd_field(name: str, description: str):
  28. """ 提前定义 passwd 字段的生成函数,供下文调用 """
  29. return PasswordField(name, description=description,
  30. validators=[
  31. DataRequired(f"必须填写{name}"),
  32. Length(8, 32, message=f"{name}长度为8-32位")])
  33. @staticmethod
  34. def passwd_again_field(name: str, description: str, passwd: str = "passwd"):
  35. """ 提前定义 passwd again 字段的生成函数,供下文调用 """
  36. return PasswordField(f"重复{name}", description=description,
  37. validators=[
  38. DataRequired(message=f"必须再次填写{name}"),
  39. EqualTo(passwd, message=f"两次输入的{name}不相同")])
  40. class EmailPasswd(AuthField):
  41. email = AuthField.email_field("邮箱", "用户邮箱")
  42. passwd = AuthField.passwd_field("密码", "用户密码")
  43. class PasswdLoginForm(EmailPasswd):
  44. remember = BooleanField("记住我")
  45. submit = SubmitField("登录")
  46. class EmailLoginForm(AuthField):
  47. email = AuthField.email_field("邮箱", "用户邮箱")
  48. remember = BooleanField("记住我")
  49. submit = SubmitField("登录")
  50. class RegisterForm(EmailPasswd):
  51. passwd_again = AuthField.passwd_again_field("密码", "用户密码")
  52. submit = SubmitField("注册")
  53. def validate_email(self, field):
  54. """ 检验email是否合法 """
  55. if User.query.filter_by(email=field.data).first():
  56. raise ValidationError("邮箱已被注册")
  57. class ChangePasswdForm(AuthField):
  58. old_passwd = AuthField.passwd_field("旧密码", "用户原密码")
  59. passwd = AuthField.passwd_field("新密码", "用户新密码")
  60. passwd_again = AuthField.passwd_again_field("新密码", "用户新密码")
  61. submit = SubmitField("修改密码")
  62. def validate_passwd(self, field):
  63. """ 检验新旧密码是否相同 """
  64. if field.data == self.old_passwd.data:
  65. raise ValidationError("新旧密码不能相同")
  66. def __load_login_page(passwd_login_form=None, email_login_form=None, register_form=None,
  67. on_passwd_login=True, on_email_login=False, on_register=False):
  68. if not passwd_login_form:
  69. passwd_login_form = PasswdLoginForm()
  70. if not email_login_form:
  71. email_login_form = EmailLoginForm()
  72. if not register_form:
  73. register_form = RegisterForm()
  74. return render_template("auth/login.html",
  75. passwd_login_form=passwd_login_form,
  76. email_login_form=email_login_form,
  77. register_form=register_form,
  78. on_passwd_login=on_passwd_login,
  79. on_email_login=on_email_login,
  80. on_register=on_register)
  81. @auth.route("/")
  82. def auth_page():
  83. if current_user.is_authenticated: # 用户已经成功登陆
  84. return render_template("auth/yours.html")
  85. return __load_login_page()
  86. @auth.route('/login/passwd', methods=["GET", "POST"])
  87. def passwd_login_page():
  88. if current_user.is_authenticated: # 用户已经成功登陆
  89. Logger.print_user_not_allow_opt_log("passwd-login.txt")
  90. return redirect(url_for("auth.auth_page"))
  91. form = PasswdLoginForm()
  92. if form.validate_on_submit():
  93. user = User.query.filter_by(email=form.email.data).first()
  94. if user and user.check_passwd(form.passwd.data) and user.role.has_permission(Role.USABLE):
  95. login_user(user, form.remember.data)
  96. next_page = request.args.get("next")
  97. if next_page is None or not next_page.startswith('/'):
  98. next_page = url_for('base.index_page')
  99. flash("登陆成功")
  100. Logger.print_user_opt_success_log(f"passwd login.txt {form.email.data}")
  101. return redirect(next_page)
  102. flash("账号或密码错误")
  103. Logger.print_user_opt_fail_log(f"passwd login.txt {form.email.data}")
  104. return redirect(url_for("auth.passwd_login_page"))
  105. return __load_login_page(passwd_login_form=form, on_passwd_login=True)
  106. @auth.route('/login/email', methods=["GET", "POST"])
  107. def email_login_page():
  108. if current_user.is_authenticated: # 用户已经成功登陆
  109. Logger.print_user_not_allow_opt_log("email-login.txt")
  110. return redirect(url_for("auth.auth_page"))
  111. form = EmailLoginForm()
  112. if form.validate_on_submit():
  113. user = User.query.filter_by(email=form.email.data).first()
  114. if user and user.role.has_permission(Role.USABLE):
  115. token = user.login_creat_token(form.remember.data)
  116. login_url = urljoin(request.host_url, url_for("auth.email_login_confirm_page", token=token))
  117. send_msg("登录确认", user.email, "login", login_url=login_url)
  118. flash("登录确认邮件已发送至邮箱")
  119. Logger.print_user_opt_success_log(f"email login.txt {form.email.data}")
  120. return redirect(url_for("base.index_page"))
  121. flash("账号不存在")
  122. Logger.print_user_opt_fail_log(f"email login.txt {form.email.data}")
  123. return redirect(url_for("auth.passwd_login_page"))
  124. return __load_login_page(passwd_login_form=form, on_passwd_login=True)
  125. @auth.route('/register', methods=["GET", "POST"])
  126. def register_page():
  127. if current_user.is_authenticated:
  128. Logger.print_user_not_allow_opt_log("register.txt")
  129. return redirect(url_for("auth.auth_page"))
  130. form = RegisterForm()
  131. if form.validate_on_submit():
  132. token = User.register_creat_token(form.email.data, form.passwd.data)
  133. register_url = urljoin(request.host_url, url_for("auth.register_confirm_page", token=token))
  134. send_msg("注册确认", form.email.data, "register", register_url=register_url)
  135. flash("注册提交成功, 请进入邮箱点击确认注册链接")
  136. Logger.print_import_user_opt_success_log(f"register.txt {form.email.data}")
  137. return redirect(url_for("base.index_page"))
  138. return __load_login_page(register_form=form, on_register=True, on_passwd_login=False)
  139. @auth.route('/confirm/register')
  140. def register_confirm_page():
  141. token = request.args.get("token", None)
  142. if token is None:
  143. Logger.print_user_opt_fail_log(f"Confirm (bad token)")
  144. return abort(404)
  145. token = User.register_load_token(token)
  146. if token is None:
  147. Logger.print_user_opt_fail_log(f"Confirm (bad token)")
  148. return abort(404)
  149. if User.query.filter_by(email=token[0]).first():
  150. Logger.print_user_opt_fail_log(f"Confirm (bad token)")
  151. return abort(404)
  152. if User.query.limit(1).first(): # 不是第一个用户
  153. new_user = User(email=token[0], passwd_hash=User.get_passwd_hash(token[1]))
  154. else:
  155. admin = Role.query.filter_by(name="admin").first()
  156. if admin is None:
  157. Logger.print_user_opt_fail_log(f"Role admin not found")
  158. return abort(500)
  159. new_user = User(email=token[0], passwd_hash=User.get_passwd_hash(token[1]), role=admin)
  160. db.session.add(new_user)
  161. db.session.commit()
  162. Logger.print_import_user_opt_success_log(f"confirm {token[0]} success")
  163. flash(f"用户{token[0]}认证完成")
  164. return redirect(url_for("base.index_page"))
  165. @auth.route('/confirm/login')
  166. def email_login_confirm_page():
  167. token = request.args.get("token", None)
  168. if token is None:
  169. Logger.print_user_opt_fail_log(f"Confirm (bad token)")
  170. return abort(404)
  171. token = User.login_load_token(token)
  172. if token is None:
  173. Logger.print_user_opt_fail_log(f"Confirm (bad token)")
  174. return abort(404)
  175. user = User.query.filter_by(email=token[0]).first()
  176. if not user:
  177. Logger.print_user_opt_fail_log(f"Confirm (bad token)")
  178. return abort(404)
  179. login_user(user, token[1])
  180. flash("登陆成功")
  181. Logger.print_user_opt_success_log(f"passwd login.txt {user.email}")
  182. return redirect(url_for("base.index_page"))
  183. @auth.route('/set/passwd', methods=['GET', 'POST'])
  184. @login_required
  185. def change_passwd_page():
  186. form = ChangePasswdForm()
  187. if form.validate_on_submit():
  188. if not current_user.check_passwd(form.old_passwd.data):
  189. Logger.print_user_opt_error_log(f"change passwd")
  190. flash("旧密码错误")
  191. else:
  192. current_user.passwd = form.passwd.data
  193. db.session.commit()
  194. Logger.print_user_opt_success_log(f"change passwd")
  195. flash("密码修改成功")
  196. logout_user()
  197. return redirect(url_for("auth.passwd_login_page"))
  198. return redirect(url_for("auth.change_passwd_page"))
  199. Logger.print_load_page_log("user change passwd")
  200. return render_template("auth/change_passwd.html", form=form)
  201. @auth.route('/logout')
  202. @login_required
  203. def logout_page():
  204. Logger.print_import_user_opt_success_log(f"logout")
  205. logout_user()
  206. flash("退出登录成功")
  207. return redirect(url_for("base.index_page"))
  208. @auth.route("/user")
  209. def user_page():
  210. user_id = request.args.get("user", -1, type=int)
  211. if user_id == -1:
  212. return abort(404)
  213. elif current_user.is_authenticated and current_user.id == user_id:
  214. return redirect(url_for("auth.auth_page"))
  215. user = User.query.filter_by(id=user_id).first()
  216. if not user:
  217. return abort(404)
  218. return render_template("auth/user.html", user=user)
  219. @auth.route("/follower/list")
  220. def follower_page():
  221. if current_user.follower_count == 0:
  222. return render_template("auth/no_follow.html", title="粉丝", msg="你暂时一个粉丝都没有哦。")
  223. page = request.args.get("page", 1, type=int)
  224. pagination = current_user.follower.paginate(page=page, per_page=8, error_out=False)
  225. return render_template("auth/follow.html",
  226. items=[i.follower for i in pagination.items],
  227. pagination=pagination,
  228. title="粉丝")
  229. @auth.route("/followed/list")
  230. def followed_page():
  231. if current_user.followed_count == 0:
  232. return render_template("auth/no_follow.html", title="关注", msg="你暂时未关注任何人。")
  233. page = request.args.get("page", 1, type=int)
  234. pagination = current_user.followed.paginate(page=page, per_page=8, error_out=False)
  235. return render_template("auth/follow.html",
  236. items=[i.followed for i in pagination.items],
  237. pagination=pagination,
  238. title="关注")
  239. @auth.route("/followed/follow")
  240. def set_follow_page():
  241. user_id = request.args.get("user", 1, type=int)
  242. if user_id == current_user.id:
  243. return abort(404)
  244. user = User.query.filter_by(id=user_id).first()
  245. if not user:
  246. return abort(404)
  247. try:
  248. db.session.add(Follow(follower=current_user, followed=user))
  249. db.session.commit()
  250. except IntegrityError:
  251. flash("不能重复关注用户")
  252. else:
  253. flash("关注用户成功")
  254. return redirect(url_for("auth.user_page", user=user_id))
  255. @auth.route("/followed/unfollow")
  256. def set_unfollow_page():
  257. user_id = request.args.get("user", 1, type=int)
  258. if user_id == current_user.id:
  259. return abort(404)
  260. user = User.query.filter_by(id=user_id).first()
  261. if not user:
  262. return abort(404)
  263. Follow.query.filter_by(follower=current_user, followed=user).delete()
  264. flash("取消关注用户成功")
  265. return redirect(url_for("auth.user_page", user=user_id))
  266. @auth.route("/block")
  267. def set_block_page():
  268. user_id = request.args.get("user", 1, type=int)
  269. if user_id == current_user.id:
  270. return abort(404)
  271. user = User.query.filter_by(id=user_id).first()
  272. if not user:
  273. return abort(404)
  274. block = Role.query.filter_by(name="block").first()
  275. if not block:
  276. return abort(500)
  277. user.role = block
  278. db.session.commit()
  279. return redirect(url_for("auth.user_page", user=user_id))