Преглед изворни кода

feat: 添加用户删除和新增的代码

SongZihuan пре 3 година
родитељ
комит
5dc733e334
7 измењених фајлова са 260 додато и 28 уклоњено
  1. 15 9
      control/admin.py
  2. 101 3
      control/admin_event.py
  3. 112 7
      control/admin_program.py
  4. 7 2
      control/event.py
  5. 3 3
      equipment/scan_garbage.py
  6. 21 3
      sql/user.py
  7. 1 1
      tool/login.py

+ 15 - 9
control/admin.py

@@ -8,8 +8,8 @@ from tool.type_ import *
 from tool.tk import make_font
 
 from sql.db import DB
-from sql.user import creat_new_user, del_user
-from sql.garbage import creat_new_garbage, del_garbage_not_use, del_garbage_not_use_many
+from sql.user import creat_new_user, del_user, del_user_from_where, del_user_from_where_scan
+from sql.garbage import creat_new_garbage, del_garbage_not_use, del_garbage_not_use_many, find_garbage
 
 from equipment.scan_user import write_uid_qr, write_all_uid_qr
 from equipment.scan_garbage import write_gid_qr
@@ -41,21 +41,21 @@ class AdminStationBase(TkEventMain, metaclass=abc.ABCMeta):
     def get_db(self):
         return self._db
 
-    def creat_garbage(self, path: str, num: int = 1) -> List[tuple[str, Optional[GarbageBag]]]:
+    def creat_garbage(self, path: Optional[str], num: int = 1) -> List[tuple[str, Optional[GarbageBag]]]:
         re = []
         for _ in range(num):
             gar = creat_new_garbage(self._db)
             if gar is None:
                 raise CreatGarbageError
-            res = write_gid_qr(gar.get_gid(), path, self._db)
-            re.append(res)
+            if path is not None:
+                res = write_gid_qr(gar.get_gid(), path, self._db)
+                re.append(res)
+            else:
+                re.append(("", gar))
         return re
 
     def creat_user(self, name: uname_t, passwd: passwd_t, phone: str, manager: bool) -> Optional[User]:
-        user = creat_new_user(name, passwd, phone, manager, self._db)
-        if user is None:
-            raise CreatUserError
-        return user
+        return creat_new_user(name, passwd, phone, manager, self._db)
 
     def creat_user_from_list(self, user_list: List[Tuple[uname_t, passwd_t, str]], manager: bool) -> List[User]:
         re = []
@@ -87,6 +87,12 @@ class AdminStationBase(TkEventMain, metaclass=abc.ABCMeta):
     def del_user(self, uid: uid_t) -> bool:
         return del_user(uid, self._db)
 
+    def del_user_from_where_scan(self, where: str) -> int:
+        return del_user_from_where_scan(where, self._db)
+
+    def del_user_from_where(self, where: str) -> int:
+        return del_user_from_where(where, self._db)
+
     @abc.abstractmethod
     def login_call(self):
         ...

+ 101 - 3
control/admin_event.py

@@ -2,7 +2,11 @@ import time
 
 from tool.type_ import *
 from sql.db import DB
-from sql.user import find_user_by_name
+from sql.user import find_user_by_name, creat_new_user
+from sql.garbage import creat_new_garbage
+
+from core.user import User
+from core.garbage import GarbageBag
 
 from event import TkEventBase, TkThreading, TkEventException
 import admin
@@ -46,5 +50,99 @@ class TestProgressEvent(AdminEventBase):
         super(TestProgressEvent, self).__init__(station)
         self.thread = TkThreading(self.func, 5)
 
-    def is_end(self) -> bool:
-        return not self.thread.is_alive()
+
+class CreatUserEvent(AdminEventBase):
+    def func(self, name, passwd, phone, is_manager):
+        return self.station.creat_user(name, passwd, phone, is_manager)
+
+    def __init__(self, station):
+        super(CreatUserEvent, self).__init__(station)
+        self._name = None
+
+    def start(self, name, passwd, phone, is_manager):
+        self._name = name
+        self.thread = TkThreading(self.func, name, passwd, phone, is_manager)
+        return self
+
+    def done_after_event(self):
+        res: Optional[User] = self.thread.wait_event()
+        if res is None:
+            self.station.show_msg("CreatUserError", f"Can't not creat user: {self._name}", "Warning")
+        else:
+            name = res.get_name()
+            self.station.show_msg("CreatUser", f"Creat user {name} success")
+
+
+class CreatGarbageEvent(AdminEventBase):
+    def func(self, path, count):
+        return self.station.creat_garbage(path, count)
+
+    def __init__(self, station):
+        super(CreatGarbageEvent, self).__init__(station)
+        self._name = None
+
+    def start(self, path, count):
+        self.thread = TkThreading(self.func, path, count)
+        return self
+
+    def done_after_event(self):
+        res: list[tuple[str, Optional[GarbageBag]]] = self.thread.wait_event()
+        self.station.show_msg("CreatGarbage", f"Creat {len(res)} garbage finished.")
+
+
+class DelUserEvent(AdminEventBase):
+    def func(self, uid):
+        return self.station.del_user(uid)
+
+    def __init__(self, station):
+        super(DelUserEvent, self).__init__(station)
+        self._name = None
+
+    def start(self, uid):
+        self.thread = TkThreading(self.func, uid)
+        return self
+
+    def done_after_event(self):
+        res: bool = self.thread.wait_event()
+        if res:
+            self.station.show_msg("DeleteUser", f"Delete user finished.")
+        else:
+            self.station.show_msg("DeleteUserError", f"Delete user failed.", "Warning")
+
+
+class DelUserFromWhereScanEvent(AdminEventBase):
+    def func(self, where):
+        return self.station.del_user_from_where_scan(where)
+
+    def __init__(self, station):
+        super(DelUserFromWhereScanEvent, self).__init__(station)
+
+    def start(self, where):
+        self.thread = TkThreading(self.func, where)
+        return self
+
+    def done_after_event(self):
+        res: int = self.thread.wait_event()
+        if res != -1:
+            self.station.show_msg("DeleteUserScan", f"Delete count: {res}")
+        else:
+            self.station.show_msg("DeleteUserScanError", f"`Where` must be SQL")
+
+
+class DelUserFromWhereEvent(AdminEventBase):
+    def func(self, where):
+        return self.station.del_user_from_where(where)
+
+    def __init__(self, station):
+        super(DelUserFromWhereEvent, self).__init__(station)
+
+    def start(self, where):
+        self.thread = TkThreading(self.func, where)
+        return self
+
+    def done_after_event(self):
+        res: int = self.thread.wait_event()
+        if res != -1:
+            self.station.show_msg("DeleteUser", f"Delete {res} user success")
+        else:
+            self.station.show_msg("DeleteUserError", f"`Where` must be SQL")

+ 112 - 7
control/admin_program.py

@@ -1,9 +1,11 @@
 import abc
 import tkinter as tk
 import tkinter.ttk as ttk
+from tkinter.filedialog import askdirectory
 
 from tool.type_ import *
 from tool.tk import make_font, set_tk_disable_from_list
+from tool.login import creat_uid
 
 import conf
 import admin
@@ -95,11 +97,12 @@ class CreatUserProgramBase(AdminProgram):
         self.var: List[tk.Variable] = [tk.StringVar() for _ in range(3)]
         self.btn: List[tk.Button] = [tk.Button(self.frame) for _ in range(2)]  # creat(生成用户) try(计算uid)
 
-        self._conf("#FA8072")  # 默认颜色
+        self._conf("#FA8072", False)  # 默认颜色
         self.__conf_font()
 
-    def _conf(self, bg_color):
+    def _conf(self, bg_color, is_manager: bool):
         self.bg_color = bg_color
+        self.is_manager = is_manager
         return self
 
     def __conf_font(self, n: int = 1):
@@ -131,12 +134,43 @@ class CreatUserProgramBase(AdminProgram):
             enter.place(relx=0.35, rely=height, relwidth=0.60, relheight=0.17)
             height += 0.30
 
-        for btn, text, x in zip(self.btn, ["Creat", "GetUID"], [0.2, 0.6]):
+        for btn, text, x, func in zip(self.btn,
+                                      ["Creat", "GetUID"],
+                                      [0.2, 0.6],
+                                      [lambda: self.creat_by_name(), lambda: self.get_uid()]):
             btn['font'] = btn_font
             btn['text'] = text
             btn['bg'] = conf.tk_btn_bg
+            btn['command'] = func
             btn.place(relx=x, rely=0.7, relwidth=0.2, relheight=0.08)
 
+    def __get_info(self) -> Optional[Tuple[uname_t, passwd_t, str]]:
+        name: uname_t = self.var[0].get()
+        passwd: passwd_t = self.var[1].get()
+        phone: str = self.var[2].get()
+
+        if len(name) == 0 or len(passwd) == 0 or len(phone) != 11:
+            self.station.show_msg("UserInfoError", "Please, enter UserName/Passwd/Phone(11)")
+            return None
+
+        return name, passwd, phone
+
+    def creat_by_name(self):
+        res = self.__get_info()
+        if res is None:
+            return
+        name, passwd, phone = res
+        event = tk_event.CreatUserEvent(self.station).start(name, passwd, phone, self.is_manager)
+        self.station.push_event(event)
+
+    def get_uid(self):
+        res = self.__get_info()
+        if res is None:
+            return
+        name, passwd, phone = res
+        uid = creat_uid(name, passwd, phone)
+        self.station.show_msg("UserID", f"UserName: {name}\nUserID: {uid}")
+
     def set_disable(self):
         set_tk_disable_from_list(self.btn)
         set_tk_disable_from_list(self.enter)
@@ -154,7 +188,7 @@ class CreatNormalUserProgram(CreatUserProgramBase):
 class CreatManagerUserProgram(CreatUserProgramBase):
     def __init__(self, station, win, color):
         super(CreatManagerUserProgram, self).__init__(station, win, color, "CreatManagerUser")
-        self._conf("#4b5cc4")
+        self._conf("#4b5cc4", True)
 
 
 class CreatAutoNormalUserProgram(AdminProgram):
@@ -198,8 +232,16 @@ class CreatAutoNormalUserProgram(AdminProgram):
         self.btn['font'] = btn_font
         self.btn['text'] = "Creat"
         self.btn['bg'] = conf.tk_btn_bg
+        self.btn['command'] = lambda: self.creat_user()
         self.btn.place(relx=0.4, rely=0.7, relwidth=0.2, relheight=0.08)
 
+    def creat_user(self):
+        phone = self.var.get()
+        if len(phone) != 11:
+            self.station.show_msg("UserInfoError", "Please, enter Phone(11)")
+        event = tk_event.CreatUserEvent(self.station).start(None, None, phone, False)
+        self.station.push_event(event)
+
     def set_disable(self):
         self.btn['state'] = 'disable'
         self.enter['state'] = 'disable'
@@ -251,12 +293,34 @@ class CreatGarbageProgram(AdminProgram):
             enter.place(relx=0.35, rely=height, relwidth=0.60, relheight=0.35)
             height += 0.43
 
-        for btn, text, x in zip([self.creat_btn, self.file_btn], ["Creat", "ChoosePath"], [0.2, 0.6]):
+        for btn, text, x, func in zip([self.creat_btn, self.file_btn],
+                                      ["Creat", "ChoosePath"],
+                                      [0.2, 0.6],
+                                      [lambda: self.creat_garbage(), lambda: self.choose_file()]):
             btn['font'] = btn_font
             btn['text'] = text
             btn['bg'] = conf.tk_btn_bg
+            btn['command'] = func
             btn.place(relx=x, rely=0.7, relwidth=0.2, relheight=0.08)
 
+    def choose_file(self):
+        path = askdirectory(title='path to save qr code')
+        self.var[1].set(path)
+
+    def creat_garbage(self):
+        try:
+            count = int(self.var[0].get())
+            if count <= 0:
+                raise ValueError
+        except (ValueError, TypeError):
+            self.station.show_msg("TypeError", "Count must be number > 1")
+        else:
+            path = self.var[1].get()
+            if len(path) == 0:
+                path = None
+            event = tk_event.CreatGarbageEvent(self.station).start(path, count)
+            self.station.push_event(event)
+
     def set_disable(self):
         self.creat_btn['state'] = 'disable'
         self.file_btn['state'] = 'disable'
@@ -331,14 +395,35 @@ class DeleteUserProgram(AdminProgram):
         self.uid_title.place(relx=0.01, rely=0.25, relwidth=0.30, relheight=0.50)
         self.uid_enter.place(relx=0.35, rely=0.25, relwidth=0.60, relheight=0.50)
 
-        for btn, text in zip(self.btn, ["Delete", "Delete"]):
+        for btn, text, func in zip(self.btn,
+                                   ["Delete By Uid", "Delete By Name"],
+                                   [lambda: self.del_by_uid(), lambda: self.del_by_name()]):
             btn['font'] = btn_font
             btn['text'] = text
             btn['bg'] = conf.tk_btn_bg
+            btn['command'] = func
 
         self.btn[0].place(relx=0.6, rely=0.32, relwidth=0.2, relheight=0.08)
         self.btn[1].place(relx=0.6, rely=0.75, relwidth=0.2, relheight=0.08)
 
+    def del_by_uid(self):
+        uid = self.uid_var.get()
+        if len(uid) != 32:
+            self.station.show_msg("UserID Error", "Len of UserID must be 32", "Error")
+            return
+        event = tk_event.DelUserEvent(self.station).start(uid)
+        self.station.push_event(event)
+
+    def del_by_name(self):
+        name = self.name_var[0].get()
+        passwd = self.name_var[1].get()
+        if len(name) == 0 or len(passwd) == 0:
+            self.station.show_msg("UserName/Password Error", "You should enter UserName and Password", "Error")
+            return
+        uid = creat_uid(name, passwd)
+        event = tk_event.DelUserEvent(self.station).start(uid)
+        self.station.push_event(event)
+
     def set_disable(self):
         set_tk_disable_from_list(self.btn)
         set_tk_disable_from_list(self.name_enter)
@@ -389,12 +474,32 @@ class DeleteUsersProgram(AdminProgram):
         self.title.place(relx=0.01, rely=0.25, relwidth=0.30, relheight=0.50)
         self.enter.place(relx=0.35, rely=0.25, relwidth=0.60, relheight=0.50)
 
-        for btn, text, x in zip(self.btn, ["Delete", "Scan"], [0.2, 0.6]):
+        for btn, text, x, func in zip(self.btn,
+                                      ["Delete", "Scan"],
+                                      [0.2, 0.6],
+                                      [lambda: self.delete_user(), lambda: self.scan_user()]):
             btn['font'] = btn_font
             btn['text'] = text
             btn['bg'] = conf.tk_btn_bg
+            btn['command'] = func
             btn.place(relx=x, rely=0.6, relwidth=0.2, relheight=0.08)
 
+    def delete_user(self):
+        where = self.var.get()
+        if len(where) == 0:
+            self.station.show_msg("`Where`Error", "`Where` must be SQL", "Warning")
+            return
+        event = tk_event.DelUserFromWhereEvent(self.station).start(where)
+        self.station.push_event(event)
+
+    def scan_user(self):
+        where = self.var.get()
+        if len(where) == 0:
+            self.station.show_msg("`Where`Error", "`Where` must be SQL", "Warning")
+            return
+        event = tk_event.DelUserFromWhereScanEvent(self.station).start(where)
+        self.station.push_event(event)
+
     def set_disable(self):
         set_tk_disable_from_list(self.btn)
         self.enter['state'] = 'disable'

+ 7 - 2
control/event.py

@@ -15,7 +15,7 @@ class TkEventBase(metaclass=abc.ABCMeta):
         self.thread: Optional[TkThreading] = None
 
     def is_end(self) -> bool:
-        if self.thread is not None and self.thread.is_alive():
+        if self.thread is not None and not self.thread.is_alive():
             return True
         return False
 
@@ -88,7 +88,12 @@ class TkThreading(threading.Thread):
             self.start()
 
     def run(self):
-        self.result = self.func(*self.args)
+        try:
+            self.result = self.func(*self.args)
+        except:
+            ...
+        finally:
+            del self.func, self.args
 
     def wait_event(self):
         self.join()

+ 3 - 3
equipment/scan_garbage.py

@@ -45,13 +45,13 @@ def make_gid_image(gid: gid_t, path: str):
 
 
 def write_gid_qr(gid: gid_t, path: str, db: DB) -> Tuple[str, Optional[GarbageBag]]:
-    user = find_garbage(gid, db)
-    if user is None:
+    garbage = find_garbage(gid, db)
+    if garbage is None:
         return "", None
 
     path = __get_gid_qr_file_name(gid, path)
     if make_gid_image(gid, path):
-        return path, user
+        return path, garbage
     return "", None
 
 

+ 21 - 3
sql/user.py

@@ -64,7 +64,7 @@ def update_user(user: User, db: DB) -> bool:
 
 def creat_new_user(name: Optional[uname_t], passwd: Optional[passwd_t], phone: phone_t, manager: bool, db: DB) -> Optional[User]:
     if name is None:
-        name = randomPassword()
+        name = f'User-{phone[-6:]}'
 
     if passwd is None:
         passwd = randomPassword()
@@ -92,16 +92,34 @@ def get_user_phone(uid: uid_t, db: DB) -> Optional[str]:
 
 
 def del_user(uid: uid_t, db: DB) -> bool:
-    cur = db.search(f"SELECT gid FROM garbage_time WHERE uid = {uid};")
+    cur = db.search(f"SELECT gid FROM garbage_time WHERE uid = '{uid}';")  # 确保没有引用
     if cur is None or cur.rowcount != 0:
         return False
-    cur = db.done(f"DELETE FROM user WHERE uid = {uid};")
+    cur = db.done(f"DELETE FROM user WHERE uid = '{uid}';")
     if cur is None or cur.rowcount == 0:
         return False
     assert cur.rowcount == 1
     return True
 
 
+def del_user_from_where_scan(where: str, db: DB) -> int:
+    cur = db.search(f"SELECT uid FROM user WHERE {where};")
+    print(f"SELECT uid FROM user WHERE {where};")
+    if cur is None:
+        return -1
+    return cur.rowcount
+
+
+def del_user_from_where(where: str, db: DB) -> int:
+    cur = db.search(f"SELECT gid FROM garbage_user WHERE {where};")  # 确保没有引用
+    if cur is None or cur.rowcount != 0:
+        return False
+    cur = db.done(f"DELETE FROM user WHERE {where};")
+    if cur is None:
+        return -1
+    return cur.rowcount
+
+
 if __name__ == '__main__':
     mysql_db = DB()
     name_ = 'Huan12'

+ 1 - 1
tool/login.py

@@ -13,7 +13,7 @@ def check_login(uid: uid_t, name: uname_t, passwd: passwd_t, salt: str = conf.pa
 
 
 def randomPassword():
-    passwd_char = 'qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM1234567890<>,.?/:;''!@#$%^&*()-_=+*'
+    passwd_char = 'qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM1234567890@'
     passwd = []
     for i in range(randint(16, 22)):
         passwd.append(passwd_char[randint(0, len(passwd_char) - 1)])