garbage.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. import time
  2. from . import DBBit
  3. from .db import DB
  4. from tool.typing import *
  5. from tool.time import mysql_time, time_from_mysql
  6. from core.garbage import GarbageBag, GarbageType
  7. def update_garbage_type(where: str, type_: int, db: DB) -> int:
  8. """
  9. 更新垃圾袋类型
  10. :param where: 条件
  11. :param type_: 类型
  12. :param db: 数据库
  13. :return:
  14. """
  15. if len(where) == 0:
  16. return -1
  17. cur = db.update(table="garbage", kw={"GarbageType": str(type_)}, where=where)
  18. if cur is None:
  19. return -1
  20. return cur.rowcount
  21. def update_garbage_check(where: str, result: bool, db: DB) -> int:
  22. """
  23. 更新垃圾袋检测结果
  24. :param where: 条件
  25. :param result: 结果
  26. :param db: 数据库
  27. :return:
  28. """
  29. if len(where) == 0:
  30. return -1
  31. i: str = '1' if result else '0'
  32. cur = db.update(table="garbage", kw={"CheckResult": i}, where=where)
  33. if cur is None:
  34. return -1
  35. return cur.rowcount
  36. def __get_time(time_str: str) -> float:
  37. if time_str == 'now':
  38. return time.time()
  39. if time_str.startswith("Date:") and len(time_str) >= 5:
  40. return time_from_mysql(time_str[5:])
  41. return float(time_str)
  42. def __get_time_diff(time_str: str) -> float:
  43. if time_str.endswith("s") and len(time_str) >= 2:
  44. return float(time_str[:-1])
  45. elif time_str.endswith("ms") and len(time_str) >= 3:
  46. return float(time_str[:-1]) / 1000
  47. elif time_str.endswith("min") and len(time_str) >= 4:
  48. return float(time_str[:-1]) * 60
  49. elif time_str.endswith("h") and len(time_str) >= 2:
  50. return float(time_str[:-1]) * 60 * 60
  51. elif time_str.endswith("d") and len(time_str) >= 2:
  52. return float(time_str[:-1]) * 24 * 60 * 60
  53. return float(time_str)
  54. def __search_fields_time(time_: str, time_name: str) -> str:
  55. if time_ == '<=now':
  56. return f"{time_name}<={mysql_time()} AND"
  57. sp = time_.split(',')
  58. if len(sp) == 2:
  59. try:
  60. time_list = __get_time(sp[0]), __get_time(sp[1])
  61. a = min(time_list)
  62. b = max(time_list)
  63. except (TypeError, ValueError):
  64. return ""
  65. else:
  66. return f"({time_name} BETWEEN {mysql_time(a)} AND {mysql_time(b)}) AND"
  67. sp = time_.split(';')
  68. if len(sp) == 2:
  69. try:
  70. time_list = __get_time(sp[0]), __get_time_diff(sp[1])
  71. a = time_list[0] - time_list[1]
  72. b = time_list[0] + time_list[1]
  73. except (TypeError, ValueError):
  74. return ""
  75. else:
  76. return f"({time_name} BETWEEN {mysql_time(a)} AND {mysql_time(b)}) AND"
  77. try:
  78. t = __get_time(time_)
  79. except (TypeError, ValueError):
  80. return ""
  81. else:
  82. return f"({time_name}={mysql_time(t)} AND"
  83. def set_where_(ex: Optional[str], column: str):
  84. if ex is None:
  85. return ""
  86. if ex.strip() == "IS NULL":
  87. where = f"{column} IS NULL AND "
  88. elif ex.startswith("LIKE ") or ex.startswith("REGEXP "):
  89. where = f"{column} {ex} AND "
  90. else:
  91. where = f"{column}='{ex}' AND "
  92. return where
  93. def search_garbage_by_fields(columns, gid, uid, cuid, create_time, use_time, loc, type_, check, db: DB):
  94. where = ""
  95. where += set_where_(gid, "GarbageID")
  96. where += set_where_(uid, "UserID")
  97. where += set_where_(cuid, "CheckerID")
  98. where += set_where_(loc, "Location")
  99. if check is not None:
  100. if check == 'IS NULL':
  101. where += f"CheckResult IS NULL AND "
  102. elif check == "不通过" or check == "False" or check == "Fail":
  103. where += f"CheckResult=0 AND "
  104. elif check == "通过" or check == "True" or check == "Pass":
  105. where += f"CheckResult=1 AND "
  106. if type_ is not None:
  107. if type_ == 'IS NULL':
  108. where += f"GarbageType IS NULL AND "
  109. elif type_ in GarbageType.GarbageTypeStrList:
  110. res = GarbageType.GarbageTypeStrList.index(type_)
  111. where += f"GarbageType={res} AND "
  112. elif type_ in GarbageType.GarbageTypeStrList_ch:
  113. res = GarbageType.GarbageTypeStrList_ch.index(type_)
  114. where += f"GarbageType={res} AND "
  115. if create_time is not None:
  116. where += __search_fields_time(create_time, "CreateTime")
  117. if use_time is not None:
  118. where += __search_fields_time(use_time, "UseTime")
  119. if len(where) != 0:
  120. where = where[0:-4] # 去除末尾的AND
  121. return search_from_garbage_view(columns, where, db)
  122. def search_from_garbage_view(columns, where: str, db: DB):
  123. cur = db.search(columns=columns, table="garbage", where=where)
  124. if cur is None:
  125. return None
  126. res = cur.fetchall()
  127. return res
  128. def count_garbage_by_uid(uid: uid_t, db: DB, time_limit: bool = True):
  129. if time_limit:
  130. ti: time_t = time.time()
  131. start = ti - 3.5 * 24 * 60 * 60 # 前后3.5天
  132. end = ti + 3.5 * 24 * 60 * 60
  133. where = [f"UserID = '{uid}'", f"UseTime BETWEEN {mysql_time(start)} AND {mysql_time(end)}"]
  134. else:
  135. where = [f"UserID = '{uid}'"]
  136. cur = db.search(columns=["Count(GarbageID)"],
  137. table="garbage_time",
  138. where=where)
  139. if cur is None:
  140. return -1
  141. assert cur.rowcount == 1
  142. return int(cur.fetchone()[0])
  143. def get_garbage_by_uid(uid: uid_t, columns, limit, db: DB, offset: int = 0):
  144. cur = db.search(columns=columns,
  145. table="garbage",
  146. where=f"UserID='{uid}'",
  147. limit=limit,
  148. offset=offset,
  149. order_by=[("UseTime", "DESC")])
  150. if cur is None:
  151. return None
  152. return cur.fetchall()
  153. def __find_garbage(columns: List[str], table: str, where: str, db: DB):
  154. cur = db.search(columns=columns, table=table, where=where)
  155. if cur is None or cur.rowcount == 0:
  156. return None, tuple()
  157. assert cur.rowcount == 1
  158. res = cur.fetchone()
  159. assert len(res) == len(columns)
  160. return GarbageBag(str(res[0])), res
  161. def find_not_use_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
  162. return __find_garbage(columns=["GarbageID"],
  163. table="garbage_n",
  164. where=f"GarbageID = {gid}",
  165. db=db)[0]
  166. def find_wait_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
  167. res: Tuple[int, bytes, str, str, str]
  168. gb: GarbageBag
  169. gb, res = __find_garbage(columns=["GarbageID", "GarbageType", "UseTime", "UserID", "Location"],
  170. table="garbage_c",
  171. where=f"GarbageID = {gid}",
  172. db=db)
  173. if gb is None:
  174. return None
  175. garbage_type: enum = int(res[1].decode())
  176. use_time: time_t = time_from_mysql(res[2])
  177. uid: uid_t = res[3]
  178. loc: location_t = res[4]
  179. gb.config_use(garbage_type, use_time, uid, loc)
  180. return gb
  181. def find_use_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
  182. res: Tuple[int, bytes, str, str, str, bytes]
  183. gb: GarbageBag
  184. gb, res = __find_garbage(columns=["GarbageID", "GarbageType", "UseTime", "UserID", "Location",
  185. "CheckResult", "CheckerID"],
  186. table="garbage_u",
  187. where=f"GarbageID = {gid}",
  188. db=db)
  189. if gb is None:
  190. return None
  191. garbage_type: enum = int(res[1].decode())
  192. use_time: time_t = time_from_mysql(res[2])
  193. uid: uid_t = res[3]
  194. loc: location_t = res[4]
  195. check: bool = res[5] == DBBit.BIT_1
  196. check_uid: uid_t = res[6]
  197. gb.config_use(garbage_type, use_time, uid, loc)
  198. gb.config_check(check, check_uid)
  199. return gb
  200. def find_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
  201. res: Tuple[bool, int] = is_garbage_exists(gid, db)
  202. if not res[0]:
  203. return None
  204. elif res[1] == 0:
  205. re = find_not_use_garbage(gid, db)
  206. elif res[1] == 1:
  207. re = find_wait_garbage(gid, db)
  208. elif res[1] == 2:
  209. re = find_use_garbage(gid, db)
  210. else:
  211. re = None
  212. assert re is not None
  213. return re
  214. def is_garbage_exists(gid: gid_t, db: DB) -> Tuple[bool, int]:
  215. cur = db.search(columns=["GarbageID", "Flat"],
  216. table="garbage",
  217. where=f"GarbageID = {gid}")
  218. if cur is None or cur.rowcount == 0:
  219. return False, 0
  220. assert cur.rowcount == 1
  221. res: Tuple[int, int] = cur.fetchone()
  222. return True, res[1]
  223. def update_garbage(garbage: GarbageBag, db: DB) -> bool:
  224. re = find_garbage(garbage.get_gid(), db)
  225. if re is None:
  226. return False
  227. if re.is_use() and not garbage.is_use() or re.is_check()[0] and not garbage.is_check()[0]:
  228. return False
  229. if not garbage.is_use() and not garbage.is_check()[0]:
  230. return True # 不做任何修改
  231. gid = garbage.get_gid()
  232. info = garbage.get_info()
  233. update_kw = {
  234. "Flat": "0",
  235. "UserID": "NULL",
  236. "UseTime": "NULL",
  237. "GarbageType": "NULL",
  238. "Location": "NULL",
  239. "CheckResult": "NULL",
  240. "CheckerID": "NULL"
  241. }
  242. if garbage.is_use():
  243. update_kw['Flat'] = "1"
  244. update_kw['UserID'] = f"'{info['user']}'"
  245. update_kw['UseTime'] = f"{mysql_time(info['use_time'])}"
  246. update_kw['GarbageType'] = f"{info['type']}"
  247. update_kw['Location'] = f"'{info['loc']}'"
  248. if garbage.is_check()[0]:
  249. update_kw['Flat'] = "2"
  250. update_kw['CheckResult'] = f"{info['check']}"
  251. update_kw['CheckerID'] = f"'{info['checker']}'"
  252. res = db.update("garbage", kw=update_kw, where=f"GarbageID = {gid}")
  253. return res is not None
  254. def create_new_garbage(db: DB) -> Optional[GarbageBag]:
  255. cur = db.insert(table="garbage", columns=["CreateTime", "Flat"], values=f"{mysql_time()}, 0")
  256. if cur is None:
  257. return None
  258. assert cur.rowcount == 1
  259. gid = cur.lastrowid
  260. return GarbageBag(str(gid))
  261. def del_garbage_not_use(gid: gid_t, db: DB) -> bool:
  262. cur = db.delete(table="garbage_n", where=f"GarbageID = {gid}")
  263. if cur is None or cur.rowcount == 0:
  264. return False
  265. assert cur.rowcount == 1
  266. return True
  267. def del_garbage_wait_check(gid: gid_t, db: DB) -> bool:
  268. cur = db.delete(table="garbage_c", where=f"GarbageID = {gid}")
  269. if cur is None or cur.rowcount == 0:
  270. return False
  271. assert cur.rowcount == 1
  272. return True
  273. def del_garbage_has_check(gid: gid_t, db: DB) -> bool:
  274. cur = db.delete(table="garbage_u", where=f"GarbageID = {gid}")
  275. if cur is None or cur.rowcount == 0:
  276. return False
  277. assert cur.rowcount == 1
  278. return True
  279. def del_garbage(gid, db: DB):
  280. cur = db.delete(table="garbage", where=f"GarbageID = {gid}")
  281. if cur is None or cur.rowcount == 0:
  282. return False
  283. assert cur.rowcount == 1
  284. return True
  285. def del_garbage_where_not_use(where: str, db: DB) -> int:
  286. cur = db.delete(table="garbage_n", where=where)
  287. if cur is None:
  288. return -1
  289. return cur.rowcount
  290. def del_garbage_where_wait_check(where: str, db: DB) -> int:
  291. cur = db.delete(table="garbage_c", where=where)
  292. if cur is None:
  293. return -1
  294. return cur.rowcount
  295. def del_garbage_where_has_check(where: str, db: DB) -> int:
  296. cur = db.delete(table="garbage_u", where=where)
  297. if cur is None:
  298. return -1
  299. return cur.rowcount
  300. def del_garbage_where_scan_not_use(where, db: DB) -> int:
  301. cur = db.search(columns=["GarbageID"],
  302. table="garbage_n",
  303. where=where)
  304. if cur is None:
  305. return -1
  306. return cur.rowcount
  307. def del_garbage_where_scan_wait_check(where: str, db: DB) -> int:
  308. cur = db.search(columns=["GarbageID"],
  309. table="garbage_c",
  310. where=where)
  311. if cur is None:
  312. return -1
  313. return cur.rowcount
  314. def del_garbage_where_scan_has_check(where, db: DB) -> int:
  315. cur = db.search(columns=["GarbageID"],
  316. table="garbage_u",
  317. where=where)
  318. if cur is None:
  319. return -1
  320. return cur.rowcount
  321. def del_all_garbage(db: DB) -> int:
  322. cur = db.delete(table="garbage", where='1')
  323. if cur is None:
  324. return -1
  325. return cur.rowcount
  326. def del_all_garbage_scan(db: DB) -> int:
  327. cur = db.search(columns=["GarbageID"], table="garbage", where="1")
  328. if cur is None:
  329. return -1
  330. return cur.rowcount