auth.py 10 KB


  1. from flask import Blueprint, render_template, redirect, url_for, request, current_app, flash, abort
  2. from flask_wtf import FlaskForm
  3. from wtforms import (EmailField,
  4. PasswordField,
  5. BooleanField,
  6. SubmitField,
  7. ValidationError)
  8. from wtforms.validators import DataRequired, Length, Regexp, EqualTo
  9. from flask_login import current_user, login_user, logout_user, login_required
  10. from urllib.parse import urljoin
  11. from .db import db, User, Role
  12. from .logger import Logger
  13. from .mail import send_msg
  14. auth = Blueprint("auth", __name__)
  15. class AuthField(FlaskForm):
  16. @staticmethod
  17. def email_field(name: str, description: str):
  18. """ 提前定义 email 字段的生成函数,供下文调用 """
  19. return EmailField(name, description=description,
  20. validators=[
  21. DataRequired(f"必须填写{name}"),
  22. Length(1, 32, message=f"{name}长度1-32个字符"),
  23. Regexp(r"^[a-zA-Z0-9_\.\-]+@[a-zA-Z0-9_\-]+(\.[a-zA-Z0-9_\.]+)+$",
  24. message=f"{name}不满足正则表达式")])
  25. @staticmethod
  26. def passwd_field(name: str, description: str):
  27. """ 提前定义 passwd 字段的生成函数,供下文调用 """
  28. return PasswordField(name, description=description,
  29. validators=[
  30. DataRequired(f"必须填写{name}"),
  31. Length(8, 32, message=f"{name}长度为8-32位")])
  32. @staticmethod
  33. def passwd_again_field(name: str, description: str, passwd: str = "passwd"):
  34. """ 提前定义 passwd again 字段的生成函数,供下文调用 """
  35. return PasswordField(f"重复{name}", description=description,
  36. validators=[
  37. DataRequired(message=f"必须再次填写{name}"),
  38. EqualTo(passwd, message=f"两次输入的{name}不相同")])
  39. class EmailPasswd(AuthField):
  40. email = AuthField.email_field("邮箱", "用户邮箱")
  41. passwd = AuthField.passwd_field("密码", "用户密码")
  42. class PasswdLoginForm(EmailPasswd):
  43. remember = BooleanField("记住我")
  44. submit = SubmitField("登录")
  45. class EmailLoginForm(AuthField):
  46. email = AuthField.email_field("邮箱", "用户邮箱")
  47. remember = BooleanField("记住我")
  48. submit = SubmitField("登录")
  49. class RegisterForm(EmailPasswd):
  50. passwd_again = AuthField.passwd_again_field("密码", "用户密码")
  51. submit = SubmitField("注册")
  52. def validate_email(self, field):
  53. """ 检验email是否合法 """
  54. if User.query.filter_by(email=field.data).first():
  55. raise ValidationError("邮箱已被注册")
  56. class ChangePasswdForm(AuthField):
  57. old_passwd = AuthField.passwd_field("旧密码", "用户原密码")
  58. passwd = AuthField.passwd_field("新密码", "用户新密码")
  59. passwd_again = AuthField.passwd_again_field("新密码", "用户新密码")
  60. submit = SubmitField("修改密码")
  61. def validate_passwd(self, field):
  62. """ 检验新旧密码是否相同 """
  63. if field.data == self.old_passwd.data:
  64. raise ValidationError("新旧密码不能相同")
  65. def __load_login_page(passwd_login_form=None, email_login_form=None, register_form=None,
  66. on_passwd_login=True, on_email_login=False, on_register=False):
  67. if not passwd_login_form:
  68. passwd_login_form = PasswdLoginForm()
  69. if not email_login_form:
  70. email_login_form = EmailLoginForm()
  71. if not register_form:
  72. register_form = RegisterForm()
  73. return render_template("auth/login.html",
  74. passwd_login_form=passwd_login_form,
  75. email_login_form=email_login_form,
  76. register_form=register_form,
  77. on_passwd_login=on_passwd_login,
  78. on_email_login=on_email_login,
  79. on_register=on_register)
  80. @auth.route("/")
  81. def auth_page():
  82. if current_user.is_authenticated: # 用户已经成功登陆
  83. return render_template("auth/yours.html")
  84. return __load_login_page()
  85. @auth.route('/login/passwd', methods=["GET", "POST"])
  86. def passwd_login_page():
  87. if current_user.is_authenticated: # 用户已经成功登陆
  88. Logger.print_user_not_allow_opt_log("passwd-login.txt")
  89. return redirect(url_for("auth.auth_page"))
  90. form = PasswdLoginForm()
  91. if form.validate_on_submit():
  92. user = User.query.filter_by(email=form.email.data).first()
  93. if user and user.check_passwd(form.passwd.data):
  94. login_user(user, form.remember.data)
  95. next_page = request.args.get("next")
  96. if next_page is None or not next_page.startswith('/'):
  97. next_page = url_for('base.index_page')
  98. flash("登陆成功")
  99. Logger.print_user_opt_success_log(f"passwd login.txt {form.email.data}")
  100. return redirect(next_page)
  101. flash("账号或密码错误")
  102. Logger.print_user_opt_fail_log(f"passwd login.txt {form.email.data}")
  103. return redirect(url_for("auth.passwd_login_page"))
  104. return __load_login_page(passwd_login_form=form, on_passwd_login=True)
  105. @auth.route('/login/email', methods=["GET", "POST"])
  106. def email_login_page():
  107. if current_user.is_authenticated: # 用户已经成功登陆
  108. Logger.print_user_not_allow_opt_log("email-login.txt")
  109. return redirect(url_for("auth.auth_page"))
  110. form = EmailLoginForm()
  111. if form.validate_on_submit():
  112. user = User.query.filter_by(email=form.email.data).first()
  113. if user:
  114. token = user.login_creat_token(form.remember.data)
  115. login_url = urljoin(request.host_url, url_for("auth.email_login_confirm_page", token=token))
  116. send_msg("登录确认", user.email, "login", login_url=login_url)
  117. flash("登录确认邮件已发送至邮箱")
  118. Logger.print_user_opt_success_log(f"email login.txt {form.email.data}")
  119. return redirect(url_for("base.index_page"))
  120. flash("账号不存在")
  121. Logger.print_user_opt_fail_log(f"email login.txt {form.email.data}")
  122. return redirect(url_for("auth.passwd_login_page"))
  123. return __load_login_page(passwd_login_form=form, on_passwd_login=True)
  124. @auth.route('/register', methods=["GET", "POST"])
  125. def register_page():
  126. if current_user.is_authenticated:
  127. Logger.print_user_not_allow_opt_log("register.txt")
  128. return redirect(url_for("auth.auth_page"))
  129. form = RegisterForm()
  130. if form.validate_on_submit():
  131. token = User.register_creat_token(form.email.data, form.passwd.data)
  132. register_url = urljoin(request.host_url, url_for("auth.register_confirm_page", token=token))
  133. send_msg("注册确认", form.email.data, "register", register_url=register_url)
  134. flash("注册提交成功, 请进入邮箱点击确认注册链接")
  135. Logger.print_import_user_opt_success_log(f"register.txt {form.email.data}")
  136. return redirect(url_for("base.index_page"))
  137. return __load_login_page(register_form=form, on_register=True, on_passwd_login=False)
  138. @auth.route('/confirm/register')
  139. def register_confirm_page():
  140. token = request.args.get("token", None)
  141. if token is None:
  142. Logger.print_user_opt_fail_log(f"Confirm (bad token)")
  143. return abort(404)
  144. token = User.register_load_token(token)
  145. if token is None:
  146. Logger.print_user_opt_fail_log(f"Confirm (bad token)")
  147. return abort(404)
  148. if User.query.filter_by(email=token[0]).first():
  149. Logger.print_user_opt_fail_log(f"Confirm (bad token)")
  150. return abort(404)
  151. if User.query.limit(1).first(): # 不是第一个用户
  152. new_user = User(email=token[0], passwd_hash=User.get_passwd_hash(token[1]))
  153. else:
  154. admin = Role.query.filter_by(name="admin").first()
  155. if admin is None:
  156. Logger.print_user_opt_fail_log(f"Role admin not found")
  157. return abort(500)
  158. new_user = User(email=token[0], passwd_hash=User.get_passwd_hash(token[1]), role=admin)
  159. db.session.add(new_user)
  160. db.session.commit()
  161. Logger.print_import_user_opt_success_log(f"confirm {token[0]} success")
  162. flash(f"用户{token[0]}认证完成")
  163. return redirect(url_for("base.index_page"))
  164. @auth.route('/confirm/login')
  165. def email_login_confirm_page():
  166. token = request.args.get("token", None)
  167. if token is None:
  168. Logger.print_user_opt_fail_log(f"Confirm (bad token)")
  169. return abort(404)
  170. token = User.login_load_token(token)
  171. if token is None:
  172. Logger.print_user_opt_fail_log(f"Confirm (bad token)")
  173. return abort(404)
  174. user = User.query.filter_by(email=token[0]).first()
  175. if not user:
  176. Logger.print_user_opt_fail_log(f"Confirm (bad token)")
  177. return abort(404)
  178. login_user(user, token[1])
  179. flash("登陆成功")
  180. Logger.print_user_opt_success_log(f"passwd login.txt {user.email}")
  181. return redirect(url_for("base.index_page"))
  182. @auth.route('/set/passwd', methods=['GET', 'POST'])
  183. @login_required
  184. def change_passwd_page():
  185. form = ChangePasswdForm()
  186. if form.validate_on_submit():
  187. if not current_user.check_passwd(form.old_passwd.data):
  188. Logger.print_user_opt_error_log(f"change passwd")
  189. flash("旧密码错误")
  190. else:
  191. current_user.passwd = form.passwd.data
  192. db.session.commit()
  193. Logger.print_user_opt_success_log(f"change passwd")
  194. flash("密码修改成功")
  195. logout_user()
  196. return redirect(url_for("auth.passwd_login_page"))
  197. return redirect(url_for("auth.change_passwd_page"))
  198. Logger.print_load_page_log("user change passwd")
  199. return render_template("auth/change_passwd.html", form=form)
  200. @auth.route('/logout')
  201. @login_required
  202. def logout_page():
  203. Logger.print_import_user_opt_success_log(f"logout")
  204. logout_user()
  205. flash("退出登录成功")
  206. return redirect(url_for("base.index_page"))
  207. @auth.route("/user")
  208. def user_page():
  209. user_id = request.args.get("user", -1, type=int)
  210. if user_id == -1:
  211. return abort(404)
  212. elif current_user.is_authenticated and current_user.id == user_id:
  213. return redirect(url_for("auth.auth_page"))
  214. user = User.query.filter_by(id=user_id).first()
  215. if not user:
  216. return abort(404)
  217. return render_template("auth/user.html", user=user)