Browse Source

feat: 新增登陆界面

SongZihuan 3 years ago
parent
commit
5e20a01f02
5 changed files with 444 additions and 128 deletions
  1. 322 96
      control/admin.py
  2. 8 32
      control/station.py
  3. 72 0
      control/tk_event.py
  4. 23 0
      sql/garbage.py
  5. 19 0
      sql/user.py

+ 322 - 96
control/admin.py

@@ -1,96 +1,322 @@
-# import tkinter as tk
-# import tkinter.font as font
-#
-# from tool.type_ import *
-#
-# from sql.db import DB
-# from sql.user import creat_new_user
-# from sql.garbage import creat_new_garbage
-#
-# from equipment.scan_user import write_uid_qr, write_all_uid_qr
-# from equipment.scan_garbage import write_gid_qr
-#
-# from core.user import User
-# from core.garbage import GarbageBag
-#
-#
-# class AdminStationException:
-#     ...
-#
-#
-# class ControlNotLogin(AdminStationException):
-#     ...
-#
-#
-# class CreatGarbageError(AdminStationException):
-#     ...
-#
-#
-# class CreatUserError(AdminStationException):
-#     ...
-#
-#
-# class AdminStationStatus:
-#     def __init__(self,
-#                  win,
-#                  db: DB):
-#         self._win = win
-#         self._db: DB = db
-#         self._admin: Optional[User] = None  # 操作者
-#
-#     def __check_manager_user(self):
-#         if self._admin is None or not self._admin.is_manager():
-#             raise ControlNotLogin
-#
-#     def creat_garbage(self, path: str, num: int = 1) -> List[tuple[str, Optional[GarbageBag]]]:
-#         self.__check_manager_user()
-#
-#         re = []
-#         for _ in range(num):
-#             gar = creat_new_garbage(self._db)
-#             if gar is None:
-#                 raise CreatGarbageError
-#             res = write_gid_qr(gar.get_gid(), path, self._db)
-#             re.append(res)
-#         return re
-#
-#     def creat_user(self, name: uname_t, passwd: passwd_t, phone: str, manager: bool) -> Optional[User]:
-#         user = creat_new_user(name, passwd, phone, manager, self._db)
-#         if user is None:
-#             raise CreatUserError
-#         return user
-#
-#     def creat_user_from_list(self, user_list: List[Tuple[uname_t, passwd_t, str]], manager: bool) -> List[User]:
-#         re = []
-#         for i in user_list:
-#             user = creat_new_user(i[0], i[1], i[2], manager, self._db)
-#             if user is None:
-#                 raise CreatUserError
-#             re.append(user)
-#         return re
-#
-#     def get_uid_qrcode(self, uid: uid_t, path: str) -> Tuple[str, Optional[User]]:
-#         return write_uid_qr(uid, path, self._db)
-#
-#     def get_uid_qrcode_from_list(self, uid_list: List[uid_t], path: str) -> List[Tuple[str, Optional[User]]]:
-#         re = []
-#         for uid in uid_list:
-#             res = write_uid_qr(uid, path, self._db)
-#             re.append(res)
-#         return re
-#
-#     def get_all_uid_qrcode(self, path: str, where: str = "") -> List[str]:
-#         return write_all_uid_qr(path, self._db, where=where)
-#
-#
-# class AdminStation:
-#     def __init__(self, db: DB):
-#         self._status = AdminStationStatus(self, db)
-#
-#         self._window = tk.Tk()
-#         self._sys_height = self._window.winfo_screenheight()
-#         self._sys_width = self._window.winfo_screenwidth()
-#
-#         self._win_height = int(self._sys_height * (2 / 3))
-#         self._win_width = int(self._sys_width * (1 / 3))
-#         self._full_screen = False
+import tkinter as tk
+from tkinter import ttk
+from tkinter import messagebox as msg
+import tkinter.font as font
+import abc
+
+import conf
+from tool.type_ import *
+
+from sql.db import DB
+from sql.user import creat_new_user, del_user, find_user_by_name
+from sql.garbage import creat_new_garbage, del_garbage_not_use, del_garbage_not_use_many
+
+from equipment.scan_user import write_uid_qr, write_all_uid_qr
+from equipment.scan_garbage import write_gid_qr
+
+from core.user import User
+from core.garbage import GarbageBag
+
+from tk_event import TkEventBase, TkEventMain, TkThreading
+
+
+class AdminStationException(Exception):
+    ...
+
+
+class CreatGarbageError(AdminStationException):
+    ...
+
+
+class CreatUserError(AdminStationException):
+    ...
+
+
+class AdminEventBase(TkEventBase):
+    def __init__(self, station, db: DB, title: str = 'unknown'):
+        self.station: AdminStationBase = station
+        self._db: DB = db
+        self._title = title
+
+    def get_title(self) -> str:
+        return self._title
+
+    def is_end(self) -> bool:
+        raise AdminStationException
+
+    def done_after_event(self):
+        raise AdminStationException
+
+
+class AdminStationBase(TkEventMain, metaclass=abc.ABCMeta):
+    def __init__(self, db: DB):
+        self._admin: Optional[User] = None
+        self._db = db
+        super(AdminStationBase, self).__init__()
+
+    def creat_garbage(self, path: str, num: int = 1) -> List[tuple[str, Optional[GarbageBag]]]:
+        re = []
+        for _ in range(num):
+            gar = creat_new_garbage(self._db)
+            if gar is None:
+                raise CreatGarbageError
+            res = write_gid_qr(gar.get_gid(), path, self._db)
+            re.append(res)
+        return re
+
+    def creat_user(self, name: uname_t, passwd: passwd_t, phone: str, manager: bool) -> Optional[User]:
+        user = creat_new_user(name, passwd, phone, manager, self._db)
+        if user is None:
+            raise CreatUserError
+        return user
+
+    def creat_user_from_list(self, user_list: List[Tuple[uname_t, passwd_t, str]], manager: bool) -> List[User]:
+        re = []
+        for i in user_list:
+            user = creat_new_user(i[0], i[1], i[2], manager, self._db)
+            if user is None:
+                raise CreatUserError
+            re.append(user)
+        return re
+
+    def get_gid_qrcode(self, gid: gid_t, path: str) -> Tuple[str, Optional[GarbageBag]]:
+        return write_gid_qr(gid, path, self._db)
+
+    def get_all_gid_qrcode(self, path: str, where: str = "") -> List[str]:
+        return write_all_uid_qr(path, self._db, where=where)
+
+    def get_uid_qrcode(self, uid: uid_t, path: str) -> Tuple[str, Optional[User]]:
+        return write_uid_qr(uid, path, self._db)
+
+    def get_all_uid_qrcode(self, path: str, where: str = "") -> List[str]:
+        return write_all_uid_qr(path, self._db, where=where)
+
+    def del_garbage(self, gid: gid_t) -> bool:
+        return del_garbage_not_use(gid, self._db)
+
+    def del_garbage_many(self, from_: gid_t, to_: gid_t) -> int:
+        return del_garbage_not_use_many(from_, to_, self._db)
+
+    def del_user(self, uid: uid_t) -> bool:
+        return del_user(uid, self._db)
+
+    @abc.abstractmethod
+    def login_call(self):
+        ...
+
+    def login(self, user: User) -> bool:
+        if user is not None and user.is_manager():
+            self._admin = user
+            return True
+        else:
+            return False
+
+    @abc.abstractmethod
+    def show_loading(self, title: str):
+        ...
+
+    @abc.abstractmethod
+    def stop_loading(self):
+        ...
+
+    @abc.abstractmethod
+    def set_after_run(self, ms, func, *args):
+        ...
+
+
+class AdminStation(AdminStationBase):
+    def set_after_run(self, ms, func, *args):  # super.__init__可能会调用
+        self.init_after_run_list.append((ms, func, args))
+
+    def __conf_set_after_run(self):
+        for ms, func, args in self.init_after_run_list:
+            self._window.after(ms, func, *args)
+
+    def set_after_run_now(self, ms, func, *args):
+        self._window.after(ms, func, *args)
+
+    def __init__(self, db: DB, refresh_delay: int = conf.tk_refresh_delay):
+        self.init_after_run_list: List[Tuple[int, Callable, Tuple]] = []
+
+        super().__init__(db)
+        self.refresh_delay = refresh_delay
+
+        self._window = tk.Tk()
+        self.login_window = None
+        self._sys_height = self._window.winfo_screenheight()
+        self._sys_width = self._window.winfo_screenwidth()
+
+        self._win_height = int(self._sys_height * (2 / 3))
+        self._win_width = int(self._sys_width * (2 / 3))
+
+        self._full_screen = False
+        self.__conf_windows()
+
+        self.__conf_font_size()
+        self.__show_login_window()
+        self.__conf_set_after_run()
+
+    def __conf_windows(self):
+        self._window.title('HGSSystem: Manage Station')
+        self._window.geometry(f'{self._win_width}x{self._win_height}')
+        self._window['bg'] = "#F0FFF0"
+        self._window.resizable(False, False)
+        self._window.protocol("WM_DELETE_WINDOW", lambda: self.main_exit())
+
+    def __conf_creak_tk(self):
+        self._frame_list: List[AdminMenu] = []
+        self._program_frame: List[AdminProgram] = []
+
+        self._msg_frame = tk.Frame(self._window)
+        self._msg_label = tk.Label(self._msg_frame), tk.Label(self._msg_frame), tk.StringVar(), tk.StringVar()
+        self._msg_hide = tk.Button(self._msg_frame)
+
+        self._loading_frame = tk.Frame(self._window)
+        self._loading_title: Tuple[tk.Label, tk.Variable] = tk.Label(self._loading_frame), tk.StringVar()
+        self._loading_pro = ttk.Progressbar(self._loading_frame)
+
+    def __conf_font_size(self, n: int = 1):
+        self._login_title_font_size = int(12 * n)
+        self._login_btn_font_size = int(11 * n)
+
+    def __show_login_window(self):
+        self.login_window: Optional[tk.Toplevel] = tk.Toplevel()
+        self.login_window.title("HGSSystem Login")
+
+        height = int(self._sys_height * (1 / 6))
+        width = int(height * 2)
+
+        if width > self._sys_width:
+            width = int(self._sys_width * (2 / 3))
+            height = int(width / 2)
+
+        self.login_window.geometry(f'{width}x{height}')
+        self.login_window['bg'] = "#d1d9e0"
+        self.login_window.resizable(False, False)
+        self.login_window.protocol("WM_DELETE_WINDOW", lambda: self.login_exit())
+        self._login_name = [tk.Label(self.login_window), tk.Entry(self.login_window), tk.StringVar()]
+        self._login_passwd = [tk.Label(self.login_window), tk.Entry(self.login_window), tk.StringVar()]
+        self._login_btn = [tk.Button(self.login_window), tk.Button(self.login_window)]
+
+        self.__conf_login_window()
+        self.hide_main()
+
+    def __conf_login_window(self):
+        title_font = self.__make_font(size=self._login_title_font_size, weight="bold")
+        btn_font = self.__make_font(size=self._login_btn_font_size, weight="bold")
+
+        for lb, text in zip([self._login_name[0], self._login_passwd[0]], ["User:", "Passwd:"]):
+            lb['bg'] = "#d1d9e0"  # 蜜瓜绿
+            lb['font'] = title_font
+            lb['text'] = text
+            lb['anchor'] = 'e'
+
+        for lb, var in zip([self._login_name[1], self._login_passwd[1]], [self._login_name[2], self._login_passwd[2]]):
+            lb['font'] = title_font
+            lb['textvariable'] = var
+
+        self._login_name[0].place(relx=0.00, rely=0.20, relwidth=0.35, relheight=0.15)
+        self._login_passwd[0].place(relx=0.00, rely=0.40, relwidth=0.35, relheight=0.15)
+
+        self._login_name[1].place(relx=0.40, rely=0.20, relwidth=0.45, relheight=0.15)
+        self._login_passwd[1]['show'] = "*"
+        self._login_passwd[1].place(relx=0.40, rely=0.40, relwidth=0.45, relheight=0.15)
+
+        self._login_btn[0]['bg'] = "#a1afc9"
+        self._login_btn[0]['font'] = btn_font
+        self._login_btn[0]['text'] = 'Login'
+        self._login_btn[0]['command'] = lambda: self.login_call()
+        self._login_btn[0].place(relx=0.50, rely=0.70, relwidth=0.16, relheight=0.15)
+
+        self._login_btn[1]['bg'] = "#a1afc9"
+        self._login_btn[1]['font'] = btn_font
+        self._login_btn[1]['text'] = 'Exit'
+        self._login_btn[1]['command'] = lambda: self.login_exit()
+        self._login_btn[1].place(relx=0.70, rely=0.70, relwidth=0.16, relheight=0.15)
+
+    def login_call(self):
+        event = LoginEvent(self, self._db).start(self._login_name[2].get(),
+                                                 self._login_passwd[2].get())
+        self.push_event(event)
+
+    def login(self, user: User):
+        if super(AdminStation, self).login(user):
+            self.login_window.destroy()
+            self.login_window = None
+            self.show_main()
+        else:
+            msg.showerror("Login error", "Please, try again")
+            self._login_name[2].set('')
+            self._login_passwd[2].set('')
+
+    def login_exit(self):
+        if not msg.askokcancel('Sure?', 'Exit manager system.'):
+            return
+        if self.login_window is not None:
+            self.login_window.destroy()
+        self.exit_win()
+
+    def main_exit(self):
+        if not msg.askokcancel('Sure?', 'Exit manager system.'):
+            return
+        self.exit_win()
+
+    def hide_main(self):
+        self._window.withdraw()
+
+    def show_main(self):
+        self._window.update()
+        self._window.deiconify()
+
+    def show_loading(self, title: str):
+        ...
+
+    def stop_loading(self):
+        ...
+
+    @staticmethod
+    def __make_font(family: str = 'noto', **kwargs):
+        return font.Font(family=conf.font_d[family], **kwargs)
+
+    def mainloop(self):
+        self._window.mainloop()
+
+    def exit_win(self):
+        self._window.destroy()
+
+
+class LoginEvent(AdminEventBase):
+    def __init__(self, station: AdminStationBase, db: DB):
+        super().__init__(station, db, "Ranking")
+        self.thread: Optional[TkThreading] = None
+
+    def login(self, name, passwd):
+        return find_user_by_name(name, passwd, self._db)
+
+    def start(self, name, passwd):
+        self.thread = TkThreading(self.login, name, passwd)
+        return self
+
+    def is_end(self) -> bool:
+        return not self.thread.is_alive()
+
+    def done_after_event(self):
+        self.station.login(self.thread.wait_event())
+
+
+class AdminMenu(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def get_menu_frame(self, station: AdminStation) -> tk.Frame:
+        ...
+
+
+class AdminProgram(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def get_admin_frame(self, station: AdminStation) -> tk.Frame:
+        ...
+
+
+if __name__ == '__main__':
+    mysql_db = DB()
+    station_ = AdminStation(mysql_db)
+    station_.mainloop()

+ 8 - 32
control/station.py

@@ -25,7 +25,7 @@ from equipment.scan import HGSCapture, HGSQRCoder, QRCode
 from equipment.scan_user import scan_user
 from equipment.scan_garbage import scan_garbage
 
-from tk_event import TkEventBase
+from tk_event import TkEventBase, TkEventMain, TkThreading
 
 
 class GarbageStationException(Exception):
@@ -64,7 +64,7 @@ class StationEventBase(TkEventBase):
         raise GarbageStationException
 
 
-class GarbageStationBase(metaclass=abc.ABCMeta):
+class GarbageStationBase(TkEventMain, metaclass=abc.ABCMeta):
     status_normal = 1
     status_get_garbage_type = 2
     status_get_garbage_check = 3
@@ -94,8 +94,7 @@ class GarbageStationBase(metaclass=abc.ABCMeta):
 
         self.rank = None
         self.rank_index = 0
-        self._event_list: List[StationEventBase] = []
-        self.set_after_run(conf.tk_refresh_delay, lambda: self.run_event())
+        super(GarbageStationBase, self).__init__()
 
     def update_user_time(self):
         if self.check_user():
@@ -392,32 +391,6 @@ The function has not yet been implemented.
         event = RankingEvent(self, self._db)
         self.push_event(event)
 
-    def push_event(self, event: StationEventBase):
-        self._event_list.append(event)
-        self.show_loading(event.get_title())
-        self.run_event()
-
-    def run_event(self):
-        if len(self._event_list) == 0:
-            return
-
-        new_event: List[StationEventBase] = []
-        done_event: List[StationEventBase] = []
-        for event in self._event_list:
-            if event.is_end():
-                done_event.append(event)
-            else:
-                new_event.append(event)
-        self._event_list = new_event
-        if len(self._event_list) == 0:
-            self.stop_loading()
-
-        for event in done_event:  # 隐藏进度条后执行Event-GUI任务
-            try:
-                event.done_after_event()
-            except:
-                traceback.print_exc()
-
     @abc.abstractmethod
     def show_msg(self, title, info, msg_type='info', big: bool = True):
         ...
@@ -475,7 +448,10 @@ class GarbageStation(GarbageStationBase):
 
     def __conf_set_after_run(self):
         for ms, func, args in self.init_after_run_list:
-            self.__define_after(ms, func, *args)
+            self.set_after_run_now(ms, func, *args)
+
+    def set_after_run_now(self, ms, func, *args):
+        self._window.after(ms, func, *args)
 
     def __init__(self,
                  db: DB,
@@ -508,6 +484,7 @@ class GarbageStation(GarbageStationBase):
         self.__conf_tk()
 
         self.__conf_after()
+        self.__conf_set_after_run()
         self.__switch_to_no_user()
 
     def __creat_tk(self):
@@ -1048,7 +1025,6 @@ class GarbageStation(GarbageStationBase):
         self.__define_after(self.refresh_delay, self.update_control)
         self.__define_after(self.refresh_delay, self.update_scan)
         self.__define_after(self.refresh_delay, self.update_msg)
-        self.__conf_set_after_run()
 
     def __after_func_maker(self, func):
         def new_func(*args, **kwargs):

+ 72 - 0
control/tk_event.py

@@ -1,4 +1,9 @@
 import abc
+import traceback
+import threading
+
+import conf
+from tool.type_ import *
 
 
 class TkEventBase(metaclass=abc.ABCMeta):
@@ -13,3 +18,70 @@ class TkEventBase(metaclass=abc.ABCMeta):
     @abc.abstractmethod
     def done_after_event(self):
         ...
+
+
+class TkEventMain(metaclass=abc.ABCMeta):
+    def __init__(self):
+        self._event_list: List[TkEventBase] = []
+        self.set_after_run(conf.tk_refresh_delay, lambda: self.run_event())
+
+    def push_event(self, event: TkEventBase):
+        self._event_list.append(event)
+        self.show_loading(event.get_title())
+        self.run_event()
+
+    def run_event(self):
+        if len(self._event_list) == 0:
+            return
+
+        new_event: List[TkEventBase] = []
+        done_event: List[TkEventBase] = []
+        for event in self._event_list:
+            if event.is_end():
+                done_event.append(event)
+            else:
+                new_event.append(event)
+        self._event_list = new_event
+        if len(self._event_list) == 0:
+            self.stop_loading()
+
+        for event in done_event:  # 隐藏进度条后执行Event-GUI任务
+            try:
+                event.done_after_event()
+            except:
+                traceback.print_exc()
+        self.set_after_run_now(conf.tk_refresh_delay, self.run_event)
+
+    @abc.abstractmethod
+    def show_loading(self, title: str):
+        ...
+
+    @abc.abstractmethod
+    def stop_loading(self):
+        ...
+
+    @abc.abstractmethod
+    def set_after_run(self, ms, func, *args):
+        ...
+
+    @abc.abstractmethod
+    def set_after_run_now(self, ms, func, *args):
+        ...
+
+
+class TkThreading(threading.Thread):
+    def __init__(self, func, *args, start_now: bool = True):
+        threading.Thread.__init__(self)
+        self.func = func
+        self.args = args
+        self.result = None
+
+        if start_now:
+            self.start()
+
+    def run(self):
+        self.result = self.func(*self.args)
+
+    def wait_event(self):
+        self.join()
+        return self.result

+ 23 - 0
sql/garbage.py

@@ -144,6 +144,29 @@ def creat_new_garbage(db: DB) -> Optional[GarbageBag]:
     return GarbageBag(str(gid))
 
 
+def del_garbage_not_use(gid: gid_t, db: DB) -> bool:
+    cur = db.done(f"DELETE FROM garbage_n WHERE gid = {gid};")
+    if cur is None or cur.rowcount == 0:
+        return False
+    assert cur.rowcount == 1
+    cur = db.done(f"DELETE FROM garbage WHERE gid = {gid};")
+    if cur is None or cur.rowcount == 0:
+        return False
+    assert cur.rowcount == 1
+    return True
+
+
+def del_garbage_not_use_many(gid_from: gid_t, gid_to: gid_t, db: DB) -> int:
+    cur = db.done(f"DELETE FROM garbage "
+                  f"WHERE gid IN (SELECT gid FROM garbage_n WHERE gid BETWEEN {gid_from} and {gid_to});")
+    if cur is None or cur.rowcount == 0:
+        return 0
+    cur = db.done(f"DELETE FROM garbage WHERE gid BETWEEN {gid_from} and {gid_to};")
+    if cur is None or cur.rowcount == 0:
+        return 0
+    return cur.rowcount
+
+
 if __name__ == '__main__':
     mysql_db = DB()
     bag = creat_new_garbage(mysql_db)

+ 19 - 0
sql/user.py

@@ -83,6 +83,25 @@ def creat_new_user(name: Optional[uname_t], passwd: Optional[passwd_t], phone: p
     return NormalUser(name, uid, conf.default_reputation, 0, conf.default_score)
 
 
+def get_user_phone(uid: uid_t, db: DB) -> Optional[str]:
+    cur = db.done(f"SELECT phone FROM user WHERE uid = {uid};")
+    if cur is None or cur.rowcount == 0:
+        return None
+    assert cur.rowcount == 1
+    return cur.fetchall()[0]
+
+
+def del_user(uid: uid_t, db: DB) -> bool:
+    cur = db.search(f"SELECT gid FROM garbage_time WHERE uid = {uid};")
+    if cur is None or cur.rowcount != 0:
+        return False
+    cur = db.done(f"DELETE FROM user WHERE uid = {uid};")
+    if cur is None or cur.rowcount == 0:
+        return False
+    assert cur.rowcount == 1
+    return True
+
+
 if __name__ == '__main__':
     mysql_db = DB()
     name_ = 'Huan12'