db.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. import sqlite3
  2. from typing import Optional, Union, List, Tuple, Dict
  3. import logging
  4. import pandas
  5. from core import word
  6. import re
  7. import os
  8. import random
  9. import configure
  10. class DataBase:
  11. __logger = logging.getLogger("main.database")
  12. __logger.setLevel({"DEBUG": logging.DEBUG,
  13. "INFO": logging.INFO,
  14. "WARNING": logging.WARNING,
  15. "ERROR": logging.ERROR}.get(configure.conf["LOG_LEVEL"], logging.INFO))
  16. def __init__(self, name, path: str = ""):
  17. self._db_name = os.path.join(path, f"{name}.db")
  18. self.__logger.info(f"Mark {self._db_name}")
  19. def delete_self(self):
  20. os.remove(self._db_name)
  21. def search(self, columns: List[str], table: str,
  22. where: Union[str, List[str]] = None,
  23. limit: Optional[int] = None,
  24. offset: Optional[int] = None,
  25. order_by: Optional[List[Tuple[str, str]]] = None,
  26. group_by: Optional[List[str]] = None,
  27. for_update: bool = False):
  28. if type(where) is list and len(where) > 0:
  29. where: str = " WHERE " + " AND ".join(f"({w})" for w in where)
  30. elif type(where) is str and len(where) > 0:
  31. where = " WHERE " + where
  32. else:
  33. where: str = ""
  34. if order_by is None:
  35. order_by: str = ""
  36. else:
  37. by = [f" {i[0]} {i[1]} " for i in order_by]
  38. order_by: str = " ORDER BY" + ", ".join(by)
  39. if limit is None or limit == 0:
  40. limit: str = ""
  41. else:
  42. limit = f" LIMIT {limit}"
  43. if offset is None:
  44. offset: str = ""
  45. else:
  46. offset = f" OFFSET {offset}"
  47. if group_by is None:
  48. group_by: str = ""
  49. else:
  50. group_by = "GROUP BY " + ", ".join(group_by)
  51. columns: str = ", ".join(columns)
  52. if for_update:
  53. for_update = "FOR UPDATE"
  54. else:
  55. for_update = ""
  56. return self.__search(f"SELECT {columns} "
  57. f"FROM {table} "
  58. f"{where} {group_by} {order_by} {limit} {offset} {for_update};")
  59. def insert(self, table: str, columns: list, values: Union[str, List[str]], not_commit: bool = False):
  60. columns: str = ", ".join(columns)
  61. if type(values) is str:
  62. values: str = f"({values})"
  63. else:
  64. values: str = ", ".join(f"{v}" for v in values)
  65. return self.done(f"INSERT INTO {table}({columns}) VALUES {values};", not_commit=not_commit)
  66. def delete(self, table: str, where: Union[str, List[str]] = None, not_commit: bool = False):
  67. if type(where) is list and len(where) > 0:
  68. where: str = " AND ".join(f"({w})" for w in where)
  69. elif type(where) is not str or len(where) == 0: # 必须指定条件
  70. return None
  71. return self.done(f"DELETE FROM {table} WHERE {where};", not_commit=not_commit)
  72. def update(self, table: str, kw: "Dict[str:str]", where: Union[str, List[str]] = None,
  73. not_commit: bool = False):
  74. if len(kw) == 0:
  75. return None
  76. if type(where) is list and len(where) > 0:
  77. where: str = " AND ".join(f"({w})" for w in where)
  78. elif type(where) is not str or len(where) == 0: # 必须指定条件
  79. return None
  80. kw_list = [f"{key} = {kw[key]}" for key in kw]
  81. kw_str = ", ".join(kw_list)
  82. return self.done(f"UPDATE {table} SET {kw_str} WHERE {where};", not_commit=not_commit)
  83. def __search(self, sql) -> Union[None, List]:
  84. try:
  85. sqlite = sqlite3.connect(self._db_name)
  86. cur = sqlite.cursor()
  87. cur.execute(sql)
  88. ret = cur.fetchall()
  89. except sqlite3.Error:
  90. self.__logger.error(f"Sqlite({self._db_name}) SQL {sql} error", exc_info=True)
  91. return None
  92. return ret
  93. def done(self, sql, not_commit: bool = False) -> Union[None, "Tuple[sqlite3, sqlite3.Cursor]"]:
  94. sqlite = sqlite3.connect(self._db_name)
  95. try:
  96. cur = sqlite.cursor()
  97. cur.execute(sql)
  98. except sqlite3.Error:
  99. sqlite.rollback()
  100. self.__logger.error(f"Sqlite({self._db_name}) SQL {sql} error", exc_info=True)
  101. return None
  102. finally:
  103. if not not_commit:
  104. sqlite.commit()
  105. return sqlite, cur
  106. class WordDatabase(DataBase):
  107. word_pattern = re.compile("([a-zA-Z\']+)") # 匹配英语单词
  108. __logger = logging.getLogger("main.database.dict")
  109. def __init__(self, dict_name, path: str = ""):
  110. super(WordDatabase, self).__init__(dict_name, path)
  111. self.dict_name = dict_name
  112. self.done(f'''
  113. CREATE TABLE IF NOT EXISTS Word (
  114. id INTEGER PRIMARY KEY AUTOINCREMENT, -- 编码
  115. box INTEGER NOT NULL DEFAULT 1 CHECK (box < 6 and box > 0),
  116. word TEXT NOT NULL, -- 单词
  117. part TEXT NOT NULL, -- 词性
  118. english TEXT NOT NULL, -- 英文注释
  119. chinese TEXT NOT NULL, -- 中文注释
  120. eg TEXT -- 例句
  121. )''')
  122. self.wd = word.WordDict()
  123. def __add_word(self, q: str, add: bool):
  124. r = self.wd.get_requests(q)
  125. if not r.is_find:
  126. return None
  127. ret = None
  128. for i in r.res:
  129. w = r.res[i]
  130. if ret is None:
  131. ret = w
  132. name = w.name
  133. name_lower = name.lower().replace("'", "''")
  134. res = self.search(columns=["word"], table="Word", where=f"LOWER(word)='{name_lower}'")
  135. if res is not None and len(res) > 0:
  136. continue
  137. for c in w.comment:
  138. comment = r.res[i].comment[c]
  139. eg = '@@'.join(comment.eg).replace("'", "''")
  140. part = comment.part.replace("'", "''")
  141. english = comment.english.replace("'", "''")
  142. chinese = comment.chinese.replace("'", "''")
  143. if add:
  144. self.insert(table='Word',
  145. columns=['word', 'part', 'english', 'chinese', 'eg'],
  146. values=f"'{name_lower}', '{part}', '{english}', '{chinese}', '{eg} '")
  147. self.__logger.info(f"Add word name: {name_lower} part: {part}")
  148. return ret
  149. @staticmethod
  150. def __make_word(q: str, res: list):
  151. w = word.Word(q)
  152. for i in res:
  153. c = word.Word.Comment(i[2], i[3], i[4])
  154. for e in i[5].split("@@"):
  155. c.add_eg(e)
  156. w.add_comment(c)
  157. return w
  158. def find_word(self, q: str, search: bool = True, add: bool = True) -> Optional[word.Word]:
  159. name_lower = q.lower().replace("'", "''")
  160. res = self.search(columns=["id", "word", "part", "english", "chinese", "eg"],
  161. table="Word",
  162. where=f"LOWER(word)='{name_lower}'")
  163. if res is None:
  164. res = []
  165. if len(res) <= 0:
  166. if search:
  167. return self.__add_word(q, add)
  168. return None
  169. self.__logger.debug(f"Find word (not add) {q}")
  170. return self.__make_word(q, res)
  171. class UpdateResponse:
  172. def __init__(self):
  173. self._success = 0
  174. self._error = 0
  175. self._error_list = []
  176. def add_success(self):
  177. self._success += 1
  178. def add_error(self, q: str):
  179. self._error += 1
  180. self._error_list.append(q)
  181. def response(self):
  182. return self._success, self._error, self._error_list
  183. def import_txt(self, line: str):
  184. response = self.UpdateResponse()
  185. word_list = self.word_pattern.findall(line)
  186. for w in word_list:
  187. try:
  188. if self.find_word(w, True, True) is None:
  189. self.__logger.debug(f"update word {w} fail")
  190. response.add_error(w)
  191. else:
  192. self.__logger.debug(f"update word {w} success")
  193. response.add_success()
  194. except Exception as e:
  195. response.add_error(w)
  196. self.__logger.debug(f"update word {w} fail", exc_info=True)
  197. return True, response
  198. @staticmethod
  199. def eg_to_str(eg_filed: str, max_eg: int, html: bool = False):
  200. eg = eg_filed.split("@@")
  201. eg_str = ""
  202. count_eg = 0
  203. for e in eg:
  204. count_eg += 1
  205. if max_eg != -1 and count_eg > max_eg:
  206. break
  207. ec = e.split("##")
  208. if len(ec) == 2:
  209. eng, chi = ec
  210. else:
  211. eng = ec[0]
  212. chi = ""
  213. if len(eng.replace(" ", "")) == 0:
  214. continue
  215. if html:
  216. eg_str += f"{eng} ({chi})<br>"
  217. else:
  218. eg_str += f"{eng} ({chi})\n"
  219. return eg_str
  220. def export_frame(self, max_eg: int = 3, html: bool = False) -> Optional[pandas.DataFrame]:
  221. res = self.search(columns=["box", "word", "part", "english", "chinese", "eg"],
  222. table="Word",
  223. order_by=[("word", "ASC"), ("box", "ASC")])
  224. if res is None:
  225. return None
  226. df_box = []
  227. df_word = []
  228. df_part = []
  229. df_english = []
  230. df_chinese = []
  231. df_eg = []
  232. for i in res:
  233. if i[1] in df_word:
  234. continue
  235. df_box.append(str(i[0]))
  236. df_word.append(str(i[1]))
  237. df_part.append(str(i[2]))
  238. df_english.append(str(i[3]))
  239. df_chinese.append(str(i[4]))
  240. df_eg.append(self.eg_to_str(i[5], max_eg, html))
  241. self.__logger.debug(f"export word {i[1]}")
  242. return pandas.DataFrame(data={"Box": df_box,
  243. "Word": df_word,
  244. "Part": df_part,
  245. "English note(s)": df_english,
  246. "Chinese note(s)": df_chinese,
  247. "Example sentence(s)": df_eg})
  248. def delete_txt(self, line: str):
  249. count = 0
  250. word_list = self.word_pattern.findall(line)
  251. for w in word_list:
  252. name_lower = w.lower().replace("'", "''")
  253. cur = self.delete(table="Word", where=f"LOWER(word)='{name_lower}'")
  254. if cur[1].rowcount != -1:
  255. self.__logger.debug(f"delete word {w} success")
  256. count += 1
  257. else:
  258. self.__logger.debug(f"delete word {w} fail")
  259. return count
  260. def delete_all(self):
  261. self.__logger.debug(f"delete all word")
  262. cur = self.delete(table="Word")
  263. return cur[1].rowcount
  264. def right_word(self, w: str):
  265. name_lower = w.lower().replace("'", "''")
  266. res = self.search(columns=["MIN(box)"], table="Word", where=f"LOWER(word)='{name_lower}'")
  267. if len(res) == 0:
  268. return False
  269. box = res[0][0]
  270. if box != 5:
  271. box += 1
  272. name_lower = w.lower().replace("'", "''")
  273. self.update(table="Word", kw={"box": f"{box}"}, where=f"LOWER(word)='{name_lower}'")
  274. return True
  275. def wrong_word(self, w: str):
  276. name_lower = w.lower().replace('\'', '\'\'')
  277. self.update(table="Word", kw={"box": "1"}, where=f"LOWER(word)='{name_lower}'")
  278. return True
  279. def rand_word(self):
  280. r = random.randint(0, 15)
  281. if r < 5:
  282. box = 0 # 5
  283. elif r < 9:
  284. box = 1 # 4
  285. elif r < 12:
  286. box = 2 # 3
  287. elif r < 14:
  288. box = 3 # 2
  289. else:
  290. box = 4 # 1
  291. # box 的概率比分别为:5:4:3:2:1
  292. count = 0
  293. while count == 0:
  294. if box == 5:
  295. print()
  296. return None
  297. box += 1
  298. count = self.search(columns=["COUNT(ID)"], table="Word", where=f"box<={box}")[0][0]
  299. get = self.search(columns=["word"], table="Word", where=f"box<={box}",
  300. limit=1, offset=random.randint(0, count - 1))[0][0]
  301. self.__logger.debug(f"Rand word {self.dict_name} from box: {box} count: {count} get: {get}")
  302. return self.find_word(get, False)