auth.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. from flask import Flask, Blueprint, render_template, redirect, flash, url_for, request, abort
  2. from flask_login import login_required, login_user, current_user, logout_user
  3. from flask_mail import Mail
  4. from wtforms import StringField, PasswordField, BooleanField, SubmitField, ValidationError
  5. from wtforms.validators import DataRequired, Length, EqualTo
  6. from typing import Optional
  7. from view.base import App
  8. from core.user import User, load_user_by_email
  9. from flask_wtf import FlaskForm
  10. from send_email import send_msg
  11. auth = Blueprint("auth", __name__)
  12. app: Optional[Flask] = None
  13. mail: Optional[Mail] = None
  14. class LoginForm(FlaskForm):
  15. email = StringField("邮箱", validators=[DataRequired(), Length(1, 32)])
  16. passwd = PasswordField("密码", validators=[DataRequired(), Length(8, 32)])
  17. remember = BooleanField("记住我")
  18. submit = SubmitField("登录")
  19. class RegisterForm(FlaskForm):
  20. email = StringField("邮箱", validators=[DataRequired(), Length(1, 32)])
  21. passwd = PasswordField("密码", validators=[DataRequired(),
  22. EqualTo("passwd_again", message="两次输入密码不相同"),
  23. Length(8, 32)])
  24. passwd_again = PasswordField("重复密码", validators=[DataRequired()])
  25. submit = SubmitField("注册")
  26. def validate_email(self, field):
  27. if load_user_by_email(field.data) is not None:
  28. raise ValidationError("Email already register")
  29. class ChangePasswdForm(FlaskForm):
  30. old_passwd = PasswordField("旧密码", validators=[DataRequired()])
  31. passwd = PasswordField("新密码", validators=[DataRequired(),
  32. EqualTo("passwd_again", message="两次输入密码不相同"),
  33. Length(8, 32)])
  34. passwd_again = PasswordField("重复密码", validators=[DataRequired()])
  35. submit = SubmitField("修改密码")
  36. @auth.route('/yours')
  37. @login_required
  38. def yours_page():
  39. msg_count, comment_count, blog_count = current_user.count_info()
  40. return render_template("auth/yours.html", msg_count=msg_count, comment_count=comment_count, blog_count=blog_count)
  41. @auth.route('/login', methods=["GET", "POST"])
  42. def login_page():
  43. if current_user.is_authenticated:
  44. return redirect(url_for("auth.yours_page"))
  45. form = LoginForm()
  46. if form.validate_on_submit():
  47. user = load_user_by_email(form.email.data)
  48. if user is not None and user.check_passwd(form.passwd.data):
  49. login_user(user, form.remember.data)
  50. next_page = request.args.get("next")
  51. if next_page is None or not next_page.startswith('/'):
  52. next_page = url_for('base.index_page')
  53. flash("登陆成功")
  54. return redirect(next_page)
  55. flash("账号或密码错误")
  56. return redirect(url_for("auth.login_page"))
  57. return render_template("auth/login.html", form=form)
  58. @auth.route('/register', methods=["GET", "POST"])
  59. def register_page():
  60. if current_user.is_authenticated:
  61. return redirect(url_for("auth.yours_page"))
  62. form = RegisterForm()
  63. if form.validate_on_submit():
  64. token = User.creat_token(form.email.data, form.passwd.data)
  65. register_url = url_for("auth.confirm_page", token=token, _external=True)
  66. send_msg("注册确认", mail, form.email.data, "register", register_url=register_url)
  67. flash("注册提交成功, 请进入邮箱点击确认注册链接")
  68. return redirect(url_for("base.index_page"))
  69. return render_template("auth/register.html", RegisterForm=form)
  70. @auth.route('/confirm')
  71. def confirm_page():
  72. token = request.args.get("token", None)
  73. if token is None:
  74. abort(404)
  75. return
  76. token = User.load_token(token)
  77. if token is None:
  78. abort(404)
  79. return
  80. if load_user_by_email(token[0]) is not None:
  81. abort(404)
  82. return
  83. User(token[0], token[1], None, None).create()
  84. flash(f"用户{token[0]}认证完成")
  85. return redirect(url_for("base.index_page"))
  86. @auth.route('/logout')
  87. def logout_page():
  88. logout_user()
  89. flash("退出登录成功")
  90. return redirect(url_for("base.index_page"))
  91. @auth.route('/passwd', methods=['GET', 'POST'])
  92. @login_required
  93. def change_passwd_page():
  94. form = ChangePasswdForm()
  95. if form.validate_on_submit():
  96. if not current_user.check_passwd(form.old_passwd.data):
  97. flash("旧密码错误")
  98. return redirect(url_for("auth.change_passwd_page"))
  99. if current_user.change_passwd(form.passwd.data):
  100. flash("密码修改成功")
  101. else:
  102. flash("密码修改失败")
  103. return redirect(url_for("auth.yours_page"))
  104. return render_template("auth/passwd.html", ChangePasswdForm=form)
  105. @auth.context_processor
  106. def inject_base():
  107. return {"top_nav": ["", "", "", "", "", "active"]}
  108. class AuthApp(App):
  109. def __init__(self, import_name):
  110. super(AuthApp, self).__init__(import_name)
  111. global app, mail
  112. app = self._app
  113. mail = self.mail
  114. app.register_blueprint(auth, url_prefix="/auth")
  115. self.login_manager.login_view = "auth.login_page"
  116. @self.login_manager.user_loader
  117. def user_loader(email: str):
  118. return load_user_by_email(email)