event.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import abc
  2. import traceback
  3. import threading
  4. import conf
  5. from tool.type_ import *
  6. class TkEventException(Exception):
  7. ...
  8. class TkEventBase(metaclass=abc.ABCMeta):
  9. def __init__(self):
  10. self.thread: Optional[TkThreading] = None
  11. def is_end(self) -> bool:
  12. if self.thread is not None and not self.thread.is_alive():
  13. return True
  14. return False
  15. @abc.abstractmethod
  16. def get_title(self) -> str:
  17. ...
  18. def done_after_event(self):
  19. if self.thread is not None:
  20. self.thread.wait_event()
  21. class TkEventMain(metaclass=abc.ABCMeta):
  22. def __init__(self):
  23. self._event_list: List[TkEventBase] = []
  24. self.set_after_run(conf.tk_refresh_delay, lambda: self.run_event())
  25. def push_event(self, event: TkEventBase):
  26. self._event_list.append(event)
  27. self.show_loading(event.get_title())
  28. self.run_event()
  29. def run_event(self):
  30. if len(self._event_list) == 0:
  31. return
  32. new_event: List[TkEventBase] = []
  33. done_event: List[TkEventBase] = []
  34. for event in self._event_list:
  35. if event.is_end():
  36. done_event.append(event)
  37. else:
  38. new_event.append(event)
  39. self._event_list = new_event
  40. if len(self._event_list) == 0:
  41. self.stop_loading()
  42. for event in done_event: # 隐藏进度条后执行Event-GUI任务
  43. try:
  44. event.done_after_event()
  45. except:
  46. traceback.print_exc()
  47. self.set_after_run_now(conf.tk_refresh_delay, self.run_event)
  48. @abc.abstractmethod
  49. def show_loading(self, title: str):
  50. ...
  51. @abc.abstractmethod
  52. def stop_loading(self):
  53. ...
  54. @abc.abstractmethod
  55. def set_after_run(self, ms, func, *args):
  56. ...
  57. @abc.abstractmethod
  58. def set_after_run_now(self, ms, func, *args):
  59. ...
  60. class TkThreading(threading.Thread):
  61. def __init__(self, func, *args, start_now: bool = True):
  62. threading.Thread.__init__(self)
  63. self.func = func
  64. self.args = args
  65. self.result = None
  66. if start_now:
  67. self.start()
  68. def run(self):
  69. try:
  70. self.result = self.func(*self.args)
  71. except:
  72. ...
  73. finally:
  74. del self.func, self.args
  75. def wait_event(self):
  76. self.join()
  77. return self.result