123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- import time
- from . import DBBit
- from .db import DB
- from tool.typing import *
- from tool.time import mysql_time, time_from_mysql
- from core.garbage import GarbageBag, GarbageType
- def update_garbage_type(where: str, type_: int, db: DB) -> int:
- """
- 更新垃圾袋类型
- :param where: 条件
- :param type_: 类型
- :param db: 数据库
- :return:
- """
- if len(where) == 0:
- return -1
- cur = db.update(table="garbage", kw={"GarbageType": str(type_)}, where=where)
- if cur is None:
- return -1
- return cur.rowcount
- def update_garbage_check(where: str, result: bool, db: DB) -> int:
- """
- 更新垃圾袋检测结果
- :param where: 条件
- :param result: 结果
- :param db: 数据库
- :return:
- """
- if len(where) == 0:
- return -1
- i: str = '1' if result else '0'
- cur = db.update(table="garbage", kw={"CheckResult": i}, where=where)
- if cur is None:
- return -1
- return cur.rowcount
- def __get_time(time_str: str) -> float:
- if time_str == 'now':
- return time.time()
- if time_str.startswith("Date:") and len(time_str) >= 5:
- return time_from_mysql(time_str[5:])
- return float(time_str)
- def __get_time_diff(time_str: str) -> float:
- if time_str.endswith("s") and len(time_str) >= 2:
- return float(time_str[:-1])
- elif time_str.endswith("ms") and len(time_str) >= 3:
- return float(time_str[:-1]) / 1000
- elif time_str.endswith("min") and len(time_str) >= 4:
- return float(time_str[:-1]) * 60
- elif time_str.endswith("h") and len(time_str) >= 2:
- return float(time_str[:-1]) * 60 * 60
- elif time_str.endswith("d") and len(time_str) >= 2:
- return float(time_str[:-1]) * 24 * 60 * 60
- return float(time_str)
- def __search_fields_time(time_: str, time_name: str) -> str:
- if time_ == '<=now':
- return f"{time_name}<={mysql_time()} AND"
- sp = time_.split(',')
- if len(sp) == 2:
- try:
- time_list = __get_time(sp[0]), __get_time(sp[1])
- a = min(time_list)
- b = max(time_list)
- except (TypeError, ValueError):
- return ""
- else:
- return f"({time_name} BETWEEN {mysql_time(a)} AND {mysql_time(b)}) AND"
- sp = time_.split(';')
- if len(sp) == 2:
- try:
- time_list = __get_time(sp[0]), __get_time_diff(sp[1])
- a = time_list[0] - time_list[1]
- b = time_list[0] + time_list[1]
- except (TypeError, ValueError):
- return ""
- else:
- return f"({time_name} BETWEEN {mysql_time(a)} AND {mysql_time(b)}) AND"
- try:
- t = __get_time(time_)
- except (TypeError, ValueError):
- return ""
- else:
- return f"({time_name}={mysql_time(t)} AND"
- def set_where_(ex: Optional[str], column: str):
- if ex is None:
- return ""
- if ex.strip() == "IS NULL":
- where = f"{column} IS NULL AND "
- elif ex.startswith("LIKE ") or ex.startswith("REGEXP "):
- where = f"{column} {ex} AND "
- else:
- where = f"{column}='{ex}' AND "
- return where
- def search_garbage_by_fields(columns, gid, uid, cuid, create_time, use_time, loc, type_, check, db: DB):
- where = ""
- where += set_where_(gid, "GarbageID")
- where += set_where_(uid, "UserID")
- where += set_where_(cuid, "CheckerID")
- where += set_where_(loc, "Location")
- if check is not None:
- if check == 'IS NULL':
- where += f"CheckResult IS NULL AND "
- elif check == "不通过" or check == "False" or check == "Fail":
- where += f"CheckResult=0 AND "
- elif check == "通过" or check == "True" or check == "Pass":
- where += f"CheckResult=1 AND "
- if type_ is not None:
- if type_ == 'IS NULL':
- where += f"GarbageType IS NULL AND "
- elif type_ in GarbageType.GarbageTypeStrList:
- res = GarbageType.GarbageTypeStrList.index(type_)
- where += f"GarbageType={res} AND "
- elif type_ in GarbageType.GarbageTypeStrList_ch:
- res = GarbageType.GarbageTypeStrList_ch.index(type_)
- where += f"GarbageType={res} AND "
- if create_time is not None:
- where += __search_fields_time(create_time, "CreateTime")
- if use_time is not None:
- where += __search_fields_time(use_time, "UseTime")
- if len(where) != 0:
- where = where[0:-4] # 去除末尾的AND
- return search_from_garbage_view(columns, where, db)
- def search_from_garbage_view(columns, where: str, db: DB):
- cur = db.search(columns=columns, table="garbage", where=where)
- if cur is None:
- return None
- res = cur.fetchall()
- return res
- def count_garbage_by_uid(uid: uid_t, db: DB, time_limit: bool = True):
- if time_limit:
- ti: time_t = time.time()
- start = ti - 3.5 * 24 * 60 * 60 # 前后3.5天
- end = ti + 3.5 * 24 * 60 * 60
- where = [f"UserID = '{uid}'", f"UseTime BETWEEN {mysql_time(start)} AND {mysql_time(end)}"]
- else:
- where = [f"UserID = '{uid}'"]
- cur = db.search(columns=["Count(GarbageID)"],
- table="garbage_time",
- where=where)
- if cur is None:
- return -1
- assert cur.rowcount == 1
- return int(cur.fetchone()[0])
- def get_garbage_by_uid(uid: uid_t, columns, limit, db: DB, offset: int = 0):
- cur = db.search(columns=columns,
- table="garbage",
- where=f"UserID='{uid}'",
- limit=limit,
- offset=offset,
- order_by=[("UseTime", "DESC")])
- if cur is None:
- return None
- return cur.fetchall()
- def __find_garbage(columns: List[str], table: str, where: str, db: DB):
- cur = db.search(columns=columns, table=table, where=where)
- if cur is None or cur.rowcount == 0:
- return None, tuple()
- assert cur.rowcount == 1
- res = cur.fetchone()
- assert len(res) == len(columns)
- return GarbageBag(str(res[0])), res
- def find_not_use_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
- return __find_garbage(columns=["GarbageID"],
- table="garbage_n",
- where=f"GarbageID = {gid}",
- db=db)[0]
- def find_wait_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
- res: Tuple[int, bytes, str, str, str]
- gb: GarbageBag
- gb, res = __find_garbage(columns=["GarbageID", "GarbageType", "UseTime", "UserID", "Location"],
- table="garbage_c",
- where=f"GarbageID = {gid}",
- db=db)
- if gb is None:
- return None
- garbage_type: enum = int(res[1].decode())
- use_time: time_t = time_from_mysql(res[2])
- uid: uid_t = res[3]
- loc: location_t = res[4]
- gb.config_use(garbage_type, use_time, uid, loc)
- return gb
- def find_use_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
- res: Tuple[int, bytes, str, str, str, bytes]
- gb: GarbageBag
- gb, res = __find_garbage(columns=["GarbageID", "GarbageType", "UseTime", "UserID", "Location",
- "CheckResult", "CheckerID"],
- table="garbage_u",
- where=f"GarbageID = {gid}",
- db=db)
- if gb is None:
- return None
- garbage_type: enum = int(res[1].decode())
- use_time: time_t = time_from_mysql(res[2])
- uid: uid_t = res[3]
- loc: location_t = res[4]
- check: bool = res[5] == DBBit.BIT_1
- check_uid: uid_t = res[6]
- gb.config_use(garbage_type, use_time, uid, loc)
- gb.config_check(check, check_uid)
- return gb
- def find_garbage(gid: gid_t, db: DB) -> Union[GarbageBag, None]:
- res: Tuple[bool, int] = is_garbage_exists(gid, db)
- if not res[0]:
- return None
- elif res[1] == 0:
- re = find_not_use_garbage(gid, db)
- elif res[1] == 1:
- re = find_wait_garbage(gid, db)
- elif res[1] == 2:
- re = find_use_garbage(gid, db)
- else:
- re = None
- assert re is not None
- return re
- def is_garbage_exists(gid: gid_t, db: DB) -> Tuple[bool, int]:
- cur = db.search(columns=["GarbageID", "Flat"],
- table="garbage",
- where=f"GarbageID = {gid}")
- if cur is None or cur.rowcount == 0:
- return False, 0
- assert cur.rowcount == 1
- res: Tuple[int, int] = cur.fetchone()
- return True, res[1]
- def update_garbage(garbage: GarbageBag, db: DB) -> bool:
- re = find_garbage(garbage.get_gid(), db)
- if re is None:
- return False
- if re.is_use() and not garbage.is_use() or re.is_check()[0] and not garbage.is_check()[0]:
- return False
- if not garbage.is_use() and not garbage.is_check()[0]:
- return True # 不做任何修改
- gid = garbage.get_gid()
- info = garbage.get_info()
- update_kw = {
- "Flat": "0",
- "UserID": "NULL",
- "UseTime": "NULL",
- "GarbageType": "NULL",
- "Location": "NULL",
- "CheckResult": "NULL",
- "CheckerID": "NULL"
- }
- if garbage.is_use():
- update_kw['Flat'] = "1"
- update_kw['UserID'] = f"'{info['user']}'"
- update_kw['UseTime'] = f"{mysql_time(info['use_time'])}"
- update_kw['GarbageType'] = f"{info['type']}"
- update_kw['Location'] = f"'{info['loc']}'"
- if garbage.is_check()[0]:
- update_kw['Flat'] = "2"
- update_kw['CheckResult'] = f"{info['check']}"
- update_kw['CheckerID'] = f"'{info['checker']}'"
- res = db.update("garbage", kw=update_kw, where=f"GarbageID = {gid}")
- return res is not None
- def create_new_garbage(db: DB) -> Optional[GarbageBag]:
- cur = db.insert(table="garbage", columns=["CreateTime", "Flat"], values=f"{mysql_time()}, 0")
- if cur is None:
- return None
- assert cur.rowcount == 1
- gid = cur.lastrowid
- return GarbageBag(str(gid))
- def del_garbage_not_use(gid: gid_t, db: DB) -> bool:
- cur = db.delete(table="garbage_n", where=f"GarbageID = {gid}")
- if cur is None or cur.rowcount == 0:
- return False
- assert cur.rowcount == 1
- return True
- def del_garbage_wait_check(gid: gid_t, db: DB) -> bool:
- cur = db.delete(table="garbage_c", where=f"GarbageID = {gid}")
- if cur is None or cur.rowcount == 0:
- return False
- assert cur.rowcount == 1
- return True
- def del_garbage_has_check(gid: gid_t, db: DB) -> bool:
- cur = db.delete(table="garbage_u", where=f"GarbageID = {gid}")
- if cur is None or cur.rowcount == 0:
- return False
- assert cur.rowcount == 1
- return True
- def del_garbage(gid, db: DB):
- cur = db.delete(table="garbage", where=f"GarbageID = {gid}")
- if cur is None or cur.rowcount == 0:
- return False
- assert cur.rowcount == 1
- return True
- def del_garbage_where_not_use(where: str, db: DB) -> int:
- cur = db.delete(table="garbage_n", where=where)
- if cur is None:
- return -1
- return cur.rowcount
- def del_garbage_where_wait_check(where: str, db: DB) -> int:
- cur = db.delete(table="garbage_c", where=where)
- if cur is None:
- return -1
- return cur.rowcount
- def del_garbage_where_has_check(where: str, db: DB) -> int:
- cur = db.delete(table="garbage_u", where=where)
- if cur is None:
- return -1
- return cur.rowcount
- def del_garbage_where_scan_not_use(where, db: DB) -> int:
- cur = db.search(columns=["GarbageID"],
- table="garbage_n",
- where=where)
- if cur is None:
- return -1
- return cur.rowcount
- def del_garbage_where_scan_wait_check(where: str, db: DB) -> int:
- cur = db.search(columns=["GarbageID"],
- table="garbage_c",
- where=where)
- if cur is None:
- return -1
- return cur.rowcount
- def del_garbage_where_scan_has_check(where, db: DB) -> int:
- cur = db.search(columns=["GarbageID"],
- table="garbage_u",
- where=where)
- if cur is None:
- return -1
- return cur.rowcount
- def del_all_garbage(db: DB) -> int:
- cur = db.delete(table="garbage", where='1')
- if cur is None:
- return -1
- return cur.rowcount
- def del_all_garbage_scan(db: DB) -> int:
- cur = db.search(columns=["GarbageID"], table="garbage", where="1")
- if cur is None:
- return -1
- return cur.rowcount
|