auth.py 12 KB


  1. from flask import Blueprint, render_template, redirect, flash, url_for, request, abort, current_app, g
  2. from flask_login import login_required, login_user, current_user, logout_user
  3. from flask_wtf import FlaskForm
  4. from wtforms import (EmailField,
  5. StringField,
  6. PasswordField,
  7. BooleanField,
  8. SelectMultipleField,
  9. SelectField,
  10. SubmitField,
  11. ValidationError)
  12. from wtforms.validators import DataRequired, Length, Regexp, EqualTo
  13. from urllib.parse import urljoin
  14. import app
  15. from object.user import User
  16. from send_email import send_msg
  17. from configure import conf
  18. auth = Blueprint("auth", __name__)
  19. class AuthField(FlaskForm):
  20. @staticmethod
  21. def email_field(name: str, description: str):
  22. """ 提前定义 email 字段的生成函数,供下文调用 """
  23. return EmailField(name, description=description,
  24. validators=[
  25. DataRequired(f"必须填写{name}"),
  26. Length(1, 32, message=f"{name}长度1-32个字符"),
  27. Regexp(r"^[a-zA-Z0-9_\.\-]+@[a-zA-Z0-9_\-]+(\.[a-zA-Z0-9_\.]+)+$",
  28. message=f"{name}不满足正则表达式")])
  29. @staticmethod
  30. def passwd_field(name: str, description: str):
  31. """ 提前定义 passwd 字段的生成函数,供下文调用 """
  32. return PasswordField(name, description=description,
  33. validators=[
  34. DataRequired(f"必须填写{name}"),
  35. Length(8, 32, message=f"{name}长度为8-32位")])
  36. @staticmethod
  37. def passwd_again_field(name: str, description: str, passwd: str = "passwd"):
  38. """ 提前定义 passwd again 字段的生成函数,供下文调用 """
  39. return PasswordField(f"重复{name}", description=description,
  40. validators=[
  41. DataRequired(message=f"必须再次填写{name}"),
  42. EqualTo(passwd, message=f"两次输入的{name}不相同")])
  43. class EmailPasswd(AuthField):
  44. email = AuthField.email_field("邮箱", "用户邮箱")
  45. passwd = AuthField.passwd_field("密码", "用户密码")
  46. class LoginForm(EmailPasswd):
  47. remember = BooleanField("记住我")
  48. submit = SubmitField("登录")
  49. class RegisterForm(EmailPasswd):
  50. passwd_again = AuthField.passwd_again_field("密码", "用户密码")
  51. submit = SubmitField("注册")
  52. def validate_email(self, field):
  53. """ 检验email是否合法 """
  54. if User(field.data).info[2] != -1:
  55. raise ValidationError("邮箱已被注册")
  56. class ChangePasswdForm(AuthField):
  57. old_passwd = AuthField.passwd_field("旧密码", "用户原密码")
  58. passwd = AuthField.passwd_field("新密码", "用户新密码")
  59. passwd_again = AuthField.passwd_again_field("新密码", "用户新密码")
  60. submit = SubmitField("修改密码")
  61. def validate_passwd(self, field):
  62. """ 检验新旧密码是否相同 """
  63. if field.data == self.old_passwd.data:
  64. raise ValidationError("新旧密码不能相同")
  65. class DeleteUserForm(AuthField):
  66. email = AuthField.email_field("邮箱", "用户邮箱")
  67. submit = SubmitField("删除用户")
  68. def __init__(self):
  69. super(DeleteUserForm, self).__init__()
  70. self.email_user = None
  71. def validate_email(self, field):
  72. """ 检验用户是否存在 """
  73. if User(field.data).info[2] == -1:
  74. raise ValidationError("邮箱用户不存在")
  75. class CreateRoleForm(AuthField):
  76. name = StringField("角色名称", validators=[DataRequired()])
  77. authority = SelectMultipleField("权限", coerce=str, choices=User.RoleAuthorize)
  78. submit = SubmitField("创建角色")
  79. class RoleForm(AuthField):
  80. name = SelectField("角色名称", validators=[DataRequired()], coerce=int)
  81. def __init__(self):
  82. super(RoleForm, self).__init__()
  83. self.name_res = []
  84. self.name_choices = []
  85. for i in User.get_role_list():
  86. self.name_res.append(i[0])
  87. self.name_choices.append((i[0], i[1]))
  88. self.name.choices = self.name_choices
  89. def validate_name(self, field):
  90. """ 检验角色是否存在 """
  91. if field.data not in self.name_res:
  92. raise ValidationError("角色不存在")
  93. class DeleteRoleForm(RoleForm):
  94. submit = SubmitField("删除角色")
  95. class SetRoleForm(RoleForm):
  96. email = AuthField.email_field("邮箱", "用户邮箱")
  97. submit = SubmitField("设置角色")
  98. def __init__(self):
  99. super(SetRoleForm, self).__init__()
  100. self.email_user = None
  101. def validate_email(self, field):
  102. if User(field.data).info[2] == -1:
  103. raise ValidationError("邮箱用户不存在")
  104. @auth.route('/user/yours')
  105. @login_required
  106. def yours_page():
  107. msg_count, comment_count, blog_count = current_user.count
  108. app.HBlogFlask.print_load_page_log("user info")
  109. return render_template("auth/yours.html", msg_count=msg_count, comment_count=comment_count, blog_count=blog_count)
  110. @auth.route('/user/login', methods=["GET", "POST"])
  111. def login_page():
  112. if current_user.is_authenticated: # 用户已经成功登陆
  113. app.HBlogFlask.print_user_not_allow_opt_log("login")
  114. return redirect(url_for("auth.yours_page"))
  115. form = LoginForm()
  116. if form.validate_on_submit():
  117. user = User(form.email.data)
  118. if user.info[2] != -1 and user.check_passwd(form.passwd.data):
  119. login_user(user, form.remember.data)
  120. next_page = request.args.get("next", None, type=str)
  121. if next_page is None or not next_page.startswith('/'):
  122. next_page = url_for('base.index_page')
  123. flash("登陆成功")
  124. app.HBlogFlask.print_user_opt_success_log(f"login {form.email.data}")
  125. return redirect(next_page)
  126. flash("账号或密码错误")
  127. app.HBlogFlask.print_user_opt_fail_log(f"login {form.email.data}")
  128. return redirect(url_for("auth.login_page"))
  129. app.HBlogFlask.print_load_page_log("user login")
  130. return render_template("auth/login.html", form=form)
  131. @auth.route('/user/register', methods=["GET", "POST"])
  132. def register_page():
  133. if current_user.is_authenticated:
  134. app.HBlogFlask.print_user_not_allow_opt_log("register")
  135. return redirect(url_for("auth.yours_page"))
  136. form = RegisterForm()
  137. if form.validate_on_submit():
  138. token = User.creat_token(form.email.data, form.passwd.data)
  139. register_url = urljoin(request.host_url, url_for("auth.confirm_page", token=token))
  140. hblog: app.Hblog = current_app
  141. send_msg("注册确认", hblog.mail, form.email.data, "register", register_url=register_url)
  142. flash("注册提交成功, 请进入邮箱点击确认注册链接")
  143. app.HBlogFlask.print_import_user_opt_success_log(f"register {form.email.data}")
  144. return redirect(url_for("base.index_page"))
  145. app.HBlogFlask.print_load_page_log("user register")
  146. return render_template("auth/register.html", RegisterForm=form)
  147. @auth.route('/user/confirm')
  148. def confirm_page():
  149. token = request.args.get("token", None, type=str)
  150. if token is None:
  151. app.HBlogFlask.print_user_opt_fail_log(f"Confirm (bad token)")
  152. abort(404)
  153. return
  154. token = User.load_token(token)
  155. if token is None:
  156. app.HBlogFlask.print_user_opt_fail_log(f"Confirm (bad token)")
  157. abort(404)
  158. return
  159. if User(token[0]).info[2] != -1:
  160. app.HBlogFlask.print_user_opt_fail_log(f"Confirm (bad token)")
  161. abort(404)
  162. return
  163. User.create(token[0], token[1])
  164. current_app.logger.info(f"{token[0]} confirm success")
  165. app.HBlogFlask.print_import_user_opt_success_log(f"confirm {token[0]}")
  166. flash(f"用户{token[0]}认证完成")
  167. return redirect(url_for("base.index_page"))
  168. @auth.route('/user/logout')
  169. @login_required
  170. def logout_page():
  171. app.HBlogFlask.print_import_user_opt_success_log(f"logout")
  172. logout_user()
  173. flash("退出登录成功")
  174. return redirect(url_for("base.index_page"))
  175. @auth.route('/user/set/passwd', methods=['GET', 'POST'])
  176. @login_required
  177. def change_passwd_page():
  178. form = ChangePasswdForm()
  179. if form.validate_on_submit():
  180. if not current_user.check_passwd(form.old_passwd.data):
  181. app.HBlogFlask.print_user_opt_error_log(f"change passwd")
  182. flash("旧密码错误")
  183. elif current_user.change_passwd(form.passwd.data):
  184. app.HBlogFlask.print_user_opt_success_log(f"change passwd")
  185. flash("密码修改成功")
  186. logout_user()
  187. return redirect(url_for("auth.login_page"))
  188. else:
  189. app.HBlogFlask.print_user_opt_error_log(f"change passwd")
  190. flash("密码修改失败")
  191. return redirect(url_for("auth.change_passwd_page"))
  192. app.HBlogFlask.print_load_page_log("user change passwd")
  193. return render_template("auth/passwd.html", ChangePasswdForm=form)
  194. @auth.route('/user/delete', methods=['GET', 'POST'])
  195. @login_required
  196. @app.role_required("DeleteUser", "delete user")
  197. def delete_user_page():
  198. form = DeleteUserForm()
  199. if form.validate_on_submit():
  200. user = form.email_user
  201. if user.delete():
  202. app.HBlogFlask.print_sys_opt_success_log(f"{current_user.email} delete user {form.email.data} success")
  203. flash("用户删除成功")
  204. else:
  205. app.HBlogFlask.print_sys_opt_fail_log(f"{current_user.email} delete user {form.email.data} fail")
  206. flash("用户删除失败")
  207. return redirect(url_for("auth.delete_user_page"))
  208. app.HBlogFlask.print_load_page_log("delete user")
  209. return render_template("auth/delete.html", DeleteUserForm=form)
  210. @auth.route('/role', methods=['GET'])
  211. @login_required
  212. @app.role_required("ConfigureSystem", "load role setting")
  213. def role_page():
  214. app.HBlogFlask.print_load_page_log("role setting")
  215. return render_template("auth/role.html",
  216. CreateRoleForm=CreateRoleForm(),
  217. DeleteRoleForm=DeleteRoleForm(),
  218. SetRoleForm=SetRoleForm())
  219. @auth.route('/role/create', methods=['POST'])
  220. @login_required
  221. @app.form_required(CreateRoleForm, "create role")
  222. @app.role_required("ConfigureSystem", "create role")
  223. def role_create_page():
  224. form: CreateRoleForm = g.form
  225. name = form.name.data
  226. if User.create_role(name, form.authority.data):
  227. app.HBlogFlask.print_sys_opt_success_log(f"Create role success: {name}")
  228. flash("角色创建成功")
  229. else:
  230. app.HBlogFlask.print_sys_opt_success_log(f"Create role fail: {name}")
  231. flash("角色创建失败")
  232. return redirect(url_for("auth.role_page"))
  233. @auth.route('/role/delete', methods=['POST'])
  234. @login_required
  235. @app.form_required(DeleteRoleForm, "delete role")
  236. @app.role_required("ConfigureSystem", "delete role")
  237. def role_delete_page():
  238. form: DeleteRoleForm = g.form
  239. if User.delete_role(form.name.data):
  240. app.HBlogFlask.print_sys_opt_success_log(f"Delete role success: {form.name.data}")
  241. flash("角色删除成功")
  242. else:
  243. app.HBlogFlask.print_sys_opt_fail_log(f"Delete role fail: {form.name.data}")
  244. flash("角色删除失败")
  245. return redirect(url_for("auth.role_page"))
  246. @auth.route('/role/set', methods=['POST'])
  247. @login_required
  248. @app.form_required(SetRoleForm, "assign user a role")
  249. @app.role_required("ConfigureSystem", "assign user a role")
  250. def role_set_page():
  251. form: SetRoleForm = g.form
  252. user = form.email_user
  253. if user.set_user_role(form.name.data):
  254. app.HBlogFlask.print_sys_opt_success_log(f"Role assign {form.email.data} -> {form.name.data}")
  255. flash("角色设置成功")
  256. else:
  257. app.HBlogFlask.print_sys_opt_fail_log(f"Role assign {form.email.data} -> {form.name.data}")
  258. flash("角色设置失败")
  259. return redirect(url_for("auth.role_page"))
  260. @auth.context_processor
  261. @app.cache.cached(timeout=conf["CACHE_EXPIRE"], key_prefix="inject_base:auth")
  262. def inject_base():
  263. """ auth 默认模板变量 """
  264. return {"top_nav": ["", "", "", "", "", "active"]}