|
@@ -2,36 +2,258 @@ import time
|
|
|
import conf
|
|
|
import cv2
|
|
|
import sys
|
|
|
+import random
|
|
|
import traceback
|
|
|
import tkinter as tk
|
|
|
import tkinter.font as font
|
|
|
-from tool.type_ import *
|
|
|
import datetime
|
|
|
from PIL import Image, ImageTk
|
|
|
-from control import Control, global_control, ControlScanType, ControlNotLogin, ThrowGarbageError, CheckGarbageError
|
|
|
+
|
|
|
+from tool.type_ import *
|
|
|
+
|
|
|
from core.user import User, UserNotSupportError
|
|
|
from core.garbage import GarbageBag, GarbageType, GarbageBagNotUse
|
|
|
-import random
|
|
|
+
|
|
|
+from sql.db import DB
|
|
|
+from sql.user import update_user, find_user_by_id, creat_new_user
|
|
|
+from sql.garbage import update_garbage, creat_new_garbage
|
|
|
+
|
|
|
+from equipment.scan import HGSCapture, HGSQRCoder, QRCode
|
|
|
+from equipment.scan_user import scan_user, write_uid_qr, write_all_uid_qr
|
|
|
+from equipment.scan_garbage import scan_garbage, write_gid_qr
|
|
|
|
|
|
|
|
|
class GarbageStationException(Exception):
|
|
|
...
|
|
|
|
|
|
|
|
|
+class ControlNotLogin(GarbageStationException):
|
|
|
+ ...
|
|
|
+
|
|
|
+
|
|
|
+class ThrowGarbageError(GarbageStationException):
|
|
|
+ ...
|
|
|
+
|
|
|
+
|
|
|
+class CheckGarbageError(GarbageStationException):
|
|
|
+ ...
|
|
|
+
|
|
|
+
|
|
|
+class CreatGarbageError(GarbageStationException):
|
|
|
+ ...
|
|
|
+
|
|
|
+
|
|
|
+class CreatUserError(GarbageStationException):
|
|
|
+ ...
|
|
|
+
|
|
|
+
|
|
|
+class RankingUserError(GarbageStationException):
|
|
|
+ ...
|
|
|
+
|
|
|
+
|
|
|
class GarbageStationStatus:
|
|
|
status_normal = 1
|
|
|
status_get_garbage_type = 2
|
|
|
status_get_garbage_check = 3
|
|
|
|
|
|
- def __init__(self, win, ctrl: Control = global_control):
|
|
|
+ scan_switch_user = 1
|
|
|
+ scan_throw_garbage = 2
|
|
|
+ scan_check_garbage = 3
|
|
|
+ scan_no_to_done = 4
|
|
|
+
|
|
|
+ def __init__(self,
|
|
|
+ win,
|
|
|
+ db: DB,
|
|
|
+ cap: HGSCapture,
|
|
|
+ qr: HGSQRCoder,
|
|
|
+ loc: location_t = conf.base_location):
|
|
|
+ self._db: DB = db
|
|
|
+ self._cap = cap
|
|
|
+ self._qr = qr
|
|
|
self._win: GarbageStation = win
|
|
|
- self._ctrl = ctrl
|
|
|
+ self._loc: location_t = loc
|
|
|
+
|
|
|
+ self._user: Optional[User] = None # 操作者
|
|
|
+ self._user_last_time: time_t = 0
|
|
|
+
|
|
|
self._garbage: Optional[GarbageBag] = None
|
|
|
+
|
|
|
self._flat = GarbageStationStatus.status_normal
|
|
|
self._have_easter_eggs = False
|
|
|
+
|
|
|
self.rank = None
|
|
|
self.rank_index = 0
|
|
|
|
|
|
+ def update_user_time(self):
|
|
|
+ if self.check_user():
|
|
|
+ self._user_last_time = time.time()
|
|
|
+
|
|
|
+ def is_manager(self):
|
|
|
+ if not self.check_user():
|
|
|
+ return False
|
|
|
+ return self._user.is_manager()
|
|
|
+
|
|
|
+ def check_user(self):
|
|
|
+ if self._user is None:
|
|
|
+ return False
|
|
|
+ if not self._user.is_manager() and time.time() - self._user_last_time > 20:
|
|
|
+ self._user = None
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
+ def __check_user(self):
|
|
|
+ if not self.check_user():
|
|
|
+ raise ControlNotLogin
|
|
|
+ self._user_last_time = time.time()
|
|
|
+
|
|
|
+ def __check_normal_user(self):
|
|
|
+ self.__check_user()
|
|
|
+ if self._user.is_manager():
|
|
|
+ raise UserNotSupportError
|
|
|
+
|
|
|
+ def __check_manager_user(self):
|
|
|
+ self.__check_user()
|
|
|
+ if not self._user.is_manager():
|
|
|
+ raise UserNotSupportError
|
|
|
+
|
|
|
+ def get_user_info(self):
|
|
|
+ self.__check_user()
|
|
|
+ return self._user.get_info()
|
|
|
+
|
|
|
+ def get_uid_no_update(self):
|
|
|
+ if not self.check_user():
|
|
|
+ return ""
|
|
|
+ return self._user.get_uid()
|
|
|
+
|
|
|
+ def get_user_info_no_update(self) -> Dict[str, str]:
|
|
|
+ if not self.check_user():
|
|
|
+ return {}
|
|
|
+ return self._user.get_info()
|
|
|
+
|
|
|
+ def scan(self) -> Tuple[int, any]:
|
|
|
+ """
|
|
|
+ 处理扫码事务
|
|
|
+ 二维码扫描的任务包括: 登录, 扔垃圾, 标记垃圾
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ self._cap.get_image()
|
|
|
+ qr_code = self._qr.get_qr_code()
|
|
|
+ if qr_code is None:
|
|
|
+ return GarbageStationStatus.scan_no_to_done, None
|
|
|
+
|
|
|
+ user: Optional[User] = scan_user(qr_code, self._db)
|
|
|
+ if user is not None:
|
|
|
+ return GarbageStationStatus.scan_switch_user, user
|
|
|
+
|
|
|
+ garbage: Optional[GarbageBag] = scan_garbage(qr_code, self._db)
|
|
|
+ if garbage is not None:
|
|
|
+ if self._user is None:
|
|
|
+ raise ControlNotLogin
|
|
|
+ if self._user.is_manager():
|
|
|
+ return GarbageStationStatus.scan_check_garbage, garbage
|
|
|
+ return GarbageStationStatus.scan_throw_garbage, garbage
|
|
|
+
|
|
|
+ return GarbageStationStatus.scan_no_to_done, None
|
|
|
+
|
|
|
+ def get_cap_img(self):
|
|
|
+ return self._cap.get_frame()
|
|
|
+
|
|
|
+ def switch_user(self, user: User) -> bool:
|
|
|
+ """
|
|
|
+ 切换用户: 退出/登录
|
|
|
+ :param user: 新用户
|
|
|
+ :return: 登录-True, 退出-False
|
|
|
+ """
|
|
|
+ if self._user is not None and self._user.get_uid() == user.get_uid() and self.check_user(): # 正在登陆期
|
|
|
+ self._user = None # 退出登录
|
|
|
+ self._user_last_time = 0
|
|
|
+ return False
|
|
|
+ self._user = user
|
|
|
+ self._user_last_time = time.time()
|
|
|
+ return True # 登录
|
|
|
+
|
|
|
+ def __throw_garbage(self, garbage: GarbageBag, garbage_type: enum):
|
|
|
+ self.__check_normal_user()
|
|
|
+ if not self._user.throw_rubbish(garbage, garbage_type, self._loc):
|
|
|
+ raise ThrowGarbageError
|
|
|
+ update_garbage(garbage, self._db)
|
|
|
+ update_user(self._user, self._db)
|
|
|
+
|
|
|
+ def __check_garbage(self, garbage: GarbageBag, check_result: bool):
|
|
|
+ self.__check_manager_user()
|
|
|
+ user = find_user_by_id(garbage.get_user(), self._db)
|
|
|
+ if user is None:
|
|
|
+ raise GarbageBagNotUse
|
|
|
+ if not self._user.check_rubbish(garbage, check_result, user):
|
|
|
+ raise CheckGarbageError
|
|
|
+ update_garbage(garbage, self._db)
|
|
|
+ update_user(self._user, self._db)
|
|
|
+ update_user(user, self._db)
|
|
|
+
|
|
|
+ def creat_garbage(self, path: str, num: int = 1) -> List[tuple[str, Optional[GarbageBag]]]:
|
|
|
+ self.__check_manager_user()
|
|
|
+ if self._user is None:
|
|
|
+ raise ControlNotLogin
|
|
|
+
|
|
|
+ re = []
|
|
|
+ for _ in range(num):
|
|
|
+ gar = creat_new_garbage(self._db)
|
|
|
+ if gar is None:
|
|
|
+ raise CreatGarbageError
|
|
|
+ res = write_gid_qr(gar.get_gid(), path, self._db)
|
|
|
+ re.append(res)
|
|
|
+ return re
|
|
|
+
|
|
|
+ def creat_user(self, name: uname_t, passwd: passwd_t, phone: str, manager: bool) -> Optional[User]:
|
|
|
+ user = creat_new_user(name, passwd, phone, manager, self._db)
|
|
|
+ if user is None:
|
|
|
+ raise CreatUserError
|
|
|
+ return user
|
|
|
+
|
|
|
+ def creat_user_from_list(self, user_list: List[Tuple[uname_t, passwd_t, str]], manager: bool) -> List[User]:
|
|
|
+ re = []
|
|
|
+ for i in user_list:
|
|
|
+ user = creat_new_user(i[0], i[1], i[2], manager, self._db)
|
|
|
+ if user is None:
|
|
|
+ raise CreatUserError
|
|
|
+ re.append(user)
|
|
|
+ return re
|
|
|
+
|
|
|
+ def get_uid_qrcode(self, uid: uid_t, path: str) -> Tuple[str, Optional[User]]:
|
|
|
+ return write_uid_qr(uid, path, self._db)
|
|
|
+
|
|
|
+ def get_uid_qrcode_from_list(self, uid_list: List[uid_t], path: str) -> List[Tuple[str, Optional[User]]]:
|
|
|
+ re = []
|
|
|
+ for uid in uid_list:
|
|
|
+ res = write_uid_qr(uid, path, self._db)
|
|
|
+ re.append(res)
|
|
|
+ return re
|
|
|
+
|
|
|
+ def get_all_uid_qrcode(self, path: str, where: str = "") -> List[str]:
|
|
|
+ return write_all_uid_qr(path, self._db, where=where)
|
|
|
+
|
|
|
+ def ranking(self, limit: int = 0, order_by: str = 'DESC') -> list[Tuple[uid_t, uname_t, score_t, score_t]]:
|
|
|
+ """
|
|
|
+ 获取排行榜的功能
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ if limit > 0:
|
|
|
+ limit = f"LIMIT {int(limit)}"
|
|
|
+ else:
|
|
|
+ limit = ""
|
|
|
+
|
|
|
+ if order_by != 'ASC' and order_by != 'DESC':
|
|
|
+ order_by = 'DESC'
|
|
|
+
|
|
|
+ cur = self._db.search((f"SELECT uid, name, score, reputation "
|
|
|
+ f"FROM user "
|
|
|
+ f"WHERE manager = 0 "
|
|
|
+ f"ORDER BY reputation {order_by}, score {order_by} "
|
|
|
+ f"{limit}"))
|
|
|
+ if cur is None:
|
|
|
+ raise RankingUserError
|
|
|
+ return list(cur.fetchall())
|
|
|
+
|
|
|
def to_get_garbage_type(self, garbage: GarbageBag):
|
|
|
self._flat = GarbageStationStatus.status_get_garbage_type
|
|
|
self.set_garbage(garbage)
|
|
@@ -44,22 +266,10 @@ class GarbageStationStatus:
|
|
|
self._garbage = garbage
|
|
|
|
|
|
def get_garbage(self) -> Optional[GarbageBag]:
|
|
|
- if not self._ctrl.check_user():
|
|
|
+ if not self.check_user():
|
|
|
self._garbage = None
|
|
|
return self._garbage
|
|
|
|
|
|
- def get_user_info_no_update(self):
|
|
|
- return self._ctrl.get_user_info_no_update()
|
|
|
-
|
|
|
- def scan(self):
|
|
|
- return self._ctrl.scan()
|
|
|
-
|
|
|
- def get_cap_img(self):
|
|
|
- return self._ctrl.get_cap_img()
|
|
|
-
|
|
|
- def switch_user(self, user: User):
|
|
|
- return self._ctrl.switch_user(user)
|
|
|
-
|
|
|
def throw_garbage(self, garbage_type: enum):
|
|
|
self.update_user_time()
|
|
|
if self._flat != GarbageStationStatus.status_get_garbage_type or self._garbage is None:
|
|
@@ -67,7 +277,7 @@ class GarbageStationStatus:
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
- self._ctrl.throw_garbage(self._garbage, garbage_type)
|
|
|
+ self.__throw_garbage(self._garbage, garbage_type)
|
|
|
except (ThrowGarbageError, UserNotSupportError, ControlNotLogin):
|
|
|
self._win.show_warning("Operation Fail", "The garbage bags have been used.")
|
|
|
raise
|
|
@@ -82,7 +292,7 @@ class GarbageStationStatus:
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
- self._ctrl.check_garbage(self._garbage, check)
|
|
|
+ self.__check_garbage(self._garbage, check)
|
|
|
except (ThrowGarbageError, UserNotSupportError, CheckGarbageError, GarbageBagNotUse):
|
|
|
self._win.show_warning("Operation Fail", "The garbage bag has been checked")
|
|
|
finally:
|
|
@@ -108,11 +318,11 @@ class GarbageStationStatus:
|
|
|
|
|
|
def show_user_info(self):
|
|
|
self.update_user_time()
|
|
|
- if not self._ctrl.check_user():
|
|
|
+ if not self.check_user():
|
|
|
self._win.show_warning("Operation Fail", "You should login first")
|
|
|
return
|
|
|
|
|
|
- info = self._ctrl.get_user_info()
|
|
|
+ info = self.get_user_info()
|
|
|
if info.get('manager', '0') == '1':
|
|
|
self._win.show_msg("About User", (f"Manager User\n"
|
|
|
f"UserName: {info['name']}\n"
|
|
@@ -149,14 +359,11 @@ Run on python {sys.version}
|
|
|
|
|
|
def show_exit(self):
|
|
|
self.update_user_time()
|
|
|
- if self._ctrl.is_manager():
|
|
|
+ if self.is_manager():
|
|
|
self._win.exit_win()
|
|
|
return
|
|
|
self._win.show_warning("Exit", f'Permission not permitted'.strip())
|
|
|
|
|
|
- def is_manager(self):
|
|
|
- return self._ctrl.is_manager()
|
|
|
-
|
|
|
def easter_eggs(self):
|
|
|
self.update_user_time()
|
|
|
if (not self._have_easter_eggs) and random.randint(0, 10) != 1: # 10% 概率触发
|
|
@@ -179,7 +386,7 @@ The function has not yet been implemented.
|
|
|
|
|
|
def get_show_rank(self):
|
|
|
self.update_user_time()
|
|
|
- rank_list = self._ctrl.ranking(limit=20)
|
|
|
+ rank_list = self.ranking(limit=20)
|
|
|
self.rank = [[]]
|
|
|
for i, r in enumerate(rank_list):
|
|
|
if len(self.rank[-1]) == 5:
|
|
@@ -191,7 +398,7 @@ The function has not yet been implemented.
|
|
|
color = "#ffa631"
|
|
|
elif i == 2:
|
|
|
color = "#ff7500"
|
|
|
- elif r[0] == self._ctrl.get_uid_no_update():
|
|
|
+ elif r[0] == self.get_uid_no_update():
|
|
|
color = "#b0a4e3"
|
|
|
self.rank[-1].append((i + 1, r[1], r[0], r[2], r[3], color))
|
|
|
if len(self.rank[0]) == 0:
|
|
@@ -214,14 +421,16 @@ The function has not yet been implemented.
|
|
|
lambda: self.show_rank(+1),
|
|
|
self.rank[self.rank_index])
|
|
|
|
|
|
- def update_user_time(self):
|
|
|
- self._ctrl.update_user_time()
|
|
|
-
|
|
|
|
|
|
class GarbageStation:
|
|
|
- def __init__(self, ctrl: Control = global_control, refresh_delay: int = conf.tk_refresh_delay):
|
|
|
+ def __init__(self,
|
|
|
+ db: DB,
|
|
|
+ cap: HGSCapture,
|
|
|
+ qr: HGSQRCoder,
|
|
|
+ loc: location_t = conf.base_location,
|
|
|
+ refresh_delay: int = conf.tk_refresh_delay):
|
|
|
self.refresh_delay = refresh_delay
|
|
|
- self._status = GarbageStationStatus(self, ctrl)
|
|
|
+ self._status = GarbageStationStatus(self, db, cap, qr, loc)
|
|
|
|
|
|
self._window = tk.Tk()
|
|
|
self._sys_height = self._window.winfo_screenheight()
|
|
@@ -781,7 +990,7 @@ class GarbageStation:
|
|
|
self._garbage_id[2].set(gid)
|
|
|
|
|
|
def update_scan(self):
|
|
|
- res: Tuple[enum, any] = ControlScanType.no_to_done, None
|
|
|
+ res: Tuple[enum, any] = GarbageStationStatus.scan_no_to_done, None
|
|
|
try:
|
|
|
res = self._status.scan()
|
|
|
except ControlNotLogin:
|
|
@@ -793,14 +1002,14 @@ class GarbageStation:
|
|
|
self._cap_img = ImageTk.PhotoImage(image=_cap_img_info)
|
|
|
self._cap_label['image'] = self._cap_img
|
|
|
|
|
|
- if res[0] == ControlScanType.switch_user:
|
|
|
+ if res[0] == GarbageStationStatus.scan_switch_user:
|
|
|
self._status.switch_user(res[1])
|
|
|
self.update_control()
|
|
|
- elif res[0] == ControlScanType.throw_garbage:
|
|
|
+ elif res[0] == GarbageStationStatus.scan_throw_garbage:
|
|
|
self._status.to_get_garbage_type(res[1])
|
|
|
self.hide_msg_rank() # 如果有msg也马上隐藏
|
|
|
self.update_control()
|
|
|
- elif res[0] == ControlScanType.check_garbage:
|
|
|
+ elif res[0] == GarbageStationStatus.scan_check_garbage:
|
|
|
self._status.to_get_garbage_check(res[1])
|
|
|
self._status.show_garbage_info() # 显示信息
|
|
|
self.update_control()
|
|
@@ -857,5 +1066,8 @@ class GarbageStation:
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
- station = GarbageStation()
|
|
|
+ mysql_db = DB()
|
|
|
+ capture = HGSCapture()
|
|
|
+ qr_capture = HGSQRCoder(capture)
|
|
|
+ station = GarbageStation(mysql_db, capture, qr_capture)
|
|
|
station.mainloop()
|