|
@@ -1,21 +1,20 @@
|
|
|
from flask import Flask
|
|
|
from flask_login import current_user
|
|
|
-import datetime
|
|
|
import math
|
|
|
|
|
|
-from conf import Config
|
|
|
-
|
|
|
from sql.store import get_store_item_list, get_store_item, confirm_order
|
|
|
|
|
|
-from tool.type_ import *
|
|
|
+from tool.typing import *
|
|
|
from tool.page import get_page
|
|
|
|
|
|
from core.garbage import GarbageType
|
|
|
|
|
|
from sql import DBBit
|
|
|
from sql.db import DB
|
|
|
-from sql.user import find_user_by_name, find_user_by_id
|
|
|
+from sql.garbage import count_garbage_by_uid, get_garbage_by_uid
|
|
|
+from sql.user import find_user_by_name, find_user_by_id, get_rank_for_user, count_all_user
|
|
|
from sql.news import write_news, get_news, get_news_count
|
|
|
+from sql.store import check_order, get_goods_from_order
|
|
|
|
|
|
from . import web_user
|
|
|
from . import web_goods
|
|
@@ -59,33 +58,23 @@ class AuthWebsite(WebsiteBase):
|
|
|
return web_user.WebUser(name, uid=uid)
|
|
|
|
|
|
def get_user_garbage_count(self, uid: uid_t):
|
|
|
- cur = self._db.search(columns=["count(GarbageID)"],
|
|
|
- table="garbage",
|
|
|
- where=f"UserID='{uid}'")
|
|
|
- if cur is None:
|
|
|
- return 0
|
|
|
- assert cur.rowcount == 1
|
|
|
- return int(cur.fetchone()[0])
|
|
|
+ return count_garbage_by_uid(uid, self._db, time_limit=False)
|
|
|
|
|
|
def get_user_garbage_list(self, uid: uid_t, limit: int, offset: int = 0):
|
|
|
- cur = self._db.search(columns=["UseTime", "Location", "GarbageType", "CheckResult"],
|
|
|
- table="garbage",
|
|
|
- where=f"UserID='{uid}'",
|
|
|
- limit=limit,
|
|
|
- offset=offset,
|
|
|
- order_by=[("UseTime", "DESC")])
|
|
|
- if cur is None or cur.rowcount == 0:
|
|
|
- return None
|
|
|
+ garbage_list = get_garbage_by_uid(uid,
|
|
|
+ columns=["UseTime", "Location", "GarbageType", "CheckResult"],
|
|
|
+ limit=limit,
|
|
|
+ db=self.db,
|
|
|
+ offset=offset)
|
|
|
res = []
|
|
|
- for i in range(cur.rowcount):
|
|
|
- re: Tuple[datetime.datetime, str, bytes, Optional[bytes]] = cur.fetchone()
|
|
|
- t = re[0].strftime("%Y-%m-%d %H:%M:%S")
|
|
|
- loc = re[1]
|
|
|
- type_ = GarbageType.GarbageTypeStrList_ch[int(re[2])]
|
|
|
- if re[3] is None:
|
|
|
+ for i in garbage_list:
|
|
|
+ t = i[0].strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+ loc = i[1]
|
|
|
+ type_ = GarbageType.GarbageTypeStrList_ch[int(i[2])]
|
|
|
+ if i[3] is None:
|
|
|
result = "待确定"
|
|
|
result_class = 'wait'
|
|
|
- elif re[3] == DBBit.BIT_1:
|
|
|
+ elif i[3] == DBBit.BIT_1:
|
|
|
result = "投放正确"
|
|
|
result_class = 'pass'
|
|
|
else:
|
|
@@ -107,26 +96,16 @@ class StoreWebsite(WebsiteBase):
|
|
|
goods = get_store_item(goods_id, self._db) # 返回值 ["Name", "Score", "Quantity", "GoodsID"]
|
|
|
if goods is None:
|
|
|
return goods
|
|
|
- return web_goods.Goods(*goods)
|
|
|
+ # 更快的写法应该是 web_goods.Goods(*goods), 但目前的写法可读性更佳
|
|
|
+ return web_goods.Goods(name=goods[0], score=goods[1], quantity=goods[2], goods_id=goods[3])
|
|
|
|
|
|
def check_order(self, order, uid) -> Tuple[Optional[list], Optional[str]]:
|
|
|
- cur = self._db.search(columns=["UserID"],
|
|
|
- table="orders",
|
|
|
- where=[f"OrderID='{order}'", f"UserID='{uid}'"])
|
|
|
- if cur is None or cur.rowcount != 1:
|
|
|
- return None, None
|
|
|
- uid = cur.fetchone()[0]
|
|
|
-
|
|
|
- cur = self._db.search(columns=["Name", "Quantity"],
|
|
|
- table="order_goods_view",
|
|
|
- where=f"OrderID = '{order}'")
|
|
|
- if cur is None:
|
|
|
+ if not check_order(order, uid, self._db):
|
|
|
return None, None
|
|
|
-
|
|
|
+ goods = get_goods_from_order(order, self._db)
|
|
|
res = []
|
|
|
- for i in range(cur.rowcount):
|
|
|
- re = cur.fetchone()
|
|
|
- res.append(f"#{i} {re[0]} x {re[1]}")
|
|
|
+ for i in goods:
|
|
|
+ res.append(f"#{i} {i[0]} x {i[1]}")
|
|
|
return res, uid
|
|
|
|
|
|
def confirm_order(self, order_id: int, uid: uid_t) -> bool:
|
|
@@ -135,26 +114,9 @@ class StoreWebsite(WebsiteBase):
|
|
|
|
|
|
class RankWebsite(WebsiteBase):
|
|
|
def get_rank(self, page: int, order_by: str = "DESC", url: str = "rank_up"):
|
|
|
- cur = self._db.search(columns=['count(UserID)'], table='user')
|
|
|
- if cur is None:
|
|
|
- return None, None
|
|
|
- assert cur.rowcount == 1
|
|
|
- count = math.ceil(int(cur.fetchone()[0]) / 20)
|
|
|
-
|
|
|
+ count = math.ceil(count_all_user(self._db) / 20)
|
|
|
offset = 20 * (page - 1)
|
|
|
- cur = self._db.search(columns=['UserID', 'Name', 'Score', 'Reputation'],
|
|
|
- table='user',
|
|
|
- where='IsManager=0',
|
|
|
- order_by=[('Reputation', order_by), ('Score', order_by), ('UserID', order_by)],
|
|
|
- limit=20,
|
|
|
- offset=offset) # TODO 封装该函数
|
|
|
- if cur is None:
|
|
|
- return None, None
|
|
|
- res = []
|
|
|
- for index in range(cur.rowcount):
|
|
|
- i = cur.fetchone()
|
|
|
- res.append((f"{offset + index + 1}", i[1], i[0][:Config.show_uid_len], str(i[3]), str(i[2])))
|
|
|
- return res, get_page(f"rank.{url}", page, count)
|
|
|
+ return get_rank_for_user(self.db, 20, offset, order_by), get_page(f"rank.{url}", page, count)
|
|
|
|
|
|
|
|
|
class NewsWebsite(WebsiteBase):
|
|
@@ -165,11 +127,9 @@ class NewsWebsite(WebsiteBase):
|
|
|
count = math.ceil(get_news_count(self.db) / 10)
|
|
|
if page > count:
|
|
|
return False, None, None
|
|
|
-
|
|
|
res, news_list = get_news(limit=20, offset=((page - 1) * 10), db=self.db)
|
|
|
if not res:
|
|
|
return False, None, None
|
|
|
-
|
|
|
return True, news_list, get_page("news.index", page, count)
|
|
|
|
|
|
|