garbage.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. import time
  2. from . import DBBit
  3. from .db import DB
  4. from tool.type_ import *
  5. from tool.time_ import mysql_time, time_from_mysql
  6. from core.garbage import GarbageBag, GarbageType
  7. def update_garbage_type(where: str, type_: int, db: DB) -> int:
  8. if len(where) == 0:
  9. return -1
  10. cur = db.update(table="garbage", kw={"GarbageType": str(type_)}, where=where)
  11. if cur is None:
  12. return -1
  13. return cur.rowcount
  14. def update_garbage_check(where: str, check_: bool, db: DB) -> int:
  15. if len(where) == 0:
  16. return -1
  17. i: str = '1' if check_ else '0'
  18. cur = db.update(table="garbage", kw={"CheckResult": i}, where=where)
  19. if cur is None:
  20. return -1
  21. return cur.rowcount
  22. def __get_time(time_str: str) -> float:
  23. if time_str == 'now':
  24. return time.time()
  25. if time_str.startswith("Date:") and len(time_str) >= 5:
  26. return time_from_mysql(time_str[5:])
  27. return float(time_str)
  28. def __get_time_diff(time_str: str) -> float:
  29. if time_str.endswith("s") and len(time_str) >= 2:
  30. return float(time_str[:-1])
  31. elif time_str.endswith("ms") and len(time_str) >= 3:
  32. return float(time_str[:-1]) / 1000
  33. elif time_str.endswith("min") and len(time_str) >= 4:
  34. return float(time_str[:-1]) * 60
  35. elif time_str.endswith("h") and len(time_str) >= 2:
  36. return float(time_str[:-1]) * 60 * 60
  37. elif time_str.endswith("d") and len(time_str) >= 2:
  38. return float(time_str[:-1]) * 24 * 60 * 60
  39. return float(time_str)
  40. def __search_fields_time(time_: str, time_name: str) -> str:
  41. if time_ == '<=now':
  42. return f"{time_name}<={mysql_time()} AND"
  43. sp = time_.split(',')
  44. if len(sp) == 2:
  45. try:
  46. time_list = __get_time(sp[0]), __get_time(sp[1])
  47. a = min(time_list)
  48. b = max(time_list)
  49. except (TypeError, ValueError):
  50. return ""
  51. else:
  52. return f"({time_name} BETWEEN {mysql_time(a)} AND {mysql_time(b)}) AND"
  53. sp = time_.split(';')
  54. if len(sp) == 2:
  55. try:
  56. time_list = __get_time(sp[0]), __get_time_diff(sp[1])
  57. a = time_list[0] - time_list[1]
  58. b = time_list[0] + time_list[1]
  59. except (TypeError, ValueError):
  60. return ""
  61. else:
  62. return f"({time_name} BETWEEN {mysql_time(a)} AND {mysql_time(b)}) AND"
  63. try:
  64. t = __get_time(time_)
  65. except (TypeError, ValueError):
  66. return ""
  67. else:
  68. return f"({time_name}={mysql_time(t)} AND"
  69. def search_garbage_by_fields(columns, gid, uid, cuid, create_time, use_time, loc, type_, check, db: DB):
  70. where = ""
  71. if gid is not None:
  72. where += f"GarbageID={gid} AND "
  73. if uid is not None:
  74. where += f"UserID=‘{uid}’ AND "
  75. if cuid is not None:
  76. where += f"CheckerID='{cuid}' AND "
  77. if loc is not None:
  78. where += f"Location='{loc}' AND "
  79. if check is not None:
  80. if check == "False":
  81. where += f"CheckResult=0 AND "
  82. else:
  83. where += f"CheckResult=1 AND "
  84. if type_ is not None and type_ in GarbageType.GarbageTypeStrList:
  85. res = GarbageType.GarbageTypeStrList.index(type_)
  86. where += f"Phone={res} AND "
  87. if create_time is not None:
  88. where += __search_fields_time(create_time, "CreateTime")
  89. if use_time is not None:
  90. where += __search_fields_time(use_time, "UseTime")
  91. if len(where) != 0:
  92. where = where[0:-4] # 去除末尾的AND
  93. return search_from_garbage_view(columns, where, db)
  94. def search_from_garbage_view(columns, where: str, db: DB):
  95. cur = db.search(columns=columns, table="garbage", where=where)
  96. if cur is None:
  97. return None
  98. res = cur.fetchall()
  99. return res
  100. def count_garbage_by_time(uid: uid_t, db: DB):
  101. ti: time_t = time.time()
  102. start = ti - 3.5 * 24 * 60 * 60 # 前后3.5天
  103. end = ti + 3.5 * 24 * 60 * 60
  104. cur = db.search(columns=["GarbageID"],
  105. table="garbage_time",
  106. where=[f"UserID = '{uid}'", f"UseTime BETWEEN {mysql_time(start)} AND {mysql_time(end)}"])
  107. if cur is None:
  108. return -1
  109. return cur.rowcount
  110. def __find_garbage(columns: List[str], table: str, where: str, db: DB):
  111. cur = db.search(columns=columns, table=table, where=where)
  112. if cur is None or cur.rowcount == 0:
  113. return [None, tuple()]
  114. assert cur.rowcount == 1
  115. res = cur.fetchone()
  116. assert len(res) == len(columns)
  117. return GarbageBag(str(res[0])), res
  118. def find_not_use_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
  119. return __find_garbage(columns=["GarbageID"],
  120. table="garbage_n",
  121. where=f"GarbageID = {gid}",
  122. db=db)[0]
  123. def find_wait_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
  124. res: Tuple[int, bytes, str, str, str]
  125. gb: GarbageBag
  126. gb, res = __find_garbage(columns=["GarbageID", "GarbageType", "UseTime", "UserID", "Location"],
  127. table="garbage_c",
  128. where=f"GarbageID = {gid}",
  129. db=db)[0]
  130. if gb is None:
  131. return None
  132. garbage_type: enum = int(res[1].decode())
  133. use_time: time_t = time_from_mysql(res[2])
  134. uid: uid_t = res[3]
  135. loc: location_t = res[4]
  136. gb.config_use(garbage_type, use_time, uid, loc)
  137. return gb
  138. def find_use_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
  139. res: Tuple[int, bytes, str, str, str, bytes]
  140. gb: GarbageBag
  141. gb, res = __find_garbage(columns=["GarbageID", "GarbageType", "UseTime", "UserID", "Location",
  142. "CheckResult", "CheckerID"],
  143. table="garbage_u",
  144. where=f"GarbageID = {gid}",
  145. db=db)[0]
  146. if gb is None:
  147. return None
  148. garbage_type: enum = int(res[1].decode())
  149. use_time: time_t = time_from_mysql(res[2])
  150. uid: uid_t = res[3]
  151. loc: location_t = res[4]
  152. check: bool = res[5] == DBBit.BIT_1
  153. check_uid: uid_t = res[6]
  154. gb.config_use(garbage_type, use_time, uid, loc)
  155. gb.config_check(check, check_uid)
  156. return gb
  157. def find_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
  158. res: Tuple[bool, int] = is_garbage_exists(gid, db)
  159. if not res[0]:
  160. return None
  161. elif res[1] == 0:
  162. re = find_not_use_garbage(gid, db)
  163. elif res[1] == 1:
  164. re = find_wait_garbage(gid, db)
  165. elif res[1] == 2:
  166. re = find_use_garbage(gid, db)
  167. else:
  168. re = None
  169. assert re is not None
  170. return re
  171. def is_garbage_exists(gid: gid_t, db: DB) -> Tuple[bool, int]:
  172. cur = db.search(columns=["GarbageID", "Flat"],
  173. table="garbage",
  174. where=f"GarbageID = {gid}")
  175. if cur is None or cur.rowcount == 0:
  176. return False, 0
  177. assert cur.rowcount == 1
  178. res: Tuple[int, int] = cur.fetchone()
  179. return True, res[1]
  180. def update_garbage(garbage: GarbageBag, db: DB) -> bool:
  181. re = find_garbage(garbage.get_gid(), db)
  182. if re is None:
  183. return False
  184. if re.is_use() and not garbage.is_use() or re.is_check()[0] and not garbage.is_check()[0]:
  185. return False
  186. if not garbage.is_use() and not garbage.is_check()[0]:
  187. return True # 不做任何修改
  188. gid = garbage.get_gid()
  189. info = garbage.get_info()
  190. update_kw = {
  191. "Flat": "0",
  192. "UserID": "NULL",
  193. "UseTime": "NULL",
  194. "GarbageType": "NULL",
  195. "Location": "NULL",
  196. "CheckResult": "NULL",
  197. "CheckerID": "NULL"
  198. }
  199. if garbage.is_use():
  200. update_kw['Flat'] = "1"
  201. update_kw['UserID'] = f"'{info['user']}'"
  202. update_kw['UseTime'] = f"{mysql_time(info['use_time'])}"
  203. update_kw['GarbageType'] = f"{info['type']}"
  204. update_kw['Location'] = f"'{info['loc']}'"
  205. if garbage.is_check()[0]:
  206. update_kw['Flat'] = "2"
  207. update_kw['CheckResult'] = f"{info['check']}"
  208. update_kw['CheckerID'] = f"'{info['checker']}'"
  209. res = db.update("garbage", kw=update_kw, where=f"GarbageID = {gid}")
  210. return res is not None
  211. def create_new_garbage(db: DB) -> Optional[GarbageBag]:
  212. cur = db.insert(table="garbage", columns=["CreateTime", "Flat"], values=f"{mysql_time()}, 0")
  213. if cur is None:
  214. return None
  215. assert cur.rowcount == 1
  216. gid = cur.lastrowid
  217. return GarbageBag(str(gid))
  218. def del_garbage_not_use(gid: gid_t, db: DB) -> bool:
  219. cur = db.delete(table="garbage_n", where=f"GarbageID = {gid}")
  220. if cur is None or cur.rowcount == 0:
  221. return False
  222. assert cur.rowcount == 1
  223. return True
  224. def del_garbage_wait_check(gid: gid_t, db: DB) -> bool:
  225. cur = db.delete(table="garbage_c", where=f"GarbageID = {gid}")
  226. if cur is None or cur.rowcount == 0:
  227. return False
  228. assert cur.rowcount == 1
  229. return True
  230. def del_garbage_has_check(gid: gid_t, db: DB) -> bool:
  231. cur = db.delete(table="garbage_u", where=f"GarbageID = {gid}")
  232. if cur is None or cur.rowcount == 0:
  233. return False
  234. assert cur.rowcount == 1
  235. return True
  236. def del_garbage(gid, db: DB):
  237. cur = db.delete(table="garbage", where=f"GarbageID = {gid}")
  238. if cur is None or cur.rowcount == 0:
  239. return False
  240. assert cur.rowcount == 1
  241. return True
  242. def del_garbage_where_not_use(where: str, db: DB) -> int:
  243. cur = db.delete(table="garbage_n", where=where)
  244. if cur is None:
  245. return -1
  246. return cur.rowcount
  247. def del_garbage_where_wait_check(where: str, db: DB) -> int:
  248. cur = db.delete(table="garbage_c", where=where)
  249. if cur is None:
  250. return -1
  251. return cur.rowcount
  252. def del_garbage_where_has_check(where: str, db: DB) -> int:
  253. cur = db.delete(table="garbage_u", where=where)
  254. if cur is None:
  255. return -1
  256. return cur.rowcount
  257. def del_garbage_where_scan_not_use(where, db: DB) -> int:
  258. cur = db.search(columns=["GarbageID"],
  259. table="garbage_n",
  260. where=where)
  261. if cur is None:
  262. return -1
  263. return cur.rowcount
  264. def del_garbage_where_scan_wait_check(where: str, db: DB) -> int:
  265. cur = db.search(columns=["GarbageID"],
  266. table="garbage_c",
  267. where=where)
  268. if cur is None:
  269. return -1
  270. return cur.rowcount
  271. def del_garbage_where_scan_has_check(where, db: DB) -> int:
  272. cur = db.search(columns=["GarbageID"],
  273. table="garbage_u",
  274. where=where)
  275. if cur is None:
  276. return -1
  277. return cur.rowcount
  278. def del_all_garbage(db: DB) -> int:
  279. cur = db.delete(table="garbage", where='1')
  280. if cur is None:
  281. return -1
  282. return cur.rowcount
  283. def del_all_garbage_scan(db: DB) -> int:
  284. cur = db.search(columns=["GarbageID"], table="garbage", where="1")
  285. if cur is None:
  286. return -1
  287. return cur.rowcount