admin_event.py 11 KB


  1. import time
  2. from tool.type_ import *
  3. from sql.db import DB
  4. from sql.user import find_user_by_name
  5. from core.user import User
  6. from core.garbage import GarbageBag
  7. from event import TkEventBase, TkThreading
  8. import admin
  9. import admin_program
  10. class AdminEventBase(TkEventBase):
  11. def __init__(self, station):
  12. super(AdminEventBase, self).__init__()
  13. self.station: admin.AdminStationBase = station
  14. self._db: DB = station.get_db()
  15. def get_title(self) -> str:
  16. return "AdminEvent"
  17. class LoginEvent(AdminEventBase):
  18. def __init__(self, station):
  19. super().__init__(station)
  20. self.thread: Optional[TkThreading] = None
  21. def login(self, name, passwd):
  22. return find_user_by_name(name, passwd, self._db)
  23. def start(self, name, passwd):
  24. self.thread = TkThreading(self.login, name, passwd)
  25. return self
  26. def is_end(self) -> bool:
  27. return not self.thread.is_alive()
  28. def done_after_event(self):
  29. self.station.login(self.thread.wait_event())
  30. class TestProgressEvent(AdminEventBase):
  31. @staticmethod
  32. def func(sleep_time):
  33. time.sleep(sleep_time)
  34. def __init__(self, station):
  35. super(TestProgressEvent, self).__init__(station)
  36. self.thread = TkThreading(self.func, 5)
  37. class createUserEvent(AdminEventBase):
  38. def func(self, name, passwd, phone, is_manager):
  39. return self.station.create_user(name, passwd, phone, is_manager)
  40. def __init__(self, station):
  41. super(createUserEvent, self).__init__(station)
  42. self._name = None
  43. def start(self, name, passwd, phone, is_manager):
  44. self._name = name
  45. self.thread = TkThreading(self.func, name, passwd, phone, is_manager)
  46. return self
  47. def done_after_event(self):
  48. res: Optional[User] = self.thread.wait_event()
  49. if res is None:
  50. self.station.show_msg("createUserError", f"Can't not create user: {self._name}", "Warning")
  51. else:
  52. name = res.get_name()
  53. self.station.show_msg("createUser", f"create user {name} success")
  54. class createGarbageEvent(AdminEventBase):
  55. def func(self, path, count):
  56. return self.station.create_garbage(path, count)
  57. def __init__(self, station):
  58. super(createGarbageEvent, self).__init__(station)
  59. self._name = None
  60. def start(self, path, count):
  61. self.thread = TkThreading(self.func, path, count)
  62. return self
  63. def done_after_event(self):
  64. res: list[tuple[str, Optional[GarbageBag]]] = self.thread.wait_event()
  65. self.station.show_msg("createGarbage", f"create {len(res)} garbage finished.")
  66. class DelUserEvent(AdminEventBase):
  67. def func(self, uid):
  68. return self.station.del_user(uid)
  69. def __init__(self, station):
  70. super(DelUserEvent, self).__init__(station)
  71. self._name = None
  72. def start(self, uid):
  73. self.thread = TkThreading(self.func, uid)
  74. return self
  75. def done_after_event(self):
  76. res: bool = self.thread.wait_event()
  77. if res:
  78. self.station.show_msg("DeleteUser", f"Delete user finished.")
  79. else:
  80. self.station.show_msg("DeleteUserError", f"Delete user failed.", "Warning")
  81. class DelUserFromWhereScanEvent(AdminEventBase):
  82. def func(self, where):
  83. return self.station.del_user_from_where_scan(where)
  84. def __init__(self, station):
  85. super(DelUserFromWhereScanEvent, self).__init__(station)
  86. def start(self, where):
  87. self.thread = TkThreading(self.func, where)
  88. return self
  89. def done_after_event(self):
  90. res: int = self.thread.wait_event()
  91. if res != -1:
  92. self.station.show_msg("DeleteUserScan", f"Delete count: {res}")
  93. else:
  94. self.station.show_msg("DeleteUserScanError", f"`Where` must be SQL")
  95. class DelUserFromWhereEvent(AdminEventBase):
  96. def func(self, where):
  97. return self.station.del_user_from_where(where)
  98. def __init__(self, station):
  99. super(DelUserFromWhereEvent, self).__init__(station)
  100. def start(self, where):
  101. self.thread = TkThreading(self.func, where)
  102. return self
  103. def done_after_event(self):
  104. res: int = self.thread.wait_event()
  105. if res != -1:
  106. self.station.show_msg("DeleteUser", f"Delete {res} user success")
  107. else:
  108. self.station.show_msg("DeleteUserError", f"`Where` must be SQL")
  109. class DelGarbageEvent(AdminEventBase):
  110. def func(self, gid, where):
  111. if where == 0:
  112. return 1 if self.station.del_garbage(gid) else -1
  113. elif where == 1:
  114. return 1 if self.station.del_garbage_not_use(gid) else -1
  115. elif where == 2:
  116. return 1 if self.station.del_garbage_wait_check(gid) else -1
  117. elif where == 3:
  118. return 1 if self.station.del_garbage_has_check(gid) else -1
  119. return -1
  120. def __init__(self, station):
  121. super(DelGarbageEvent, self).__init__(station)
  122. def start(self, gid, where):
  123. self.thread = TkThreading(self.func, gid, where)
  124. return self
  125. def done_after_event(self):
  126. res: int = self.thread.wait_event()
  127. if res != -1:
  128. self.station.show_msg("DeleteGarbage", f"Delete {res} garbage success")
  129. else:
  130. self.station.show_msg("DeleteGarbageError", f"Delete error")
  131. class DelGarbageWhereEvent(AdminEventBase):
  132. def func(self, where, where_sql):
  133. if where == 1:
  134. return self.station.del_garbage_where_not_use(where_sql)
  135. elif where == 2:
  136. return self.station.del_garbage_where_wait_check(where_sql)
  137. elif where == 3:
  138. return self.station.del_garbage_where_has_check(where_sql)
  139. return -1
  140. def __init__(self, station):
  141. super(DelGarbageWhereEvent, self).__init__(station)
  142. def start(self, where, where_sql):
  143. self.thread = TkThreading(self.func, where, where_sql)
  144. return self
  145. def done_after_event(self):
  146. res: int = self.thread.wait_event()
  147. if res != -1:
  148. self.station.show_msg("DeleteGarbage", f"Delete {res} garbage success")
  149. else:
  150. self.station.show_msg("DeleteGarbageError", f"Delete error")
  151. class DelGarbageWhereScanEvent(AdminEventBase):
  152. def func(self, where, where_sql):
  153. if where == 1:
  154. return self.station.del_garbage_where_scan_not_use(where_sql)
  155. elif where == 2:
  156. return self.station.del_garbage_where_scan_wait_check(where_sql)
  157. elif where == 3:
  158. return self.station.del_garbage_where_scan_has_check(where_sql)
  159. return -1
  160. def __init__(self, station):
  161. super(DelGarbageWhereScanEvent, self).__init__(station)
  162. def start(self, where, where_sql):
  163. self.thread = TkThreading(self.func, where, where_sql)
  164. return self
  165. def done_after_event(self):
  166. res: int = self.thread.wait_event()
  167. if res != -1:
  168. self.station.show_msg("DeleteGarbageScan", f"Delete count: {res}")
  169. else:
  170. self.station.show_msg("DeleteGarbageScanError", f"Delete scan error")
  171. class DelAllGarbageScanEvent(AdminEventBase):
  172. def func(self):
  173. return self.station.del_all_garbage_scan()
  174. def __init__(self, station):
  175. super(DelAllGarbageScanEvent, self).__init__(station)
  176. self.thread = TkThreading(self.func)
  177. def done_after_event(self):
  178. res: int = self.thread.wait_event()
  179. if res != -1:
  180. self.station.show_msg("DeleteAllGarbageScan", f"Delete count: {res}")
  181. else:
  182. self.station.show_msg("DeleteAllGarbageError", f"Delete scan error")
  183. class DelAllGarbageEvent(AdminEventBase):
  184. def func(self):
  185. return self.station.del_all_garbage()
  186. def __init__(self, station):
  187. super(DelAllGarbageEvent, self).__init__(station)
  188. self.thread = TkThreading(self.func)
  189. def done_after_event(self):
  190. res: int = self.thread.wait_event()
  191. if res != -1:
  192. self.station.show_msg("DeleteAllGarbage", f"Delete all[{res}] garbage success")
  193. else:
  194. self.station.show_msg("DeleteAllGarbageError", f"Delete error")
  195. class SearchUserEvent(AdminEventBase):
  196. def func(self, columns, uid, name, phone):
  197. return self.station.search_user(columns, uid, name, phone)
  198. def __init__(self, station):
  199. super(SearchUserEvent, self).__init__(station)
  200. self.program: Optional[admin_program.SearchUserProgram] = None
  201. def start(self, columns, uid, name, phone, program):
  202. self.thread = TkThreading(self.func, columns, uid, name, phone)
  203. self.program = program
  204. return self
  205. def done_after_event(self):
  206. res: list[list] = self.thread.wait_event()
  207. if res is None or self.program is None:
  208. self.station.show_msg("Search User Error", f"Search error")
  209. return
  210. for i in self.program.view.get_children():
  211. self.program.view.delete(i)
  212. for i in res:
  213. self.program.view.insert('', 'end', values=i)
  214. class SearchUserAdvancedEvent(AdminEventBase):
  215. def func(self, columns, sql):
  216. return self.station.search_user_advanced(columns, sql)
  217. def __init__(self, station):
  218. super(SearchUserAdvancedEvent, self).__init__(station)
  219. self.program: Optional[admin_program.SearchUserAdvancedProgram] = None
  220. def start(self, columns, sql, program):
  221. self.thread = TkThreading(self.func, columns, sql)
  222. self.program = program
  223. return self
  224. def done_after_event(self):
  225. res: list[list] = self.thread.wait_event()
  226. if res is None or self.program is None:
  227. self.station.show_msg("Search User Advanced Error", f"Search error")
  228. return
  229. for i in self.program.view.get_children():
  230. self.program.view.delete(i)
  231. for i in res:
  232. self.program.view.insert('', 'end', values=i)
  233. class SearchGarbageAdvancedEvent(AdminEventBase):
  234. def func(self, columns, sql):
  235. return self.station.search_garbage_advanced(columns, sql)
  236. def __init__(self, station):
  237. super(SearchGarbageAdvancedEvent, self).__init__(station)
  238. self.program: Optional[admin_program.SearchGarbageAdvancedProgram] = None
  239. def start(self, columns, sql, program):
  240. self.thread = TkThreading(self.func, columns, sql)
  241. self.program = program
  242. return self
  243. def done_after_event(self):
  244. res: list[list] = self.thread.wait_event()
  245. if res is None or self.program is None:
  246. self.station.show_msg("Search Garbage Advanced Error", f"Search error")
  247. return
  248. for i in self.program.view.get_children():
  249. self.program.view.delete(i)
  250. for i in res:
  251. self.program.view.insert('', 'end', values=i)
  252. class SearchAdvancedEvent(AdminEventBase):
  253. def func(self, columns, sql):
  254. return self.station.search_advanced(columns, sql)
  255. def __init__(self, station):
  256. super(SearchAdvancedEvent, self).__init__(station)
  257. self.program: Optional[admin_program.SearchAdvancedProgram] = None
  258. def start(self, columns, sql, program):
  259. self.thread = TkThreading(self.func, columns, sql)
  260. self.program = program
  261. return self
  262. def done_after_event(self):
  263. res: list[list] = self.thread.wait_event()
  264. if res is None or self.program is None:
  265. self.station.show_msg("Search Advanced Error", f"Search error")
  266. return
  267. for i in self.program.view.get_children():
  268. self.program.view.delete(i)
  269. for i in res:
  270. self.program.view.insert('', 'end', values=i)