garbage.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import threading
  2. from tool.type_ import *
  3. from tool.time_ import HGSTime, hgs_time_t
  4. from tool.location import HGSLocation, hgs_location_t
  5. class GarbageBagNotUse(Exception):
  6. pass
  7. class GarbageType:
  8. GarbageTypeStrList: List = ["", "recyclable", "kitchen", "hazardous", "other"]
  9. GarbageTypeStrList_ch: List = ["", "可回收", "厨余", "有害", "其他"]
  10. recyclable: enum = 1
  11. kitchen: enum = 2
  12. hazardous: enum = 3
  13. other: enum = 4
  14. class GarbageBag:
  15. def __init__(self, gid: gid_t):
  16. self._gid: gid_t = gid
  17. self._have_use: bool = False
  18. self._have_check: bool = False
  19. self._type: Union[enum, None] = None
  20. self._use_time: Union[HGSTime, None] = None
  21. self._user: Union[uid_t, None] = None
  22. self._loc: Union[HGSLocation, None] = None
  23. self._check = False
  24. self._checker: Union[uid_t, None] = None
  25. self._lock = threading.RLock()
  26. def __repr__(self):
  27. tmp = GarbageType.GarbageTypeStrList
  28. try:
  29. self._lock.acquire()
  30. if not self._have_use:
  31. info = f"GarbageBag: {self._gid} NOT USE"
  32. elif not self._have_check:
  33. info = (f"GarbageBag: {self._gid} "
  34. f"Type: {tmp[self._type]} "
  35. f"Time: {self._use_time.get_time()} "
  36. f"User: {self._user} "
  37. f"Location {self._loc.get_location()} "
  38. f"NOT CHECK")
  39. else:
  40. info = (f"GarbageBag: {self._gid} "
  41. f"Type: {tmp[self._type]} "
  42. f"Time: {self._use_time.get_time()} "
  43. f"User: {self._user} "
  44. f"Location {self._loc.get_location()} "
  45. f"Check: {self._check}"
  46. f"Checker: {self._checker}")
  47. finally:
  48. self._lock.release()
  49. return info
  50. def get_info(self) -> dict[str: str]:
  51. try:
  52. self._lock.acquire()
  53. info = {
  54. "gid": str(self._gid),
  55. "type": str(self._type),
  56. "use_time": "" if (self._use_time is None) else str(self._use_time.get_time()),
  57. "user": str(self._user),
  58. "loc": "" if (self._loc is None) else str(self._loc.get_location()),
  59. "check": '1' if self._check else '0',
  60. "checker": "None" if self._checker is None else self._checker,
  61. }
  62. finally:
  63. self._lock.release()
  64. return info
  65. def is_use(self) -> bool:
  66. try:
  67. self._lock.acquire()
  68. have_use = self._have_use
  69. finally:
  70. self._lock.release()
  71. return have_use
  72. def is_check(self) -> Tuple[bool, bool]:
  73. try:
  74. self._lock.acquire()
  75. have_check = self._have_check
  76. check = self._check
  77. finally:
  78. self._lock.release()
  79. if not have_check:
  80. return False, False
  81. else:
  82. return True, check
  83. def get_gid(self):
  84. try:
  85. self._lock.acquire()
  86. gid = self._gid
  87. finally:
  88. self._lock.release()
  89. return gid
  90. def get_user(self) -> uid_t:
  91. try:
  92. self._lock.acquire()
  93. user = self._user
  94. have_use = self._have_use
  95. finally:
  96. self._lock.release()
  97. if not have_use:
  98. raise GarbageBagNotUse
  99. return user
  100. def get_type(self):
  101. try:
  102. self._lock.acquire()
  103. type_ = self._type
  104. have_use = self._have_use
  105. finally:
  106. self._lock.release()
  107. if not have_use:
  108. raise GarbageBagNotUse
  109. return type_
  110. def config_use(self, garbage_type: enum, use_time: hgs_time_t, user: uid_t, location: hgs_location_t):
  111. try:
  112. self._lock.acquire()
  113. assert not self._have_use
  114. self._have_use = True
  115. if not isinstance(use_time, HGSTime):
  116. use_time = HGSTime(use_time)
  117. if not isinstance(location, HGSLocation):
  118. location = HGSLocation(location)
  119. self._type: enum = garbage_type
  120. self._use_time: HGSTime = use_time
  121. self._user: uid_t = user
  122. self._loc: HGSLocation = location
  123. finally:
  124. self._lock.release()
  125. def config_check(self, use_right: bool, check_uid: uid_t):
  126. try:
  127. self._lock.acquire()
  128. assert self._have_use
  129. assert not self._have_check
  130. self._have_check = True
  131. self._check = use_right
  132. self._checker = check_uid
  133. finally:
  134. self._lock.release()