db.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. import sqlite3
  2. from typing import Optional, Union, List, Tuple, Dict
  3. import logging
  4. import pandas
  5. from core import word
  6. import re
  7. import os
  8. import random
  9. import configure
  10. import time
  11. class DataBase:
  12. __logger = logging.getLogger("main.database")
  13. __logger.setLevel({"DEBUG": logging.DEBUG,
  14. "INFO": logging.INFO,
  15. "WARNING": logging.WARNING,
  16. "ERROR": logging.ERROR}.get(configure.conf["LOG_LEVEL"], logging.INFO))
  17. def __init__(self, name, path: str = ""):
  18. self._db_name = os.path.join(path, f"{name}.db")
  19. self.__logger.info(f"Mark {self._db_name}")
  20. def delete_self(self):
  21. os.remove(self._db_name)
  22. def search(self, sql: str, *args) -> Union[None, List]:
  23. return self.__search(sql, args)
  24. def insert(self, sql: str, *args, not_commit: bool = False) -> Union[None, "Tuple[sqlite3, sqlite3.Cursor]"]:
  25. return self.done(sql, args, not_commit=not_commit)
  26. def delete(self, sql: str, *args, not_commit: bool = False) -> Union[None, "Tuple[sqlite3, sqlite3.Cursor]"]:
  27. return self.done(sql, args, not_commit=not_commit)
  28. def update(self, sql: str, *args, not_commit: bool = False) -> Union[None, "Tuple[sqlite3, sqlite3.Cursor]"]:
  29. return self.done(sql, args, not_commit=not_commit)
  30. def __search(self, sql, args) -> Union[None, List]:
  31. try:
  32. sqlite = sqlite3.connect(self._db_name)
  33. cur = sqlite.cursor()
  34. cur.execute(sql, args)
  35. ret = cur.fetchall()
  36. except sqlite3.Error:
  37. self.__logger.error(f"Sqlite({self._db_name}) SQL {sql} error", exc_info=True)
  38. return None
  39. return ret
  40. def done(self, sql, args=(), not_commit: bool = False) -> Union[None, "Tuple[sqlite3, sqlite3.Cursor]"]:
  41. sqlite = sqlite3.connect(self._db_name)
  42. try:
  43. cur = sqlite.cursor()
  44. cur.execute(sql, args)
  45. except sqlite3.Error:
  46. sqlite.rollback()
  47. self.__logger.error(f"Sqlite({self._db_name}) SQL {sql} error", exc_info=True)
  48. return None
  49. finally:
  50. if not not_commit:
  51. sqlite.commit()
  52. return sqlite, cur
  53. class WordDatabase(DataBase):
  54. word_pattern = re.compile("([a-zA-Z\']+)") # 匹配英语单词
  55. __logger = logging.getLogger("main.database.dict")
  56. def __init__(self, dict_name, path: str = ""):
  57. super(WordDatabase, self).__init__(dict_name, path)
  58. self.dict_name = dict_name
  59. self.done(f'''
  60. CREATE TABLE IF NOT EXISTS main.Word (
  61. id INTEGER PRIMARY KEY AUTOINCREMENT, -- 编码
  62. box INTEGER NOT NULL DEFAULT 1 CHECK (box < 6 and box > 0),
  63. word TEXT NOT NULL, -- 单词
  64. part TEXT NOT NULL, -- 词性
  65. english TEXT NOT NULL, -- 英文注释
  66. chinese TEXT NOT NULL, -- 中文注释
  67. eg TEXT, -- 例句
  68. mp3 TEXT -- mp3 单词音频
  69. )''')
  70. self.wd = word.WordDict()
  71. http_header = configure.conf.get("HEADER")
  72. if http_header:
  73. self.wd.set_headers(http_header)
  74. http_proxy = configure.conf.get("PROXY")
  75. if http_header:
  76. self.wd.set_proxies(http_proxy)
  77. def __add_word(self, q: str, add: bool):
  78. """
  79. 访问词典, 添加一个新单词
  80. :param add: 表示是否添加到数据库
  81. """
  82. r = self.wd.get_requests(q)
  83. if not r.is_find:
  84. return None
  85. ret = None
  86. for i in r.res:
  87. w = r.res[i]
  88. if ret is None:
  89. ret = w
  90. name = w.name
  91. mp3 = w.mp3
  92. name_lower = name.lower()
  93. res = self.search("SELECT word FROM main.Word WHERE LOWER(word)=?", name_lower)
  94. if res is not None and len(res) > 0:
  95. continue
  96. for c in w.comment:
  97. comment = r.res[i].comment[c]
  98. eg = '@@'.join(comment.eg) # 例句之间使用@@分隔
  99. part = comment.part
  100. english = comment.english
  101. chinese = comment.chinese
  102. if add:
  103. self.insert("INSERT INTO main.Word(word, part, english, chinese, eg, mp3) "
  104. "VALUES (?, ?, ?, ?, ?, ?)",
  105. name_lower, part, english, chinese, eg, mp3)
  106. self.__logger.info(f"Add word name: {name_lower} part: {part}")
  107. return ret
  108. @staticmethod
  109. def __make_word(q: str, res: list):
  110. """ 将 find_word 获取的SQL数据转换为word对象 """
  111. w = word.Word(q, res[0][7])
  112. box = 6
  113. for i in res:
  114. c = word.Word.Comment(i[2], i[3], i[4])
  115. for e in i[5].split("@@"):
  116. c.add_eg(e)
  117. w.add_comment(c)
  118. box = min(i[6], box)
  119. w.set_box(box)
  120. return w
  121. def find_word(self, q: str, search: bool = True, add: bool = True) -> Optional[word.Word]:
  122. """
  123. 数据库中找单词
  124. :param q: 单词
  125. :param search: 是否在字典查找
  126. :param add: 是否查找后添加到数据库
  127. :return: 单词
  128. """
  129. name_lower = q.lower()
  130. res = self.search("SELECT id, word, part, english, chinese, eg, box, mp3 "
  131. "FROM main.Word "
  132. "WHERE LOWER(word)=?", name_lower)
  133. if res is None or len(res) <= 0:
  134. if search:
  135. return self.__add_word(q, add)
  136. return None
  137. self.__logger.debug(f"Find word (not add) {q}")
  138. return self.__make_word(q, res)
  139. def find_word_by_index(self, index) -> Optional[word.Word]:
  140. res = self.search("SELECT DISTINCT word "
  141. "FROM main.Word "
  142. "ORDER BY word "
  143. "LIMIT 1 OFFSET ?", index)
  144. if res is None or len(res) <= 0:
  145. return None
  146. return self.find_word(res[0][0], False, False)
  147. class UpdateResponse:
  148. """ 记录单词导入(更新)的个数, 和失败的单词 """
  149. def __init__(self):
  150. self._success = 0
  151. self._error = 0
  152. self._error_list = []
  153. def add_success(self):
  154. self._success += 1
  155. def add_error(self, q: str):
  156. self._error += 1
  157. self._error_list.append(q)
  158. def response(self):
  159. return self._success, self._error, self._error_list
  160. def import_txt(self, line: str, sleep: int = 1):
  161. """ 在字符串中导入单词 """
  162. response = self.UpdateResponse()
  163. word_list = self.word_pattern.findall(line) # 匹配line中的所有英语单词
  164. for w in word_list:
  165. time.sleep(sleep)
  166. try:
  167. if self.find_word(w, True, True) is None:
  168. self.__logger.debug(f"update word {w} fail")
  169. response.add_error(w)
  170. else:
  171. self.__logger.debug(f"update word {w} success")
  172. response.add_success()
  173. except Exception as e:
  174. response.add_error(w)
  175. self.__logger.debug(f"update word {w} fail", exc_info=True)
  176. return True, response
  177. @staticmethod
  178. def eg_to_str(eg_filed: str, max_eg: int, html: bool = False):
  179. """ 例句转换成HTML格式或人类可读格式 """
  180. eg = eg_filed.split("@@")
  181. eg_str = ""
  182. count_eg = 0
  183. for e in eg:
  184. count_eg += 1
  185. if max_eg != -1 and count_eg > max_eg:
  186. break
  187. ec = e.split("##")
  188. if len(ec) == 2:
  189. eng, chi = ec
  190. else:
  191. eng = ec[0]
  192. chi = ""
  193. if len(eng.replace(" ", "")) == 0:
  194. continue
  195. if html:
  196. eg_str += f"{eng} ({chi})<br>"
  197. else:
  198. eg_str += f"{eng} ({chi})\n"
  199. return eg_str
  200. def export_frame(self, max_eg: int = 3, html: bool = False) -> Optional[pandas.DataFrame]:
  201. """ 导出数据库 Pandas DataFrame """
  202. res = self.search("SELECT box, word, part, english, chinese, eg "
  203. "FROM main.Word "
  204. "ORDER BY word, box")
  205. if res is None:
  206. return None
  207. df_box = []
  208. df_word = []
  209. df_part = []
  210. df_english = []
  211. df_chinese = []
  212. df_eg = []
  213. for i in res:
  214. if i[1] in df_word:
  215. continue
  216. df_box.append(str(i[0]))
  217. df_word.append(str(i[1]))
  218. df_part.append(str(i[2]))
  219. df_english.append(str(i[3]))
  220. df_chinese.append(str(i[4]))
  221. df_eg.append(self.eg_to_str(i[5], max_eg, html))
  222. self.__logger.debug(f"export word {i[1]}")
  223. return pandas.DataFrame(data={"Box": df_box,
  224. "Word": df_word,
  225. "Part": df_part,
  226. "English note(s)": df_english,
  227. "Chinese note(s)": df_chinese,
  228. "Example sentence(s)": df_eg})
  229. def delete_txt(self, line: str):
  230. count = 0
  231. word_list = self.word_pattern.findall(line)
  232. for w in word_list:
  233. name_lower = w.lower()
  234. cur = self.delete("DELETE FROM main.Word WHERE LOWER(word)-?", name_lower)
  235. if cur[1].rowcount != -1:
  236. self.__logger.debug(f"delete word {w} success")
  237. count += 1
  238. else:
  239. self.__logger.debug(f"delete word {w} fail")
  240. return count
  241. def delete_all(self):
  242. self.__logger.debug(f"delete all word")
  243. cur = self.delete("DELETE FROM main.Word WHERE true")
  244. return cur[1].rowcount
  245. def right_word(self, w: str):
  246. name_lower = w.lower()
  247. res = self.search("SELECT MIN(box) FROM main.Word WHERE LOWER(word)=?", name_lower)
  248. if len(res) == 0:
  249. return False
  250. box = res[0][0]
  251. if box != 5:
  252. box += 1
  253. name_lower = w.lower()
  254. self.update("UPDATE main.Word SET box=? WHERE LOWER(word)=?", box, name_lower)
  255. return True
  256. def wrong_word(self, w: str):
  257. name_lower = w.lower().replace('\'', '\'\'')
  258. self.update("UPDATE main.Word SET box=1 WHERE LOWER(word)=?", name_lower)
  259. return True
  260. def rand_word(self):
  261. r = random.randint(0, 15)
  262. if r < 5:
  263. box = 0 # 5
  264. elif r < 9:
  265. box = 1 # 4
  266. elif r < 12:
  267. box = 2 # 3
  268. elif r < 14:
  269. box = 3 # 2
  270. else:
  271. box = 4 # 1
  272. # box 的概率比分别为:5:4:3:2:1
  273. first_box = box
  274. count = self.search("SELECT COUNT(DISTINCT word) FROM main.Word WHERE box=?", box)[0][0]
  275. while count == 0:
  276. if box == 4:
  277. box = 0
  278. else:
  279. box += 1
  280. if box == first_box:
  281. break
  282. count = self.search("SELECT COUNT(DISTINCT word) FROM main.Word WHERE box=?", box)[0][0]
  283. get = self.search("SELECT DISTINCT word FROM main.Word WHERE box=? LIMIT 1 OFFSET ?",
  284. box, random.randint(0, count - 1))[0][0]
  285. self.__logger.debug(f"Rand word {self.dict_name} from box: {box} count: {count} get: {get}")
  286. return self.find_word(get, False)