garbage.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. import time
  2. from . import DBBit
  3. from .db import DB
  4. from tool.typing import *
  5. from tool.time import mysql_time, time_from_mysql
  6. from core.garbage import GarbageBag, GarbageType
  7. def update_garbage_type(where: str, type_: int, db: DB) -> int:
  8. """
  9. 更新垃圾袋类型
  10. :param where: 条件
  11. :param type_: 类型
  12. :param db: 数据库
  13. :return:
  14. """
  15. if len(where) == 0:
  16. return -1
  17. cur = db.update(table="garbage", kw={"GarbageType": str(type_)}, where=where)
  18. if cur is None:
  19. return -1
  20. return cur.rowcount
  21. def update_garbage_check(where: str, result: bool, db: DB) -> int:
  22. """
  23. 更新垃圾袋检测结果
  24. :param where: 条件
  25. :param result: 结果
  26. :param db: 数据库
  27. :return:
  28. """
  29. if len(where) == 0:
  30. return -1
  31. i: str = '1' if result else '0'
  32. cur = db.update(table="garbage", kw={"CheckResult": i}, where=where)
  33. if cur is None:
  34. return -1
  35. return cur.rowcount
  36. def __get_time(time_str: str) -> float:
  37. if time_str == 'now':
  38. return time.time()
  39. if time_str.startswith("Date:") and len(time_str) >= 5:
  40. return time_from_mysql(time_str[5:])
  41. return float(time_str)
  42. def __get_time_diff(time_str: str) -> float:
  43. if time_str.endswith("s") and len(time_str) >= 2:
  44. return float(time_str[:-1])
  45. elif time_str.endswith("ms") and len(time_str) >= 3:
  46. return float(time_str[:-1]) / 1000
  47. elif time_str.endswith("min") and len(time_str) >= 4:
  48. return float(time_str[:-1]) * 60
  49. elif time_str.endswith("h") and len(time_str) >= 2:
  50. return float(time_str[:-1]) * 60 * 60
  51. elif time_str.endswith("d") and len(time_str) >= 2:
  52. return float(time_str[:-1]) * 24 * 60 * 60
  53. return float(time_str)
  54. def __search_fields_time(time_: str, time_name: str) -> str:
  55. if time_ == '<=now':
  56. return f"{time_name}<={mysql_time()} AND"
  57. sp = time_.split(',')
  58. if len(sp) == 2:
  59. try:
  60. time_list = __get_time(sp[0]), __get_time(sp[1])
  61. a = min(time_list)
  62. b = max(time_list)
  63. except (TypeError, ValueError):
  64. return ""
  65. else:
  66. return f"({time_name} BETWEEN {mysql_time(a)} AND {mysql_time(b)}) AND"
  67. sp = time_.split(';')
  68. if len(sp) == 2:
  69. try:
  70. time_list = __get_time(sp[0]), __get_time_diff(sp[1])
  71. a = time_list[0] - time_list[1]
  72. b = time_list[0] + time_list[1]
  73. except (TypeError, ValueError):
  74. return ""
  75. else:
  76. return f"({time_name} BETWEEN {mysql_time(a)} AND {mysql_time(b)}) AND"
  77. try:
  78. t = __get_time(time_)
  79. except (TypeError, ValueError):
  80. return ""
  81. else:
  82. return f"({time_name}={mysql_time(t)} AND"
  83. def search_garbage_by_fields(columns, gid, uid, cuid, create_time, use_time, loc, type_, check, db: DB):
  84. where = ""
  85. if gid is not None:
  86. where += f"GarbageID={gid} AND "
  87. if uid is not None:
  88. where += f"UserID=‘{uid}’ AND "
  89. if cuid is not None:
  90. where += f"CheckerID='{cuid}' AND "
  91. if loc is not None:
  92. where += f"Location='{loc}' AND "
  93. if check is not None:
  94. if check == "False":
  95. where += f"CheckResult=0 AND "
  96. else:
  97. where += f"CheckResult=1 AND "
  98. if type_ is not None and type_ in GarbageType.GarbageTypeStrList:
  99. res = GarbageType.GarbageTypeStrList.index(type_)
  100. where += f"Phone={res} AND "
  101. if create_time is not None:
  102. where += __search_fields_time(create_time, "CreateTime")
  103. if use_time is not None:
  104. where += __search_fields_time(use_time, "UseTime")
  105. if len(where) != 0:
  106. where = where[0:-4] # 去除末尾的AND
  107. return search_from_garbage_view(columns, where, db)
  108. def search_from_garbage_view(columns, where: str, db: DB):
  109. cur = db.search(columns=columns, table="garbage", where=where)
  110. if cur is None:
  111. return None
  112. res = cur.fetchall()
  113. return res
  114. def count_garbage_by_uid(uid: uid_t, db: DB, time_limit: bool = True):
  115. if time_limit:
  116. ti: time_t = time.time()
  117. start = ti - 3.5 * 24 * 60 * 60 # 前后3.5天
  118. end = ti + 3.5 * 24 * 60 * 60
  119. where = [f"UserID = '{uid}'", f"UseTime BETWEEN {mysql_time(start)} AND {mysql_time(end)}"]
  120. else:
  121. where = [f"UserID = '{uid}'"]
  122. cur = db.search(columns=["Count(GarbageID)"],
  123. table="garbage_time",
  124. where=where)
  125. if cur is None:
  126. return -1
  127. assert cur.rowcount == 1
  128. return int(cur.fetchone()[0])
  129. def get_garbage_by_uid(uid: uid_t, columns, limit, db: DB, offset: int = 0):
  130. cur = db.search(columns=columns,
  131. table="garbage",
  132. where=f"UserID='{uid}'",
  133. limit=limit,
  134. offset=offset,
  135. order_by=[("UseTime", "DESC")])
  136. if cur is None:
  137. return None
  138. return cur.fetchall()
  139. def __find_garbage(columns: List[str], table: str, where: str, db: DB):
  140. cur = db.search(columns=columns, table=table, where=where)
  141. if cur is None or cur.rowcount == 0:
  142. return None, tuple()
  143. assert cur.rowcount == 1
  144. res = cur.fetchone()
  145. assert len(res) == len(columns)
  146. return GarbageBag(str(res[0])), res
  147. def find_not_use_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
  148. return __find_garbage(columns=["GarbageID"],
  149. table="garbage_n",
  150. where=f"GarbageID = {gid}",
  151. db=db)[0]
  152. def find_wait_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
  153. res: Tuple[int, bytes, str, str, str]
  154. gb: GarbageBag
  155. gb, res = __find_garbage(columns=["GarbageID", "GarbageType", "UseTime", "UserID", "Location"],
  156. table="garbage_c",
  157. where=f"GarbageID = {gid}",
  158. db=db)
  159. if gb is None:
  160. return None
  161. garbage_type: enum = int(res[1].decode())
  162. use_time: time_t = time_from_mysql(res[2])
  163. uid: uid_t = res[3]
  164. loc: location_t = res[4]
  165. gb.config_use(garbage_type, use_time, uid, loc)
  166. return gb
  167. def find_use_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
  168. res: Tuple[int, bytes, str, str, str, bytes]
  169. gb: GarbageBag
  170. gb, res = __find_garbage(columns=["GarbageID", "GarbageType", "UseTime", "UserID", "Location",
  171. "CheckResult", "CheckerID"],
  172. table="garbage_u",
  173. where=f"GarbageID = {gid}",
  174. db=db)
  175. if gb is None:
  176. return None
  177. garbage_type: enum = int(res[1].decode())
  178. use_time: time_t = time_from_mysql(res[2])
  179. uid: uid_t = res[3]
  180. loc: location_t = res[4]
  181. check: bool = res[5] == DBBit.BIT_1
  182. check_uid: uid_t = res[6]
  183. gb.config_use(garbage_type, use_time, uid, loc)
  184. gb.config_check(check, check_uid)
  185. return gb
  186. def find_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
  187. res: Tuple[bool, int] = is_garbage_exists(gid, db)
  188. if not res[0]:
  189. return None
  190. elif res[1] == 0:
  191. re = find_not_use_garbage(gid, db)
  192. elif res[1] == 1:
  193. re = find_wait_garbage(gid, db)
  194. elif res[1] == 2:
  195. re = find_use_garbage(gid, db)
  196. else:
  197. re = None
  198. assert re is not None
  199. return re
  200. def is_garbage_exists(gid: gid_t, db: DB) -> Tuple[bool, int]:
  201. cur = db.search(columns=["GarbageID", "Flat"],
  202. table="garbage",
  203. where=f"GarbageID = {gid}")
  204. if cur is None or cur.rowcount == 0:
  205. return False, 0
  206. assert cur.rowcount == 1
  207. res: Tuple[int, int] = cur.fetchone()
  208. return True, res[1]
  209. def update_garbage(garbage: GarbageBag, db: DB) -> bool:
  210. re = find_garbage(garbage.get_gid(), db)
  211. if re is None:
  212. return False
  213. if re.is_use() and not garbage.is_use() or re.is_check()[0] and not garbage.is_check()[0]:
  214. return False
  215. if not garbage.is_use() and not garbage.is_check()[0]:
  216. return True # 不做任何修改
  217. gid = garbage.get_gid()
  218. info = garbage.get_info()
  219. update_kw = {
  220. "Flat": "0",
  221. "UserID": "NULL",
  222. "UseTime": "NULL",
  223. "GarbageType": "NULL",
  224. "Location": "NULL",
  225. "CheckResult": "NULL",
  226. "CheckerID": "NULL"
  227. }
  228. if garbage.is_use():
  229. update_kw['Flat'] = "1"
  230. update_kw['UserID'] = f"'{info['user']}'"
  231. update_kw['UseTime'] = f"{mysql_time(info['use_time'])}"
  232. update_kw['GarbageType'] = f"{info['type']}"
  233. update_kw['Location'] = f"'{info['loc']}'"
  234. if garbage.is_check()[0]:
  235. update_kw['Flat'] = "2"
  236. update_kw['CheckResult'] = f"{info['check']}"
  237. update_kw['CheckerID'] = f"'{info['checker']}'"
  238. res = db.update("garbage", kw=update_kw, where=f"GarbageID = {gid}")
  239. return res is not None
  240. def create_new_garbage(db: DB) -> Optional[GarbageBag]:
  241. cur = db.insert(table="garbage", columns=["CreateTime", "Flat"], values=f"{mysql_time()}, 0")
  242. if cur is None:
  243. return None
  244. assert cur.rowcount == 1
  245. gid = cur.lastrowid
  246. return GarbageBag(str(gid))
  247. def del_garbage_not_use(gid: gid_t, db: DB) -> bool:
  248. cur = db.delete(table="garbage_n", where=f"GarbageID = {gid}")
  249. if cur is None or cur.rowcount == 0:
  250. return False
  251. assert cur.rowcount == 1
  252. return True
  253. def del_garbage_wait_check(gid: gid_t, db: DB) -> bool:
  254. cur = db.delete(table="garbage_c", where=f"GarbageID = {gid}")
  255. if cur is None or cur.rowcount == 0:
  256. return False
  257. assert cur.rowcount == 1
  258. return True
  259. def del_garbage_has_check(gid: gid_t, db: DB) -> bool:
  260. cur = db.delete(table="garbage_u", where=f"GarbageID = {gid}")
  261. if cur is None or cur.rowcount == 0:
  262. return False
  263. assert cur.rowcount == 1
  264. return True
  265. def del_garbage(gid, db: DB):
  266. cur = db.delete(table="garbage", where=f"GarbageID = {gid}")
  267. if cur is None or cur.rowcount == 0:
  268. return False
  269. assert cur.rowcount == 1
  270. return True
  271. def del_garbage_where_not_use(where: str, db: DB) -> int:
  272. cur = db.delete(table="garbage_n", where=where)
  273. if cur is None:
  274. return -1
  275. return cur.rowcount
  276. def del_garbage_where_wait_check(where: str, db: DB) -> int:
  277. cur = db.delete(table="garbage_c", where=where)
  278. if cur is None:
  279. return -1
  280. return cur.rowcount
  281. def del_garbage_where_has_check(where: str, db: DB) -> int:
  282. cur = db.delete(table="garbage_u", where=where)
  283. if cur is None:
  284. return -1
  285. return cur.rowcount
  286. def del_garbage_where_scan_not_use(where, db: DB) -> int:
  287. cur = db.search(columns=["GarbageID"],
  288. table="garbage_n",
  289. where=where)
  290. if cur is None:
  291. return -1
  292. return cur.rowcount
  293. def del_garbage_where_scan_wait_check(where: str, db: DB) -> int:
  294. cur = db.search(columns=["GarbageID"],
  295. table="garbage_c",
  296. where=where)
  297. if cur is None:
  298. return -1
  299. return cur.rowcount
  300. def del_garbage_where_scan_has_check(where, db: DB) -> int:
  301. cur = db.search(columns=["GarbageID"],
  302. table="garbage_u",
  303. where=where)
  304. if cur is None:
  305. return -1
  306. return cur.rowcount
  307. def del_all_garbage(db: DB) -> int:
  308. cur = db.delete(table="garbage", where='1')
  309. if cur is None:
  310. return -1
  311. return cur.rowcount
  312. def del_all_garbage_scan(db: DB) -> int:
  313. cur = db.search(columns=["GarbageID"], table="garbage", where="1")
  314. if cur is None:
  315. return -1
  316. return cur.rowcount