db.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. import sqlite3
  2. from typing import Optional, Union, List, Tuple, Dict
  3. import logging
  4. import pandas
  5. from core import word
  6. import re
  7. import os
  8. import random
  9. import configure
  10. import time
  11. class DataBase:
  12. __logger = logging.getLogger("main.database")
  13. __logger.setLevel({"DEBUG": logging.DEBUG,
  14. "INFO": logging.INFO,
  15. "WARNING": logging.WARNING,
  16. "ERROR": logging.ERROR}.get(configure.conf["LOG_LEVEL"], logging.INFO))
  17. def __init__(self, name, path: str = ""):
  18. self._db_name = os.path.join(path, f"{name}.db")
  19. self.__logger.info(f"Mark {self._db_name}")
  20. def delete_self(self):
  21. os.remove(self._db_name)
  22. def search(self, sql: str, *args) -> Union[None, List]:
  23. return self.__search(sql, args)
  24. def insert(self, table: str, columns: list, values: Union[str, List[str]], not_commit: bool = False):
  25. columns: str = ", ".join(columns)
  26. if type(values) is str:
  27. values: str = f"({values})"
  28. else:
  29. values: str = ", ".join(f"{v}" for v in values)
  30. return self.done(f"INSERT INTO {table}({columns}) VALUES {values};", not_commit=not_commit)
  31. def delete(self, table: str, where: Union[str, List[str]] = None, not_commit: bool = False):
  32. if type(where) is list and len(where) > 0:
  33. where: str = " AND ".join(f"({w})" for w in where)
  34. elif type(where) is not str or len(where) == 0: # 必须指定条件
  35. return None
  36. return self.done(f"DELETE FROM {table} WHERE {where};", not_commit=not_commit)
  37. def update(self, table: str, kw: "Dict[str:str]", where: Union[str, List[str]] = None,
  38. not_commit: bool = False):
  39. if len(kw) == 0:
  40. return None
  41. if type(where) is list and len(where) > 0:
  42. where: str = " AND ".join(f"({w})" for w in where)
  43. elif type(where) is not str or len(where) == 0: # 必须指定条件
  44. return None
  45. kw_list = [f"{key} = {kw[key]}" for key in kw]
  46. kw_str = ", ".join(kw_list)
  47. return self.done(f"UPDATE {table} SET {kw_str} WHERE {where};", not_commit=not_commit)
  48. def __search(self, sql, args) -> Union[None, List]:
  49. try:
  50. sqlite = sqlite3.connect(self._db_name)
  51. cur = sqlite.cursor()
  52. cur.execute(sql, args)
  53. ret = cur.fetchall()
  54. except sqlite3.Error:
  55. self.__logger.error(f"Sqlite({self._db_name}) SQL {sql} error", exc_info=True)
  56. return None
  57. return ret
  58. def done(self, sql, not_commit: bool = False) -> Union[None, "Tuple[sqlite3, sqlite3.Cursor]"]:
  59. sqlite = sqlite3.connect(self._db_name)
  60. try:
  61. cur = sqlite.cursor()
  62. cur.execute(sql)
  63. except sqlite3.Error:
  64. sqlite.rollback()
  65. self.__logger.error(f"Sqlite({self._db_name}) SQL {sql} error", exc_info=True)
  66. return None
  67. finally:
  68. if not not_commit:
  69. sqlite.commit()
  70. return sqlite, cur
  71. class WordDatabase(DataBase):
  72. word_pattern = re.compile("([a-zA-Z\']+)") # 匹配英语单词
  73. __logger = logging.getLogger("main.database.dict")
  74. def __init__(self, dict_name, path: str = ""):
  75. super(WordDatabase, self).__init__(dict_name, path)
  76. self.dict_name = dict_name
  77. self.done(f'''
  78. CREATE TABLE IF NOT EXISTS Word (
  79. id INTEGER PRIMARY KEY AUTOINCREMENT, -- 编码
  80. box INTEGER NOT NULL DEFAULT 1 CHECK (box < 6 and box > 0),
  81. word TEXT NOT NULL, -- 单词
  82. part TEXT NOT NULL, -- 词性
  83. english TEXT NOT NULL, -- 英文注释
  84. chinese TEXT NOT NULL, -- 中文注释
  85. eg TEXT, -- 例句
  86. mp3 TEXT -- mp3 单词音频
  87. )''')
  88. self.wd = word.WordDict()
  89. http_header = configure.conf.get("HEADER")
  90. if http_header:
  91. self.wd.set_headers(http_header)
  92. http_proxy = configure.conf.get("PROXY")
  93. if http_header:
  94. self.wd.set_proxies(http_proxy)
  95. def __add_word(self, q: str, add: bool):
  96. """
  97. 访问词典, 添加一个新单词
  98. :param add: 表示是否添加到数据库
  99. """
  100. r = self.wd.get_requests(q)
  101. if not r.is_find:
  102. return None
  103. ret = None
  104. for i in r.res:
  105. w = r.res[i]
  106. if ret is None:
  107. ret = w
  108. name = w.name
  109. mp3 = w.mp3
  110. name_lower = name.lower()
  111. res = self.search("SELECT word FROM Word WHERE LOWER(word)=?", name_lower)
  112. if res is not None and len(res) > 0:
  113. continue
  114. for c in w.comment:
  115. comment = r.res[i].comment[c]
  116. eg = '@@'.join(comment.eg) # 例句之间使用@@分隔
  117. part = comment.part
  118. english = comment.english
  119. chinese = comment.chinese
  120. if add:
  121. self.insert(table='Word',
  122. columns=['word', 'part', 'english', 'chinese', 'eg', 'mp3'],
  123. values=f"'{name_lower}', '{part}', '{english}', '{chinese}', '{eg}', '{mp3}'")
  124. self.__logger.info(f"Add word name: {name_lower} part: {part}")
  125. return ret
  126. @staticmethod
  127. def __make_word(q: str, res: list):
  128. """ 将 find_word 获取的SQL数据转换为word对象 """
  129. w = word.Word(q, res[0][7])
  130. box = 6
  131. for i in res:
  132. c = word.Word.Comment(i[2], i[3], i[4])
  133. for e in i[5].split("@@"):
  134. c.add_eg(e)
  135. w.add_comment(c)
  136. box = min(i[6], box)
  137. w.set_box(box)
  138. return w
  139. def find_word(self, q: str, search: bool = True, add: bool = True) -> Optional[word.Word]:
  140. """
  141. 数据库中找单词
  142. :param q: 单词
  143. :param search: 是否在字典查找
  144. :param add: 是否查找后添加到数据库
  145. :return: 单词
  146. """
  147. name_lower = q.lower()
  148. res = self.search("SELECT id, word, part, english, chinese, eg, box, mp3 "
  149. "FROM Word "
  150. "WHERE LOWER(word)=?", name_lower)
  151. if res is None or len(res) <= 0:
  152. if search:
  153. return self.__add_word(q, add)
  154. return None
  155. self.__logger.debug(f"Find word (not add) {q}")
  156. return self.__make_word(q, res)
  157. def find_word_by_index(self, index) -> Optional[word.Word]:
  158. res = self.search("SELECT DISTINCT word "
  159. "FROM Word "
  160. "ORDER BY word "
  161. "LIMIT 1 OFFSET ?", index)
  162. if res is None or len(res) <= 0:
  163. return None
  164. return self.find_word(res[0][0], False, False)
  165. class UpdateResponse:
  166. """ 记录单词导入(更新)的个数, 和失败的单词 """
  167. def __init__(self):
  168. self._success = 0
  169. self._error = 0
  170. self._error_list = []
  171. def add_success(self):
  172. self._success += 1
  173. def add_error(self, q: str):
  174. self._error += 1
  175. self._error_list.append(q)
  176. def response(self):
  177. return self._success, self._error, self._error_list
  178. def import_txt(self, line: str, sleep: int = 1):
  179. """ 在字符串中导入单词 """
  180. response = self.UpdateResponse()
  181. word_list = self.word_pattern.findall(line) # 匹配line中的所有英语单词
  182. for w in word_list:
  183. time.sleep(sleep)
  184. try:
  185. if self.find_word(w, True, True) is None:
  186. self.__logger.debug(f"update word {w} fail")
  187. response.add_error(w)
  188. else:
  189. self.__logger.debug(f"update word {w} success")
  190. response.add_success()
  191. except Exception as e:
  192. response.add_error(w)
  193. self.__logger.debug(f"update word {w} fail", exc_info=True)
  194. return True, response
  195. @staticmethod
  196. def eg_to_str(eg_filed: str, max_eg: int, html: bool = False):
  197. """ 例句转换成HTML格式或人类可读格式 """
  198. eg = eg_filed.split("@@")
  199. eg_str = ""
  200. count_eg = 0
  201. for e in eg:
  202. count_eg += 1
  203. if max_eg != -1 and count_eg > max_eg:
  204. break
  205. ec = e.split("##")
  206. if len(ec) == 2:
  207. eng, chi = ec
  208. else:
  209. eng = ec[0]
  210. chi = ""
  211. if len(eng.replace(" ", "")) == 0:
  212. continue
  213. if html:
  214. eg_str += f"{eng} ({chi})<br>"
  215. else:
  216. eg_str += f"{eng} ({chi})\n"
  217. return eg_str
  218. def export_frame(self, max_eg: int = 3, html: bool = False) -> Optional[pandas.DataFrame]:
  219. """ 导出数据库 Pandas DataFrame """
  220. res = self.search("SELECT box, word, part, english, chinese, eg "
  221. "FROM Word "
  222. "ORDER BY word, box")
  223. if res is None:
  224. return None
  225. df_box = []
  226. df_word = []
  227. df_part = []
  228. df_english = []
  229. df_chinese = []
  230. df_eg = []
  231. for i in res:
  232. if i[1] in df_word:
  233. continue
  234. df_box.append(str(i[0]))
  235. df_word.append(str(i[1]))
  236. df_part.append(str(i[2]))
  237. df_english.append(str(i[3]))
  238. df_chinese.append(str(i[4]))
  239. df_eg.append(self.eg_to_str(i[5], max_eg, html))
  240. self.__logger.debug(f"export word {i[1]}")
  241. return pandas.DataFrame(data={"Box": df_box,
  242. "Word": df_word,
  243. "Part": df_part,
  244. "English note(s)": df_english,
  245. "Chinese note(s)": df_chinese,
  246. "Example sentence(s)": df_eg})
  247. def delete_txt(self, line: str):
  248. count = 0
  249. word_list = self.word_pattern.findall(line)
  250. for w in word_list:
  251. name_lower = w.lower()
  252. cur = self.delete(table="Word", where=f"LOWER(word)='{name_lower}'")
  253. if cur[1].rowcount != -1:
  254. self.__logger.debug(f"delete word {w} success")
  255. count += 1
  256. else:
  257. self.__logger.debug(f"delete word {w} fail")
  258. return count
  259. def delete_all(self):
  260. self.__logger.debug(f"delete all word")
  261. cur = self.delete(table="Word")
  262. return cur[1].rowcount
  263. def right_word(self, w: str):
  264. name_lower = w.lower()
  265. res = self.search("SELECT MIN(box) FROM Word WHERE LOWER(word)=?", name_lower)
  266. if len(res) == 0:
  267. return False
  268. box = res[0][0]
  269. if box != 5:
  270. box += 1
  271. name_lower = w.lower()
  272. self.update(table="Word", kw={"box": f"{box}"}, where=f"LOWER(word)='{name_lower}'")
  273. return True
  274. def wrong_word(self, w: str):
  275. name_lower = w.lower().replace('\'', '\'\'')
  276. self.update(table="Word", kw={"box": "1"}, where=f"LOWER(word)='{name_lower}'")
  277. return True
  278. def rand_word(self):
  279. r = random.randint(0, 15)
  280. if r < 5:
  281. box = 0 # 5
  282. elif r < 9:
  283. box = 1 # 4
  284. elif r < 12:
  285. box = 2 # 3
  286. elif r < 14:
  287. box = 3 # 2
  288. else:
  289. box = 4 # 1
  290. # box 的概率比分别为:5:4:3:2:1
  291. first_box = box
  292. count = self.search("SELECT COUNT(DISTINCT word) FROM Word WHERE box=?", box)[0][0]
  293. while count == 0:
  294. if box == 4:
  295. box = 0
  296. else:
  297. box += 1
  298. if box == first_box:
  299. break
  300. count = self.search("SELECT COUNT(DISTINCT word) FROM Word WHERE box=?", box)[0][0]
  301. get = self.search("SELECT DISTINCT word FROM Word WHERE box=? LIMIT 1 OFFSET ?",
  302. box, random.randint(0, count - 1))[0][0]
  303. self.__logger.debug(f"Rand word {self.dict_name} from box: {box} count: {count} get: {get}")
  304. return self.find_word(get, False)