web.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. from sql.store import get_store_item_list, get_store_item, check_order
  2. from flask import Flask
  3. import datetime
  4. from conf import Config
  5. from tool.type_ import *
  6. from core.garbage import GarbageType
  7. from sql import DBBit
  8. from sql.db import DB
  9. from sql.user import find_user_by_name, find_user_by_id
  10. from . import web_user
  11. from . import web_goods
  12. class WebsiteBase:
  13. def __init__(self, app: Flask, db: DB):
  14. self._db = db
  15. self._app = app
  16. class AuthWebsite(WebsiteBase):
  17. @property
  18. def db(self):
  19. return self._db
  20. def load_user_by_name(self, name: uname_t, passwd: passwd_t) -> Optional["web_user.WebUser"]:
  21. user = find_user_by_name(name, passwd, self._db)
  22. if user is None:
  23. return None
  24. return web_user.WebUser(name, uid=user.get_uid())
  25. def load_user_by_id(self, uid: uid_t) -> Optional["web_user.WebUser"]:
  26. user = find_user_by_id(uid, self._db)
  27. if user is None:
  28. return None
  29. name = user.get_name()
  30. return web_user.WebUser(name, uid=uid)
  31. def get_user_garbage_list(self, uid: uid_t, limit: int):
  32. cur = self._db.search(columns=["UseTime", "Location", "GarbageType", "CheckResult"],
  33. table="garbage",
  34. where=f"UserID='{uid}'",
  35. limit=limit)
  36. if cur is None or cur.rowcount == 0:
  37. return None
  38. res = []
  39. for i in range(cur.rowcount):
  40. re: Tuple[datetime.datetime, str, bytes, Optional[bytes]] = cur.fetchone()
  41. t = re[0].strftime("%Y-%m-%d %H:%M:%S")
  42. loc = re[1]
  43. type_ = GarbageType.GarbageTypeStrList_ch[int(re[2])]
  44. if re[3] is None:
  45. result = "待确定"
  46. result_class = 'wait'
  47. elif re[3] == DBBit.BIT_1:
  48. result = "投放正确"
  49. result_class = 'pass'
  50. else:
  51. result = "投放错误"
  52. result_class = 'fail'
  53. res.append((t, loc, type_, result, result_class))
  54. return res
  55. def get_user_by_id(self, uid: uid_t):
  56. res = find_user_by_id(uid, self._db)
  57. return res
  58. class StoreWebsite(WebsiteBase):
  59. @property
  60. def db(self):
  61. return self._db
  62. def get_store_list(self) -> Optional[List]:
  63. return get_store_item_list(self._db)
  64. def get_goods(self, goods_id: int):
  65. goods = get_store_item(goods_id, self._db) # 返回值 ["Name", "Score", "Quantity", "GoodsID"]
  66. if goods is None:
  67. return goods
  68. return web_goods.Goods(*goods)
  69. def check_order(self, order_id: int, uid: uid_t) -> bool:
  70. return check_order(order_id, uid, self._db)
  71. class RankWebsite(WebsiteBase):
  72. @property
  73. def db(self):
  74. return self._db
  75. def get_rank(self, page: int, order_by: str = "DESC") -> Optional[List[Tuple]]:
  76. offset = 20 * (page - 1)
  77. cur = self._db.search(columns=['UserID', 'Name', 'Score', 'Reputation'],
  78. table='user',
  79. where='IsManager=0',
  80. order_by=[('Reputation', order_by), ('Score', order_by), ('UserID', order_by)],
  81. limit=20,
  82. offset=offset)
  83. if cur is None:
  84. return None
  85. res = []
  86. for index in range(cur.rowcount):
  87. i = cur.fetchone()
  88. res.append((f"{offset + index + 1}", i[1], i[0][:Config.tk_show_uid_len], str(i[3]), str(i[2])))
  89. return res
  90. class Website(AuthWebsite, StoreWebsite, RankWebsite, WebsiteBase):
  91. def __init__(self, app: Flask, db: DB):
  92. super(Website, self).__init__(app, db)