Browse Source

feat: 提取鉴权和识别表单的函数

SongZihuan 3 years ago
parent
commit
8bd8236b27
7 changed files with 151 additions and 204 deletions
  1. 1 1
      app/__init__.py
  2. 18 29
      app/archive.py
  3. 40 72
      app/auth.py
  4. 45 70
      app/docx.py
  5. 14 26
      app/msg.py
  6. 2 6
      app/oss.py
  7. 31 0
      app/tool.py

+ 1 - 1
app/__init__.py

@@ -1,2 +1,2 @@
+from .tool import role_required, form_required
 from .app import HBlogFlask, Hblog
-

+ 18 - 29
app/archive.py

@@ -1,4 +1,4 @@
-from flask import Blueprint, render_template, abort, redirect, url_for, flash, current_app, request
+from flask import Blueprint, render_template, redirect, url_for, flash
 from flask_login import login_required, current_user
 from flask_wtf import FlaskForm
 from wtforms import StringField, SubmitField
@@ -28,40 +28,29 @@ def archive_page():
 
 @archive.route("create", methods=["POST"])
 @login_required
-def create_archive_page():
-    form = CreateArchiveForm()
-    if form.validate_on_submit():
-        if not current_user.check_role("WriteBlog"):  # 检查相应的权限
-            app.HBlogFlask.print_user_not_allow_opt_log("Create archive")
-            abort(403)
-            return
-
-        name = form.name.data
-        describe = form.describe.data
-        if len(name) > 10:
-            flash("归档名太长了")
-        elif len(describe) > 30:
-            flash("归档描述太长了")
+@app.form_required(CreateArchiveForm, "create archive")
+@app.role_required("WriteBlog", "create archive")
+def create_archive_page(form: CreateArchiveForm):
+    name = form.name.data
+    describe = form.describe.data
+    if len(name) > 10:
+        flash("归档名太长了")
+    elif len(describe) > 30:
+        flash("归档描述太长了")
+    else:
+        if Archive(name, describe, None).create():
+            app.HBlogFlask.print_sys_opt_success_log(f"Create archive {name}")
+            flash(f"创建归档 {name} 成功")
         else:
-            if Archive(name, describe, None).create():
-                app.HBlogFlask.print_sys_opt_success_log(f"Create archive {name}")
-                flash(f"创建归档 {name} 成功")
-            else:
-                app.HBlogFlask.print_sys_opt_fail_log(f"Create archive {name}")
-                flash(f"创建归档 {name} 失败")
-        return redirect(url_for("archive.archive_page"))
-    current_app.logger.warning("Create archive with error form.")
-    abort(404)
+            app.HBlogFlask.print_sys_opt_fail_log(f"Create archive {name}")
+            flash(f"创建归档 {name} 失败")
+    return redirect(url_for("archive.archive_page"))
 
 
 @archive.route("delete/<int:archive_id>")
 @login_required
+@app.role_required("DeleteBlog", "delete archive")
 def delete_archive_page(archive_id: int):
-    if not current_user.check_role("DeleteBlog"):
-        app.HBlogFlask.print_user_not_allow_opt_log("Delete archive")
-        abort(403)
-        return
-
     if Archive(None, None, archive_id).delete():
         app.HBlogFlask.print_sys_opt_success_log(f"Delete archive {archive_id}")
         flash("归档删除成功")

+ 40 - 72
app/auth.py

@@ -1,4 +1,4 @@
-from flask import Blueprint, render_template, redirect, flash, url_for, request, abort, current_app, g
+from flask import Blueprint, render_template, redirect, flash, url_for, request, abort, current_app
 from flask_login import login_required, login_user, current_user, logout_user
 from flask_wtf import FlaskForm
 from wtforms import StringField, PasswordField, BooleanField, SubmitField, ValidationError
@@ -189,12 +189,8 @@ def change_passwd_page():
 
 @auth.route('/delete', methods=['GET', 'POST'])
 @login_required
+@app.role_required("DeleteUser", "delete user")
 def delete_user_page():
-    if not current_user.check_role("DeleteUser"):
-        app.HBlogFlask.print_user_not_allow_opt_log("delete user")
-        abort(403)
-        return
-
     form = DeleteUserForm()
     if form.validate_on_submit():
         user = load_user_by_email(form.email.data)
@@ -216,12 +212,8 @@ def delete_user_page():
 
 @auth.route('/role', methods=['GET'])
 @login_required
+@app.role_required("ConfigureSystem", "load role setting")
 def role_page():
-    if not current_user.check_role("ConfigureSystem"):
-        app.HBlogFlask.print_user_not_allow_opt_log("load role setting")
-        abort(403)
-        return
-
     app.HBlogFlask.print_load_page_log("role setting")
     return render_template("auth/role.html",
                            CreateRoleForm=CreateRoleForm(),
@@ -231,77 +223,53 @@ def role_page():
 
 @auth.route('/role-create', methods=['POST'])
 @login_required
-def role_create_page():
-    form = CreateRoleForm()
-    if form.validate_on_submit():
-        if not current_user.check_role("ConfigureSystem"):
-            app.HBlogFlask.print_user_not_allow_opt_log("create role")
-            abort(403)
-            return
-
-        name = form.name.data
-        if len(name) > 10:
-            flash("角色名字太长")
+@app.form_required(CreateRoleForm, "create role")
+@app.role_required("ConfigureSystem", "create role")
+def role_create_page(form: CreateRoleForm):
+    name = form.name.data
+    if len(name) > 10:
+        flash("角色名字太长")
+    else:
+        if User.create_role(name, form.authority.data.replace(" ", "").split(";")):
+            app.HBlogFlask.print_sys_opt_success_log(f"Create role success: {name}")
+            flash("角色创建成功")
         else:
-            if User.create_role(name, form.authority.data.replace(" ", "").split(";")):
-                app.HBlogFlask.print_sys_opt_success_log(f"Create role success: {name}")
-                flash("角色创建成功")
-            else:
-                app.HBlogFlask.print_sys_opt_success_log(f"Create role fail: {name}")
-                flash("角色创建失败")
-        return redirect(url_for("auth.role_page"))
-
-    abort(404)
-    return
+            app.HBlogFlask.print_sys_opt_success_log(f"Create role fail: {name}")
+            flash("角色创建失败")
+    return redirect(url_for("auth.role_page"))
 
 
 @auth.route('/role-delete', methods=['POST'])
 @login_required
-def role_delete_page():
-    form = DeleteRoleForm()
-    if form.validate_on_submit():
-        if not current_user.check_role("ConfigureSystem"):
-            app.HBlogFlask.print_user_not_allow_opt_log("delete role")
-            abort(403)
-            return
-
-        if User.delete_role(form.name.data):
-            app.HBlogFlask.print_sys_opt_success_log(f"Delete role success: {form.name.data}")
-            flash("角色删除成功")
-        else:
-            app.HBlogFlask.print_sys_opt_fail_log(f"Delete role fail: {form.name.data}")
-            flash("角色删除失败")
-        return redirect(url_for("auth.role_page"))
-
-    abort(404)
-    return
+@app.form_required(DeleteRoleForm, "delete role")
+@app.role_required("ConfigureSystem", "delete role")
+def role_delete_page(form: DeleteRoleForm):
+    if User.delete_role(form.name.data):
+        app.HBlogFlask.print_sys_opt_success_log(f"Delete role success: {form.name.data}")
+        flash("角色删除成功")
+    else:
+        app.HBlogFlask.print_sys_opt_fail_log(f"Delete role fail: {form.name.data}")
+        flash("角色删除失败")
+    return redirect(url_for("auth.role_page"))
 
 
 @auth.route('/role-set', methods=['POST'])
 @login_required
-def role_set_page():
-    form = SetRoleForm()
-    if form.validate_on_submit():
-        if not current_user.check_role("ConfigureSystem"):
-            app.HBlogFlask.print_user_not_allow_opt_log("assign user a role")
-            abort(403)
-            return
-
-        user = load_user_by_email(form.email.data)
-        if user is not None:
-            if user.set_user_role(form.name.data):
-                app.HBlogFlask.print_sys_opt_success_log(f"Role assign {form.email.data} -> {form.name.data}")
-                flash("角色设置成功")
-            else:
-                app.HBlogFlask.print_sys_opt_fail_log(f"Role assign {form.email.data} -> {form.name.data}")
-                flash("角色设置失败")
+@app.form_required(SetRoleForm, "assign user a role")
+@app.role_required("ConfigureSystem", "assign user a role")
+def role_set_page(form: SetRoleForm):
+    user = load_user_by_email(form.email.data)
+    if user is not None:
+        if user.set_user_role(form.name.data):
+            app.HBlogFlask.print_sys_opt_success_log(f"Role assign {form.email.data} -> {form.name.data}")
+            flash("角色设置成功")
         else:
-            app.HBlogFlask.print_sys_opt_fail_log(f"Role assign (bad email) {form.email.data} -> {form.name.data}")
-            flash("邮箱未注册")
-        return redirect(url_for("auth.role_page"))
-
-    abort(404)
-    return
+            app.HBlogFlask.print_sys_opt_fail_log(f"Role assign {form.email.data} -> {form.name.data}")
+            flash("角色设置失败")
+    else:
+        app.HBlogFlask.print_sys_opt_fail_log(f"Role assign (bad email) {form.email.data} -> {form.name.data}")
+        flash("邮箱未注册")
+    return redirect(url_for("auth.role_page"))
 
 
 @auth.context_processor

+ 45 - 70
app/docx.py

@@ -10,7 +10,6 @@ from markdown import markdown
 import app
 from sql.base import DBBit
 from object.blog import BlogArticle, load_blog_by_id
-from object.user import User
 from object.comment import Comment
 from object.archive import load_archive_by_name
 
@@ -101,78 +100,58 @@ def article_down_page(blog_id: int):
 
 @docx.route('/comment/<int:blog>', methods=["POST"])
 @login_required
-def comment_page(blog: int):
-    form = WriteCommentForm()
-    if form.validate_on_submit():
-        auth: User = current_user
-        if not auth.check_role("WriteComment"):  # 检查是否具有权限
-            app.HBlogFlask.print_user_not_allow_opt_log("comment")
-            abort(403)
-            return
-
-        context = form.context.data
-        if Comment(None, blog, auth, context).create():
-            app.HBlogFlask.print_user_opt_success_log("comment")
-            flash("评论成功")
-        else:
-            app.HBlogFlask.print_user_opt_error_log("comment")
-            flash("评论失败")
-        return redirect(url_for("docx.article_page", blog_id=blog))
-    app.HBlogFlask.print_form_error_log("comment")
-    abort(404)
+@app.form_required(WriteCommentForm, "write comment")
+@app.role_required("WriteComment", "write comment")
+def comment_page(blog: int, form: WriteCommentForm):
+    context = form.context.data
+    if Comment(None, blog, current_user, context).create():
+        app.HBlogFlask.print_user_opt_success_log("comment")
+        flash("评论成功")
+    else:
+        app.HBlogFlask.print_user_opt_error_log("comment")
+        flash("评论失败")
+    return redirect(url_for("docx.article_page", blog_id=blog))
 
 
 @docx.route('/create-docx', methods=["POST"])
 @login_required
-def create_docx_page():
-    form = WriteBlogForm()
-    if form.validate_on_submit():
-        auth: User = current_user
-        if not auth.check_role("WriteBlog"):  # 检查是否具有写入权限
-            app.HBlogFlask.print_user_not_allow_opt_log("write blog")
-            abort(403)
-            return
-
-        title = form.title.data
-        if len(title) > 10:
-            flash("标题太长了")
-            abort(400)
-
-        subtitle = form.subtitle.data
-        if len(subtitle) > 10:
-            flash("副标题太长了")
-            abort(400)
-
-        archive = set(str(form.archive.data).replace(" ", "").split(";"))
-        archive_list = []
-        for f in archive:
-            f_ = load_archive_by_name(f)
-            if f_ is not None:
-                archive_list.append(f_)
-
-        context = bleach.linkify(
-            bleach.clean(
-                markdown(form.context.data, output_format='html'), tags=allow_tag, strip=True))
-
-        if BlogArticle(None, current_user, title, subtitle, context, archive=archive_list).create():
-            app.HBlogFlask.print_sys_opt_success_log("write blog")
-            flash(f"博客 {title} 发表成功")
-        else:
-            app.HBlogFlask.print_sys_opt_fail_log("write blog")
-            flash(f"博客 {title} 发表失败")
-        return redirect(url_for("docx.docx_page", page=1))
-    app.HBlogFlask.print_form_error_log("write blog")
-    abort(404)
+@app.form_required(WriteBlogForm, "write blog")
+@app.role_required("WriteBlog", "write blog")
+def create_docx_page(form: WriteBlogForm):
+    title = form.title.data
+    if len(title) > 10:
+        flash("标题太长了")
+        abort(400)
+
+    subtitle = form.subtitle.data
+    if len(subtitle) > 10:
+        flash("副标题太长了")
+        abort(400)
+
+    archive = set(str(form.archive.data).replace(" ", "").split(";"))
+    archive_list = []
+    for f in archive:
+        f_ = load_archive_by_name(f)
+        if f_ is not None:
+            archive_list.append(f_)
+
+    context = bleach.linkify(
+        bleach.clean(
+            markdown(form.context.data, output_format='html'), tags=allow_tag, strip=True))
+
+    if BlogArticle(None, current_user, title, subtitle, context, archive=archive_list).create():
+        app.HBlogFlask.print_sys_opt_success_log("write blog")
+        flash(f"博客 {title} 发表成功")
+    else:
+        app.HBlogFlask.print_sys_opt_fail_log("write blog")
+        flash(f"博客 {title} 发表失败")
+    return redirect(url_for("docx.docx_page", page=1))
 
 
 @docx.route("delete/<int:blog_id>")
 @login_required
+@app.role_required("DeleteBlog", "delete blog")
 def delete_blog_page(blog_id: int):
-    if not current_user.check_role("DeleteBlog"):
-        app.HBlogFlask.print_user_not_allow_opt_log("delete blog")
-        abort(403)
-        return
-
     if BlogArticle(blog_id, None, None, None, None).delete():
         app.HBlogFlask.print_sys_opt_success_log("delete blog")
         flash("博文删除成功")
@@ -184,12 +163,8 @@ def delete_blog_page(blog_id: int):
 
 @docx.route("delete_comment/<int:comment_id>")
 @login_required
+@app.role_required("DeleteComment", "delete comment")
 def delete_comment_page(comment_id: int):
-    if not current_user.check_role("DeleteComment"):
-        app.HBlogFlask.print_user_not_allow_opt_log("delete comment")
-        abort(403)
-        return
-
     if Comment(comment_id, None, None, None).delete():
         app.HBlogFlask.print_sys_opt_success_log("delete comment")
         flash("博文评论成功")
@@ -201,4 +176,4 @@ def delete_comment_page(comment_id: int):
 
 @docx.context_processor
 def inject_base():
-    return {"top_nav": ["", "", "active", "", "", ""]}
+    return {"top_nav": ["", "", "active", "", "", ""]}

+ 14 - 26
app/msg.py

@@ -1,4 +1,4 @@
-from flask import Flask, Blueprint, render_template, abort, redirect, url_for, flash
+from flask import Blueprint, render_template, abort, redirect, url_for, flash
 from flask_wtf import FlaskForm
 from flask_login import login_required, current_user
 from wtforms import TextAreaField, BooleanField, SubmitField
@@ -6,7 +6,6 @@ from wtforms.validators import DataRequired
 
 import app
 from sql.base import DBBit
-from object.user import User
 from object.msg import Message, load_message_list
 
 msg = Blueprint("msg", __name__)
@@ -44,35 +43,24 @@ def msg_page(page: int = 1):
 
 @msg.route('/write', methods=["POST"])
 @login_required
-def write_msg_page():
-    form = WriteForm()
-    if form.validate_on_submit():
-        auth: User = current_user
-        if not auth.check_role("WriteMsg"):  # 检查相应权限
-            app.HBlogFlask.print_user_not_allow_opt_log("write msg")
-            abort(403)
-            return
-
-        context = form.context.data
-        secret = form.secret.data
-        if Message(None, auth, context, secret, None).create():
-            app.HBlogFlask.print_user_opt_success_log("write msg")
-            flash("留言成功")
-        else:
-            app.HBlogFlask.print_user_opt_fail_log("write msg")
-            flash("留言失败")
-        return redirect(url_for("msg.msg_page", page=1))
-    abort(404)
+@app.form_required(WriteForm, "write msg")
+@app.role_required("WriteMsg", "write msg")
+def write_msg_page(form: WriteForm):
+    context = form.context.data
+    secret = form.secret.data
+    if Message(None, current_user, context, secret, None).create():
+        app.HBlogFlask.print_user_opt_success_log("write msg")
+        flash("留言成功")
+    else:
+        app.HBlogFlask.print_user_opt_fail_log("write msg")
+        flash("留言失败")
+    return redirect(url_for("msg.msg_page", page=1))
 
 
 @msg.route('/delete/<int:msg_id>')
 @login_required
+@app.role_required("DeleteMsg", "delete msg")
 def delete_msg_page(msg_id: int):
-    if not current_user.check_role("DeleteMsg"):
-        app.HBlogFlask.print_user_not_allow_opt_log("delete msg")
-        abort(403)
-        return
-
     if Message(msg_id, None, None).delete():
         app.HBlogFlask.print_user_opt_success_log("delete msg")
         flash("留言删除成功")

+ 2 - 6
app/oss.py

@@ -1,5 +1,5 @@
 from flask import Blueprint, redirect, render_template, abort, flash, url_for, request
-from flask_login import login_required, current_user
+from flask_login import login_required
 from flask_wtf import FlaskForm
 from wtforms import FileField, StringField, SubmitField
 from wtforms.validators import DataRequired
@@ -35,12 +35,8 @@ def get_page(name: str):
 
 @oss.route('upload', methods=["GET", "POST"])
 @login_required
+@app.role_required("ConfigureSystem", "upload file")
 def upload_page():
-    if not current_user.check_role("ConfigureSystem"):
-        app.HBlogFlask.print_user_not_allow_opt_log("upload file")
-        abort(403)
-        return
-
     form = UploadForm()
     if form.validate_on_submit():
         file = request.files["file"]

+ 31 - 0
app/tool.py

@@ -0,0 +1,31 @@
+from functools import wraps
+from flask import abort
+from flask_login import current_user
+from flask_wtf import FlaskForm
+from typing import ClassVar
+import app
+
+
+def role_required(role: str, opt: str):
+    def required(func):
+        @wraps(func)
+        def new_func(*args, **kwargs):
+            if not current_user.check_role(role):  # 检查相应的权限
+                app.HBlogFlask.print_user_not_allow_opt_log(opt)
+                return abort(403)
+            return func(*args, **kwargs)
+        return new_func
+    return required
+
+
+def form_required(form: ClassVar[FlaskForm], opt: str):
+    def required(func):
+        @wraps(func)
+        def new_func(*args, **kwargs):
+            f = form()
+            if not f.validate_on_submit():
+                app.HBlogFlask.print_form_error_log(opt)
+                return abort(404)
+            return func(*args, **kwargs, form=f)
+        return new_func
+    return required