web.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. from flask import Flask, url_for
  2. from flask_login import current_user
  3. import datetime
  4. import math
  5. from conf import Config
  6. from sql.store import get_store_item_list, get_store_item, check_order
  7. from tool.type_ import *
  8. from tool.page import get_page
  9. from core.garbage import GarbageType
  10. from sql import DBBit
  11. from sql.db import DB
  12. from sql.user import find_user_by_name, find_user_by_id
  13. from sql.news import write_news, get_news, get_news_count
  14. from . import web_user
  15. from . import web_goods
  16. class WebsiteBase:
  17. def __init__(self, app: Flask, db: DB):
  18. self._db = db
  19. self._app = app
  20. self._user = current_user # 把参函传递的user移迁为该变量
  21. @property
  22. def db(self):
  23. return self._db
  24. @property
  25. def app(self):
  26. return self._app
  27. @property
  28. def user(self):
  29. return self._user
  30. @property
  31. def rel_user(self):
  32. return self._user._get_current_object()
  33. class AuthWebsite(WebsiteBase):
  34. def load_user_by_name(self, name: uname_t, passwd: passwd_t) -> Optional["web_user.WebUser"]:
  35. user = find_user_by_name(name, passwd, self._db)
  36. if user is None:
  37. return None
  38. return web_user.WebUser(name, uid=user.get_uid())
  39. def load_user_by_id(self, uid: uid_t) -> Optional["web_user.WebUser"]:
  40. user = find_user_by_id(uid, self._db)
  41. if user is None:
  42. return None
  43. name = user.get_name()
  44. return web_user.WebUser(name, uid=uid)
  45. def get_user_garbage_count(self, uid: uid_t):
  46. cur = self._db.search(columns=["count(GarbageID)"],
  47. table="garbage",
  48. where=f"UserID='{uid}'")
  49. if cur is None:
  50. return 0
  51. assert cur.rowcount == 1
  52. return int(cur.fetchone()[0])
  53. def get_user_garbage_list(self, uid: uid_t, limit: int, offset: int = 0):
  54. cur = self._db.search(columns=["UseTime", "Location", "GarbageType", "CheckResult"],
  55. table="garbage",
  56. where=f"UserID='{uid}'",
  57. limit=limit,
  58. offset=offset,
  59. order_by=[("UseTime", "DESC")])
  60. if cur is None or cur.rowcount == 0:
  61. return None
  62. res = []
  63. for i in range(cur.rowcount):
  64. re: Tuple[datetime.datetime, str, bytes, Optional[bytes]] = cur.fetchone()
  65. t = re[0].strftime("%Y-%m-%d %H:%M:%S")
  66. loc = re[1]
  67. type_ = GarbageType.GarbageTypeStrList_ch[int(re[2])]
  68. if re[3] is None:
  69. result = "待确定"
  70. result_class = 'wait'
  71. elif re[3] == DBBit.BIT_1:
  72. result = "投放正确"
  73. result_class = 'pass'
  74. else:
  75. result = "投放错误"
  76. result_class = 'fail'
  77. res.append((t, loc, type_, result, result_class))
  78. return res
  79. def get_user_by_id(self, uid: uid_t):
  80. res = find_user_by_id(uid, self._db)
  81. return res
  82. class StoreWebsite(WebsiteBase):
  83. def get_store_list(self) -> Optional[List]:
  84. return get_store_item_list(self._db)
  85. def get_goods(self, goods_id: int):
  86. goods = get_store_item(goods_id, self._db) # 返回值 ["Name", "Score", "Quantity", "GoodsID"]
  87. if goods is None:
  88. return goods
  89. return web_goods.Goods(*goods)
  90. def check_order(self, order_id: int, uid: uid_t) -> bool:
  91. return check_order(order_id, uid, self._db)
  92. class RankWebsite(WebsiteBase):
  93. def get_rank(self, page: int, order_by: str = "DESC", url: str = "rank_up"):
  94. cur = self._db.search(columns=['count(UserID)'], table='user')
  95. if cur is None:
  96. return None, None
  97. assert cur.rowcount == 1
  98. count = math.ceil(int(cur.fetchone()[0]) / 20)
  99. offset = 20 * (page - 1)
  100. cur = self._db.search(columns=['UserID', 'Name', 'Score', 'Reputation'],
  101. table='user',
  102. where='IsManager=0',
  103. order_by=[('Reputation', order_by), ('Score', order_by), ('UserID', order_by)],
  104. limit=20,
  105. offset=offset) # TODO 封装该函数
  106. if cur is None:
  107. return None, None
  108. res = []
  109. for index in range(cur.rowcount):
  110. i = cur.fetchone()
  111. res.append((f"{offset + index + 1}", i[1], i[0][:Config.tk_show_uid_len], str(i[3]), str(i[2])))
  112. return res, get_page(f"rank.{url}", page, count)
  113. class NewsWebsite(WebsiteBase):
  114. def write_news(self, context: str, uid: uid_t):
  115. return write_news(context, uid, self.db)
  116. def get_news(self, page: int = 1):
  117. count = math.ceil(get_news_count(self.db) / 10)
  118. if page > count:
  119. return False, None, None
  120. res, news_list = get_news(limit=20, offset=((page - 1) * 10), db=self.db)
  121. if not res:
  122. return False, None, None
  123. return True, news_list, get_page("news.index", page, count)
  124. class Website(AuthWebsite, StoreWebsite, RankWebsite, NewsWebsite, WebsiteBase):
  125. def __init__(self, app: Flask, db: DB):
  126. super(Website, self).__init__(app, db)