auth.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. from flask import Blueprint, render_template, redirect, url_for, request, current_app, flash, abort
  2. from flask_wtf import FlaskForm
  3. from wtforms import (EmailField,
  4. PasswordField,
  5. BooleanField,
  6. SubmitField,
  7. ValidationError,
  8. StringField)
  9. from wtforms.validators import DataRequired, Length, Regexp, EqualTo
  10. from flask_login import current_user, login_user, logout_user, login_required
  11. from urllib.parse import urljoin
  12. from sqlalchemy.exc import IntegrityError
  13. from .db import db, User, Role, Follow
  14. from .logger import Logger
  15. from .mail import send_msg
  16. from .login import role_required
  17. auth = Blueprint("auth", __name__)
  18. class AuthField(FlaskForm):
  19. @staticmethod
  20. def email_field(name: str, description: str):
  21. """ 提前定义 email 字段的生成函数,供下文调用 """
  22. return EmailField(name, description=description,
  23. validators=[
  24. DataRequired(f"必须填写{name}"),
  25. Length(1, 32, message=f"{name}长度1-32个字符"),
  26. Regexp(r"^[a-zA-Z0-9_\.\-]+@[a-zA-Z0-9_\-]+(\.[a-zA-Z0-9_\.]+)+$",
  27. message=f"{name}不满足正则表达式")])
  28. @staticmethod
  29. def passwd_field(name: str, description: str):
  30. """ 提前定义 passwd 字段的生成函数,供下文调用 """
  31. return PasswordField(name, description=description,
  32. validators=[
  33. DataRequired(f"必须填写{name}"),
  34. Length(8, 32, message=f"{name}长度为8-32位")])
  35. @staticmethod
  36. def passwd_again_field(name: str, description: str, passwd: str = "passwd"):
  37. """ 提前定义 passwd again 字段的生成函数,供下文调用 """
  38. return PasswordField(f"重复{name}", description=description,
  39. validators=[
  40. DataRequired(message=f"必须再次填写{name}"),
  41. EqualTo(passwd, message=f"两次输入的{name}不相同")])
  42. class EmailPasswd(AuthField):
  43. email = AuthField.email_field("邮箱", "用户邮箱")
  44. passwd = AuthField.passwd_field("密码", "用户密码")
  45. class PasswdLoginForm(EmailPasswd):
  46. remember = BooleanField("记住我")
  47. submit = SubmitField("登录")
  48. class EmailLoginForm(AuthField):
  49. email = AuthField.email_field("邮箱", "用户邮箱")
  50. remember = BooleanField("记住我")
  51. submit = SubmitField("登录")
  52. class RegisterForm(EmailPasswd):
  53. passwd_again = AuthField.passwd_again_field("密码", "用户密码")
  54. submit = SubmitField("注册")
  55. def validate_email(self, field):
  56. """ 检验email是否合法 """
  57. if User.query.filter_by(email=field.data).first():
  58. raise ValidationError("邮箱已被注册")
  59. class ChangePasswdForm(AuthField):
  60. old_passwd = AuthField.passwd_field("旧密码", "用户原密码")
  61. passwd = AuthField.passwd_field("新密码", "用户新密码")
  62. passwd_again = AuthField.passwd_again_field("新密码", "用户新密码")
  63. submit = SubmitField("修改密码")
  64. def validate_passwd(self, field):
  65. """ 检验新旧密码是否相同 """
  66. if field.data == self.old_passwd.data:
  67. raise ValidationError("新旧密码不能相同")
  68. class ChangeRoleForm(AuthField):
  69. email = AuthField.email_field("邮箱", "用户邮箱")
  70. role = StringField("角色", description="用户角色", validators=[DataRequired(message="必须指定用户角色")])
  71. submit = SubmitField("修改")
  72. def validate_role(self, field):
  73. if not Role.query.filter_by(name=field.data).first():
  74. raise ValidationError("角色不存在")
  75. def validate_email(self, field):
  76. if not User.query.filter_by(email=field.data).first():
  77. raise ValidationError("用户不存在")
  78. def __load_login_page(passwd_login_form=None, email_login_form=None, register_form=None,
  79. on_passwd_login=True, on_email_login=False, on_register=False):
  80. if not passwd_login_form:
  81. passwd_login_form = PasswdLoginForm()
  82. if not email_login_form:
  83. email_login_form = EmailLoginForm()
  84. if not register_form:
  85. register_form = RegisterForm()
  86. Logger.print_load_page_log("user login")
  87. return render_template("auth/login.html",
  88. passwd_login_form=passwd_login_form,
  89. email_login_form=email_login_form,
  90. register_form=register_form,
  91. on_passwd_login=on_passwd_login,
  92. on_email_login=on_email_login,
  93. on_register=on_register)
  94. @auth.route("/")
  95. def auth_page():
  96. if current_user.is_authenticated: # 用户已经成功登陆
  97. return render_template("auth/yours.html")
  98. return __load_login_page()
  99. @auth.route('/login/passwd', methods=["GET", "POST"])
  100. def passwd_login_page():
  101. if current_user.is_authenticated: # 用户已经成功登陆
  102. Logger.print_user_not_allow_opt_log("passwd-login")
  103. return redirect(url_for("auth.auth_page"))
  104. form = PasswdLoginForm()
  105. if form.validate_on_submit():
  106. user = User.query.filter_by(email=form.email.data).first()
  107. if user and user.check_passwd(form.passwd.data) and user.role.has_permission(Role.USABLE):
  108. login_user(user, form.remember.data)
  109. next_page = request.args.get("next")
  110. if next_page is None or not next_page.startswith('/'):
  111. next_page = url_for('base.index_page')
  112. flash("登陆成功")
  113. Logger.print_user_opt_success_log(f"passwd login {form.email.data}")
  114. return redirect(next_page)
  115. flash("账号或密码错误")
  116. Logger.print_user_opt_fail_log(f"passwd login {form.email.data}")
  117. return redirect(url_for("auth.passwd_login_page"))
  118. return __load_login_page(passwd_login_form=form, on_passwd_login=True)
  119. @auth.route('/login/email', methods=["GET", "POST"])
  120. def email_login_page():
  121. if current_user.is_authenticated: # 用户已经成功登陆
  122. Logger.print_user_not_allow_opt_log("email-login")
  123. return redirect(url_for("auth.auth_page"))
  124. form = EmailLoginForm()
  125. if form.validate_on_submit():
  126. user = User.query.filter_by(email=form.email.data).first()
  127. if user and user.role.has_permission(Role.USABLE):
  128. token = user.login_creat_token(form.remember.data)
  129. login_url = urljoin(request.host_url, url_for("auth.email_login_confirm_page", token=token))
  130. send_msg("登录确认", user.email, "login", login_url=login_url)
  131. flash("登录确认邮件已发送至邮箱")
  132. Logger.print_user_opt_success_log(f"email login {form.email.data}")
  133. return redirect(url_for("base.index_page"))
  134. flash("账号不存在")
  135. Logger.print_user_opt_fail_log(f"email login {form.email.data}")
  136. return redirect(url_for("auth.passwd_login_page"))
  137. return __load_login_page(passwd_login_form=form, on_passwd_login=True)
  138. @auth.route('/register', methods=["GET", "POST"])
  139. def register_page():
  140. if current_user.is_authenticated:
  141. Logger.print_user_not_allow_opt_log("register")
  142. return redirect(url_for("auth.auth_page"))
  143. form = RegisterForm()
  144. if form.validate_on_submit():
  145. token = User.register_creat_token(form.email.data, form.passwd.data)
  146. register_url = urljoin(request.host_url, url_for("auth.register_confirm_page", token=token))
  147. send_msg("注册确认", form.email.data, "register", register_url=register_url)
  148. flash("注册提交成功, 请进入邮箱点击确认注册链接")
  149. Logger.print_import_user_opt_success_log(f"register {form.email.data}")
  150. return redirect(url_for("base.index_page"))
  151. return __load_login_page(register_form=form, on_register=True, on_passwd_login=False)
  152. @auth.route('/confirm/register')
  153. def register_confirm_page():
  154. token = request.args.get("token", None)
  155. if token is None:
  156. Logger.print_user_opt_fail_log(f"register confirm (bad token)")
  157. return abort(404)
  158. token = User.register_load_token(token)
  159. if token is None:
  160. Logger.print_user_opt_fail_log(f"register confirm (bad token)")
  161. return abort(404)
  162. if User.query.filter_by(email=token[0]).first():
  163. Logger.print_user_opt_fail_log(f"register confirm (bad token)")
  164. return abort(404)
  165. if User.query.limit(1).first(): # 不是第一个用户
  166. new_user = User(email=token[0], passwd_hash=User.get_passwd_hash(token[1]))
  167. else:
  168. admin = Role.query.filter_by(name="admin").first()
  169. if admin is None:
  170. Logger.print_sys_opt_fail_log(f"get admin(role)")
  171. return abort(500)
  172. new_user = User(email=token[0], passwd_hash=User.get_passwd_hash(token[1]), role=admin)
  173. db.session.add(new_user)
  174. db.session.commit()
  175. Logger.print_user_opt_success_log(f"register confirm {token[0]}")
  176. flash(f"用户{token[0]}认证完成")
  177. return redirect(url_for("base.index_page"))
  178. @auth.route('/confirm/login')
  179. def email_login_confirm_page():
  180. token = request.args.get("token", None)
  181. if token is None:
  182. Logger.print_user_opt_fail_log(f"login confirm (bad token)")
  183. return abort(404)
  184. token = User.login_load_token(token)
  185. if token is None:
  186. Logger.print_user_opt_fail_log(f"login confirm (bad token)")
  187. return abort(404)
  188. user = User.query.filter_by(email=token[0]).first()
  189. if not user:
  190. Logger.print_user_opt_fail_log(f"login confirm (bad token)")
  191. return abort(404)
  192. login_user(user, token[1])
  193. flash("登陆成功")
  194. Logger.print_user_opt_success_log(f"email login {user.email}")
  195. return redirect(url_for("base.index_page"))
  196. @auth.route('/set/passwd', methods=['GET', 'POST'])
  197. @login_required
  198. def change_passwd_page():
  199. form = ChangePasswdForm()
  200. if form.validate_on_submit():
  201. if not current_user.check_passwd(form.old_passwd.data):
  202. Logger.print_user_opt_error_log(f"change passwd")
  203. flash("旧密码错误")
  204. else:
  205. current_user.passwd = form.passwd.data
  206. db.session.commit()
  207. Logger.print_user_opt_success_log(f"change passwd")
  208. flash("密码修改成功")
  209. logout_user()
  210. return redirect(url_for("auth.passwd_login_page"))
  211. return redirect(url_for("auth.change_passwd_page"))
  212. Logger.print_load_page_log("user change passwd")
  213. return render_template("auth/change_passwd.html", form=form)
  214. @auth.route('/logout')
  215. @login_required
  216. def logout_page():
  217. logout_user()
  218. flash("退出登录成功")
  219. Logger.print_user_opt_success_log(f"logout")
  220. return redirect(url_for("base.index_page"))
  221. @auth.route("/user")
  222. def user_page():
  223. user_id = request.args.get("user", None, type=int)
  224. if not user_id:
  225. return abort(404)
  226. elif current_user.is_authenticated and current_user.id == user_id:
  227. return redirect(url_for("auth.auth_page"))
  228. user = User.query.filter_by(id=user_id).first()
  229. if not user:
  230. return abort(404)
  231. Logger.print_load_page_log(f"user {user.email} page")
  232. return render_template("auth/user.html", user=user)
  233. @auth.route("/follower/list")
  234. @login_required
  235. @role_required(Role.CHECK_FOLLOW, "check follower")
  236. def follower_page():
  237. if current_user.follower_count == 0:
  238. return render_template("auth/no_follow.html", title="粉丝", msg="你暂时一个粉丝都没有哦。")
  239. page = request.args.get("page", 1, type=int)
  240. pagination = current_user.follower.paginate(page=page, per_page=8, error_out=False)
  241. Logger.print_load_page_log(f"user {current_user.email} follower")
  242. return render_template("auth/follow.html",
  243. items=[i.follower for i in pagination.items],
  244. pagination=pagination,
  245. title="粉丝")
  246. @auth.route("/followed/list")
  247. @login_required
  248. @role_required(Role.CHECK_FOLLOW, "check followed")
  249. def followed_page():
  250. if current_user.followed_count == 0:
  251. return render_template("auth/no_follow.html", title="关注", msg="你暂时未关注任何人。")
  252. page = request.args.get("page", 1, type=int)
  253. pagination = current_user.followed.paginate(page=page, per_page=8, error_out=False)
  254. Logger.print_load_page_log(f"user {current_user.email} followed")
  255. return render_template("auth/follow.html",
  256. items=[i.followed for i in pagination.items],
  257. pagination=pagination,
  258. title="关注")
  259. @auth.route("/followed/follow")
  260. @login_required
  261. @role_required(Role.FOLLOW, "follow")
  262. def set_follow_page():
  263. user_id = request.args.get("user", None, type=int)
  264. if not user_id or user_id == current_user.id:
  265. return abort(404)
  266. user = User.query.filter_by(id=user_id).first()
  267. if not user:
  268. return abort(404)
  269. try:
  270. db.session.add(Follow(follower=current_user, followed=user))
  271. db.session.commit()
  272. except IntegrityError:
  273. flash("不能重复关注用户")
  274. else:
  275. flash("关注用户成功")
  276. return redirect(url_for("auth.user_page", user=user_id))
  277. @auth.route("/followed/unfollow")
  278. @login_required
  279. @role_required(Role.FOLLOW, "unfollow")
  280. def set_unfollow_page():
  281. user_id = request.args.get("user", None, type=int)
  282. if not user_id or user_id == current_user.id:
  283. return abort(404)
  284. user = User.query.filter_by(id=user_id).first()
  285. if not user:
  286. return abort(404)
  287. Follow.query.filter_by(follower=current_user, followed=user).delete()
  288. flash("取消关注用户成功")
  289. return redirect(url_for("auth.user_page", user=user_id))
  290. @auth.route("/block")
  291. @login_required
  292. @role_required(Role.BLOCK_USER, "block user")
  293. def set_block_page():
  294. user_id = request.args.get("user", None, type=int)
  295. if not user_id or user_id == current_user.id:
  296. return abort(404)
  297. user = User.query.filter_by(id=user_id).first()
  298. if not user:
  299. return abort(404)
  300. block = Role.query.filter_by(name="block").first()
  301. if not block:
  302. Logger.print_sys_opt_fail_log("get block(role)")
  303. return abort(500)
  304. user.role = block
  305. db.session.commit()
  306. return redirect(url_for("auth.user_page", user=user_id))
  307. @auth.route('/role/user', methods=['GET', 'POST'])
  308. @login_required
  309. @role_required(Role.SYSTEM, "change user role")
  310. def change_role_page():
  311. form = ChangeRoleForm()
  312. if form.validate_on_submit():
  313. user = User.query.filter_by(email=form.email.data).first()
  314. if not user:
  315. flash("用户不存在")
  316. return redirect(url_for("auth.change_role_page"))
  317. role = Role.query.filter_by(name=form.role.data).first()
  318. if not role:
  319. flash("角色不存在")
  320. return redirect(url_for("auth.change_role_page"))
  321. user.role = role
  322. db.session.commit()
  323. flash("用户分组修改成功")
  324. Logger.print_sys_opt_success_log(f"move {user.email} to {role.name}")
  325. return redirect(url_for("auth.change_role_page"))
  326. Logger.print_load_page_log("change user role")
  327. return render_template("auth/change_role.html", form=form)