web.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. from flask import Flask
  2. from flask_login import current_user
  3. import math
  4. from sql.store import get_store_item_list, get_store_item, confirm_order
  5. from tool.typing import *
  6. from tool.page import get_page
  7. from core.garbage import GarbageType
  8. from sql import DBBit
  9. from sql.db import DB
  10. from sql.garbage import count_garbage_by_uid, get_garbage_by_uid
  11. from sql.user import find_user_by_name, find_user_by_id, get_rank_for_user, count_all_user
  12. from sql.news import write_news, get_news, get_news_count, delete_news
  13. from sql.store import check_order, get_goods_from_order, set_goods_quantity, set_goods_score, add_new_goods
  14. from . import web_user
  15. from . import web_goods
  16. class WebsiteBase:
  17. def __init__(self, app: Flask, db: DB):
  18. self._db = db
  19. self._app = app
  20. self._user = current_user # 把参函传递的user移迁为该变量
  21. @property
  22. def db(self):
  23. return self._db
  24. @property
  25. def app(self):
  26. return self._app
  27. @property
  28. def user(self):
  29. return self._user
  30. @property
  31. def rel_user(self):
  32. return self._user._get_current_object()
  33. class AuthWebsite(WebsiteBase):
  34. """ 用户界面 """
  35. def load_user_by_name(self, name: uname_t, passwd: passwd_t) -> Optional["web_user.WebUser"]:
  36. user = find_user_by_name(name, passwd, self._db)
  37. if user is None:
  38. return None
  39. return web_user.WebUser(name, uid=user.get_uid())
  40. def load_user_by_id(self, uid: uid_t) -> Optional["web_user.WebUser"]:
  41. user = find_user_by_id(uid, self._db)
  42. if user is None:
  43. return None
  44. name = user.get_name()
  45. return web_user.WebUser(name, uid=uid)
  46. def get_user_garbage_count(self, uid: uid_t):
  47. return count_garbage_by_uid(uid, self._db, time_limit=False)
  48. def get_user_garbage_list(self, uid: uid_t, limit: int, offset: int = 0):
  49. garbage_list = get_garbage_by_uid(uid,
  50. columns=["UseTime", "Location", "GarbageType", "CheckResult"],
  51. limit=limit,
  52. db=self.db,
  53. offset=offset)
  54. res = []
  55. for i in garbage_list:
  56. t = i[0].strftime("%Y-%m-%d %H:%M:%S")
  57. loc = i[1]
  58. type_ = GarbageType.GarbageTypeStrList_ch[int(i[2])]
  59. if i[3] is None:
  60. result = "待确定"
  61. result_class = 'wait'
  62. elif i[3] == DBBit.BIT_1:
  63. result = "投放正确"
  64. result_class = 'pass'
  65. else:
  66. result = "投放错误"
  67. result_class = 'fail'
  68. res.append((t, loc, type_, result, result_class))
  69. return res
  70. def get_user_by_id(self, uid: uid_t):
  71. res = find_user_by_id(uid, self._db)
  72. return res
  73. class StoreWebsite(WebsiteBase):
  74. """ 积分商城 """
  75. def get_store_list(self) -> Optional[List]:
  76. return get_store_item_list(self._db)
  77. def get_goods(self, goods_id: int):
  78. goods = get_store_item(goods_id, self._db) # 返回值 ["Name", "Score", "Quantity", "GoodsID"]
  79. if goods is None:
  80. return goods
  81. # 更快的写法应该是 web_goods.Goods(*goods), 但目前的写法可读性更佳
  82. return web_goods.Goods(name=goods[0], score=goods[1], quantity=goods[2], goods_id=goods[3])
  83. def check_order(self, order, uid) -> Tuple[Optional[list], Optional[str]]:
  84. if not check_order(order, uid, self._db):
  85. return None, None
  86. goods = get_goods_from_order(order, self._db)
  87. res = []
  88. for i in goods:
  89. res.append(f"#{i} {i[0]} x {i[1]}")
  90. return res, uid
  91. def confirm_order(self, order_id: int, uid: uid_t) -> bool:
  92. return confirm_order(order_id, uid, self._db)
  93. def set_goods_quantity(self, quantity: int, goods_id: int):
  94. return set_goods_quantity(quantity, goods_id, self._db)
  95. def set_goods_score(self, score: score_t, goods_id: int):
  96. return set_goods_score(score, goods_id, self._db)
  97. def add_new_goods(self, name: str, score: score_t, quantity: int):
  98. return add_new_goods(name, score, quantity, self._db)
  99. class RankWebsite(WebsiteBase):
  100. """ 排行榜 """
  101. def get_rank(self, page: int, order_by: str = "DESC", url: str = "rank_up"):
  102. count = math.ceil(count_all_user(self._db) / 20)
  103. offset = 20 * (page - 1)
  104. return get_rank_for_user(self.db, 20, offset, order_by), get_page(f"rank.{url}", page, count)
  105. class NewsWebsite(WebsiteBase):
  106. """ 博客 """
  107. def write_news(self, context: str, uid: uid_t):
  108. return write_news(context, uid, self.db)
  109. def get_news(self, page: int = 1):
  110. count = math.ceil(get_news_count(self.db) / 10)
  111. if page > count:
  112. return False, None, None
  113. res, news_list = get_news(limit=20, offset=((page - 1) * 10), db=self.db)
  114. if not res:
  115. return False, None, None
  116. return True, news_list, get_page("news.index", page, count)
  117. def delete_news(self, context_id: str):
  118. return delete_news(context_id, self._db)
  119. class DataWebsite(WebsiteBase):
  120. """
  121. 数据分析
  122. """
  123. def count_by_days(self):
  124. cur = self._db.search(columns=["GarbageType", "DATE_FORMAT(UseTime,'%H') AS days", "count(GarbageID) AS count"],
  125. table="garbage",
  126. group_by=["GarbageType", "days"],
  127. order_by=[("GarbageType", "ASC"), ("days", "ASC")],
  128. where="UseTime IS NOT NULL")
  129. if cur is None:
  130. return None
  131. return cur.fetchall()
  132. def count_by_times(self, days):
  133. cur = self._db.search(columns=["GarbageType", "days", "count(GarbageID) AS count"],
  134. table=f"garbage_{days}d",
  135. group_by=["GarbageType", "days"],
  136. order_by=[("GarbageType", "ASC"), ("days", "ASC")],
  137. where="UseTime IS NOT NULL")
  138. if cur is None:
  139. return None
  140. return cur.fetchall()
  141. def count_passing_rate(self):
  142. cur = self._db.search(columns=["GarbageType",
  143. f"get_avg(count(GarbageID), "
  144. f"(SELECT count(g.GarbageID) "
  145. f"FROM garbage AS g WHERE g.CheckResult is not null)) AS count"],
  146. table="garbage",
  147. where=["CheckResult is not null", "CheckResult=1"],
  148. group_by=["GarbageType"],
  149. order_by=[("GarbageType", "ASC")])
  150. if cur is None or cur.rowcount == 0:
  151. return None
  152. return cur.fetchall()
  153. class Website(AuthWebsite, StoreWebsite, RankWebsite, NewsWebsite, DataWebsite, WebsiteBase):
  154. def __init__(self, app: Flask, db: DB):
  155. super(Website, self).__init__(app, db)