Browse Source

feat: SQL搜索语句使用硬编码

SongZihuan 2 years ago
parent
commit
78d242c2f2
10 changed files with 93 additions and 103 deletions
  1. 0 1
      app/home.py
  2. 1 1
      app/test.py
  3. 1 1
      app/tool.py
  4. 11 9
      app/user.py
  5. 7 8
      app/word_list.py
  6. 1 1
      configure/__init__.py
  7. 5 4
      core/aliyun.py
  8. 62 73
      core/db.py
  9. 2 5
      core/word.py
  10. 3 0
      job/new_data.py

+ 0 - 1
app/home.py

@@ -90,7 +90,6 @@ def register():
         return redirect(url_for("home.index"))
 
     current_app.new_invite_passwd()
-    print(register_form.template.data)
     flat, user = create_user(register_form.template.data, register_form.name.data, register_form.passwd.data)
     if user is not None:
         current_app.logger.debug(f"{register_form.name.data} with {register_form.template.data} register success")

+ 1 - 1
app/test.py

@@ -288,7 +288,7 @@ def upload():
     file = request.files["file"]
     user = current_user._get_current_object()
     job: Upload = Upload.upload.get(user.user)
-    if Upload.upload.get(user.user) is not None and job.is_alive():
+    if job is not None and job.is_alive():
         flash("Please wait for the current task to complete")
         return abort(423)
     Upload(user, file.stream.read().decode('utf-8').split('\n')).start()

+ 1 - 1
app/tool.py

@@ -25,7 +25,7 @@ def form_required(form: ClassVar[FlaskForm], callback: Optional[Callable] = None
                 if callback is None:
                     return abort(404)
                 return callback(form=f, **kw, **kwargs)
-            g.form = f
+            g.form = f  # 使用g传递form参数
             return func(*args, **kwargs)
         return new_func
     return required

+ 11 - 9
app/user.py

@@ -14,8 +14,8 @@ class AnonymousUser(AnonymousUserMixin):
 
 class UserWordDataBase(WordDatabase, UserMixin):
     def __check(self, key: int, value: str):
-        if len(self.search(table="User", columns=["value"], where=f"key={key}")) == 0:
-            self.insert(table="User", columns=["key", "value"], values=f"{key}, '{value}'")  # 默认密码
+        if len(self.search("SELECT value FROM User WHERE key=?", key)) == 0:
+            self.insert(table="User", columns=["key", "value"], values=f"{key}, '{value}'")
 
     def __init__(self, user: str, path: str):
         super().__init__(user, path)
@@ -26,24 +26,24 @@ class UserWordDataBase(WordDatabase, UserMixin):
                     value TEXT NOT NULL  -- 密码hash
                 )''')
 
-        self.__check(1, generate_password_hash('88888888'))
+        self.__check(1, generate_password_hash('88888888'))  # 默认密码
         self.__check(2, time.strftime('%Y#%m#%d', time.localtime(time.time())))  # 更新时间
         self.__check(3, '0')  # right
         self.__check(4, '0')  # wrong
-        self.__check(5, '')  # wrong
+        self.__check(5, '')  # 最近列表
 
-        self.check_time()
+        self.check_time()  # 更新最后登录时间
         self.user = user
 
     def get_id(self):
         return self.user
 
     def set_value(self, key: int, value):
-        value = str(value).replace("'", "''")
+        value = str(value)
         self.update(table="User", kw={"value": f"'{value}'"}, where=f"key={key}")
 
     def get_value(self, key: int, default=None) -> Optional[str]:
-        res = self.search(table="User", columns=["value"], where=f"key={key}")
+        res = self.search("SELECT value FROM User WHERE key=?", key)
         if len(res) == 0:
             return default
         return res[0][0]
@@ -58,8 +58,9 @@ class UserWordDataBase(WordDatabase, UserMixin):
         self.set_value(1, generate_password_hash(passwd))
 
     def check_time(self):
+        """ 更新时间数据, 按日计 """
         now_time = time.strftime('%Y#%m#%d', time.localtime(time.time()))
-        if self.get_value(2) != now_time:
+        if self.get_value(2) != now_time:  # 最后更新日期(精确到日)与当前不同,则重置数据
             self.set_value(2, now_time)
             self.set_value(3, 0)
             self.set_value(4, 0)
@@ -93,7 +94,7 @@ class UserWordDataBase(WordDatabase, UserMixin):
         self.delete_self()
 
     def get_box_count(self) -> Tuple[list, list, int, int]:
-        res = self.search(columns=["COUNT(word)", "COUNT(DISTINCT word)", "box"], table="Word", group_by=["box"])
+        res = self.search("SELECT COUNT(word), COUNT(DISTINCT word), box FROM Word GROUP BY box")
         ret = [0, 0, 0, 0, 0]
         ret_distinct = [0, 0, 0, 0, 0]
         for i in res:
@@ -157,6 +158,7 @@ def have_user(name: str):
 
 
 def load_user(name: str, passwd: Optional[str]):
+    """ 加载一个用户,如果密码为None表示不检查密码"""
     if not os.path.exists(os.path.join(conf["DB_PATH"], f"{name}.db")):
         return None
     user = UserWordDataBase(name, conf["DB_PATH"])

+ 7 - 8
app/word_list.py

@@ -41,10 +41,10 @@ def show_all_word():
         return abort(400)
     if page < 1:
         abort(400)
-    res = user.search(columns=["word", "box", "part", "english", "chinese", "eg"],
-                      table="Word", limit=20, offset=(page - 1) * 20,
-                      order_by=[("word", "ASC"), ("box", "ASC")])
-    word_count = user.search(columns=["COUNT(id)"], table="Word")[0][0]
+    res = user.search("SELECT word, box, part, english, chinese, eg FROM Word "
+                      "ORDER BY word, box "
+                      "LIMIT 20 OFFSET ?", (page - 1) * 20)
+    word_count = user.search("SELECT COUNT(id) FROM Word")[0][0]
     page_count = word_count // 20
     if word_count % 20 > 0:
         page_count += 1
@@ -66,10 +66,9 @@ def show_box_word(box: int):
         return abort(400)
     if page < 1:
         abort(400)
-    res = user.search(columns=["word", "box", "part", "english", "chinese", "eg"],
-                      table="Word", limit=20, offset=(page - 1) * 20, where=f"box={box}",
-                      order_by=[("word", "ASC"), ("box", "ASC")])
-    word_count = user.search(columns=["COUNT(id)"], table="Word", where=f"box={box}")[0][0]
+    res = user.search("SELECT word, box, part, english, chinese, eg FROM Word WHERE box=? "
+                      "ORDER BY word, box LIMIT 20 OFFSET ?", box, (page - 1) * 20)
+    word_count = user.search("SELECT COUNT(id) FROM Word WHERE box=?", box)[0][0]
     page_count = word_count // 20
     if word_count % 20 > 0:
         page_count += 1

+ 1 - 1
configure/__init__.py

@@ -13,7 +13,7 @@ conf = {
     "LOG_STDERR": True,
     "LOG_LEVEL": "DEBUG",
     "SERVER_NAME": None,
-    "MAX_CONTENT_LENGTH": 5 * 1024 * 1024,
+    "MAX_CONTENT_LENGTH": 5 * 1024 * 1024,  # Flask 文件上传最大大小
     "LOG_FILE_NAME_PID": False,
     "INVITE_URL": "12345678",
     "ALIYUN_KEY": "",

+ 5 - 4
core/aliyun.py

@@ -18,7 +18,7 @@ class AliyunTls:
         def __init__(self, url: str, key: str, secret: str, app: str, res: "Result", **kwargs):
             self.__res = res
             self.__kwargs = kwargs
-            self.__th = threading.Thread(target=self.__test_run)
+            self.__th = threading.Thread(target=self.__run)
             self.url = url
             self.key = key
             self.secret = secret
@@ -35,19 +35,19 @@ class AliyunTls:
         def on_error(self, message, *args):
             self.__res.success = False
 
-        def data_to_base64(self, data, *_):
+        def on_data(self, data, *_):
             try:
                 self.__res.byte += data
             except Exception:
                 self.__res.success = False
 
-        def __test_run(self):
+        def __run(self):
             tts = nls.NlsSpeechSynthesizer(
                 url=self.url,
                 akid=self.key,
                 aksecret=self.secret,
                 appkey=self.app,
-                on_data=self.data_to_base64,
+                on_data=self.on_data,
                 on_error=self.on_error,
             )
 
@@ -64,6 +64,7 @@ class AliyunTls:
         }
 
     def start(self, text: str):
+        """ 会阻塞程序进行 """
         return AliyunTls.Tls(self.url, self.key, self.secret, self.app, Result(text), **self.kwargs).start()
 
 

+ 62 - 73
core/db.py

@@ -24,49 +24,8 @@ class DataBase:
     def delete_self(self):
         os.remove(self._db_name)
 
-    def search(self, columns: List[str], table: str,
-               where: Union[str, List[str]] = None,
-               limit: Optional[int] = None,
-               offset: Optional[int] = None,
-               order_by: Optional[List[Tuple[str, str]]] = None,
-               group_by: Optional[List[str]] = None,
-               for_update: bool = False):
-        if type(where) is list and len(where) > 0:
-            where: str = " WHERE " + " AND ".join(f"({w})" for w in where)
-        elif type(where) is str and len(where) > 0:
-            where = " WHERE " + where
-        else:
-            where: str = ""
-
-        if order_by is None:
-            order_by: str = ""
-        else:
-            by = [f" {i[0]} {i[1]} " for i in order_by]
-            order_by: str = " ORDER BY" + ", ".join(by)
-
-        if limit is None or limit == 0:
-            limit: str = ""
-        else:
-            limit = f" LIMIT {limit}"
-
-        if offset is None:
-            offset: str = ""
-        else:
-            offset = f" OFFSET {offset}"
-
-        if group_by is None:
-            group_by: str = ""
-        else:
-            group_by = "GROUP BY " + ", ".join(group_by)
-
-        columns: str = ", ".join(columns)
-        if for_update:
-            for_update = "FOR UPDATE"
-        else:
-            for_update = ""
-        return self.__search(f"SELECT {columns} "
-                             f"FROM {table} "
-                             f"{where} {group_by} {order_by} {limit} {offset} {for_update};")
+    def search(self, sql: str, *args) -> Union[None, List]:
+        return self.__search(sql, args)
 
     def insert(self, table: str, columns: list, values: Union[str, List[str]], not_commit: bool = False):
         columns: str = ", ".join(columns)
@@ -98,11 +57,11 @@ class DataBase:
         kw_str = ", ".join(kw_list)
         return self.done(f"UPDATE {table} SET {kw_str} WHERE {where};", not_commit=not_commit)
 
-    def __search(self, sql) -> Union[None, List]:
+    def __search(self, sql, args) -> Union[None, List]:
         try:
             sqlite = sqlite3.connect(self._db_name)
             cur = sqlite.cursor()
-            cur.execute(sql)
+            cur.execute(sql, args)
             ret = cur.fetchall()
         except sqlite3.Error:
             self.__logger.error(f"Sqlite({self._db_name}) SQL {sql} error", exc_info=True)
@@ -144,7 +103,19 @@ class WordDatabase(DataBase):
         )''')
         self.wd = word.WordDict()
 
+        http_header = configure.conf.get("HEADER")
+        if http_header:
+            self.wd.set_headers(http_header)
+
+        http_proxy = configure.conf.get("PROXY")
+        if http_header:
+            self.wd.set_proxies(http_proxy)
+
     def __add_word(self, q: str, add: bool):
+        """
+        访问词典, 添加一个新单词
+        :param add: 表示是否添加到数据库
+        """
         r = self.wd.get_requests(q)
         if not r.is_find:
             return None
@@ -155,16 +126,16 @@ class WordDatabase(DataBase):
                 ret = w
             name = w.name
             mp3 = w.mp3
-            name_lower = name.lower().replace("'", "''")
-            res = self.search(columns=["word"], table="Word", where=f"LOWER(word)='{name_lower}'")
+            name_lower = name.lower()
+            res = self.search("SELECT word FROM Word WHERE LOWER(word)=?", name_lower)
             if res is not None and len(res) > 0:
                 continue
             for c in w.comment:
                 comment = r.res[i].comment[c]
-                eg = '@@'.join(comment.eg).replace("'", "''")
-                part = comment.part.replace("'", "''")
-                english = comment.english.replace("'", "''")
-                chinese = comment.chinese.replace("'", "''")
+                eg = '@@'.join(comment.eg)  # 例句之间使用@@分隔
+                part = comment.part
+                english = comment.english
+                chinese = comment.chinese
                 if add:
                     self.insert(table='Word',
                                 columns=['word', 'part', 'english', 'chinese', 'eg', 'mp3'],
@@ -174,6 +145,7 @@ class WordDatabase(DataBase):
 
     @staticmethod
     def __make_word(q: str, res: list):
+        """ 将 find_word 获取的SQL数据转换为word对象 """
         w = word.Word(q, res[0][7])
         box = 6
         for i in res:
@@ -186,10 +158,17 @@ class WordDatabase(DataBase):
         return w
 
     def find_word(self, q: str, search: bool = True, add: bool = True) -> Optional[word.Word]:
-        name_lower = q.lower().replace("'", "''")
-        res = self.search(columns=["id", "word", "part", "english", "chinese", "eg", "box", "mp3"],
-                          table="Word",
-                          where=f"LOWER(word)='{name_lower}'")
+        """
+        数据库中找单词
+        :param q: 单词
+        :param search: 是否在字典查找
+        :param add: 是否查找后添加到数据库
+        :return: 单词
+        """
+        name_lower = q.lower()
+        res = self.search("SELECT id, word, part, english, chinese, eg, box, mp3 "
+                          "FROM Word "
+                          "WHERE LOWER(word)=?", name_lower)
         if res is None or len(res) <= 0:
             if search:
                 return self.__add_word(q, add)
@@ -198,14 +177,16 @@ class WordDatabase(DataBase):
         return self.__make_word(q, res)
 
     def find_word_by_index(self, index) -> Optional[word.Word]:
-        res = self.search(columns=["DISTINCT word"],
-                          table="Word", order_by=[("Word", "ASC")],
-                          offset=index, limit=1)
+        res = self.search("SELECT DISTINCT word "
+                          "FROM Word "
+                          "ORDER BY word "
+                          "LIMIT 1 OFFSET ?", index)
         if res is None or len(res) <= 0:
             return None
         return self.find_word(res[0][0], False, False)
 
     class UpdateResponse:
+        """ 记录单词导入(更新)的个数, 和失败的单词 """
         def __init__(self):
             self._success = 0
             self._error = 0
@@ -222,8 +203,9 @@ class WordDatabase(DataBase):
             return self._success, self._error, self._error_list
 
     def import_txt(self, line: str, sleep: int = 1):
+        """ 在字符串中导入单词 """
         response = self.UpdateResponse()
-        word_list = self.word_pattern.findall(line)
+        word_list = self.word_pattern.findall(line)  # 匹配line中的所有英语单词
         for w in word_list:
             time.sleep(sleep)
             try:
@@ -240,6 +222,7 @@ class WordDatabase(DataBase):
 
     @staticmethod
     def eg_to_str(eg_filed: str, max_eg: int, html: bool = False):
+        """ 例句转换成HTML格式或人类可读格式 """
         eg = eg_filed.split("@@")
         eg_str = ""
         count_eg = 0
@@ -263,9 +246,10 @@ class WordDatabase(DataBase):
         return eg_str
 
     def export_frame(self, max_eg: int = 3, html: bool = False) -> Optional[pandas.DataFrame]:
-        res = self.search(columns=["box", "word", "part", "english", "chinese", "eg"],
-                          table="Word",
-                          order_by=[("word", "ASC"), ("box", "ASC")])
+        """ 导出数据库 Pandas DataFrame """
+        res = self.search("SELECT box, word, part, english, chinese, eg "
+                          "FROM Word "
+                          "ORDER BY word, box")
         if res is None:
             return None
 
@@ -296,7 +280,7 @@ class WordDatabase(DataBase):
         count = 0
         word_list = self.word_pattern.findall(line)
         for w in word_list:
-            name_lower = w.lower().replace("'", "''")
+            name_lower = w.lower()
             cur = self.delete(table="Word", where=f"LOWER(word)='{name_lower}'")
             if cur[1].rowcount != -1:
                 self.__logger.debug(f"delete word {w} success")
@@ -311,14 +295,14 @@ class WordDatabase(DataBase):
         return cur[1].rowcount
 
     def right_word(self, w: str):
-        name_lower = w.lower().replace("'", "''")
-        res = self.search(columns=["MIN(box)"], table="Word", where=f"LOWER(word)='{name_lower}'")
+        name_lower = w.lower()
+        res = self.search("SELECT MIN(box) FROM Word WHERE LOWER(word)=?", name_lower)
         if len(res) == 0:
             return False
         box = res[0][0]
         if box != 5:
             box += 1
-            name_lower = w.lower().replace("'", "''")
+            name_lower = w.lower()
             self.update(table="Word", kw={"box": f"{box}"}, where=f"LOWER(word)='{name_lower}'")
         return True
 
@@ -341,13 +325,18 @@ class WordDatabase(DataBase):
             box = 4  # 1
         # box 的概率比分别为:5:4:3:2:1
 
-        count = 0
+        first_box = box
+        count = self.search("SELECT COUNT(DISTINCT word) FROM Word WHERE box=?", box)[0][0]
         while count == 0:
-            if box == 5:
-                return None
-            box += 1
-            count = self.search(columns=["COUNT(DISTINCT word)"], table="Word", where=f"box<={box}")[0][0]
-        get = self.search(columns=["DISTINCT word"], table="Word", where=f"box<={box}",
-                          limit=1, offset=random.randint(0, count - 1))[0][0]
+            if box == 4:
+                box = 0
+            else:
+                box += 1
+
+            if box == first_box:
+                break
+            count = self.search("SELECT COUNT(DISTINCT word) FROM Word WHERE box=?", box)[0][0]
+        get = self.search("SELECT DISTINCT word FROM Word WHERE box=? LIMIT 1 OFFSET ?",
+                          box, random.randint(0, count - 1))[0][0]
         self.__logger.debug(f"Rand word {self.dict_name} from box: {box} count: {count} get: {get}")
         return self.find_word(get, False)

+ 2 - 5
core/word.py

@@ -23,11 +23,8 @@ class WordDict:
             user_agent = ("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) "
                           "Chrome/17.0.963.56 Safari/535.11 ")
 
-        if proxies is None:
-            proxies = {'http': "http://localhost:8889", 'https': "http://localhost:8889"}  # 不走系统代理
-
         self.headers = {"User-Agent": user_agent}  # 配置请求头
-        self.proxies = proxies
+        self.proxies = {}
 
     def set_headers(self, headers: dict):
         self.headers.update(headers)
@@ -140,7 +137,7 @@ class Word:
 
         def add_eg(self, eg: str):
             eg = eg.strip()
-            if eg == "##" or len(eg) == 0:
+            if eg == "##" or len(eg) == 0:  # 中英文使用##拼接
                 return
             self.eg.append(eg)
 

+ 3 - 0
job/new_data.py

@@ -1,3 +1,6 @@
+""" 旧版数据库导入为新版数据库 """
+""" 新版加入了mp3 """
+
 import sqlite3
 from core.aliyun import tls
 import traceback