1
0

db.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. import sqlite3
  2. from typing import Optional, Union, List, Tuple, Dict
  3. import logging
  4. import pandas
  5. import word
  6. import re
  7. import os
  8. class DataBase:
  9. __logger = logging.getLogger("database")
  10. __logger.propagate = False
  11. def __init__(self, name, path: str = ""):
  12. self._db_name = os.path.join(path, f"{name}.db")
  13. self.__logger.info(f"Mark {self._db_name}")
  14. def search(self, columns: List[str], table: str,
  15. where: Union[str, List[str]] = None,
  16. limit: Optional[int] = None,
  17. offset: Optional[int] = None,
  18. order_by: Optional[List[Tuple[str, str]]] = None,
  19. group_by: Optional[List[str]] = None,
  20. for_update: bool = False):
  21. if type(where) is list and len(where) > 0:
  22. where: str = " WHERE " + " AND ".join(f"({w})" for w in where)
  23. elif type(where) is str and len(where) > 0:
  24. where = " WHERE " + where
  25. else:
  26. where: str = ""
  27. if order_by is None:
  28. order_by: str = ""
  29. else:
  30. by = [f" {i[0]} {i[1]} " for i in order_by]
  31. order_by: str = " ORDER BY" + ", ".join(by)
  32. if limit is None or limit == 0:
  33. limit: str = ""
  34. else:
  35. limit = f" LIMIT {limit}"
  36. if offset is None:
  37. offset: str = ""
  38. else:
  39. offset = f" OFFSET {offset}"
  40. if group_by is None:
  41. group_by: str = ""
  42. else:
  43. group_by = "GROUP BY " + ", ".join(group_by)
  44. columns: str = ", ".join(columns)
  45. if for_update:
  46. for_update = "FOR UPDATE"
  47. else:
  48. for_update = ""
  49. return self.__search(f"SELECT {columns} "
  50. f"FROM {table} "
  51. f"{where} {group_by} {order_by} {limit} {offset} {for_update};")
  52. def insert(self, table: str, columns: list, values: Union[str, List[str]], not_commit: bool = False):
  53. columns: str = ", ".join(columns)
  54. if type(values) is str:
  55. values: str = f"({values})"
  56. else:
  57. values: str = ", ".join(f"{v}" for v in values)
  58. return self.done(f"INSERT INTO {table}({columns}) VALUES {values};", not_commit=not_commit)
  59. def delete(self, table: str, where: Union[str, List[str]] = None, not_commit: bool = False):
  60. if type(where) is list and len(where) > 0:
  61. where: str = " AND ".join(f"({w})" for w in where)
  62. elif type(where) is not str or len(where) == 0: # 必须指定条件
  63. return None
  64. return self.done(f"DELETE FROM {table} WHERE {where};", not_commit=not_commit)
  65. def update(self, table: str, kw: "Dict[str:str]", where: Union[str, List[str]] = None,
  66. not_commit: bool = False):
  67. if len(kw) == 0:
  68. return None
  69. if type(where) is list and len(where) > 0:
  70. where: str = " AND ".join(f"({w})" for w in where)
  71. elif type(where) is not str or len(where) == 0: # 必须指定条件
  72. return None
  73. kw_list = [f"{key} = {kw[key]}" for key in kw]
  74. kw_str = ", ".join(kw_list)
  75. return self.done(f"UPDATE {table} SET {kw_str} WHERE {where};", not_commit=not_commit)
  76. def __search(self, sql) -> Union[None, List]:
  77. try:
  78. sqlite = sqlite3.connect(self._db_name)
  79. cur = sqlite.cursor()
  80. cur.execute(sql)
  81. ret = cur.fetchall()
  82. except sqlite3.Error:
  83. self.__logger.error(f"Sqlite({self._db_name}) SQL {sql} error", exc_info=True)
  84. return None
  85. return ret
  86. def done(self, sql, not_commit: bool = False) -> Union[None, "Tuple[sqlite3, sqlite3.Cursor]"]:
  87. sqlite = sqlite3.connect(self._db_name)
  88. try:
  89. cur = sqlite.cursor()
  90. cur.execute(sql)
  91. except sqlite3.Error:
  92. sqlite.rollback()
  93. self.__logger.error(f"Sqlite({self._db_name}) SQL {sql} error", exc_info=True)
  94. return None
  95. finally:
  96. if not not_commit:
  97. sqlite.commit()
  98. return sqlite, cur
  99. class WordDatabase(DataBase):
  100. word_pattern = re.compile("([a-zA-Z\']+)") # 匹配英语单词
  101. __logger = logging.getLogger("database.dict")
  102. def __init__(self, dict_name: str = "global"):
  103. super(WordDatabase, self).__init__(dict_name + "-dict")
  104. self.dict_name = dict_name
  105. self.done(f'''
  106. CREATE TABLE IF NOT EXISTS Word (
  107. id INTEGER PRIMARY KEY AUTOINCREMENT, -- 编码
  108. box INTEGER NOT NULL DEFAULT 1 CHECK (box < 6 and box > 0),
  109. word TEXT NOT NULL, -- 单词
  110. part TEXT NOT NULL, -- 词性
  111. english TEXT NOT NULL, -- 英文注释
  112. chinese TEXT NOT NULL, -- 中文注释
  113. eg TEXT -- 例句
  114. )''')
  115. self.wd = word.WordDict()
  116. def __add_word(self, q: str):
  117. r = self.wd.get_requests(q)
  118. if not r.is_find:
  119. return None
  120. ret = None
  121. for i in r.res:
  122. w = r.res[i]
  123. if ret is None:
  124. ret = w
  125. name = w.name
  126. name_lower = name.lower().replace("'", "''")
  127. res = self.search(columns=["word"], table="Word", where=f"LOWER(word)='{name_lower}'")
  128. if res is not None and len(res) > 0:
  129. continue
  130. for c in w.comment:
  131. comment = r.res[i].comment[c]
  132. eg = '@@'.join(comment.eg).replace("'", "''")
  133. part = comment.part.replace("'", "''")
  134. english = comment.english.replace("'", "''")
  135. chinese = comment.chinese.replace("'", "''")
  136. self.insert(table='Word',
  137. columns=['word', 'part', 'english', 'chinese', 'eg'],
  138. values=f"'{name_lower}', '{part}', '{english}', '{chinese}', '{eg} '")
  139. self.__logger.info(f"Add word name: {name_lower} part: {part}")
  140. return ret
  141. @staticmethod
  142. def __make_word(q: str, res: list):
  143. w = word.Word(q)
  144. for i in res:
  145. c = word.Word.Comment(i[2], i[3], i[4])
  146. for e in i[5].split("@@"):
  147. c.add_eg(e)
  148. w.add_comment(c)
  149. return w
  150. def find_word(self, q: str) -> Optional[word.Word]:
  151. res = self.search(columns=["id", "word", "part", "english", "chinese", "eg"],
  152. table="Word",
  153. where=f"LOWER(word)='{q.lower()}'")
  154. if res is None:
  155. res = []
  156. if len(res) <= 0:
  157. return self.__add_word(q)
  158. self.__logger.debug(f"Find word (not add) {q}")
  159. return self.__make_word(q, res)
  160. class UpdateResponse:
  161. def __init__(self):
  162. self._success = 0
  163. self._error = 0
  164. self._error_list = []
  165. def add_success(self):
  166. self._success += 1
  167. def add_error(self, q: str):
  168. self._error += 1
  169. self._error_list.append(q)
  170. def response(self):
  171. return self._success, self._error, self._error_list
  172. def import_txt(self, line: str):
  173. response = self.UpdateResponse()
  174. word_list = self.word_pattern.findall(line)
  175. for w in word_list:
  176. try:
  177. if self.find_word(w) is None:
  178. self.__logger.debug(f"update word {w} fail")
  179. response.add_error(w)
  180. else:
  181. self.__logger.debug(f"update word {w} success")
  182. response.add_success()
  183. except Exception as e:
  184. response.add_error(w)
  185. self.__logger.debug(f"update word {w} fail", exc_info=True)
  186. return True, response
  187. @staticmethod
  188. def eg_to_str(eg_filed: str, max_eg: int):
  189. eg = eg_filed.split("@@")
  190. eg_str = ""
  191. count_eg = 0
  192. for e in eg:
  193. count_eg += 1
  194. if max_eg != -1 and count_eg > max_eg:
  195. break
  196. ec = e.split("##")
  197. if len(ec) == 2:
  198. eng, chi = ec
  199. else:
  200. eng = ec[0]
  201. chi = ""
  202. if len(eng.replace(" ", "")) == 0:
  203. continue
  204. eg_str += f"{eng} ({chi})\n"
  205. return eg_str
  206. def export_frame(self, max_eg: int = 3) -> Optional[pandas.DataFrame]:
  207. res = self.search(columns=["box", "word", "part", "english", "chinese", "eg"],
  208. table="Word",
  209. order_by=[('box', 'DESC')])
  210. if res is None:
  211. return None
  212. df = pandas.DataFrame(columns=["Box", "Word", "Part", "English", "Chinese", "Eg"])
  213. export = []
  214. for i in res:
  215. if i[1] in export:
  216. continue
  217. export.append(i[1])
  218. eg_str = self.eg_to_str(i[5], max_eg)
  219. df = df.append({"Box": str(i[0]),
  220. "Word": str(i[1]),
  221. "Part": str(i[2]),
  222. "English": str(i[3]),
  223. "Chinese": str(i[4]),
  224. "Eg": eg_str}, ignore_index=True)
  225. self.__logger.debug(f"export word {i[1]}")
  226. return df
  227. def delete_txt(self, line: str):
  228. count = 0
  229. word_list = self.word_pattern.findall(line)
  230. for w in word_list:
  231. cur = self.delete(table="Word", where=f"LOWER(word)='{w.lower()}'")
  232. if cur[1].rowcount != -1:
  233. self.__logger.debug(f"delete word {w} success")
  234. count += 1
  235. else:
  236. self.__logger.debug(f"delete word {w} fail")
  237. return count
  238. def delete_all(self):
  239. self.__logger.debug(f"delete all word")
  240. cur = self.delete(table="Word")
  241. return cur[1].rowcount