Преглед на файлове

feat: 添加底层依赖代码的注释

SongZihuan преди 3 години
родител
ревизия
23f155a27a
променени са 12 файла, в които са добавени 162 реда и са изтрити 102 реда
  1. 20 15
      core/blog.py
  2. 6 9
      core/comment.py
  3. 6 8
      core/file.py
  4. 3 3
      core/msg.py
  5. 12 15
      core/user.py
  6. 25 4
      sql/blog.py
  7. 14 3
      sql/comment.py
  8. 16 2
      sql/file.py
  9. 15 8
      sql/msg.py
  10. 31 8
      sql/user.py
  11. 5 20
      view/auth.py
  12. 9 7
      view/docx.py

+ 20 - 15
core/blog.py

@@ -1,11 +1,14 @@
+from typing import Optional
+
 from sql.blog import (get_blog_list,
                       get_blog_count,
-                      get_blog_list_with_file,
-                      get_blog_with_file_count,
+                      get_file_blog_list,
+                      get_file_blog_count,
                       get_blog_list_not_top,
                       read_blog,
-                      write_blog,
-                      get_blog_user_count)
+                      create_blog,
+                      delete_blog,
+                      get_user_user_count)
 import core.user
 import core.file
 import core.comment
@@ -15,16 +18,15 @@ class LoadBlogError(Exception):
     pass
 
 
-def load_blog_by_id(blog_id) -> "BlogArticle":
+def load_blog_by_id(blog_id) -> "Optional[BlogArticle]":
     blog_id = blog_id
     blog = read_blog(blog_id)
     if len(blog) == 0:
-        raise LoadBlogError
+        return None
 
-    try:
-        auth = core.user.User.load_user_by_id(blog[0])
-    except core.user.LoaderUserError:
-        raise LoadBlogError
+    auth = core.user.User.load_user_by_id(blog[0])
+    if auth is None:
+        return None
 
     title = blog[1]
     subtitle = blog[2]
@@ -54,17 +56,20 @@ class BlogArticle:
             if not_top:
                 return get_blog_list_not_top(limit=limit, offset=offset)
             return get_blog_list(limit=limit, offset=offset)
-        return get_blog_list_with_file(file_id, limit=limit, offset=offset)
+        return get_file_blog_list(file_id, limit=limit, offset=offset)
 
     @staticmethod
     def get_blog_count(file_id=None, auth=None):
         if file_id is None:
             return get_blog_count()
         if auth is None:
-            return get_blog_with_file_count(file_id)
-        return get_blog_user_count(auth.get_user_id())
+            return get_file_blog_count(file_id)
+        return get_user_user_count(auth.get_user_id())
 
-    def write_blog(self):
+    def create(self):
         if self.blog_id is not None:  # 只有 blog_id为None时才使用
             return False
-        return write_blog(self.user.get_user_id(), self.title, self.subtitle, self.context, self.file)
+        return create_blog(self.user.get_user_id(), self.title, self.subtitle, self.context, self.file)
+
+    def delete(self):
+        return delete_blog(self.blog_id)

+ 6 - 9
core/comment.py

@@ -1,21 +1,18 @@
-from sql.comment import get_comment_list, write_comment, get_user_comment_count
+from sql.comment import read_comment, create_comment, get_user_comment_count
 import core.user
 
 
-class LoadCommentError(Exception):
-    pass
-
-
 def load_comment_list(blog_id: int):
-    comment_list = get_comment_list(blog_id)
+    comment_list = read_comment(blog_id)
     ret = []
     for comment in comment_list:
-        ret.append(Comment(blog_id, core.user.User(comment[1], None, None, comment[0]), comment[2], comment[3]))
+        ret.append(Comment(comment[0], blog_id, core.user.User(comment[2], None, None, comment[1]), comment[3], comment[4]))
     return ret
 
 
 class Comment:
-    def __init__(self, blog_id: int, auth: "core.user.User", context: str, update_time=None):
+    def __init__(self, comment_id, blog_id: int, auth: "core.user.User", context: str, update_time=None):
+        self.comment_id = comment_id
         self.blog_id = blog_id
         self.auth = auth
         self.context = context
@@ -26,4 +23,4 @@ class Comment:
         return get_user_comment_count(auth.get_user_id())
 
     def create_comment(self):
-        return write_comment(self.blog_id, self.auth.get_user_id(), self.context)
+        return create_comment(self.blog_id, self.auth.get_user_id(), self.context)

+ 6 - 8
core/file.py

@@ -1,14 +1,12 @@
-from sql.file import get_file_id, create_file, get_file_list, get_blog_file
+from typing import Optional
 
+from sql.file import get_file_id_by_name, create_file, get_file_list, read_file
 
-class LoadFileError(Exception):
-    ...
 
-
-def load_file_by_name(name: str) -> "File":
-    file_id, describe = get_file_id(name)
+def load_file_by_name(name: str) -> "Optional[File]":
+    file_id, describe = get_file_id_by_name(name)
     if file_id is None:
-        raise LoadFileError
+        return None
     return File(name, describe, file_id)
 
 
@@ -27,7 +25,7 @@ class File:
 
     @staticmethod
     def get_blog_file(blog_id: int):
-        file = get_blog_file(blog_id)
+        file = read_file(blog_id)
         file_list = []
         for i in file:
             file_list.append(File(i[1], i[2], i[0]))

+ 3 - 3
core/msg.py

@@ -1,11 +1,11 @@
 from typing import Optional
 
-from sql.msg import get_msg_list, get_msg_count, write_msg, get_user_msg_count
+from sql.msg import read_msg, get_msg_count, create_msg, get_user_msg_count
 import core.user
 
 
 def load_message_list(limit: Optional[int] = None, offset: Optional[int] = None, show_secret: bool = False):
-    msg = get_msg_list(limit=limit, offset=offset, show_secret=show_secret)
+    msg = read_msg(limit=limit, offset=offset, show_secret=show_secret)
     ret = []
     for i in msg:
         ret.append(Message(i[0], core.user.User(i[2], None, None, i[1]), i[3], i[5], i[4]))
@@ -27,4 +27,4 @@ class Message:
         return get_user_msg_count(auth.get_user_id())
 
     def create_msg(self):
-        return write_msg(self.auth.get_user_id(), self.context, self.secret)
+        return create_msg(self.auth.get_user_id(), self.context, self.secret)

+ 12 - 15
core/user.py

@@ -1,18 +1,15 @@
 from flask_login import UserMixin, AnonymousUserMixin
 from werkzeug.security import generate_password_hash, check_password_hash
 from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
+from typing import Optional
 
 from configure import conf
-from sql.user import read_user, check_role, get_user_email, add_user, get_role_name
+from sql.user import read_user, check_role, get_user_email, create_user, get_role_name
 import core.blog
 import core.comment
 import core.msg
 
 
-class LoaderUserError(Exception):
-    pass
-
-
 class AnonymousUser(AnonymousUserMixin):
     def __init__(self):
         super(AnonymousUser, self).__init__()
@@ -28,16 +25,23 @@ class AnonymousUser(AnonymousUserMixin):
         return 0
 
 
-def load_user_by_email(email: str) -> "User":
+def load_user_by_email(email: str) -> "Optional[User]":
     user = read_user(email)
     if len(user) == 0:
-        raise LoaderUserError
+        return None
     passwd_hash = user[0]
     role = user[1]
     user_id = user[2]
     return User(email, passwd_hash, role, user_id)
 
 
+def load_user_by_id(user_id):
+    email = get_user_email(user_id)
+    if email is None:
+        return None
+    return load_user_by_email(email)
+
+
 class User(UserMixin):
     def __init__(self, email, passwd_hash, role, user_id):
         self.email = email
@@ -63,13 +67,6 @@ class User(UserMixin):
             email = f"{self.email[0]}****{self.email[5:]}"
             return email
 
-    @staticmethod
-    def load_user_by_id(user_id):
-        email = get_user_email(user_id)
-        if email is None:
-            raise LoaderUserError
-        return load_user_by_email(email)
-
     @property
     def comment_count(self):
         return 0
@@ -124,4 +121,4 @@ class User(UserMixin):
         return check_role(self.role, operate)
 
     def create_user(self):
-        return add_user(self.email, self.passwd_hash)
+        return create_user(self.email, self.passwd_hash)

+ 25 - 4
sql/blog.py

@@ -3,7 +3,8 @@ from typing import Optional, List
 import core.file
 
 
-def write_blog(auth_id: int, title: str, subtitle:str, context: str, file_list: List[core.file.File]) -> bool:
+def create_blog(auth_id: int, title: str, subtitle:str, context: str, file_list: List[core.file.File]) -> bool:
+    """写入新的blog"""
     cur = db.insert(table="blog", columns=["Auth", "Title", "SubTitle", "Context"],
                     values=f"{auth_id}, '{title}', '{subtitle}', '{context}'")
     if cur is None or cur.rowcount == 0:
@@ -18,6 +19,7 @@ def write_blog(auth_id: int, title: str, subtitle:str, context: str, file_list:
 
 
 def read_blog(blog_id: int) -> list:
+    """读取blog内容"""
     cur = db.search(columns=["Auth", "Title", "SubTitle", "Context", "Quote", "Spider", "UpdateTime", "Top"],
                     table="blog",
                     where=f"ID={blog_id}")
@@ -26,7 +28,21 @@ def read_blog(blog_id: int) -> list:
     return cur.fetchone()
 
 
+def delete_blog(blog_id: int):
+    cur = db.delete(table="blog_file", where=f"BlogID={blog_id}")
+    if cur is None:
+        return False
+    cur = db.delete(table="comment", where=f"BlogID={blog_id}")
+    if cur is None:
+        return False
+    cur = db.delete(table="blog", where=f"ID={blog_id}")
+    if cur is None or cur.rowcount == 0:
+        return False
+    return True
+
+
 def get_blog_list(limit: Optional[int] = None, offset: Optional[int] = None) -> list:
+    """获得 blog 列表"""
     cur = db.search(columns=["ID", "Title", "SubTitle", "UpdateTime", "Top"], table="blog_with_top",
                     limit=limit,
                     offset=offset)
@@ -36,6 +52,7 @@ def get_blog_list(limit: Optional[int] = None, offset: Optional[int] = None) ->
 
 
 def get_blog_list_not_top(limit: Optional[int] = None, offset: Optional[int] = None) -> list:
+    """ 获得blog列表 忽略置顶 """
     cur = db.search(columns=["ID", "Title", "SubTitle", "UpdateTime"], table="blog",
                     order_by=[("UpdateTime", "DESC")],
                     limit=limit,
@@ -46,13 +63,15 @@ def get_blog_list_not_top(limit: Optional[int] = None, offset: Optional[int] = N
 
 
 def get_blog_count() -> int:
+    """ 统计 blog 个数 """
     cur = db.search(columns=["count(ID)"], table="blog")
     if cur is None or cur.rowcount == 0:
         return 0
     return cur.fetchone()[0]
 
 
-def get_blog_list_with_file(file_id, limit: Optional[int] = None, offset: Optional[int] = None) -> list:
+def get_file_blog_list(file_id, limit: Optional[int] = None, offset: Optional[int] = None) -> list:
+    """ 获得指定归档的 blog 列表 """
     cur = db.search(columns=["BlogID", "Title", "SubTitle", "UpdateTime", "Top"], table="blog_with_file",
                     where=f"FileID={file_id}",
                     limit=limit,
@@ -62,7 +81,8 @@ def get_blog_list_with_file(file_id, limit: Optional[int] = None, offset: Option
     return cur.fetchall()
 
 
-def get_blog_with_file_count(file_id) -> int:
+def get_file_blog_count(file_id) -> int:
+    """ 统计指定归档的 blog 个数 """
     cur = db.search(columns=["count(ID)"], table="blog_with_file",
                     where=f"FileID={file_id}")
     if cur is None or cur.rowcount == 0:
@@ -70,7 +90,8 @@ def get_blog_with_file_count(file_id) -> int:
     return cur.fetchone()[0]
 
 
-def get_blog_user_count(user_id: int) -> int:
+def get_user_user_count(user_id: int) -> int:
+    """ 获得指定用户的 blog 个数 """
     cur = db.search(columns=["count(ID)"], table="blog",
                     where=f"Auth={user_id}")
     if cur is None or cur.rowcount == 0:

+ 14 - 3
sql/comment.py

@@ -1,8 +1,9 @@
 from sql import db
 
 
-def get_comment_list(blog_id: int):
-    cur = db.search(columns=["Auth", "Email", "Context", "UpdateTime"],
+def read_comment(blog_id: int):
+    """ 读取文章的 comment """
+    cur = db.search(columns=["ID", "Auth", "Email", "Context", "UpdateTime"],
                     table="comment_user",
                     where=f"BlogID={blog_id}")
     if cur is None or cur.rowcount == 0:
@@ -10,7 +11,8 @@ def get_comment_list(blog_id: int):
     return cur.fetchall()
 
 
-def write_comment(blog_id: int, user_id: int, context: str):
+def create_comment(blog_id: int, user_id: int, context: str):
+    """ 新建 comment """
     cur = db.insert(table="comment",
                     columns=["BlogID", "Auth", "Context"],
                     values=f"{blog_id}, {user_id}, '{context}'")
@@ -19,7 +21,16 @@ def write_comment(blog_id: int, user_id: int, context: str):
     return True
 
 
+def delete_comment(comment_id):
+    """ 删除评论 """
+    cur = db.delete(table="comment", where=f"ID={comment_id}")
+    if cur is None or cur.rowcount == 0:
+        return False
+    return True
+
+
 def get_user_comment_count(user_id: int):
+    """ 读取指定用户的 comment 个数 """
     cur = db.search(columns=["count(ID)"], table="comment",
                     where=f"Auth={user_id}")
     if cur is None or cur.rowcount == 0:

+ 16 - 2
sql/file.py

@@ -3,6 +3,7 @@ from typing import Optional
 
 
 def create_file(name: str, describe: str):
+    """ 创建新归档 """
     cur = db.insert(table="file",
                     columns=["Name", "DescribeText"],
                     values=f"'{name}', '{describe}'")
@@ -11,7 +12,8 @@ def create_file(name: str, describe: str):
     return True
 
 
-def get_blog_file(blog_id: int):
+def read_file(blog_id: int):
+    """ 获取文章的归档 """
     cur = db.search(columns=["FileID", "FileName", "DescribeText"], table="blog_file_with_name",
                     where=f"BlogID={blog_id}")
     if cur is None or cur.rowcount == 0:
@@ -19,7 +21,18 @@ def get_blog_file(blog_id: int):
     return cur.fetchall()
 
 
+def delete_file(file_id: int):
+    cur = db.delete(table="blog_file", where=f"FileID={file_id}")
+    if cur is None:
+        return False
+    cur = db.delete(table="file", where=f"ID={file_id}")
+    if cur is None or cur.rowcount == 0:
+        return False
+    return True
+
+
 def get_file_list(limit: Optional[int] = None, offset: Optional[int] = None):
+    """ 获取归档列表 """
     cur = db.search(columns=["ID", "Name", "DescribeText", "Count"], table="file_with_count",
                     limit=limit,
                     offset=offset)
@@ -28,7 +41,8 @@ def get_file_list(limit: Optional[int] = None, offset: Optional[int] = None):
     return cur.fetchall()
 
 
-def get_file_id(name: str):
+def get_file_id_by_name(name: str):
+    """ 获取归档 ID """
     cur = db.search(columns=["ID", "DescribeText"], table="file",
                     where=f"Name='{name}'")
     if cur is None or cur.rowcount == 0:

+ 15 - 8
sql/msg.py

@@ -2,7 +2,14 @@ from sql import db
 from typing import Optional
 
 
-def get_msg_list(limit: Optional[int] = None, offset: Optional[int] = None, show_secret: bool = False):
+def create_msg(auth: int, context: str, secret: bool = False):
+    cur = db.insert(table="message",
+                    columns=["Auth", "Context", "Secret"],
+                    values=f"{auth}, '{context}', {1 if secret else 0}")
+    return cur is not None and cur.rowcount == 1
+
+
+def read_msg(limit: Optional[int] = None, offset: Optional[int] = None, show_secret: bool = False):
     if show_secret:
         where = None
     else:
@@ -17,6 +24,13 @@ def get_msg_list(limit: Optional[int] = None, offset: Optional[int] = None, show
     return cur.fetchall()
 
 
+def delete_msg(msg_id: int):
+    cur = db.delete(table="message", where=f"ID={msg_id}")
+    if cur is None or cur.rowcount == 0:
+        return False
+    return True
+
+
 def get_msg_count():
     cur = db.search(columns=["count(ID)"], table="message")
     if cur is None or cur.rowcount == 0:
@@ -30,10 +44,3 @@ def get_user_msg_count(user_id: int):
     if cur is None or cur.rowcount == 0:
         return 0
     return cur.fetchone()[0]
-
-
-def write_msg(auth: int, context: str, secret: bool = False):
-    cur = db.insert(table="message",
-                    columns=["Auth", "Context", "Secret"],
-                    values=f"{auth}, '{context}', {1 if secret else 0}")
-    return cur is not None and cur.rowcount == 1

+ 31 - 8
sql/user.py

@@ -3,14 +3,8 @@ from sql.base import DBBit
 import core.user
 
 
-def get_user_email(user_id):
-    cur = db.search(columns=["Email"], table="user", where=f"ID='{user_id}'")
-    if cur is None or cur.rowcount == 0:
-        return None
-    return cur.fetchone()[0]
-
-
 def read_user(email: str):
+    """ 读取用户 """
     cur = db.search(columns=["PasswdHash", "Role", "ID"], table="user", where=f"Email='{email}'")
     if cur is None or cur.rowcount == 0:
         return []
@@ -18,7 +12,8 @@ def read_user(email: str):
     return cur.fetchone()
 
 
-def add_user(email: str, passwd: str):
+def create_user(email: str, passwd: str):
+    """ 创建用户 """
     cur = db.search(columns=["count(Email)"], table="user")  # 统计个数
     passwd = core.user.User.get_passwd_hash(passwd)
     if cur is None or cur.rowcount == 0 or cur.fetchone()[0] == 0:
@@ -27,7 +22,33 @@ def add_user(email: str, passwd: str):
         db.insert(table='user', columns=['Email', 'PasswdHash'], values=f"'{email}', '{passwd}'")
 
 
+def delete_user(user_id: int):
+    """ 删除用户 """
+    cur = db.delete(table="message", where=f"Auth={user_id}")
+    if cur is None:
+        return False
+    cur = db.delete(table="comment", where=f"Auth={user_id}")
+    if cur is None:
+        return False
+    cur = db.delete(table="blog", where=f"Auth={user_id}")
+    if cur is None:
+        return False
+    cur = db.delete(table="user", where=f"ID={user_id}")
+    if cur is None or cur.rowcount == 0:
+        return False
+    return True
+
+
+def get_user_email(user_id):
+    """ 获取用户邮箱 """
+    cur = db.search(columns=["Email"], table="user", where=f"ID='{user_id}'")
+    if cur is None or cur.rowcount == 0:
+        return None
+    return cur.fetchone()[0]
+
+
 def get_role_name(role: int):
+    """ 获取用户角色名称 """
     cur = db.search(columns=["RoleName"], table="role", where=f"RoleID={role}")
     if cur is None or cur.rowcount == 0:
         return None
@@ -35,6 +56,7 @@ def get_role_name(role: int):
 
 
 def check_role(role: int, operate: str):
+    """ 检查角色权限(通过角色ID) """
     cur = db.search(columns=[operate], table="role", where=f"RoleID={role}")
     if cur is None or cur.rowcount == 0:
         return False
@@ -42,6 +64,7 @@ def check_role(role: int, operate: str):
 
 
 def check_role_by_name(role: str, operate: str):
+    """ 检查角色权限(通过角色名) """
     cur = db.search(columns=[operate], table="role", where=f"RoleName='{role}")
     if cur is None or cur.rowcount == 0:
         return False

+ 5 - 20
view/auth.py

@@ -6,7 +6,7 @@ from wtforms.validators import DataRequired, Length, EqualTo
 from typing import Optional
 
 from view.base import App
-from core.user import User, LoaderUserError, load_user_by_email
+from core.user import User, load_user_by_email
 from flask_wtf import FlaskForm
 from send_email import send_msg
 
@@ -31,11 +31,7 @@ class RegisterForm(FlaskForm):
     submit = SubmitField("注册")
 
     def validate_email(self, field):
-        try:
-            load_user_by_email(field.data)
-        except LoaderUserError:
-            return
-        else:
+        if load_user_by_email(field.data) is not None:
             raise ValidationError("Email already register")
 
 
@@ -53,11 +49,7 @@ def login_page():
 
     form = LoginForm()
     if form.validate_on_submit():
-        try:
-            user = load_user_by_email(form.email.data)
-        except LoaderUserError:
-            user = None
-
+        user = load_user_by_email(form.email.data)
         if user is not None and user.check_passwd(form.passwd.data):
             login_user(user, form.remember.data)
             next_page = request.args.get("next")
@@ -97,11 +89,7 @@ def confirm_page():
         abort(404)
         return
 
-    try:
-        load_user_by_email(token[0])
-    except LoaderUserError:
-        pass
-    else:
+    if load_user_by_email(token[0]) is not None:
         abort(404)
         return
 
@@ -134,7 +122,4 @@ class AuthApp(App):
 
         @self.login_manager.user_loader
         def user_loader(email: str):
-            try:
-                return load_user_by_email(email)
-            except LoaderUserError:
-                return None
+            return load_user_by_email(email)

+ 9 - 7
view/docx.py

@@ -14,7 +14,7 @@ from sql.base import DBBit
 from core.blog import BlogArticle, load_blog_by_id
 from core.user import User
 from core.comment import Comment
-from core.file import load_file_by_name, LoadFileError
+from core.file import load_file_by_name
 
 docx = Blueprint("docx", __name__)
 app: Optional[Flask] = None
@@ -70,6 +70,9 @@ def file_page(file: int, page: int = 1):
 @docx.route('/article/<int:blog_id>')
 def article_page(blog_id: int):
     article = load_blog_by_id(blog_id)
+    if article is None:
+        abort(404)
+        return
     return render_template("docx/article.html",
                            article=article,
                            file_list=article.file,
@@ -87,7 +90,7 @@ def comment_page(blog: int):
             return
 
         context = form.context.data
-        if Comment(blog, auth, context).create_comment():
+        if Comment(None, blog, auth, context).create_comment():
             flash("评论成功")
         else:
             flash("评论失败")
@@ -112,16 +115,15 @@ def create_docx_page():
         file = set(str(form.file.data).replace(" ", "").split(";"))
         file_list = []
         for f in file:
-            try:
-                file_list.append(load_file_by_name(f))
-            except LoadFileError:
-                pass
+            f_ = load_file_by_name(f)
+            if f_ is not None:
+                file_list.append(f_)
 
         context = bleach.linkify(
             bleach.clean(
                 markdown(form.context.data, output_format='html'), tags=allow_tag, strip=True))
 
-        if BlogArticle(None, current_user, title, subtitle, context, file=file_list).write_blog():
+        if BlogArticle(None, current_user, title, subtitle, context, file=file_list).create():
             flash(f"博客 {title} 发表成功")
         else:
             flash(f"博客 {title} 发表失败")