db.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. import sqlite3
  2. from typing import Optional, Union, List, Tuple, Dict
  3. import logging
  4. import pandas
  5. from core import word
  6. import re
  7. import os
  8. import random
  9. import configure
  10. import time
  11. class DataBase:
  12. __logger = logging.getLogger("main.database")
  13. __logger.setLevel({"DEBUG": logging.DEBUG,
  14. "INFO": logging.INFO,
  15. "WARNING": logging.WARNING,
  16. "ERROR": logging.ERROR}.get(configure.conf["LOG_LEVEL"], logging.INFO))
  17. def __init__(self, name, path: str = ""):
  18. self._db_name = os.path.join(path, f"{name}.db")
  19. self.__logger.info(f"Mark {self._db_name}")
  20. def delete_self(self):
  21. os.remove(self._db_name)
  22. def search(self, columns: List[str], table: str,
  23. where: Union[str, List[str]] = None,
  24. limit: Optional[int] = None,
  25. offset: Optional[int] = None,
  26. order_by: Optional[List[Tuple[str, str]]] = None,
  27. group_by: Optional[List[str]] = None,
  28. for_update: bool = False):
  29. if type(where) is list and len(where) > 0:
  30. where: str = " WHERE " + " AND ".join(f"({w})" for w in where)
  31. elif type(where) is str and len(where) > 0:
  32. where = " WHERE " + where
  33. else:
  34. where: str = ""
  35. if order_by is None:
  36. order_by: str = ""
  37. else:
  38. by = [f" {i[0]} {i[1]} " for i in order_by]
  39. order_by: str = " ORDER BY" + ", ".join(by)
  40. if limit is None or limit == 0:
  41. limit: str = ""
  42. else:
  43. limit = f" LIMIT {limit}"
  44. if offset is None:
  45. offset: str = ""
  46. else:
  47. offset = f" OFFSET {offset}"
  48. if group_by is None:
  49. group_by: str = ""
  50. else:
  51. group_by = "GROUP BY " + ", ".join(group_by)
  52. columns: str = ", ".join(columns)
  53. if for_update:
  54. for_update = "FOR UPDATE"
  55. else:
  56. for_update = ""
  57. return self.__search(f"SELECT {columns} "
  58. f"FROM {table} "
  59. f"{where} {group_by} {order_by} {limit} {offset} {for_update};")
  60. def insert(self, table: str, columns: list, values: Union[str, List[str]], not_commit: bool = False):
  61. columns: str = ", ".join(columns)
  62. if type(values) is str:
  63. values: str = f"({values})"
  64. else:
  65. values: str = ", ".join(f"{v}" for v in values)
  66. return self.done(f"INSERT INTO {table}({columns}) VALUES {values};", not_commit=not_commit)
  67. def delete(self, table: str, where: Union[str, List[str]] = None, not_commit: bool = False):
  68. if type(where) is list and len(where) > 0:
  69. where: str = " AND ".join(f"({w})" for w in where)
  70. elif type(where) is not str or len(where) == 0: # 必须指定条件
  71. return None
  72. return self.done(f"DELETE FROM {table} WHERE {where};", not_commit=not_commit)
  73. def update(self, table: str, kw: "Dict[str:str]", where: Union[str, List[str]] = None,
  74. not_commit: bool = False):
  75. if len(kw) == 0:
  76. return None
  77. if type(where) is list and len(where) > 0:
  78. where: str = " AND ".join(f"({w})" for w in where)
  79. elif type(where) is not str or len(where) == 0: # 必须指定条件
  80. return None
  81. kw_list = [f"{key} = {kw[key]}" for key in kw]
  82. kw_str = ", ".join(kw_list)
  83. return self.done(f"UPDATE {table} SET {kw_str} WHERE {where};", not_commit=not_commit)
  84. def __search(self, sql) -> Union[None, List]:
  85. try:
  86. sqlite = sqlite3.connect(self._db_name)
  87. cur = sqlite.cursor()
  88. cur.execute(sql)
  89. ret = cur.fetchall()
  90. except sqlite3.Error:
  91. self.__logger.error(f"Sqlite({self._db_name}) SQL {sql} error", exc_info=True)
  92. return None
  93. return ret
  94. def done(self, sql, not_commit: bool = False) -> Union[None, "Tuple[sqlite3, sqlite3.Cursor]"]:
  95. sqlite = sqlite3.connect(self._db_name)
  96. try:
  97. cur = sqlite.cursor()
  98. cur.execute(sql)
  99. except sqlite3.Error:
  100. sqlite.rollback()
  101. self.__logger.error(f"Sqlite({self._db_name}) SQL {sql} error", exc_info=True)
  102. return None
  103. finally:
  104. if not not_commit:
  105. sqlite.commit()
  106. return sqlite, cur
  107. class WordDatabase(DataBase):
  108. word_pattern = re.compile("([a-zA-Z\']+)") # 匹配英语单词
  109. __logger = logging.getLogger("main.database.dict")
  110. def __init__(self, dict_name, path: str = ""):
  111. super(WordDatabase, self).__init__(dict_name, path)
  112. self.dict_name = dict_name
  113. self.done(f'''
  114. CREATE TABLE IF NOT EXISTS Word (
  115. id INTEGER PRIMARY KEY AUTOINCREMENT, -- 编码
  116. box INTEGER NOT NULL DEFAULT 1 CHECK (box < 6 and box > 0),
  117. word TEXT NOT NULL, -- 单词
  118. part TEXT NOT NULL, -- 词性
  119. english TEXT NOT NULL, -- 英文注释
  120. chinese TEXT NOT NULL, -- 中文注释
  121. eg TEXT, -- 例句
  122. mp3 TEXT -- mp3 单词音频
  123. )''')
  124. self.wd = word.WordDict()
  125. def __add_word(self, q: str, add: bool):
  126. r = self.wd.get_requests(q)
  127. if not r.is_find:
  128. return None
  129. ret = None
  130. for i in r.res:
  131. w = r.res[i]
  132. if ret is None:
  133. ret = w
  134. name = w.name
  135. mp3 = w.mp3
  136. name_lower = name.lower().replace("'", "''")
  137. res = self.search(columns=["word"], table="Word", where=f"LOWER(word)='{name_lower}'")
  138. if res is not None and len(res) > 0:
  139. continue
  140. for c in w.comment:
  141. comment = r.res[i].comment[c]
  142. eg = '@@'.join(comment.eg).replace("'", "''")
  143. part = comment.part.replace("'", "''")
  144. english = comment.english.replace("'", "''")
  145. chinese = comment.chinese.replace("'", "''")
  146. if add:
  147. self.insert(table='Word',
  148. columns=['word', 'part', 'english', 'chinese', 'eg', 'mp3'],
  149. values=f"'{name_lower}', '{part}', '{english}', '{chinese}', '{eg}', '{mp3}'")
  150. self.__logger.info(f"Add word name: {name_lower} part: {part}")
  151. return ret
  152. @staticmethod
  153. def __make_word(q: str, res: list):
  154. w = word.Word(q, res[0][7])
  155. box = 6
  156. for i in res:
  157. c = word.Word.Comment(i[2], i[3], i[4])
  158. for e in i[5].split("@@"):
  159. c.add_eg(e)
  160. w.add_comment(c)
  161. box = min(i[6], box)
  162. w.set_box(box)
  163. return w
  164. def find_word(self, q: str, search: bool = True, add: bool = True) -> Optional[word.Word]:
  165. name_lower = q.lower().replace("'", "''")
  166. res = self.search(columns=["id", "word", "part", "english", "chinese", "eg", "box", "mp3"],
  167. table="Word",
  168. where=f"LOWER(word)='{name_lower}'")
  169. if res is None or len(res) <= 0:
  170. if search:
  171. return self.__add_word(q, add)
  172. return None
  173. self.__logger.debug(f"Find word (not add) {q}")
  174. return self.__make_word(q, res)
  175. class UpdateResponse:
  176. def __init__(self):
  177. self._success = 0
  178. self._error = 0
  179. self._error_list = []
  180. def add_success(self):
  181. self._success += 1
  182. def add_error(self, q: str):
  183. self._error += 1
  184. self._error_list.append(q)
  185. def response(self):
  186. return self._success, self._error, self._error_list
  187. def import_txt(self, line: str, sleep: int = 1):
  188. response = self.UpdateResponse()
  189. word_list = self.word_pattern.findall(line)
  190. for w in word_list:
  191. time.sleep(sleep)
  192. try:
  193. if self.find_word(w, True, True) is None:
  194. self.__logger.debug(f"update word {w} fail")
  195. response.add_error(w)
  196. else:
  197. self.__logger.debug(f"update word {w} success")
  198. response.add_success()
  199. except Exception as e:
  200. response.add_error(w)
  201. self.__logger.debug(f"update word {w} fail", exc_info=True)
  202. return True, response
  203. @staticmethod
  204. def eg_to_str(eg_filed: str, max_eg: int, html: bool = False):
  205. eg = eg_filed.split("@@")
  206. eg_str = ""
  207. count_eg = 0
  208. for e in eg:
  209. count_eg += 1
  210. if max_eg != -1 and count_eg > max_eg:
  211. break
  212. ec = e.split("##")
  213. if len(ec) == 2:
  214. eng, chi = ec
  215. else:
  216. eng = ec[0]
  217. chi = ""
  218. if len(eng.replace(" ", "")) == 0:
  219. continue
  220. if html:
  221. eg_str += f"{eng} ({chi})<br>"
  222. else:
  223. eg_str += f"{eng} ({chi})\n"
  224. return eg_str
  225. def export_frame(self, max_eg: int = 3, html: bool = False) -> Optional[pandas.DataFrame]:
  226. res = self.search(columns=["box", "word", "part", "english", "chinese", "eg"],
  227. table="Word",
  228. order_by=[("word", "ASC"), ("box", "ASC")])
  229. if res is None:
  230. return None
  231. df_box = []
  232. df_word = []
  233. df_part = []
  234. df_english = []
  235. df_chinese = []
  236. df_eg = []
  237. for i in res:
  238. if i[1] in df_word:
  239. continue
  240. df_box.append(str(i[0]))
  241. df_word.append(str(i[1]))
  242. df_part.append(str(i[2]))
  243. df_english.append(str(i[3]))
  244. df_chinese.append(str(i[4]))
  245. df_eg.append(self.eg_to_str(i[5], max_eg, html))
  246. self.__logger.debug(f"export word {i[1]}")
  247. return pandas.DataFrame(data={"Box": df_box,
  248. "Word": df_word,
  249. "Part": df_part,
  250. "English note(s)": df_english,
  251. "Chinese note(s)": df_chinese,
  252. "Example sentence(s)": df_eg})
  253. def delete_txt(self, line: str):
  254. count = 0
  255. word_list = self.word_pattern.findall(line)
  256. for w in word_list:
  257. name_lower = w.lower().replace("'", "''")
  258. cur = self.delete(table="Word", where=f"LOWER(word)='{name_lower}'")
  259. if cur[1].rowcount != -1:
  260. self.__logger.debug(f"delete word {w} success")
  261. count += 1
  262. else:
  263. self.__logger.debug(f"delete word {w} fail")
  264. return count
  265. def delete_all(self):
  266. self.__logger.debug(f"delete all word")
  267. cur = self.delete(table="Word")
  268. return cur[1].rowcount
  269. def right_word(self, w: str):
  270. name_lower = w.lower().replace("'", "''")
  271. res = self.search(columns=["MIN(box)"], table="Word", where=f"LOWER(word)='{name_lower}'")
  272. if len(res) == 0:
  273. return False
  274. box = res[0][0]
  275. if box != 5:
  276. box += 1
  277. name_lower = w.lower().replace("'", "''")
  278. self.update(table="Word", kw={"box": f"{box}"}, where=f"LOWER(word)='{name_lower}'")
  279. return True
  280. def wrong_word(self, w: str):
  281. name_lower = w.lower().replace('\'', '\'\'')
  282. self.update(table="Word", kw={"box": "1"}, where=f"LOWER(word)='{name_lower}'")
  283. return True
  284. def rand_word(self):
  285. r = random.randint(0, 15)
  286. if r < 5:
  287. box = 0 # 5
  288. elif r < 9:
  289. box = 1 # 4
  290. elif r < 12:
  291. box = 2 # 3
  292. elif r < 14:
  293. box = 3 # 2
  294. else:
  295. box = 4 # 1
  296. # box 的概率比分别为:5:4:3:2:1
  297. count = 0
  298. while count == 0:
  299. if box == 5:
  300. print()
  301. return None
  302. box += 1
  303. count = self.search(columns=["COUNT(DISTINCT word)"], table="Word", where=f"box<={box}")[0][0]
  304. get = self.search(columns=["DISTINCT word"], table="Word", where=f"box<={box}",
  305. limit=1, offset=random.randint(0, count - 1))[0][0]
  306. self.__logger.debug(f"Rand word {self.dict_name} from box: {box} count: {count} get: {get}")
  307. return self.find_word(get, False)