station_event.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. from equipment.scan import QRCode
  2. from equipment.scan_user import scan_user
  3. from equipment.scan_garbage import scan_garbage
  4. from tool.type_ import *
  5. from core.user import User, UserNotSupportError
  6. from core.garbage import GarbageBag
  7. from sql.db import DB
  8. from event import TkThreading, TkEventBase
  9. import station as tk_station
  10. class StationEventBase(TkEventBase):
  11. def __init__(self, station: tk_station.GarbageStationBase, title: str = '未知'):
  12. super(StationEventBase, self).__init__()
  13. self.station: tk_station.GarbageStationBase = station
  14. self._db: DB = station.get_db()
  15. self._title = title
  16. def get_title(self) -> str:
  17. return self._title
  18. class ScanUserEvent(StationEventBase):
  19. """
  20. 任务: 扫码用户, 访问db获取数据
  21. 若QR-CODE不是User码则调用ScanGarbage任务
  22. """
  23. @staticmethod
  24. def func(qr: QRCode, db: DB):
  25. return scan_user(qr, db)
  26. def __init__(self, gb_station):
  27. super().__init__(gb_station, "扫码用户")
  28. self._user: User = gb_station.get_user()
  29. self._qr_code: Optional[QRCode] = None
  30. self.thread = None
  31. def start(self, qr_code: QRCode):
  32. self._qr_code = qr_code
  33. self.thread = TkThreading(self.func, qr_code, self._db)
  34. return self
  35. def is_end(self) -> bool:
  36. return self.thread is not None and not self.thread.is_alive()
  37. def done_after_event(self):
  38. self.thread.join()
  39. if self.thread.result is not None:
  40. self.station.switch_user(self.thread.result)
  41. self.station.update_control()
  42. else:
  43. event = ScanGarbageEvent(self.station).start(self._qr_code)
  44. self.station.push_event(event)
  45. class ScanGarbageEvent(StationEventBase):
  46. """
  47. 任务: 扫码垃圾袋, 访问db获取数据
  48. """
  49. @staticmethod
  50. def func(qr: QRCode, db: DB):
  51. return scan_garbage(qr, db)
  52. def __init__(self, gb_station):
  53. super().__init__(gb_station, "扫码垃圾袋")
  54. self._user: User = gb_station.get_user()
  55. self._qr_code: Optional[QRCode] = None
  56. self.thread = None
  57. def start(self, qr_code: QRCode):
  58. self._qr_code = qr_code
  59. self.thread = TkThreading(self.func, qr_code, self._db)
  60. return self
  61. def is_end(self) -> bool:
  62. return self.thread is not None and not self.thread.is_alive()
  63. def done_after_event(self):
  64. self.thread.join()
  65. if self.thread.result is not None:
  66. if self._user is None:
  67. self.station.show_warning("操作失败", "垃圾袋已经被使用")
  68. elif self._user.is_manager():
  69. self.station.to_get_garbage_check(self.thread.result)
  70. self.station.show_garbage_info() # 显示信息
  71. self.station.update_control()
  72. else:
  73. self.station.to_get_garbage_type(self.thread.result)
  74. self.station.hide_msg_rank() # 如果有msg也马上隐藏
  75. self.station.update_control()
  76. class RankingEvent(StationEventBase):
  77. """
  78. 任务: 获取排行榜数据
  79. """
  80. @staticmethod
  81. def func(db: DB):
  82. cur = db.search((f"SELECT UserID, Name, Score, Reputation "
  83. f"FROM user "
  84. f"WHERE IsManager = 0 "
  85. f"ORDER BY Reputation DESC, Score DESC "
  86. f"LIMIT 20;"))
  87. if cur is None:
  88. return []
  89. return list(cur.fetchall())
  90. def __init__(self, gb_station):
  91. super().__init__(gb_station, "排行榜")
  92. self.thread = TkThreading(self.func, self._db)
  93. def is_end(self) -> bool:
  94. return not self.thread.is_alive()
  95. def done_after_event(self):
  96. res = self.thread.wait_event()
  97. if res is not None:
  98. self.station.thread_show_rank(res)
  99. else:
  100. self.station.show_warning("排行榜错误", f'无法获得排行榜数据')
  101. class ThrowGarbageEvent(StationEventBase):
  102. """
  103. 任务: 提交扔垃圾的结果
  104. """
  105. def func(self, garbage: GarbageBag, garbage_type: enum):
  106. try:
  107. self.station.throw_garbage_core(garbage, garbage_type)
  108. except (tk_station.ThrowGarbageError, UserNotSupportError, tk_station.ControlNotLogin):
  109. return False
  110. else:
  111. return True
  112. def __init__(self, gb_station):
  113. super().__init__(gb_station, "垃圾投放")
  114. self.thread = None
  115. def start(self, garbage: GarbageBag, garbage_type: enum):
  116. self.thread = TkThreading(self.func, garbage, garbage_type)
  117. self.thread.start()
  118. return self
  119. def is_end(self) -> bool:
  120. return not self.thread.is_alive()
  121. def done_after_event(self):
  122. self.thread.join()
  123. if not self.thread.result:
  124. self.station.show_warning("操作失败", "垃圾袋投放失败")
  125. class CheckGarbageEvent(StationEventBase):
  126. """
  127. 任务: 提交检测垃圾的结果
  128. """
  129. def func(self, garbage: GarbageBag, check: bool):
  130. try:
  131. self.station.check_garbage_core(garbage, check)
  132. except (tk_station.ThrowGarbageError, UserNotSupportError,
  133. tk_station.ControlNotLogin, tk_station.CheckGarbageError):
  134. return False
  135. else:
  136. return True
  137. def __init__(self, gb_station):
  138. super().__init__(gb_station, "检测垃圾袋")
  139. self.thread = None
  140. def start(self, garbage: GarbageBag, garbage_check: bool):
  141. self.thread = TkThreading(self.func, garbage, garbage_check)
  142. return self
  143. def is_end(self) -> bool:
  144. return not self.thread.is_alive()
  145. def done_after_event(self):
  146. self.thread.join()
  147. if not self.thread.result:
  148. self.station.show_warning("操作失败", "垃圾袋检测失败")