1
0

auth.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. from flask import Flask, Blueprint, render_template, redirect, flash, url_for, request, abort
  2. from flask_login import login_required, login_user, current_user, logout_user
  3. from flask_mail import Mail
  4. from flask_wtf import FlaskForm
  5. from wtforms import StringField, PasswordField, BooleanField, SubmitField, ValidationError
  6. from wtforms.validators import DataRequired, Length, EqualTo
  7. from typing import Optional
  8. from view.base import App
  9. from core.user import User, load_user_by_email
  10. from send_email import send_msg
  11. auth = Blueprint("auth", __name__)
  12. app: Optional[Flask] = None
  13. mail: Optional[Mail] = None
  14. class LoginForm(FlaskForm):
  15. email = StringField("邮箱", validators=[DataRequired(), Length(1, 32)])
  16. passwd = PasswordField("密码", validators=[DataRequired(), Length(8, 32)])
  17. remember = BooleanField("记住我")
  18. submit = SubmitField("登录")
  19. class RegisterForm(FlaskForm):
  20. email = StringField("邮箱", validators=[DataRequired(), Length(1, 32)])
  21. passwd = PasswordField("密码", validators=[DataRequired(),
  22. EqualTo("passwd_again", message="两次输入密码不相同"),
  23. Length(8, 32)])
  24. passwd_again = PasswordField("重复密码", validators=[DataRequired()])
  25. submit = SubmitField("注册")
  26. def validate_email(self, field):
  27. if load_user_by_email(field.data) is not None:
  28. raise ValidationError("邮箱已被注册")
  29. class ChangePasswdForm(FlaskForm):
  30. old_passwd = PasswordField("旧密码", validators=[DataRequired()])
  31. passwd = PasswordField("新密码", validators=[DataRequired(),
  32. EqualTo("passwd_again", message="两次输入密码不相同"),
  33. Length(8, 32)])
  34. passwd_again = PasswordField("重复密码", validators=[DataRequired()])
  35. submit = SubmitField("修改密码")
  36. class DeleteUserForm(FlaskForm):
  37. email = StringField("邮箱", validators=[DataRequired(), Length(1, 32)])
  38. submit = SubmitField("删除用户")
  39. def validate_email(self, field):
  40. if load_user_by_email(field.data) is None:
  41. raise ValidationError("邮箱用户不存在")
  42. class CreateRoleForm(FlaskForm):
  43. name = StringField("角色名称", validators=[DataRequired(), Length(1, 20)])
  44. authority = StringField("权限", validators=[Length(0, 100)])
  45. submit = SubmitField("创建角色")
  46. class DeleteRoleForm(FlaskForm):
  47. name = StringField("角色名称", validators=[DataRequired(), Length(1, 20)])
  48. submit = SubmitField("删除角色")
  49. class SetRoleForm(FlaskForm):
  50. email = StringField("邮箱", validators=[DataRequired(), Length(1, 32)])
  51. name = StringField("角色名称", validators=[DataRequired(), Length(1, 20)])
  52. submit = SubmitField("设置角色")
  53. @auth.route('/yours')
  54. @login_required
  55. def yours_page():
  56. msg_count, comment_count, blog_count = current_user.count_info()
  57. return render_template("auth/yours.html", msg_count=msg_count, comment_count=comment_count, blog_count=blog_count)
  58. @auth.route('/login', methods=["GET", "POST"])
  59. def login_page():
  60. if current_user.is_authenticated:
  61. return redirect(url_for("auth.yours_page"))
  62. form = LoginForm()
  63. if form.validate_on_submit():
  64. user = load_user_by_email(form.email.data)
  65. if user is not None and user.check_passwd(form.passwd.data):
  66. login_user(user, form.remember.data)
  67. next_page = request.args.get("next")
  68. if next_page is None or not next_page.startswith('/'):
  69. next_page = url_for('base.index_page')
  70. flash("登陆成功")
  71. return redirect(next_page)
  72. flash("账号或密码错误")
  73. return redirect(url_for("auth.login_page"))
  74. return render_template("auth/login.html", form=form)
  75. @auth.route('/register', methods=["GET", "POST"])
  76. def register_page():
  77. if current_user.is_authenticated:
  78. return redirect(url_for("auth.yours_page"))
  79. form = RegisterForm()
  80. if form.validate_on_submit():
  81. token = User.creat_token(form.email.data, form.passwd.data)
  82. register_url = url_for("auth.confirm_page", token=token, _external=True)
  83. send_msg("注册确认", mail, form.email.data, "register", register_url=register_url)
  84. flash("注册提交成功, 请进入邮箱点击确认注册链接")
  85. return redirect(url_for("base.index_page"))
  86. return render_template("auth/register.html", RegisterForm=form)
  87. @auth.route('/confirm')
  88. def confirm_page():
  89. token = request.args.get("token", None)
  90. if token is None:
  91. abort(404)
  92. return
  93. token = User.load_token(token)
  94. if token is None:
  95. abort(404)
  96. return
  97. if load_user_by_email(token[0]) is not None:
  98. abort(404)
  99. return
  100. User(token[0], token[1], None, None).create()
  101. flash(f"用户{token[0]}认证完成")
  102. return redirect(url_for("base.index_page"))
  103. @auth.route('/logout')
  104. def logout_page():
  105. logout_user()
  106. flash("退出登录成功")
  107. return redirect(url_for("base.index_page"))
  108. @auth.route('/passwd', methods=['GET', 'POST'])
  109. @login_required
  110. def change_passwd_page():
  111. form = ChangePasswdForm()
  112. if form.validate_on_submit():
  113. if not current_user.check_passwd(form.old_passwd.data):
  114. flash("旧密码错误")
  115. return redirect(url_for("auth.change_passwd_page"))
  116. if current_user.change_passwd(form.passwd.data):
  117. flash("密码修改成功")
  118. else:
  119. flash("密码修改失败")
  120. return redirect(url_for("auth.yours_page"))
  121. return render_template("auth/passwd.html", ChangePasswdForm=form)
  122. @auth.route('/delete', methods=['GET', 'POST'])
  123. @login_required
  124. def delete_user_page():
  125. if not current_user.check_role("DeleteUser"):
  126. abort(403)
  127. return
  128. form = DeleteUserForm()
  129. if form.validate_on_submit():
  130. user = load_user_by_email(form.email.data)
  131. if user is None:
  132. abort(404)
  133. return
  134. if user.delete():
  135. flash("用户删除成功")
  136. else:
  137. flash("用户删除失败")
  138. return redirect(url_for("auth.delete_user_page"))
  139. return render_template("auth/delete.html", DeleteUserForm=form)
  140. @auth.route('/role', methods=['GET'])
  141. @login_required
  142. def role_page():
  143. if not current_user.check_role("ConfigureSystem"):
  144. abort(403)
  145. return
  146. return render_template("auth/role.html",
  147. CreateRoleForm=CreateRoleForm(),
  148. DeleteRoleForm=DeleteRoleForm(),
  149. SetRoleForm=SetRoleForm())
  150. @auth.route('/role-create', methods=['POST'])
  151. @login_required
  152. def role_create_page():
  153. form = CreateRoleForm()
  154. if form.validate_on_submit():
  155. if not current_user.check_role("ConfigureSystem"):
  156. abort(403)
  157. return
  158. if User.create_role(form.name.data, form.authority.data.replace(" ", "").split(";")):
  159. flash("角色创建成功")
  160. else:
  161. flash("角色创建失败")
  162. return redirect(url_for("auth.role_page"))
  163. abort(404)
  164. return
  165. @auth.route('/role-delete', methods=['POST'])
  166. @login_required
  167. def role_delete_page():
  168. form = DeleteRoleForm()
  169. if form.validate_on_submit():
  170. if not current_user.check_role("ConfigureSystem"):
  171. abort(403)
  172. return
  173. if User.delete_role(form.name.data):
  174. flash("角色删除成功")
  175. else:
  176. flash("角色删除失败")
  177. return redirect(url_for("auth.role_page"))
  178. abort(404)
  179. return
  180. @auth.route('/role-set', methods=['POST'])
  181. @login_required
  182. def role_set_page():
  183. form = SetRoleForm()
  184. if form.validate_on_submit():
  185. if not current_user.check_role("ConfigureSystem"):
  186. abort(403)
  187. return
  188. user = load_user_by_email(form.email.data)
  189. if user is not None:
  190. if user.set_user_role(form.name.data):
  191. flash("角色设置成功")
  192. else:
  193. flash("角色设置失败")
  194. else:
  195. flash("邮箱未注册")
  196. return redirect(url_for("auth.role_page"))
  197. abort(404)
  198. return
  199. @auth.context_processor
  200. def inject_base():
  201. return {"top_nav": ["", "", "", "", "", "active"]}
  202. class AuthApp(App):
  203. def __init__(self, import_name):
  204. super(AuthApp, self).__init__(import_name)
  205. global app, mail
  206. app = self._app
  207. mail = self.mail
  208. app.register_blueprint(auth, url_prefix="/auth")
  209. self.login_manager.login_view = "auth.login_page"
  210. @self.login_manager.user_loader
  211. def user_loader(email: str):
  212. return load_user_by_email(email)