import threading import time import conf import cv2 import sys import random import traceback import abc import tkinter as tk from tkinter import ttk import tkinter.font as font import datetime from PIL import Image, ImageTk from tool.type_ import * from core.user import User, UserNotSupportError from core.garbage import GarbageBag, GarbageType, GarbageBagNotUse from sql.db import DB from sql.user import update_user, find_user_by_id from sql.garbage import update_garbage from equipment.scan import HGSCapture, HGSQRCoder, QRCode from equipment.scan_user import scan_user from equipment.scan_garbage import scan_garbage from tk_event import TkEventBase class GarbageStationException(Exception): ... class ControlNotLogin(GarbageStationException): ... class ThrowGarbageError(GarbageStationException): ... class CheckGarbageError(GarbageStationException): ... class RankingUserError(GarbageStationException): ... class StationEventBase(TkEventBase): def __init__(self, gb_station, db: DB, title: str = 'unknown'): self.station: GarbageStationBase = gb_station self._db: DB = db self._title = title def get_title(self) -> str: return self._title def is_end(self) -> bool: raise GarbageStationException def done_after_event(self): raise GarbageStationException class GarbageStationBase(metaclass=abc.ABCMeta): status_normal = 1 status_get_garbage_type = 2 status_get_garbage_check = 3 scan_switch_user = 1 scan_throw_garbage = 2 scan_check_garbage = 3 scan_no_to_done = 4 def __init__(self, db: DB, cap: HGSCapture, qr: HGSQRCoder, loc: location_t = conf.base_location): self._db: DB = db self._cap: HGSCapture = cap self._qr: HGSQRCoder = qr self._loc: location_t = loc self._user: Optional[User] = None # 操作者 self._user_last_time: time_t = 0 self._garbage: Optional[GarbageBag] = None self._flat = GarbageStationBase.status_normal self._have_easter_eggs = False self.rank = None self.rank_index = 0 self._event_list: List[StationEventBase] = [] self.set_after_run(conf.tk_refresh_delay, lambda: self.run_event()) def update_user_time(self): if self.check_user(): self._user_last_time = time.time() def get_user(self): return self._user def is_manager(self): if not self.check_user(): return False return self._user.is_manager() def check_user(self): if self._user is None: return False if not self._user.is_manager() and time.time() - self._user_last_time > 20: self._user = None return False return True def __check_user(self): if not self.check_user(): raise ControlNotLogin self._user_last_time = time.time() def __check_normal_user(self): self.__check_user() if self._user.is_manager(): raise UserNotSupportError def __check_manager_user(self): self.__check_user() if not self._user.is_manager(): raise UserNotSupportError def get_user_info(self): self.__check_user() return self._user.get_info() def get_uid_no_update(self): if not self.check_user(): return "" return self._user.get_uid() def get_user_info_no_update(self) -> Dict[str, str]: if not self.check_user(): return {} return self._user.get_info() def get_cap_img(self): return self._cap.get_frame() def switch_user(self, user: User) -> bool: """ 切换用户: 退出/登录 :param user: 新用户 :return: 登录-True, 退出-False """ if self._user is not None and self._user.get_uid() == user.get_uid() and self.check_user(): # 正在登陆期 self._user = None # 退出登录 self._user_last_time = 0 return False self._user = user self._user_last_time = time.time() return True # 登录 def throw_garbage_core(self, garbage: GarbageBag, garbage_type: enum): self.__check_normal_user() if not self._user.throw_rubbish(garbage, garbage_type, self._loc): raise ThrowGarbageError update_garbage(garbage, self._db) update_user(self._user, self._db) def check_garbage_core(self, garbage: GarbageBag, check_result: bool): self.__check_manager_user() user = find_user_by_id(garbage.get_user(), self._db) if user is None: raise GarbageBagNotUse if not self._user.check_rubbish(garbage, check_result, user): raise CheckGarbageError update_garbage(garbage, self._db) update_user(self._user, self._db) update_user(user, self._db) def ranking(self, limit: int = 0, order_by: str = 'DESC') -> list[Tuple[uid_t, uname_t, score_t, score_t]]: """ 获取排行榜的功能 :return: """ if limit > 0: limit = f"LIMIT {int(limit)}" else: limit = "" if order_by != 'ASC' and order_by != 'DESC': order_by = 'DESC' cur = self._db.search((f"SELECT uid, name, score, reputation " f"FROM user " f"WHERE manager = 0 " f"ORDER BY reputation {order_by}, score {order_by} " f"{limit}")) if cur is None: raise RankingUserError return list(cur.fetchall()) def to_get_garbage_type(self, garbage: GarbageBag): self._flat = GarbageStationBase.status_get_garbage_type self.set_garbage(garbage) def to_get_garbage_check(self, garbage: GarbageBag): self._flat = GarbageStationBase.status_get_garbage_check self.set_garbage(garbage) def set_garbage(self, garbage: GarbageBag): self._garbage = garbage def get_garbage(self) -> Optional[GarbageBag]: if not self.check_user(): self._garbage = None return self._garbage def throw_garbage(self, garbage_type: enum): self.update_user_time() if self._flat != GarbageStationBase.status_get_garbage_type or self._garbage is None: self.show_warning("Operation Fail", "You should login first and scan the QR code of the trash bag") return event = ThrowGarbageEvent(self, self._db).start(self._garbage, garbage_type) self.push_event(event) self._flat = GarbageStationBase.status_normal self._garbage = None def check_garbage(self, check: bool): self.update_user_time() if self._flat != GarbageStationBase.status_get_garbage_check or self._garbage is None: self.show_warning("Operation Fail", "You should login first and scan the QR code of the trash bag") return event = CheckGarbageEvent(self, self._db).start(self._garbage, check) self.push_event(event) self._flat = GarbageStationBase.status_normal self._garbage = None def show_garbage_info(self): self.update_user_time() if self._flat != GarbageStationBase.status_get_garbage_check or self._garbage is None: self.show_warning("Operation Fail", "You should login first and scan the QR code of the trash bag") return if not self._garbage.is_use(): self.show_warning("Operation Fail", "The garbage bag has not been used") return info = self._garbage.get_info() garbage_type = GarbageType.GarbageTypeStrList[int(info['type'])] if self._garbage.is_check(): time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(float(info['use_time']))) check = f'Checker is f{info["checker"][0:conf.tk_show_uid_len]}\n' if info["check"] == '1': check += f'CheckResult is Pass\n' else: check += f'CheckResult is Fail\n' self.show_msg("Garbage Info", (f"Type is {garbage_type}\n" f"User is {info['user'][0:conf.tk_show_uid_len]}\n" f"Location:\n {info['loc']}\n" f"{check}" f"Date:\n {time_str}")) # 遮蔽Pass和Fail按键 elif self._garbage.is_use(): time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(float(info['use_time']))) self.show_msg("Garbage Info", (f"Type is {garbage_type}\n" f"User is {info['user'][0:conf.tk_show_uid_len]}\n" f"Location:\n {info['loc']}\n" f"Date:\n {time_str}"), big=False) # 不遮蔽Pass和Fail按键 else: self.show_msg("Garbage Info", f"Garbage has not use") # 遮蔽Pass和Fail按键 def show_user_info(self): self.update_user_time() if not self.check_user(): self.show_warning("Operation Fail", "You should login first") return info = self.get_user_info() if info.get('manager', '0') == '1': self.show_msg("About User", (f"Manager User\n" f"UserName: {info['name']}\n" f"UserID:\n {info['uid']}")) else: self.show_msg("About User", (f"Normal User\n" f"UserName: {info['name']}\n" f"UserID:\n {info['uid']}\n" f"Score: {info['score']}\n" f"Reputation: {info['reputation']}\n" f"Rubbish: {info['rubbish']}")) def show_help_info(self): self.update_user_time() self.show_msg("Help", f''' HGSSystem: 1) Scan User QR code 2) Scan Garbage QR code 3) Select the type of garbage 4) Garbage submitted successfully (You can leave now) 5) Waiting for feedback '''.strip()) def show_about_info(self): self.update_user_time() self.show_msg("About", f''' HGSSystem (c) SuperHuan From github HGSSystem is Garbage Sorting System Author: SongZihuan[SuperHuan] Run on python {sys.version} '''.strip()) def show_exit(self): self.update_user_time() if self.is_manager(): self.exit_win() return self.show_warning("Exit", f'Permission not permitted'.strip()) def easter_eggs(self): self.update_user_time() if (not self._have_easter_eggs) and random.randint(0, 10) != 1: # 10% 概率触发 return self._have_easter_eggs = True self.show_msg("Easter Agg", f''' 恭喜触发彩蛋[中文] 尝试一下新的编程语言: aFunlang. 来自: github [斯人若彩虹, 遇上方知有] [期待再次与你相遇] '''.strip()) def show_search_info(self): self.update_user_time() self.show_msg("Search", f''' He will get the camera content and feedback the garbage type. The function has not yet been implemented. '''.strip()) def thread_show_rank(self, rank_list): self.rank = [[]] for i, r in enumerate(rank_list): if len(self.rank[-1]) == 5: self.rank.append([]) color = None if i == 0: color = "#eaff56" elif i == 1: color = "#ffa631" elif i == 2: color = "#ff7500" elif r[0] == self.get_uid_no_update(): color = "#b0a4e3" self.rank[-1].append((i + 1, r[1], r[0], r[2], r[3], color)) if len(self.rank[0]) == 0: self.rank = None self.show_warning("RankError", f'Unable to get leaderboard data') return self.rank_index = 0 self.get_rank(0) def get_rank(self, n: int): self.update_user_time() self.rank_index += n if self.rank_index < 0 or self.rank_index >= len(self.rank): self.show_msg("RankError", f'Unable to get leaderboard data') return self.show_rank(self.rank_index + 1, len(self.rank), self.rank[self.rank_index]) def scan(self): """ 处理扫码事务 二维码扫描的任务包括: 登录, 扔垃圾, 标记垃圾 :return: """ self._cap.get_image() qr_code = self._qr.get_qr_code() if qr_code is None: return GarbageStationBase.scan_no_to_done, None user_scan_event = ScanUserEvent(self, self._db).start(qr_code) self.push_event(user_scan_event) def get_show_rank(self): event = RankingEvent(self, self._db) self.push_event(event) def push_event(self, event: StationEventBase): self._event_list.append(event) self.show_loading(event.get_title()) self.run_event() def run_event(self): if len(self._event_list) == 0: return new_event: List[StationEventBase] = [] done_event: List[StationEventBase] = [] for event in self._event_list: if event.is_end(): done_event.append(event) else: new_event.append(event) self._event_list = new_event if len(self._event_list) == 0: self.stop_loading() for event in done_event: # 隐藏进度条后执行Event-GUI任务 try: event.done_after_event() except: traceback.print_exc() @abc.abstractmethod def show_msg(self, title, info, msg_type='info', big: bool = True): ... @abc.abstractmethod def show_warning(self, title, info): ... @abc.abstractmethod def show_rank(self, page: int, page_c: int, rank_info: List[Tuple[int, uname_t, uid_t, score_t, score_t, Optional[str]]], title: str = 'Ranking'): ... @abc.abstractmethod def hide_msg_rank(self, update: bool = False): ... @abc.abstractmethod def show_loading(self, title: str): ... @abc.abstractmethod def stop_loading(self): ... @abc.abstractmethod def set_after_run(self, ms, func, *args): ... @abc.abstractmethod def update_control(self): ... @abc.abstractmethod def update_scan(self): ... @abc.abstractmethod def update_msg(self): ... @abc.abstractmethod def mainloop(self): ... @abc.abstractmethod def exit_win(self): ... class GarbageStation(GarbageStationBase): def set_after_run(self, ms, func, *args): # super.__init__可能会调用 self.init_after_run_list.append((ms, func, args)) def __conf_set_after_run(self): for ms, func, args in self.init_after_run_list: self.__define_after(ms, func, *args) def __init__(self, db: DB, cap: HGSCapture, qr: HGSQRCoder, loc: location_t = conf.base_location, refresh_delay: int = conf.tk_refresh_delay): self.init_after_run_list: List[Tuple[int, Callable, Tuple]] = [] super(GarbageStation, self).__init__(db, cap, qr, loc) self.refresh_delay = refresh_delay self._window = tk.Tk() self._sys_height = self._window.winfo_screenheight() self._sys_width = self._window.winfo_screenwidth() self._win_height = int(self._sys_height * (2 / 3)) self._win_width = int(self._sys_width * (2 / 3)) self._full_screen = False self.__conf_windows() self._cap_img = None # 存储 PIL.image 的变量 防止gc释放 self._user_im = None self._msg_time: Optional[float] = None # msg 显示时间累计 self._disable_all_btn: bool = False # 禁用所有按钮和操作 self.__creat_tk() self.__conf_tk() self.__conf_after() self.__switch_to_no_user() def __creat_tk(self): self.__conf_font_size() self._title_label = tk.Label(self._window) self._win_ctrl_button: Tuple[tk.Button, tk.Button, tk.Button] = (tk.Button(self._window), tk.Button(self._window), tk.Button(self._window)) win_info_type = Tuple[tk.Label, tk.Label, tk.Variable, str] self._user_frame = tk.Frame(self._window) self._user_name: win_info_type = (tk.Label(self._user_frame), tk.Label(self._user_frame), tk.StringVar(), "UserName") self._user_uid: win_info_type = (tk.Label(self._user_frame), tk.Label(self._user_frame), tk.StringVar(), "UserID") self._user_score: win_info_type = (tk.Label(self._user_frame), tk.Label(self._user_frame), tk.StringVar(), "Score") self._user_rubbish: win_info_type = (tk.Label(self._user_frame), tk.Label(self._user_frame), tk.StringVar(), "Garbage") self._user_eval: win_info_type = (tk.Label(self._user_frame), tk.Label(self._user_frame), tk.StringVar(), "Reputation") self._user_img = tk.Label(self._user_frame) self._throw_ctrl_frame = tk.Frame(self._window) self._throw_ctrl_btn: List[tk.Button] = [tk.Button(self._throw_ctrl_frame), tk.Button(self._throw_ctrl_frame), tk.Button(self._throw_ctrl_frame), tk.Button(self._throw_ctrl_frame)] self._check_ctrl_frame = tk.Frame(self._window) self._check_ctrl_btn: List[tk.Button] = [tk.Button(self._check_ctrl_frame), tk.Button(self._check_ctrl_frame)] self._sys_info_frame = tk.Frame(self._window) self._garbage_id: win_info_type = (tk.Label(self._sys_info_frame), tk.Label(self._sys_info_frame), tk.StringVar(), "GID") self._sys_date: win_info_type = (tk.Label(self._sys_info_frame), tk.Label(self._sys_info_frame), tk.StringVar(), "Date") self._cap_label = tk.Label(self._window) self._user_btn_frame = tk.Frame(self._window) self._user_btn: List[tk.Button] = [tk.Button(self._user_btn_frame), tk.Button(self._user_btn_frame), tk.Button(self._user_btn_frame)] self._msg_frame = tk.Frame(self._window) self._msg_label = tk.Label(self._msg_frame), tk.Label(self._msg_frame), tk.StringVar(), tk.StringVar() self._msg_hide = tk.Button(self._msg_frame) self._rank_frame = tk.Frame(self._window) self._rank_label = [tk.Label(self._rank_frame), tk.Label(self._rank_frame), tk.Label(self._rank_frame), tk.Label(self._rank_frame), tk.Label(self._rank_frame), tk.Label(self._rank_frame)] self._rank_var = [tk.StringVar(), tk.StringVar(), tk.StringVar(), tk.StringVar(), tk.StringVar(), tk.StringVar()] self._rank_btn = [tk.Button(self._rank_frame), tk.Button(self._rank_frame), tk.Button(self._rank_frame)] self._loading_frame = tk.Frame(self._window) self._loading_title: Tuple[tk.Label, tk.Variable] = tk.Label(self._loading_frame), tk.StringVar() self._loading_pro = ttk.Progressbar(self._loading_frame) def __conf_font_size(self, n: Union[int, float] = 1): self._title_font_size = int(27 * n) self._win_ctrl_font_size = int(15 * n) self._win_info_font_size = int(18 * n) self._throw_ctrl_btn_font_size = int(20 * n) self._check_ctrl_btn_font_size = int(20 * n) self._sys_info_font_size = int(18 * n) self._user_btn_font_size = int(20 * n) self._msg_font_size = int(20 * n) self._rank_font_title_size = int(24 * n) self._rank_font_size = int(16 * n) self._loading_tile_font = int(20 * n) def __conf_tk(self): self.__conf_title_label() self.__conf_win_ctrl_button() self.__conf_user_info_label() self.__conf_throw_btn() self.__conf_check_btn() self.__conf_sys_info_label() self.__conf_cap_label() self.__conf_user_btn() self.__conf_msg() self.__conf_rank() self.__conf_loading() self.hide_msg_rank() # 隐藏消息 def __conf_windows(self): self._window.title('HGSSystem: Garbage Station') self._window.geometry(f'{self._win_width}x{self._win_height}') self._window['bg'] = "#F0FFF0" # 蜜瓜绿 self._window.attributes("-topmost", True) self._window.resizable(False, False) self._window.protocol("WM_DELETE_WINDOW", lambda: self.show_exit()) self._window.overrideredirect(False) # 显示标题栏 self._window.bind("", lambda _: self.__set_windows_overrideredirect(True)) # 锁定窗口 def lock_windows(_): if self._disable_all_btn: return self.__set_windows_overrideredirect(True) def unlock_windows(_): if self._disable_all_btn: return if self.is_manager(): self.__set_windows_overrideredirect(False) return self.show_warning("Exit", f'Permission not permitted'.strip()) def full_screen_windows(_): if self._disable_all_btn: return if not self._full_screen or self.is_manager(): self.__full_screen(not self._full_screen) return self.show_warning("Exit", f'Permission not permitted'.strip()) def easter_eggs(_): if self._disable_all_btn: return self.easter_eggs() self._window.bind("", lock_windows) # 锁定窗口 self._window.bind("", unlock_windows) self._window.bind("", full_screen_windows) self._window.bind("", easter_eggs) def __full_screen(self, full: bool = True): self._window.attributes("-fullscreen", full) self._full_screen = full width = self._sys_width * (2 / 3) height = self._sys_height * (2 / 3) self._win_width = self._window.winfo_width() self._win_height = self._window.winfo_height() n = ((self._win_height / height) + (self._win_width / width)) / 2 # 平均放大倍数 self.__conf_font_size(n) self.__conf_tk() def __set_windows_overrideredirect(self, show: bool = False): self._window.overrideredirect(show) def __conf_title_label(self): title_font = self.__make_font(size=self._title_font_size, weight="bold") self._title_label['font'] = title_font self._title_label['bg'] = "#F0FFF0" # 蜜瓜绿 self._title_label['text'] = "HGSSystem: GarbageStation Control Center" self._title_label['anchor'] = 'w' self._title_label.place(relx=0.02, rely=0.0, relwidth=0.6, relheight=0.07) def __conf_win_ctrl_button(self): title_font = self.__make_font(size=self._win_ctrl_font_size) for bt in self._win_ctrl_button: bt: tk.Button bt['font'] = title_font bt['bg'] = "#B0C4DE" # 浅钢青 bt_help: tk.Button = self._win_ctrl_button[0] bt_help['text'] = 'Help' bt_help['bg'] = '#A9A9A9' bt_help['command'] = lambda: self.show_help_info() bt_help.place(relx=0.81, rely=0.01, relwidth=0.05, relheight=0.05) bt_about: tk.Button = self._win_ctrl_button[1] bt_about['text'] = 'About' bt_about['bg'] = '#A9A9A9' bt_about['command'] = lambda: self.show_about_info() bt_about.place(relx=0.87, rely=0.01, relwidth=0.05, relheight=0.05) bt_exit: tk.Button = self._win_ctrl_button[2] bt_exit['text'] = 'Exit' bt_exit['bg'] = '#A9A9A9' bt_exit['command'] = lambda: self.show_exit() bt_exit.place(relx=0.93, rely=0.01, relwidth=0.05, relheight=0.05) def __conf_user_info_label(self): title_font = self.__make_font(size=self._win_info_font_size - 1, weight="bold") info_font = self.__make_font(size=self._win_info_font_size + 1) frame_width = self._win_width * 0.4 frame_height = self._win_height * 0.4 self._user_frame['bg'] = "#FA8072" self._user_frame.place(relx=0.02, rely=0.1, relwidth=0.4, relheight=0.40) self._user_frame['bd'] = 5 self._user_frame['relief'] = "ridge" h_label = 5 h_label_s = 1 h_top = 2 height_count = h_label * 5 + h_label_s * 4 + h_top * 2 height_label = h_label / height_count height = h_top / height_count for lb_list in [self._user_score, self._user_rubbish, self._user_eval, self._user_name, self._user_uid]: lb_list[0]['font'] = title_font lb_list[0]['bg'] = "#FA8072" lb_list[0]['fg'] = "#FFB6C1" lb_list[0]['text'] = lb_list[3] + " " * (10 - len(lb_list[3])) + " :" lb_list[0]['anchor'] = 'e' lb_list[0].place(relx=0.0, rely=height, relwidth=0.35, relheight=height_label) height += height_label + h_label_s / height_count for lb_list in [self._user_score, self._user_rubbish, self._user_eval, self._user_name, self._user_uid]: lb_list[1]['font'] = info_font lb_list[1]['bg'] = "#FA8073" lb_list[1]['fg'] = "#000000" lb_list[1]['textvariable'] = lb_list[2] lb_list[1]['anchor'] = 'w' lb_list[2].set('test') height = h_top / height_count for lb_list in [self._user_score, self._user_rubbish, self._user_eval]: lb_list[1].place(relx=0.36, rely=height, relwidth=0.19, relheight=height_label) height += height_label + h_label_s / height_count for lb_list in [self._user_name, self._user_uid]: lb_list[1].place(relx=0.36, rely=height, relwidth=0.63, relheight=height_label) height += height_label + h_label_s / height_count img_relwidth = 0.30 img_relheight = height_label * 3 + (h_label_s / height_count) * 2 img = (Image.open(conf.pic_d['head']). resize((int(img_relwidth * frame_width), int(img_relheight * frame_height)))) self._user_im = ImageTk.PhotoImage(image=img) self._user_img['image'] = self._user_im self._user_img.place(relx=1 - img_relwidth - 0.06, rely=0.09, relwidth=img_relwidth, relheight=img_relheight) self._user_img['bd'] = 5 self._user_img['relief'] = "ridge" def __conf_throw_btn(self): btn_font = self.__make_font(size=self._throw_ctrl_btn_font_size, weight="bold") btn_info: List[Tuple[str, str]] = [("Recyclable", "#00BFFF"), ("Other", "#A9A9A9"), ("Hazardous", "#DC143C"), ("Kitchen", "#32CD32")] self.__show_throw_frame() for btn, info in zip(self._throw_ctrl_btn, btn_info): btn['font'] = btn_font btn['bg'] = info[1] btn['text'] = info[0] self._throw_ctrl_btn[0]['command'] = lambda: self.throw_garbage(GarbageType.recyclable) self._throw_ctrl_btn[1]['command'] = lambda: self.throw_garbage(GarbageType.other) self._throw_ctrl_btn[2]['command'] = lambda: self.throw_garbage(GarbageType.hazardous) self._throw_ctrl_btn[3]['command'] = lambda: self.throw_garbage(GarbageType.kitchen) self._throw_ctrl_btn[0].place(relx=0.000, rely=0.000, relwidth=0.495, relheight=0.495) self._throw_ctrl_btn[1].place(relx=0.505, rely=0.000, relwidth=0.495, relheight=0.495) self._throw_ctrl_btn[2].place(relx=0.000, rely=0.505, relwidth=0.495, relheight=0.495) self._throw_ctrl_btn[3].place(relx=0.505, rely=0.505, relwidth=0.495, relheight=0.495) def __conf_check_btn(self): btn_font = self.__make_font(size=self._check_ctrl_btn_font_size, weight="bold") btn_info: List[Tuple[str, str]] = [("Fail", "#ef7a82"), ("Pass", "#70f3ff")] self.__show_check_frame() for btn, info in zip(self._check_ctrl_btn, btn_info): btn['font'] = btn_font btn['text'] = info[0] btn['bg'] = info[1] self._check_ctrl_btn[0]['command'] = lambda: self.check_garbage(False) self._check_ctrl_btn[1]['command'] = lambda: self.check_garbage(True) self._check_ctrl_btn[0].place(relx=0.000, rely=0.000, relwidth=0.495, relheight=1) self._check_ctrl_btn[1].place(relx=0.505, rely=0.000, relwidth=0.495, relheight=1) def __conf_sys_info_label(self): title_font = self.__make_font(size=self._sys_info_font_size - 1, weight="bold") info_font = self.__make_font(size=self._sys_info_font_size + 1) self._sys_info_frame['bg'] = "#F0F8FF" self._sys_info_frame.place(relx=0.02, rely=0.51, relwidth=0.4, relheight=0.14) self._sys_info_frame['bd'] = 5 self._sys_info_frame['relief'] = "ridge" h_label = 5 h_label_s = 1 h_top = 2 height_count = h_label * 2 + h_label_s * 1 + h_top * 2 height_label = h_label / height_count height = h_top / height_count for info_list in [self._garbage_id, self._sys_date]: info_list[0]['font'] = title_font info_list[0]['bg'] = "#F0F8FF" info_list[0]['anchor'] = 'e' info_list[0]['text'] = info_list[3] + " " * (10 - len(info_list[3])) + " :" info_list[0].place(relx=0.0, rely=height, relwidth=0.35, relheight=height_label) height += height_label + h_label_s / height_count height = h_top / height_count for info_list in [self._garbage_id, self._sys_date]: info_list[1]['font'] = info_font info_list[1]['bg'] = "#F0F8FF" info_list[1]['textvariable'] = info_list[2] info_list[1]['anchor'] = 'w' info_list[2].set('test') info_list[1].place(relx=0.36, rely=height, relwidth=0.63, relheight=height_label) height += height_label + h_label_s / height_count def __conf_user_btn(self): btn_font = self.__make_font(size=self._user_btn_font_size, weight="bold") btn_info: List[Tuple[str, str]] = [("Detail", "#b0a4e3"), ("Ranking", "#b0a4e3"), ("Search", "#b0a4e3")] self._user_btn_frame.place(relx=0.02, rely=0.66, relwidth=0.19, relheight=0.32) self._user_btn_frame['bg'] = "#F0FFF0" h_label = 5 h_label_s = 1 height_count = h_label * 3 + h_label_s * 2 height_label = h_label / (h_label * 3 + h_label_s * 2) height = 0 for btn, info in zip(self._user_btn, btn_info): btn['font'] = btn_font btn['text'] = info[0] btn['bg'] = info[1] btn.place(relx=0.0, rely=height, relwidth=1.00, relheight=height_label) height += height_label + h_label_s / height_count self._user_btn[0]['state'] = 'disable' self._user_btn[1]['command'] = lambda: self.get_show_rank() self._user_btn[2]['command'] = lambda: self.show_search_info() def __conf_cap_label(self): self._cap_label['bg'] = "#000000" self._cap_label.place(relx=0.22, rely=0.66, relwidth=0.2, relheight=0.32) def __conf_msg(self): title_font = self.__make_font(size=self._msg_font_size + 1, weight="bold") info_font = self.__make_font(size=self._msg_font_size - 1) self._msg_frame['bg'] = "#F5FFFA" self._msg_frame['bd'] = 5 self._msg_frame['relief'] = "ridge" # frame 不会立即显示 self._msg_label[0]['font'] = title_font self._msg_label[0]['bg'] = "#F5FFFA" self._msg_label[0]['anchor'] = 'w' self._msg_label[0]['textvariable'] = self._msg_label[2] self._msg_label[1]['font'] = info_font self._msg_label[1]['bg'] = "#F5FFFA" self._msg_label[1]['anchor'] = 'nw' self._msg_label[1]['justify'] = 'left' self._msg_label[1]['textvariable'] = self._msg_label[3] self._msg_label[0].place(relx=0.05, rely=0.05, relwidth=0.9, relheight=0.1) self._msg_label[1].place(relx=0.075, rely=0.2, relwidth=0.85, relheight=0.64) self._msg_hide['font'] = info_font self._msg_hide['bg'] = "#00CED1" self._msg_hide['text'] = 'close' self._msg_hide['command'] = lambda: self.hide_msg_rank(True) self._msg_hide.place(relx=0.375, rely=0.85, relwidth=0.25, relheight=0.1) def show_msg(self, title, info, msg_type='info', big: bool = True): if self._disable_all_btn: return self._msg_label[2].set(f'{msg_type}: {title}') self._msg_label[3].set(f'{info}') frame_width = self._win_width * 0.53 self._msg_label[1]['wraplength'] = frame_width * 0.85 - 5 # 设定自动换行的像素 if big: self._msg_frame.place(relx=0.45, rely=0.15, relwidth=0.53, relheight=0.70) self._check_ctrl_frame.place_forget() self._throw_ctrl_frame.place_forget() self._rank_frame.place_forget() else: self._msg_frame.place(relx=0.45, rely=0.20, relwidth=0.53, relheight=0.50) self.__show_check_frame() self._throw_ctrl_frame.place_forget() self._rank_frame.place_forget() self._msg_time = time.time() def show_warning(self, title, info): self.show_msg(title, info, msg_type='Warning') def __conf_rank(self): title_font = self.__make_font(size=self._rank_font_title_size, weight="bold") info_font = self.__make_font(size=self._rank_font_size) btn_font = self.__make_font(size=self._msg_font_size - 1) self._rank_frame['bg'] = "#F5F5DC" self._rank_frame['relief'] = "ridge" self._rank_frame['bd'] = 5 # frame 不会立即显示 self._rank_label[0]['font'] = title_font self._rank_label[0]['bg'] = "#F5F5DC" self._rank_label[0]['textvariable'] = self._rank_var[0] self._rank_label[0].place(relx=0.02, rely=0.00, relwidth=0.96, relheight=0.1) for lb, v in zip(self._rank_label[1:], self._rank_var[1:]): lb['font'] = info_font lb['bg'] = "#F5FFFA" lb['textvariable'] = v lb['relief'] = "ridge" lb['bd'] = 2 # 标签结束的高度为 0.12 + 0.15 * 5 = 0.87 for btn, text in zip(self._rank_btn, ("prev", "close", "next")): btn['font'] = btn_font btn['bg'] = "#00CED1" btn['text'] = text self._rank_btn[0].place(relx=0.050, rely=0.88, relwidth=0.25, relheight=0.1) self._rank_btn[0]['command'] = lambda: self.get_rank(-1) self._rank_btn[1].place(relx=0.375, rely=0.88, relwidth=0.25, relheight=0.1) self._rank_btn[1]['command'] = lambda: self.hide_msg_rank(True) self._rank_btn[2].place(relx=0.700, rely=0.88, relwidth=0.25, relheight=0.1) self._rank_btn[2]['command'] = lambda: self.get_rank(+1) def __set_rank_info(self, rank_info: List[Tuple[int, uname_t, uid_t, score_t, score_t, Optional[str]]]): if len(rank_info) > 5: rank_info = rank_info[:5] for lb in self._rank_label[1:]: # 隐藏全部标签 lb.place_forget() height = 0.12 for i, info in enumerate(rank_info): no, name, uid, score, eval_, color = info self._rank_var[i + 1].set(f"NO.{no} {name}\nUID: {uid[0:conf.ranking_tk_show_uid_len]}\n" f"Score: {score} Reputation: {eval_}") if color is None: self._rank_label[i + 1]['bg'] = "#F5FFFA" else: self._rank_label[i + 1]['bg'] = color self._rank_label[i + 1].place(relx=0.04, rely=height, relwidth=0.92, relheight=0.13) height += 0.15 def show_rank(self, page: int, page_c: int, rank_info: List[Tuple[int, uname_t, uid_t, score_t, score_t, Optional[str]]], title: str = 'Ranking'): if self._disable_all_btn: return self._rank_var[0].set(f'{title} ({page}/{page_c})') self._rank_frame.place(relx=0.47, rely=0.15, relwidth=0.47, relheight=0.80) frame_width = self._win_width * 0.53 for lb in self._rank_label[1:]: lb['wraplength'] = frame_width * 0.85 - 5 # 设定自动换行的像素 if page == 1: self._rank_btn[0]['state'] = 'disable' else: self._rank_btn[0]['state'] = 'normal' if page == page_c: self._rank_btn[2]['state'] = 'disable' else: self._rank_btn[2]['state'] = 'normal' self.__set_rank_info(rank_info) self._throw_ctrl_frame.place_forget() self._check_ctrl_frame.place_forget() self._msg_frame.place_forget() self._msg_time = None def hide_msg_rank(self, update: bool = False): self.__show_check_frame() # rank会令此消失 self.__show_throw_frame() # rank和msg令此消失 self._msg_frame.place_forget() self._rank_frame.place_forget() self._msg_time = None if update: self.update_user_time() def __conf_loading(self): title_font = self.__make_font(size=self._loading_tile_font, weight="bold") self._loading_frame['bg'] = "#808080" self._loading_frame['bd'] = 5 self._loading_frame['relief'] = "ridge" # frame 不会立即显示 self._loading_title[0]['font'] = title_font self._loading_title[0]['bg'] = "#808080" self._loading_title[0]['fg'] = "#F8F8FF" self._loading_title[0]['anchor'] = 'w' self._loading_title[0]['textvariable'] = self._loading_title[1] self._loading_title[0].place(relx=0.02, rely=0.00, relwidth=0.96, relheight=0.7) self._loading_pro['mode'] = 'indeterminate' self._loading_pro['orient'] = tk.HORIZONTAL self._loading_pro['maximum'] = 100 self._loading_pro.place(relx=0.02, rely=0.73, relwidth=0.96, relheight=0.22) def show_loading(self, title: str): self.set_all_btn_disable() self._loading_title[1].set(f"Loading: {title}") self._loading_pro['value'] = 0 self._loading_frame.place(relx=0.30, rely=0.40, relwidth=0.40, relheight=0.15) self._loading_pro.start(50) def stop_loading(self): self._loading_frame.place_forget() self._loading_pro.stop() self.set_reset_all_btn() def __show_check_frame(self): self._check_ctrl_frame.place(relx=0.45, rely=0.82, relwidth=0.53, relheight=0.16) def __show_throw_frame(self): self._throw_ctrl_frame.place(relx=0.45, rely=0.1, relwidth=0.53, relheight=0.70) def __define_after(self, ms, func, *args): self._window.after(ms, self.__after_func_maker(func), *args) def __conf_after(self): self.__define_after(self.refresh_delay, self.update_time) self.__define_after(self.refresh_delay, self.update_control) self.__define_after(self.refresh_delay, self.update_scan) self.__define_after(self.refresh_delay, self.update_msg) self.__conf_set_after_run() def __after_func_maker(self, func): def new_func(*args, **kwargs): try: func(*args, **kwargs) except: # 捕获未考虑的错误 self.show_msg("System Exception", "Run Error...", "Error") traceback.print_exc() finally: self._window.after(self.refresh_delay, new_func) return new_func def update_time(self): var: tk.Variable = self._sys_date[2] t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") var.set(f"{t}") def update_control(self): name: tk.Variable = self._user_name[2] uid: tk.Variable = self._user_uid[2] score: tk.Variable = self._user_score[2] rubbish: tk.Variable = self._user_rubbish[2] eval_: tk.Variable = self._user_eval[2] user_info: Dict[str, str] = self.get_user_info_no_update() if user_info.get('uid') is None: name.set('Not-Login') uid.set('Not-Login') eval_.set('---') rubbish.set('---') score.set('---') self.__switch_to_no_user() elif user_info.get('manager', '0') == '1': name.set(user_info.get('name')) uid_get = user_info.get('uid', None) if uid_get is None or len(uid_get) < 32: uid.set('error') else: uid.set(uid_get[0:21]) eval_.set('Manager') rubbish.set('Manager') score.set('Manager') self.__switch_to_manager_user() else: name.set(user_info.get('name')) uid_get = user_info.get('uid', None) if uid_get is None or len(uid_get) < 32: uid.set('error') else: uid.set(uid_get[0:conf.tk_show_uid_len]) eval_.set(user_info.get('reputation')) rubbish.set(user_info.get('rubbish')) score.set(user_info.get('score')) self.__switch_to_normal_user() garbage = self.get_garbage() if garbage is None: self._garbage_id[2].set('---') else: gid = garbage.get_gid() if len(gid) > 20: gid = gid[-20:] self._garbage_id[2].set(gid) def update_scan(self): self.scan() # 需要存储一些数据 谨防被gc释放 _cap_img_info = (Image.fromarray(cv2.cvtColor(self.get_cap_img(), cv2.COLOR_BGR2RGB)). transpose(Image.FLIP_LEFT_RIGHT)) self._cap_img = ImageTk.PhotoImage(image=_cap_img_info) self._cap_label['image'] = self._cap_img def update_msg(self): if self._msg_time is None: return if time.time() - self._msg_time > 10: # 10s 自动关闭消息 self.hide_msg_rank() def __switch_to_normal_user(self): if self._disable_all_btn: return self.normal_user_disable() self.normal_user_able() def __switch_to_manager_user(self): if self._disable_all_btn: return self.manager_user_disable() self.manager_user_able() def __switch_to_no_user(self): self.manager_user_disable() self.normal_user_disable() self._user_btn[0]['state'] = 'disable' def normal_user_disable(self): for btn in self._check_ctrl_btn: btn['state'] = 'disabled' self._user_btn[0]['state'] = 'normal' self._user_btn[0]['command'] = lambda: self.show_user_info() def manager_user_disable(self): for btn in self._throw_ctrl_btn: btn['state'] = 'disabled' self._user_btn[0]['state'] = 'normal' self._user_btn[0]['command'] = lambda: self.show_garbage_info() def normal_user_able(self): for btn in self._throw_ctrl_btn: btn['state'] = 'normal' def manager_user_able(self): for btn in self._check_ctrl_btn: btn['state'] = 'normal' def set_all_btn_disable(self): self.__switch_to_no_user() # 禁用所有操作性按钮 self.hide_msg_rank() for btn in self._user_btn: btn['state'] = 'disable' for btn in self._win_ctrl_button: btn['state'] = 'disable' self._disable_all_btn = True def set_reset_all_btn(self): for btn in self._user_btn: btn['state'] = 'normal' for btn in self._win_ctrl_button: btn['state'] = 'normal' self.update_control() # 位于_user_btn之后, 会自动设定detail按钮 self._disable_all_btn = False @staticmethod def __make_font(family: str = 'noto', **kwargs): return font.Font(family=conf.font_d[family], **kwargs) def mainloop(self): self._window.mainloop() def exit_win(self): self._window.destroy() class ScanUserEvent(StationEventBase): class ScanUserThread(threading.Thread): # 继承父类threading.Thread def __init__(self, qr_: QRCode, db_: DB): threading.Thread.__init__(self) self.thread_db = db_ self.thread_qr = qr_ self.result: Optional[User] = None def run(self): self.result = scan_user(self.thread_qr, self.thread_db) def __init__(self, gb_station: GarbageStationBase, db: DB): super().__init__(gb_station, db, "Scan User") self._user: User = gb_station.get_user() self._qr_code: Optional[QRCode] = None self.thread: Optional[ScanUserEvent.ScanUserThread] = None def start(self, qr_code: QRCode): self._qr_code = qr_code self.thread = ScanUserEvent.ScanUserThread(qr_code, self._db) self.thread.start() return self def is_end(self) -> bool: return self.thread is not None and not self.thread.is_alive() def done_after_event(self): self.thread.join() if self.thread.result is not None: self.station.switch_user(self.thread.result) self.station.update_control() else: event = ScanGarbageEvent(self.station, self._db).start(self._qr_code) self.station.push_event(event) class ScanGarbageEvent(StationEventBase): class ScanUserThread(threading.Thread): # 继承父类threading.Thread def __init__(self, qr_: QRCode, db_: DB): threading.Thread.__init__(self) self.thread_db = db_ self.thread_qr = qr_ self.result: Optional[GarbageBag] = None def run(self): self.result = scan_garbage(self.thread_qr, self.thread_db) def __init__(self, gb_station: GarbageStationBase, db: DB): super().__init__(gb_station, db, "Scan Garbage") self._user: User = gb_station.get_user() self._qr_code: Optional[QRCode] = None self.thread: Optional[ScanGarbageEvent.ScanUserThread] = None def start(self, qr_code: QRCode): self._qr_code = qr_code self.thread = ScanGarbageEvent.ScanUserThread(self._qr_code, self._db) self.thread.start() return self def is_end(self) -> bool: return self.thread is not None and not self.thread.is_alive() def done_after_event(self): self.thread.join() if self.thread.result is not None: if self._user is None: self.station.show_warning("Operation Fail", "The garbage bags have been used.") elif self._user.is_manager(): self.station.to_get_garbage_check(self.thread.result) self.station.show_garbage_info() # 显示信息 self.station.update_control() else: self.station.to_get_garbage_type(self.thread.result) self.station.hide_msg_rank() # 如果有msg也马上隐藏 self.station.update_control() class RankingEvent(StationEventBase): class RankingThread(threading.Thread): def __init__(self, db_: DB): threading.Thread.__init__(self) self.thread_db = db_ self.result: Optional[List[Tuple[uid_t, uname_t, score_t, score_t]]] = None def run(self): cur = self.thread_db.search((f"SELECT uid, name, score, reputation " f"FROM user " f"WHERE manager = 0 " f"ORDER BY reputation DESC, score DESC " f"LIMIT 20;")) if cur is None: self.result = [] self.result = list(cur.fetchall()) def __init__(self, gb_station: GarbageStationBase, db: DB): super().__init__(gb_station, db, "Ranking") self._user: User = gb_station.get_user() self.thread: Optional[RankingEvent.RankingThread] = RankingEvent.RankingThread(self._db) self.thread.start() def is_end(self) -> bool: return not self.thread.is_alive() def done_after_event(self): self.thread.join() if self.thread.result is not None: self.station.thread_show_rank(self.thread.result) class ThrowGarbageEvent(StationEventBase): class ThrowGarbageThread(threading.Thread): def __init__(self, gb_station: GarbageStationBase, garbage: GarbageBag, garbage_type: enum, db_: DB): threading.Thread.__init__(self) self.thread_station = gb_station self.thread_db = db_ self.thread_garbage = garbage self.thread_garbage_type = garbage_type self.result: bool = False def run(self): try: self.thread_station.throw_garbage_core(self.thread_garbage, self.thread_garbage_type) except (ThrowGarbageError, UserNotSupportError, ControlNotLogin): self.thread_station.show_warning("Operation Fail", "The garbage bags have been used.") self.result = False finally: self.result = True def __init__(self, gb_station: GarbageStationBase, db: DB): super().__init__(gb_station, db, "ThrowGarbage") self._user: User = gb_station.get_user() self.thread: Optional[ThrowGarbageEvent.ThrowGarbageThread] = None def start(self, garbage: GarbageBag, garbage_type: enum): self.thread = ThrowGarbageEvent.ThrowGarbageThread(self.station, garbage, garbage_type, self._db) self.thread.start() return self def is_end(self) -> bool: return not self.thread.is_alive() def done_after_event(self): self.thread.join() if not self.thread.result: self.station.show_warning("Operation Fail", "The garbage bag throw error") class CheckGarbageEvent(StationEventBase): class CheckGarbageThread(threading.Thread): def __init__(self, gb_station: GarbageStationBase, garbage: GarbageBag, check: bool, db_: DB): threading.Thread.__init__(self) self.thread_station = gb_station self.thread_db = db_ self.thread_garbage = garbage self.thread_garbage_check = check self.result: bool = False def run(self): try: self.thread_station.check_garbage_core(self.thread_garbage, self.thread_garbage_check) except (ThrowGarbageError, UserNotSupportError, ControlNotLogin, CheckGarbageError): self.thread_station.show_warning("Operation Fail", "The garbage bag has been checked") self.result = False finally: self.result = True def __init__(self, gb_station: GarbageStationBase, db: DB): super().__init__(gb_station, db, "CheckGarbage") self._user: User = gb_station.get_user() self.thread: Optional[CheckGarbageEvent.CheckGarbageThread] = None def start(self, garbage: GarbageBag, garbage_check: bool): self.thread = CheckGarbageEvent.CheckGarbageThread(self.station, garbage, garbage_check, self._db) self.thread.start() return self def is_end(self) -> bool: return not self.thread.is_alive() def done_after_event(self): self.thread.join() if not self.thread.result: self.station.show_warning("Operation Fail", "The garbage bag check error") if __name__ == '__main__': mysql_db = DB() capture = HGSCapture() qr_capture = HGSQRCoder(capture) station = GarbageStation(mysql_db, capture, qr_capture) station.mainloop()