db.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. import sqlite3
  2. from typing import Optional, Union, List, Tuple, Dict
  3. import logging
  4. import pandas
  5. from . import word
  6. import re
  7. import os
  8. import random
  9. import configure
  10. class DataBase:
  11. __logger = logging.getLogger("main.database")
  12. __logger.setLevel({"DEBUG": logging.DEBUG,
  13. "INFO": logging.INFO,
  14. "WARNING": logging.WARNING,
  15. "ERROR": logging.ERROR}.get(configure.conf["LOG_LEVEL"], logging.INFO))
  16. def __init__(self, name, path: str = ""):
  17. self._db_name = os.path.join(path, f"{name}.db")
  18. self.__logger.info(f"Mark {self._db_name}")
  19. def delete_self(self):
  20. os.remove(self._db_name)
  21. def search(self, columns: List[str], table: str,
  22. where: Union[str, List[str]] = None,
  23. limit: Optional[int] = None,
  24. offset: Optional[int] = None,
  25. order_by: Optional[List[Tuple[str, str]]] = None,
  26. group_by: Optional[List[str]] = None,
  27. for_update: bool = False):
  28. if type(where) is list and len(where) > 0:
  29. where: str = " WHERE " + " AND ".join(f"({w})" for w in where)
  30. elif type(where) is str and len(where) > 0:
  31. where = " WHERE " + where
  32. else:
  33. where: str = ""
  34. if order_by is None:
  35. order_by: str = ""
  36. else:
  37. by = [f" {i[0]} {i[1]} " for i in order_by]
  38. order_by: str = " ORDER BY" + ", ".join(by)
  39. if limit is None or limit == 0:
  40. limit: str = ""
  41. else:
  42. limit = f" LIMIT {limit}"
  43. if offset is None:
  44. offset: str = ""
  45. else:
  46. offset = f" OFFSET {offset}"
  47. if group_by is None:
  48. group_by: str = ""
  49. else:
  50. group_by = "GROUP BY " + ", ".join(group_by)
  51. columns: str = ", ".join(columns)
  52. if for_update:
  53. for_update = "FOR UPDATE"
  54. else:
  55. for_update = ""
  56. return self.__search(f"SELECT {columns} "
  57. f"FROM {table} "
  58. f"{where} {group_by} {order_by} {limit} {offset} {for_update};")
  59. def insert(self, table: str, columns: list, values: Union[str, List[str]], not_commit: bool = False):
  60. columns: str = ", ".join(columns)
  61. if type(values) is str:
  62. values: str = f"({values})"
  63. else:
  64. values: str = ", ".join(f"{v}" for v in values)
  65. return self.done(f"INSERT INTO {table}({columns}) VALUES {values};", not_commit=not_commit)
  66. def delete(self, table: str, where: Union[str, List[str]] = None, not_commit: bool = False):
  67. if type(where) is list and len(where) > 0:
  68. where: str = " AND ".join(f"({w})" for w in where)
  69. elif type(where) is not str or len(where) == 0: # 必须指定条件
  70. return None
  71. return self.done(f"DELETE FROM {table} WHERE {where};", not_commit=not_commit)
  72. def update(self, table: str, kw: "Dict[str:str]", where: Union[str, List[str]] = None,
  73. not_commit: bool = False):
  74. if len(kw) == 0:
  75. return None
  76. if type(where) is list and len(where) > 0:
  77. where: str = " AND ".join(f"({w})" for w in where)
  78. elif type(where) is not str or len(where) == 0: # 必须指定条件
  79. return None
  80. kw_list = [f"{key} = {kw[key]}" for key in kw]
  81. kw_str = ", ".join(kw_list)
  82. return self.done(f"UPDATE {table} SET {kw_str} WHERE {where};", not_commit=not_commit)
  83. def __search(self, sql) -> Union[None, List]:
  84. try:
  85. sqlite = sqlite3.connect(self._db_name)
  86. cur = sqlite.cursor()
  87. cur.execute(sql)
  88. ret = cur.fetchall()
  89. except sqlite3.Error:
  90. self.__logger.error(f"Sqlite({self._db_name}) SQL {sql} error", exc_info=True)
  91. return None
  92. return ret
  93. def done(self, sql, not_commit: bool = False) -> Union[None, "Tuple[sqlite3, sqlite3.Cursor]"]:
  94. sqlite = sqlite3.connect(self._db_name)
  95. try:
  96. cur = sqlite.cursor()
  97. cur.execute(sql)
  98. except sqlite3.Error:
  99. sqlite.rollback()
  100. self.__logger.error(f"Sqlite({self._db_name}) SQL {sql} error", exc_info=True)
  101. return None
  102. finally:
  103. if not not_commit:
  104. sqlite.commit()
  105. return sqlite, cur
  106. class WordDatabase(DataBase):
  107. word_pattern = re.compile("([a-zA-Z\']+)") # 匹配英语单词
  108. __logger = logging.getLogger("main.database.dict")
  109. def __init__(self, dict_name, path: str = ""):
  110. super(WordDatabase, self).__init__(dict_name, path)
  111. self.dict_name = dict_name
  112. self.done(f'''
  113. CREATE TABLE IF NOT EXISTS Word (
  114. id INTEGER PRIMARY KEY AUTOINCREMENT, -- 编码
  115. box INTEGER NOT NULL DEFAULT 1 CHECK (box < 6 and box > 0),
  116. word TEXT NOT NULL, -- 单词
  117. part TEXT NOT NULL, -- 词性
  118. english TEXT NOT NULL, -- 英文注释
  119. chinese TEXT NOT NULL, -- 中文注释
  120. eg TEXT -- 例句
  121. )''')
  122. self.wd = word.WordDict()
  123. def __add_word(self, q: str):
  124. r = self.wd.get_requests(q)
  125. if not r.is_find:
  126. return None
  127. ret = None
  128. for i in r.res:
  129. w = r.res[i]
  130. if ret is None:
  131. ret = w
  132. name = w.name
  133. name_lower = name.lower().replace("'", "''")
  134. res = self.search(columns=["word"], table="Word", where=f"LOWER(word)='{name_lower}'")
  135. if res is not None and len(res) > 0:
  136. continue
  137. for c in w.comment:
  138. comment = r.res[i].comment[c]
  139. eg = '@@'.join(comment.eg).replace("'", "''")
  140. part = comment.part.replace("'", "''")
  141. english = comment.english.replace("'", "''")
  142. chinese = comment.chinese.replace("'", "''")
  143. self.insert(table='Word',
  144. columns=['word', 'part', 'english', 'chinese', 'eg'],
  145. values=f"'{name_lower}', '{part}', '{english}', '{chinese}', '{eg} '")
  146. self.__logger.info(f"Add word name: {name_lower} part: {part}")
  147. return ret
  148. @staticmethod
  149. def __make_word(q: str, res: list):
  150. w = word.Word(q)
  151. for i in res:
  152. c = word.Word.Comment(i[2], i[3], i[4])
  153. for e in i[5].split("@@"):
  154. c.add_eg(e)
  155. w.add_comment(c)
  156. return w
  157. def find_word(self, q: str) -> Optional[word.Word]:
  158. res = self.search(columns=["id", "word", "part", "english", "chinese", "eg"],
  159. table="Word",
  160. where=f"LOWER(word)='{q.lower()}'")
  161. if res is None:
  162. res = []
  163. if len(res) <= 0:
  164. return self.__add_word(q)
  165. self.__logger.debug(f"Find word (not add) {q}")
  166. return self.__make_word(q, res)
  167. class UpdateResponse:
  168. def __init__(self):
  169. self._success = 0
  170. self._error = 0
  171. self._error_list = []
  172. def add_success(self):
  173. self._success += 1
  174. def add_error(self, q: str):
  175. self._error += 1
  176. self._error_list.append(q)
  177. def response(self):
  178. return self._success, self._error, self._error_list
  179. def import_txt(self, line: str):
  180. response = self.UpdateResponse()
  181. word_list = self.word_pattern.findall(line)
  182. for w in word_list:
  183. try:
  184. if self.find_word(w) is None:
  185. self.__logger.debug(f"update word {w} fail")
  186. response.add_error(w)
  187. else:
  188. self.__logger.debug(f"update word {w} success")
  189. response.add_success()
  190. except Exception as e:
  191. response.add_error(w)
  192. self.__logger.debug(f"update word {w} fail", exc_info=True)
  193. return True, response
  194. @staticmethod
  195. def eg_to_str(eg_filed: str, max_eg: int):
  196. eg = eg_filed.split("@@")
  197. eg_str = ""
  198. count_eg = 0
  199. for e in eg:
  200. count_eg += 1
  201. if max_eg != -1 and count_eg > max_eg:
  202. break
  203. ec = e.split("##")
  204. if len(ec) == 2:
  205. eng, chi = ec
  206. else:
  207. eng = ec[0]
  208. chi = ""
  209. if len(eng.replace(" ", "")) == 0:
  210. continue
  211. eg_str += f"{eng} ({chi})\n"
  212. return eg_str
  213. def export_frame(self, max_eg: int = 3) -> Optional[pandas.DataFrame]:
  214. res = self.search(columns=["box", "word", "part", "english", "chinese", "eg"],
  215. table="Word",
  216. order_by=[('box', 'DESC')])
  217. if res is None:
  218. return None
  219. df = pandas.DataFrame(columns=["Box", "Word", "Part", "English", "Chinese", "Eg"])
  220. export = []
  221. for i in res:
  222. if i[1] in export:
  223. continue
  224. export.append(i[1])
  225. eg_str = self.eg_to_str(i[5], max_eg)
  226. df = df.append({"Box": str(i[0]),
  227. "Word": str(i[1]),
  228. "Part": str(i[2]),
  229. "English": str(i[3]),
  230. "Chinese": str(i[4]),
  231. "Eg": eg_str}, ignore_index=True)
  232. self.__logger.debug(f"export word {i[1]}")
  233. return df
  234. def delete_txt(self, line: str):
  235. count = 0
  236. word_list = self.word_pattern.findall(line)
  237. for w in word_list:
  238. cur = self.delete(table="Word", where=f"LOWER(word)='{w.lower()}'")
  239. if cur[1].rowcount != -1:
  240. self.__logger.debug(f"delete word {w} success")
  241. count += 1
  242. else:
  243. self.__logger.debug(f"delete word {w} fail")
  244. return count
  245. def delete_all(self):
  246. self.__logger.debug(f"delete all word")
  247. cur = self.delete(table="Word")
  248. return cur[1].rowcount
  249. def rand_word(self):
  250. r = random.randint(0, 16)
  251. if r < 5:
  252. box = 2
  253. elif r < 9:
  254. box = 3
  255. elif r < 12:
  256. box = 4
  257. elif r < 14:
  258. box = 5
  259. else:
  260. box = 6
  261. # box 的概率比分别为:5:4:3:2:1
  262. count = 0
  263. while count == 0:
  264. if box == 1:
  265. return None
  266. box -= 1
  267. count = self.search(columns=["COUNT(ID)"], table="Word", where=f"box={box}")[0][0]
  268. get = self.search(columns=["word"], table="Word", limit=1, offset=random.randint(0, count))[0][0]
  269. self.__logger.debug(f"Rand word {self.dict_name} from box: {box} count: {count} get: {get}")
  270. return self.find_word(get)