Kaynağa Gözat

feat: 调整python-web的导入机制

SongZihuan 3 yıl önce
ebeveyn
işleme
5ea5ab56a3
13 değiştirilmiş dosya ile 293 ekleme ve 306 silme
  1. 0 2
      app/auth/__init__.py
  2. 9 11
      app/auth/views.py
  3. 0 156
      app/auth/web.py
  4. 0 1
      app/index/__init__.py
  5. 0 2
      app/rank/__init__.py
  6. 6 11
      app/rank/views.py
  7. 0 31
      app/rank/web.py
  8. 10 17
      app/store/views.py
  9. 0 72
      app/store/web.py
  10. 11 3
      app/views.py
  11. 116 0
      app/web.py
  12. 45 0
      app/web_goods.py
  13. 96 0
      app/web_user.py

+ 0 - 2
app/auth/__init__.py

@@ -1,2 +0,0 @@
-from . import views as auth_views
-from . import web as auth_web

+ 9 - 11
app/auth/views.py

@@ -9,11 +9,10 @@ import qrcode
 from io import BytesIO
 
 from tool.type_ import *
-from sql.db import DB
-from .web import AuthWebsite, WebUser
+from app import views
+from app import web_user
 
 auth = Blueprint("auth", __name__)
-auth_website: Optional[AuthWebsite] = None
 app: Optional[Flask] = None
 
 login_manager = LoginManager()
@@ -32,7 +31,7 @@ def login():
     if form.validate_on_submit():
         name = form.name.data
         passwd = form.passwd.data
-        check = auth_website.load_user_by_name(name, passwd)
+        check = views.website.load_user_by_name(name, passwd)
 
         if check is not None:
             login_user(user=check, remember=True)
@@ -47,7 +46,7 @@ def login():
 
 @login_manager.user_loader
 def load_user(user: uid_t):
-    return auth_website.load_user_by_id(user)
+    return views.website.load_user_by_id(user)
 
 
 @auth.route("/logout")
@@ -61,7 +60,7 @@ def logout():
 @auth.route("/about")
 @login_required
 def about():
-    user: WebUser = current_user
+    user: web_user.WebUser = current_user
     user.update_info()
     return render_template("auth/about.html", order=user.order, order_list=user.get_order_goods_list())
 
@@ -69,7 +68,7 @@ def about():
 @auth.route("/order")
 @login_required
 def order_qr():
-    user: WebUser = current_user
+    user: web_user.WebUser = current_user
     user.update_info()
     order, user = user.get_qr_code()
     image = qrcode.make(data=url_for("store.check", user=user, order=order, _external=True))
@@ -80,10 +79,9 @@ def order_qr():
     return render_template("auth/qr.html", qr_base64=base64_str, order=order)
 
 
-def creat_auth_website(app_: Flask, db: DB):
-    global auth_website, app
-    if auth_website is None:
+def creat_auth_website(app_: Flask):
+    global app
+    if app is None:
         app = app_
-        auth_website = AuthWebsite(app, db)
         login_manager.init_app(app)
         app.register_blueprint(auth, url_prefix="/auth")

+ 0 - 156
app/auth/web.py

@@ -1,156 +0,0 @@
-from flask import Flask
-import datetime
-
-from conf import Config
-
-from tool.login import create_uid
-from tool.type_ import *
-
-from core.user import User
-from core.garbage import GarbageType
-
-from sql import DBBit
-from sql.db import DB
-from sql.user import find_user_by_name, find_user_by_id
-from . import views
-
-
-class WebUser:
-    def __init__(self, name: uname_t, passwd: passwd_t = None, uid: uid_t = None):
-        self._name = name
-        if uid is None:
-            self._uid = create_uid(name, passwd)
-        else:
-            self._uid = uid
-        self.score = "0"
-        self.reputation = "0"
-        self.rubbish = "0"
-        self.group = "普通成员"
-        self.is_anonymous = False
-        self.update_info()
-
-    def update_info(self):
-        user = views.auth_website.get_user_by_id(self._uid)
-        if user is None:
-            return
-        if user.is_manager():
-            self.group = "管理员"
-            self.score = "0"
-            self.reputation = "0"
-            self.rubbish = "0"
-        else:
-            self.group = "普通成员"
-            res = user.get_info()
-            self.score = res.get('score', '0')
-            self.reputation = res.get('reputation', '0')
-            self.rubbish = res.get('rubbish', '0')
-
-    def is_manager(self):
-        return self.group == "管理员"
-
-    def get_qr_code(self):
-        return self.order, self._uid
-
-    @property
-    def name(self):
-        return self._name
-
-    @property
-    def uid(self):
-        return self._uid[:Config.tk_show_uid_len]
-
-    @property
-    def is_active(self):
-        return views.auth_website.load_user_by_id(self._uid) is not None
-
-    @property
-    def is_authenticated(self):
-        return views.auth_website.load_user_by_id(self._uid) is not None
-
-    @property
-    def order(self) -> str:
-        cur = views.auth_website.db.search(columns=["OrderID"],
-                                           table="orders",
-                                           where=f"UserID = '{self._uid}' and status=0")
-        if cur is None or cur.rowcount == 0:
-            return "None"
-        assert cur.rowcount == 1
-        return str(cur.fetchone()[0])
-
-    def get_order_goods_list(self):
-        order = self.order
-        if order is None:
-            return []
-        cur = views.auth_website.db.search(columns=["Name", "Quantity"],
-                                           table="order_goods_view",
-                                           where=f"OrderID = '{order}'")
-        if cur is None:
-            return []
-
-        res = []
-        for i in range(cur.rowcount):
-            re = cur.fetchone()
-            res.append(f"#{i} {re[0]} x {re[1]}")
-        return res
-
-    def get_id(self):
-        return self._uid
-
-    def get_garbage_list(self):
-        return views.auth_website.get_user_garbage_list(self._uid, limit=20)
-
-    def get_user(self) -> User:
-        res = views.auth_website.get_user_by_id(self._uid)
-        return res
-
-
-class AuthWebsite:
-    def __init__(self, app: Flask, db: DB):
-        self._app = app
-        self._db = db
-
-    @property
-    def db(self):
-        return self._db
-
-    def load_user_by_name(self, name: uname_t, passwd: passwd_t) -> Optional[WebUser]:
-        user = find_user_by_name(name, passwd, self._db)
-        if user is None:
-            return None
-        return WebUser(name, uid=user.get_uid())
-
-    def load_user_by_id(self, uid: uid_t) -> Optional[WebUser]:
-        user = find_user_by_id(uid, self._db)
-        if user is None:
-            return None
-        name = user.get_name()
-        return WebUser(name, uid=uid)
-
-    def get_user_garbage_list(self, uid: uid_t, limit: int):
-        cur = self._db.search(columns=["UseTime", "Location", "GarbageType", "CheckResult"],
-                              table="garbage",
-                              where=f"UserID='{uid}'",
-                              limit=limit)
-        if cur is None or cur.rowcount == 0:
-            return None
-        res = []
-        for i in range(cur.rowcount):
-            re: Tuple[datetime.datetime, str, bytes, Optional[bytes]] = cur.fetchone()
-            t = re[0].strftime("%Y-%m-%d %H:%M:%S")
-            loc = re[1]
-            type_ = GarbageType.GarbageTypeStrList_ch[int(re[2])]
-            if re[3] is None:
-                result = "待确定"
-                result_class = 'wait'
-            elif re[3] == DBBit.BIT_1:
-                result = "投放正确"
-                result_class = 'pass'
-            else:
-                result = "投放错误"
-                result_class = 'fail'
-            res.append((t, loc, type_, result, result_class))
-        return res
-
-    def get_user_by_id(self, uid: uid_t):
-        res = find_user_by_id(uid, self._db)
-        return res

+ 0 - 1
app/index/__init__.py

@@ -1 +0,0 @@
-from . import views as hello_views

+ 0 - 2
app/rank/__init__.py

@@ -1,2 +0,0 @@
-from . import views as rank_views
-from . import web as rank_web

+ 6 - 11
app/rank/views.py

@@ -1,30 +1,25 @@
 from flask import render_template, Blueprint, Flask
-from .web import RankWebsite
-from sql.db import DB
 from tool.type_ import Optional
+from app import views
 
 rank = Blueprint("rank", __name__)
-rank_website: Optional[RankWebsite] = None
 app: Optional[Flask] = None
 
 
 @rank.route('/up/<int:page>')
 def rank_up(page: int):
-    global rank_website
-    data = rank_website.get_rank(page, "DESC")
+    data = views.website.get_rank(page, "DESC")
     return render_template("rank/ranking.html", rank_info=data, ranking_name="高分榜")
 
 
 @rank.route('/down/<int:page>')
 def rank_down(page: int):
-    global rank_website
-    data = rank_website.get_rank(page, "ASC")
+    data = views.website.get_rank(page, "ASC")
     return render_template("rank/ranking.html", rank_info=data, ranking_name="警示榜")
 
 
-def creat_ranking_website(app_: Flask, db: DB):
-    global rank_website, app
-    if rank_website is None:
+def creat_ranking_website(app_: Flask):
+    global app
+    if app is None:
         app = app_
         app.register_blueprint(rank, url_prefix="/rank")
-        rank_website = RankWebsite(db, app_)

+ 0 - 31
app/rank/web.py

@@ -1,31 +0,0 @@
-from flask import Flask
-
-from conf import Config
-from sql.db import DB
-from tool.type_ import Optional, List, Tuple
-
-
-class RankWebsite:
-    def __init__(self, db: DB, app: Flask):
-        self._db = db
-        self.app = app
-
-    @property
-    def db(self):
-        return self._db
-
-    def get_rank(self, page: int, order_by: str = "DESC") -> Optional[List[Tuple]]:
-        offset = 20 * (page - 1)
-        cur = self._db.search(columns=['UserID', 'Name', 'Score', 'Reputation'],
-                              table='user',
-                              where='IsManager=0',
-                              order_by=[('Reputation', order_by), ('Score', order_by), ('UserID', order_by)],
-                              limit=20,
-                              offset=offset)
-        if cur is None:
-            return None
-        res = []
-        for index in range(cur.rowcount):
-            i = cur.fetchone()
-            res.append((f"{offset + index + 1}", i[1], i[0][:Config.tk_show_uid_len], str(i[3]), str(i[2])))
-        return res

+ 10 - 17
app/store/views.py

@@ -5,15 +5,12 @@ from flask_wtf import FlaskForm
 from flask_login import login_required
 import functools
 
-from sql.db import DB
-
 from tool.type_ import Optional
-from . import web
-from ..auth.web import WebUser
+from app import views
+from app import web_user
 
 store = Blueprint("store", __name__)
 app: Optional[Flask] = None
-store_web: Optional[web.StoreWebsite] = None
 
 
 class BuyForm(FlaskForm):
@@ -31,7 +28,7 @@ def buy(goods_id: int):
         except (TypeError, ValueError):
             flash("请输入正确的数量")
         else:
-            goods = store_web.get_goods(goods_id)
+            goods = views.website.get_goods(goods_id)
             if goods is None:
                 flash("商品错误")
             res, order_id = goods.buy_for_user(quantity, current_user)
@@ -53,8 +50,8 @@ def buy(goods_id: int):
 @login_required
 def index():
     form = BuyForm()
-    store_list = store_web.get_store_list()
-    user: WebUser = current_user
+    store_list = views.website.get_store_list()
+    user: web_user.WebUser = current_user
     user.update_info()
     return render_template("store/store.html", store_list=store_list, store_form=form)
 
@@ -65,7 +62,6 @@ def manager_required(f):
         if current_user.is_anonymous or not current_user.is_authenticated or not current_user.is_manager():
             abort(403)
         return f(*args, **kwargs)
-
     return func
 
 
@@ -73,17 +69,14 @@ def manager_required(f):
 @login_required
 @manager_required
 def check(user, order):
-    print(f"wwww{order=}")
-    if not store_web.check_order(order, user):
+    if not views.website.check_order(order, user):
         abort(404)
-    else:
-        flash(f"订单: {order} 处理成功")
+    flash(f"订单: {order} 处理成功")
     return redirect(url_for("hello.index"))
 
 
-def creat_store_website(app_: Flask, db: DB):
-    global store_web, app
-    if store_web is None:
+def creat_store_website(app_: Flask):
+    global app
+    if app is None:
         app = app_
         app.register_blueprint(store, url_prefix="/store")
-        store_web = web.StoreWebsite(db, app)

+ 0 - 72
app/store/web.py

@@ -1,72 +0,0 @@
-from flask import Flask
-
-from sql.db import DB
-from sql.user import update_user
-from sql.store import get_store_item_list, get_store_item, update_goods, get_order_id, write_goods, check_order
-from tool.type_ import *
-
-from core.user import User, UserNotSupportError
-
-from . import views
-from ..auth import web as auth_web
-from ..auth import views as auth_views
-
-
-class Goods:
-    def __init__(self, name: str, score: score_t, quantity: int, goods_id: int):
-        self._name = name
-        self._quantity = quantity
-        self._score = score
-        self._id = goods_id
-
-    def buy_for_user(self, quantity: int, web_user: auth_web.WebUser) -> Tuple[int, int]:
-        assert quantity > 0
-        user: User = web_user.get_user()
-        if user is None:
-            return -4, 0
-
-        score_ = quantity * self._score
-        if quantity > self._quantity or quantity == 0:
-            return -2, 0
-        try:
-            score = user.get_score()
-        except UserNotSupportError:
-            return -1, 0
-        if score < score_:
-            return -3, 0
-
-        user.add_score(-score_)
-        update_user(user, auth_views.auth_website.db)
-
-        self._quantity -= quantity
-        update_goods(self._id, self._quantity, views.store_web.db)
-
-        order_id = get_order_id(user.get_uid(), views.store_web.db)
-        if order_id is None:
-            return -4, 0
-
-        if not write_goods(self._id, quantity, order_id, views.store_web.db):
-            return -4, 0
-        return 0, order_id
-
-
-class StoreWebsite:
-    def __init__(self, db: DB, app: Flask):
-        self._db = db
-        self.app = app
-
-    @property
-    def db(self):
-        return self._db
-
-    def get_store_list(self) -> Optional[List]:
-        return get_store_item_list(self._db)
-
-    def get_goods(self, goods_id: int):
-        goods = get_store_item(goods_id, self._db)  # 返回值 ["Name", "Score", "Quantity", "GoodsID"]
-        if goods is None:
-            return goods
-        return Goods(*goods)
-
-    def check_order(self, order_id: int, uid: uid_t) -> bool:
-        return check_order(order_id, uid, self._db)

+ 11 - 3
app/views.py

@@ -4,11 +4,19 @@ from app.rank.views import creat_ranking_website
 from app.auth.views import creat_auth_website
 from app.store.views import creat_store_website
 
+from tool.type_ import *
 from sql.db import DB
+from . import web
+
+website: "Optional[web.Website]" = None
 
 
 def register(app: Flask, db: DB):
+    global website
+    if website is None:
+        website = web.Website(app, db)
+
     creat_hello_website(app)
-    creat_ranking_website(app, db)
-    creat_auth_website(app, db)
-    creat_store_website(app, db)
+    creat_ranking_website(app)
+    creat_auth_website(app)
+    creat_store_website(app)

+ 116 - 0
app/web.py

@@ -0,0 +1,116 @@
+from sql.store import get_store_item_list, get_store_item, check_order
+
+from flask import Flask
+import datetime
+
+from conf import Config
+
+from tool.type_ import *
+
+from core.garbage import GarbageType
+
+from sql import DBBit
+from sql.db import DB
+from sql.user import find_user_by_name, find_user_by_id
+
+from . import web_user
+from . import web_goods
+
+
+class WebsiteBase:
+    def __init__(self, app: Flask, db: DB):
+        self._db = db
+        self._app = app
+
+
+class AuthWebsite(WebsiteBase):
+    @property
+    def db(self):
+        return self._db
+
+    def load_user_by_name(self, name: uname_t, passwd: passwd_t) -> Optional["web_user.WebUser"]:
+        user = find_user_by_name(name, passwd, self._db)
+        if user is None:
+            return None
+        return web_user.WebUser(name, uid=user.get_uid())
+
+    def load_user_by_id(self, uid: uid_t) -> Optional["web_user.WebUser"]:
+        user = find_user_by_id(uid, self._db)
+        if user is None:
+            return None
+        name = user.get_name()
+        return web_user.WebUser(name, uid=uid)
+
+    def get_user_garbage_list(self, uid: uid_t, limit: int):
+        cur = self._db.search(columns=["UseTime", "Location", "GarbageType", "CheckResult"],
+                              table="garbage",
+                              where=f"UserID='{uid}'",
+                              limit=limit)
+        if cur is None or cur.rowcount == 0:
+            return None
+        res = []
+        for i in range(cur.rowcount):
+            re: Tuple[datetime.datetime, str, bytes, Optional[bytes]] = cur.fetchone()
+            t = re[0].strftime("%Y-%m-%d %H:%M:%S")
+            loc = re[1]
+            type_ = GarbageType.GarbageTypeStrList_ch[int(re[2])]
+            if re[3] is None:
+                result = "待确定"
+                result_class = 'wait'
+            elif re[3] == DBBit.BIT_1:
+                result = "投放正确"
+                result_class = 'pass'
+            else:
+                result = "投放错误"
+                result_class = 'fail'
+            res.append((t, loc, type_, result, result_class))
+        return res
+
+    def get_user_by_id(self, uid: uid_t):
+        res = find_user_by_id(uid, self._db)
+        return res
+
+
+class StoreWebsite(WebsiteBase):
+    @property
+    def db(self):
+        return self._db
+
+    def get_store_list(self) -> Optional[List]:
+        return get_store_item_list(self._db)
+
+    def get_goods(self, goods_id: int):
+        goods = get_store_item(goods_id, self._db)  # 返回值 ["Name", "Score", "Quantity", "GoodsID"]
+        if goods is None:
+            return goods
+        return web_goods.Goods(*goods)
+
+    def check_order(self, order_id: int, uid: uid_t) -> bool:
+        return check_order(order_id, uid, self._db)
+
+
+class RankWebsite(WebsiteBase):
+    @property
+    def db(self):
+        return self._db
+
+    def get_rank(self, page: int, order_by: str = "DESC") -> Optional[List[Tuple]]:
+        offset = 20 * (page - 1)
+        cur = self._db.search(columns=['UserID', 'Name', 'Score', 'Reputation'],
+                              table='user',
+                              where='IsManager=0',
+                              order_by=[('Reputation', order_by), ('Score', order_by), ('UserID', order_by)],
+                              limit=20,
+                              offset=offset)
+        if cur is None:
+            return None
+        res = []
+        for index in range(cur.rowcount):
+            i = cur.fetchone()
+            res.append((f"{offset + index + 1}", i[1], i[0][:Config.tk_show_uid_len], str(i[3]), str(i[2])))
+        return res
+
+
+class Website(AuthWebsite, StoreWebsite, RankWebsite, WebsiteBase):
+    def __init__(self, app: Flask, db: DB):
+        super(Website, self).__init__(app, db)

+ 45 - 0
app/web_goods.py

@@ -0,0 +1,45 @@
+from sql.user import update_user
+from sql.store import update_goods, get_order_id, write_goods
+
+from tool.type_ import *
+from core.user import User, UserNotSupportError
+from . import views
+from . import web_user
+
+
+class Goods:
+    def __init__(self, name: str, score: score_t, quantity: int, goods_id: int):
+        self._name = name
+        self._quantity = quantity
+        self._score = score
+        self._id = goods_id
+
+    def buy_for_user(self, quantity: int, user_: web_user.WebUser) -> Tuple[int, int]:
+        assert quantity > 0
+        user: User = user_.get_user()
+        if user is None:
+            return -4, 0
+
+        score_ = quantity * self._score
+        if quantity > self._quantity or quantity <= 0:
+            return -2, 0
+        try:
+            score = user.get_score()
+        except UserNotSupportError:
+            return -1, 0
+        if score < score_:
+            return -3, 0
+
+        user.add_score(-score_)
+        update_user(user, views.website.db)
+
+        self._quantity -= quantity
+        update_goods(self._id, self._quantity, views.website.db)
+
+        order_id = get_order_id(user.get_uid(), views.website.db)
+        if order_id is None:
+            return -4, 0
+
+        if not write_goods(self._id, quantity, order_id, views.website.db):
+            return -4, 0
+        return 0, order_id

+ 96 - 0
app/web_user.py

@@ -0,0 +1,96 @@
+from conf import Config
+
+from tool.login import create_uid
+from tool.type_ import *
+
+from core.user import User
+from . import views
+
+
+class WebUser:
+    def __init__(self, name: uname_t, passwd: passwd_t = None, uid: uid_t = None):
+        self._name = name
+        if uid is None:
+            self._uid = create_uid(name, passwd)
+        else:
+            self._uid = uid
+        self.score = "0"
+        self.reputation = "0"
+        self.rubbish = "0"
+        self.group = "普通成员"
+        self.is_anonymous = False
+        self.update_info()
+
+    def update_info(self):
+        user = views.website.get_user_by_id(self._uid)
+        if user is None:
+            return
+        if user.is_manager():
+            self.group = "管理员"
+            self.score = "0"
+            self.reputation = "0"
+            self.rubbish = "0"
+        else:
+            self.group = "普通成员"
+            res = user.get_info()
+            self.score = res.get('score', '0')
+            self.reputation = res.get('reputation', '0')
+            self.rubbish = res.get('rubbish', '0')
+
+    def is_manager(self):
+        return self.group == "管理员"
+
+    def get_qr_code(self):
+        return self.order, self._uid
+
+    @property
+    def name(self):
+        return self._name
+
+    @property
+    def uid(self):
+        return self._uid[:Config.tk_show_uid_len]
+
+    @property
+    def is_active(self):
+        return views.website.load_user_by_id(self._uid) is not None
+
+    @property
+    def is_authenticated(self):
+        return views.website.load_user_by_id(self._uid) is not None
+
+    @property
+    def order(self) -> str:
+        cur = views.website.db.search(columns=["OrderID"],
+                                      table="orders",
+                                      where=f"UserID = '{self._uid}' and status=0")
+        if cur is None or cur.rowcount == 0:
+            return "None"
+        assert cur.rowcount == 1
+        return str(cur.fetchone()[0])
+
+    def get_order_goods_list(self):
+        order = self.order
+        if order is None:
+            return []
+        cur = views.website.db.search(columns=["Name", "Quantity"],
+                                      table="order_goods_view",
+                                      where=f"OrderID = '{order}'")
+        if cur is None:
+            return []
+
+        res = []
+        for i in range(cur.rowcount):
+            re = cur.fetchone()
+            res.append(f"#{i} {re[0]} x {re[1]}")
+        return res
+
+    def get_id(self):
+        return self._uid
+
+    def get_garbage_list(self):
+        return views.website.get_user_garbage_list(self._uid, limit=20)
+
+    def get_user(self) -> User:
+        res = views.website.get_user_by_id(self._uid)
+        return res