auth.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. from flask import Flask, Blueprint, render_template, redirect, flash, url_for, request, abort, current_app
  2. from flask_login import login_required, login_user, current_user, logout_user
  3. from flask_mail import Mail
  4. from flask_wtf import FlaskForm
  5. from wtforms import StringField, PasswordField, BooleanField, SubmitField, ValidationError
  6. from wtforms.validators import DataRequired, Length, EqualTo
  7. from typing import Optional
  8. from view.base import App
  9. from core.user import User, load_user_by_email
  10. from send_email import send_msg
  11. auth = Blueprint("auth", __name__)
  12. app: Optional[Flask] = None
  13. mail: Optional[Mail] = None
  14. class LoginForm(FlaskForm):
  15. email = StringField("邮箱", validators=[DataRequired(), Length(1, 32)])
  16. passwd = PasswordField("密码", validators=[DataRequired(), Length(8, 32)])
  17. remember = BooleanField("记住我")
  18. submit = SubmitField("登录")
  19. class RegisterForm(FlaskForm):
  20. email = StringField("邮箱", validators=[DataRequired(), Length(1, 32)])
  21. passwd = PasswordField("密码", validators=[DataRequired(),
  22. EqualTo("passwd_again", message="两次输入密码不相同"),
  23. Length(8, 32)])
  24. passwd_again = PasswordField("重复密码", validators=[DataRequired()])
  25. submit = SubmitField("注册")
  26. def validate_email(self, field):
  27. if load_user_by_email(field.data) is not None:
  28. raise ValidationError("邮箱已被注册")
  29. class ChangePasswdForm(FlaskForm):
  30. old_passwd = PasswordField("旧密码", validators=[DataRequired()])
  31. passwd = PasswordField("新密码", validators=[DataRequired(),
  32. EqualTo("passwd_again", message="两次输入密码不相同"),
  33. Length(8, 32)])
  34. passwd_again = PasswordField("重复密码", validators=[DataRequired()])
  35. submit = SubmitField("修改密码")
  36. class DeleteUserForm(FlaskForm):
  37. email = StringField("邮箱", validators=[DataRequired(), Length(1, 32)])
  38. submit = SubmitField("删除用户")
  39. def validate_email(self, field):
  40. if load_user_by_email(field.data) is None:
  41. raise ValidationError("邮箱用户不存在")
  42. class CreateRoleForm(FlaskForm):
  43. name = StringField("角色名称", validators=[DataRequired(), Length(1, 20)])
  44. authority = StringField("权限", validators=[Length(0, 100)])
  45. submit = SubmitField("创建角色")
  46. class DeleteRoleForm(FlaskForm):
  47. name = StringField("角色名称", validators=[DataRequired(), Length(1, 20)])
  48. submit = SubmitField("删除角色")
  49. class SetRoleForm(FlaskForm):
  50. email = StringField("邮箱", validators=[DataRequired(), Length(1, 32)])
  51. name = StringField("角色名称", validators=[DataRequired(), Length(1, 20)])
  52. submit = SubmitField("设置角色")
  53. @auth.route('/yours')
  54. @login_required
  55. def yours_page():
  56. msg_count, comment_count, blog_count = current_user.count_info()
  57. AuthApp.print_load_page_log("user info")
  58. return render_template("auth/yours.html", msg_count=msg_count, comment_count=comment_count, blog_count=blog_count)
  59. @auth.route('/login', methods=["GET", "POST"])
  60. def login_page():
  61. if current_user.is_authenticated:
  62. AuthApp.print_user_not_allow_opt_log("login")
  63. return redirect(url_for("auth.yours_page"))
  64. form = LoginForm()
  65. if form.validate_on_submit():
  66. user = load_user_by_email(form.email.data)
  67. if user is not None and user.check_passwd(form.passwd.data):
  68. login_user(user, form.remember.data)
  69. next_page = request.args.get("next")
  70. if next_page is None or not next_page.startswith('/'):
  71. next_page = url_for('base.index_page')
  72. flash("登陆成功")
  73. AuthApp.print_user_opt_success_log(f"login {form.email.data}")
  74. return redirect(next_page)
  75. flash("账号或密码错误")
  76. AuthApp.print_user_opt_fail_log(f"login {form.email.data}")
  77. return redirect(url_for("auth.login_page"))
  78. AuthApp.print_load_page_log("user login")
  79. return render_template("auth/login.html", form=form)
  80. @auth.route('/register', methods=["GET", "POST"])
  81. def register_page():
  82. if current_user.is_authenticated:
  83. AuthApp.print_user_not_allow_opt_log("register")
  84. return redirect(url_for("auth.yours_page"))
  85. form = RegisterForm()
  86. if form.validate_on_submit():
  87. token = User.creat_token(form.email.data, form.passwd.data)
  88. register_url = url_for("auth.confirm_page", token=token, _external=True)
  89. send_msg("注册确认", mail, form.email.data, "register", register_url=register_url)
  90. flash("注册提交成功, 请进入邮箱点击确认注册链接")
  91. AuthApp.print_import_user_opt_success_log(f"register {form.email.data}")
  92. return redirect(url_for("base.index_page"))
  93. AuthApp.print_load_page_log("user register")
  94. return render_template("auth/register.html", RegisterForm=form)
  95. @auth.route('/confirm')
  96. def confirm_page():
  97. token = request.args.get("token", None)
  98. if token is None:
  99. AuthApp.print_user_opt_fail_log(f"Confirm (bad token)")
  100. abort(404)
  101. return
  102. token = User.load_token(token)
  103. if token is None:
  104. AuthApp.print_user_opt_fail_log(f"Confirm (bad token)")
  105. abort(404)
  106. return
  107. if load_user_by_email(token[0]) is not None:
  108. AuthApp.print_user_opt_fail_log(f"Confirm (bad token)")
  109. abort(404)
  110. return
  111. User(token[0], token[1], None, None).create()
  112. current_app.logger.info(f"{token[0]} confirm success")
  113. AuthApp.print_import_user_opt_success_log(f"confirm {token[0]}")
  114. flash(f"用户{token[0]}认证完成")
  115. return redirect(url_for("base.index_page"))
  116. @auth.route('/logout')
  117. @login_required
  118. def logout_page():
  119. AuthApp.print_import_user_opt_success_log(f"logout")
  120. logout_user()
  121. flash("退出登录成功")
  122. return redirect(url_for("base.index_page"))
  123. @auth.route('/passwd', methods=['GET', 'POST'])
  124. @login_required
  125. def change_passwd_page():
  126. form = ChangePasswdForm()
  127. if form.validate_on_submit():
  128. if not current_user.check_passwd(form.old_passwd.data):
  129. AuthApp.print_user_opt_fail_log("change passwd (old passwd error)")
  130. flash("旧密码错误")
  131. return redirect(url_for("auth.change_passwd_page"))
  132. if current_user.change_passwd(form.passwd.data):
  133. AuthApp.print_user_opt_success_log(f"change passwd")
  134. flash("密码修改成功")
  135. else:
  136. AuthApp.print_user_opt_error_log(f"change passwd")
  137. flash("密码修改失败")
  138. return redirect(url_for("auth.yours_page"))
  139. AuthApp.print_load_page_log("user change passwd")
  140. return render_template("auth/passwd.html", ChangePasswdForm=form)
  141. @auth.route('/delete', methods=['GET', 'POST'])
  142. @login_required
  143. def delete_user_page():
  144. if not current_user.check_role("DeleteUser"):
  145. AuthApp.print_user_not_allow_opt_log("delete user")
  146. abort(403)
  147. return
  148. form = DeleteUserForm()
  149. if form.validate_on_submit():
  150. user = load_user_by_email(form.email.data)
  151. if user is None:
  152. AuthApp.print_sys_opt_fail_log(f"delete user {form.email.data}")
  153. abort(404)
  154. return
  155. if user.delete():
  156. AuthApp.print_sys_opt_success_log(f"{current_user.email} delete user {form.email.data} success")
  157. flash("用户删除成功")
  158. else:
  159. AuthApp.print_sys_opt_fail_log(f"{current_user.email} delete user {form.email.data} fail")
  160. flash("用户删除失败")
  161. return redirect(url_for("auth.delete_user_page"))
  162. AuthApp.print_load_page_log("delete user")
  163. return render_template("auth/delete.html", DeleteUserForm=form)
  164. @auth.route('/role', methods=['GET'])
  165. @login_required
  166. def role_page():
  167. if not current_user.check_role("ConfigureSystem"):
  168. AuthApp.print_user_not_allow_opt_log("load role setting")
  169. abort(403)
  170. return
  171. AuthApp.print_load_page_log("role setting")
  172. return render_template("auth/role.html",
  173. CreateRoleForm=CreateRoleForm(),
  174. DeleteRoleForm=DeleteRoleForm(),
  175. SetRoleForm=SetRoleForm())
  176. @auth.route('/role-create', methods=['POST'])
  177. @login_required
  178. def role_create_page():
  179. form = CreateRoleForm()
  180. if form.validate_on_submit():
  181. if not current_user.check_role("ConfigureSystem"):
  182. AuthApp.print_user_not_allow_opt_log("create role")
  183. abort(403)
  184. return
  185. if User.create_role(form.name.data, form.authority.data.replace(" ", "").split(";")):
  186. AuthApp.print_sys_opt_success_log(f"Create role success: {form.name.data}")
  187. flash("角色创建成功")
  188. else:
  189. AuthApp.print_sys_opt_success_log(f"Create role fail: {form.name.data}")
  190. flash("角色创建失败")
  191. return redirect(url_for("auth.role_page"))
  192. abort(404)
  193. return
  194. @auth.route('/role-delete', methods=['POST'])
  195. @login_required
  196. def role_delete_page():
  197. form = DeleteRoleForm()
  198. if form.validate_on_submit():
  199. if not current_user.check_role("ConfigureSystem"):
  200. AuthApp.print_user_not_allow_opt_log("delete role")
  201. abort(403)
  202. return
  203. if User.delete_role(form.name.data):
  204. AuthApp.print_sys_opt_success_log(f"Delete role success: {form.name.data}")
  205. flash("角色删除成功")
  206. else:
  207. AuthApp.print_sys_opt_fail_log(f"Delete role fail: {form.name.data}")
  208. flash("角色删除失败")
  209. return redirect(url_for("auth.role_page"))
  210. abort(404)
  211. return
  212. @auth.route('/role-set', methods=['POST'])
  213. @login_required
  214. def role_set_page():
  215. form = SetRoleForm()
  216. if form.validate_on_submit():
  217. if not current_user.check_role("ConfigureSystem"):
  218. AuthApp.print_user_not_allow_opt_log("assign user a role")
  219. abort(403)
  220. return
  221. user = load_user_by_email(form.email.data)
  222. if user is not None:
  223. if user.set_user_role(form.name.data):
  224. AuthApp.print_sys_opt_success_log(f"Role assign {form.email.data} -> {form.name.data}")
  225. flash("角色设置成功")
  226. else:
  227. AuthApp.print_sys_opt_fail_log(f"Role assign {form.email.data} -> {form.name.data}")
  228. flash("角色设置失败")
  229. else:
  230. AuthApp.print_sys_opt_fail_log(f"Role assign (bad email) {form.email.data} -> {form.name.data}")
  231. flash("邮箱未注册")
  232. return redirect(url_for("auth.role_page"))
  233. abort(404)
  234. return
  235. @auth.context_processor
  236. def inject_base():
  237. return {"top_nav": ["", "", "", "", "", "active"]}
  238. class AuthApp(App):
  239. def __init__(self, import_name):
  240. super(AuthApp, self).__init__(import_name)
  241. global app, mail
  242. app = self._app
  243. mail = self.mail
  244. app.register_blueprint(auth, url_prefix="/auth")
  245. self.login_manager.login_view = "auth.login_page"
  246. @self.login_manager.user_loader
  247. def user_loader(email: str):
  248. return load_user_by_email(email)