garbage.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import threading
  2. from tool.type_ import *
  3. from tool.time_ import HGSTime, hgs_time_t
  4. from tool.location import HGSLocation, hgs_location_t
  5. class GarbageBagNotUse(Exception):
  6. pass
  7. class GarbageType:
  8. GarbageTypeStrList: List = ["", "recyclable", "kitchen", "hazardous", "other"]
  9. recyclable: enum = 1
  10. kitchen: enum = 2
  11. hazardous: enum = 3
  12. other: enum = 4
  13. class GarbageBag:
  14. def __init__(self, gid: gid_t):
  15. self._gid: gid_t = gid
  16. self._have_use: bool = False
  17. self._have_check: bool = False
  18. self._type: Union[enum, None] = None
  19. self._use_time: Union[HGSTime, None] = None
  20. self._user: Union[uid_t, None] = None
  21. self._loc: Union[HGSLocation, None] = None
  22. self._check = False
  23. self._checker: Union[uid_t, None] = None
  24. self._lock = threading.RLock()
  25. def __repr__(self):
  26. tmp = GarbageType.GarbageTypeStrList
  27. try:
  28. self._lock.acquire()
  29. if not self._have_use:
  30. info = f"GarbageBag: {self._gid} NOT USE"
  31. elif not self._have_check:
  32. info = (f"GarbageBag: {self._gid} "
  33. f"Type: {tmp[self._type]} "
  34. f"Time: {self._use_time.get_time()} "
  35. f"User: {self._user} "
  36. f"Location {self._loc.get_location()} "
  37. f"NOT CHECK")
  38. else:
  39. info = (f"GarbageBag: {self._gid} "
  40. f"Type: {tmp[self._type]} "
  41. f"Time: {self._use_time.get_time()} "
  42. f"User: {self._user} "
  43. f"Location {self._loc.get_location()} "
  44. f"Check: {self._check}"
  45. f"Checker: {self._checker}")
  46. finally:
  47. self._lock.release()
  48. return info
  49. def get_info(self) -> dict[str: str]:
  50. try:
  51. self._lock.acquire()
  52. info = {
  53. "gid": str(self._gid),
  54. "type": str(self._type),
  55. "use_time": "" if (self._use_time is None) else str(self._use_time.get_time()),
  56. "user": str(self._user),
  57. "loc": "" if (self._loc is None) else str(self._loc.get_location()),
  58. "check": 'Pass' if self._check else 'Fail',
  59. "checker": "None" if self._checker is None else self._checker,
  60. }
  61. finally:
  62. self._lock.release()
  63. return info
  64. def is_use(self) -> bool:
  65. try:
  66. self._lock.acquire()
  67. have_use = self._have_use
  68. finally:
  69. self._lock.release()
  70. return have_use
  71. def is_check(self) -> Tuple[bool, bool]:
  72. try:
  73. self._lock.acquire()
  74. have_check = self._have_check
  75. check = self._check
  76. finally:
  77. self._lock.release()
  78. if not have_check:
  79. return False, False
  80. else:
  81. return True, check
  82. def get_gid(self):
  83. try:
  84. self._lock.acquire()
  85. gid = self._gid
  86. finally:
  87. self._lock.release()
  88. return gid
  89. def get_user(self) -> uid_t:
  90. try:
  91. self._lock.acquire()
  92. user = self._user
  93. have_use = self._have_use
  94. finally:
  95. self._lock.release()
  96. if not have_use:
  97. raise GarbageBagNotUse
  98. return user
  99. def get_type(self):
  100. try:
  101. self._lock.acquire()
  102. type_ = self._type
  103. have_use = self._have_use
  104. finally:
  105. self._lock.release()
  106. if not have_use:
  107. raise GarbageBagNotUse
  108. return type_
  109. def config_use(self, garbage_type: enum, use_time: hgs_time_t, user: uid_t, location: hgs_location_t):
  110. try:
  111. self._lock.acquire()
  112. assert not self._have_use
  113. self._have_use = True
  114. if not isinstance(use_time, HGSTime):
  115. use_time = HGSTime(use_time)
  116. if not isinstance(location, HGSLocation):
  117. location = HGSLocation(location)
  118. self._type: enum = garbage_type
  119. self._use_time: HGSTime = use_time
  120. self._user: uid_t = user
  121. self._loc: HGSLocation = location
  122. finally:
  123. self._lock.release()
  124. def config_check(self, use_right: bool, check_uid: uid_t):
  125. try:
  126. self._lock.acquire()
  127. assert self._have_use
  128. assert not self._have_check
  129. self._have_check = True
  130. self._check = use_right
  131. self._checker = check_uid
  132. finally:
  133. self._lock.release()