mysql.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. import pymysql.cursors
  2. import pymysql
  3. import threading
  4. from sql.base import Database, DBException, DBCloseException
  5. from typing import Optional, Union, List, Tuple, Dict
  6. import inspect
  7. class MysqlConnectException(DBCloseException):
  8. """Mysql Connect error"""
  9. class MysqlDB(Database):
  10. def __init__(self,
  11. host: Optional[str],
  12. name: Optional[str],
  13. passwd: Optional[str],
  14. port: Optional[str]):
  15. if host is None or name is None:
  16. raise DBException
  17. super(MysqlDB, self).__init__(host=host, name=name, passwd=passwd, port=port)
  18. try:
  19. self._db = pymysql.connect(user=self._name,
  20. password=self._passwd,
  21. host=self._host,
  22. port=self._port,
  23. database="HBlog")
  24. except pymysql.err.OperationalError:
  25. raise
  26. self._cursor = self._db.cursor()
  27. self._lock = threading.RLock()
  28. self.logger.info(f"MySQL({self._name}@{self._host}) connect")
  29. def close(self):
  30. self._close()
  31. def is_connect(self) -> bool:
  32. if self._cursor is None or self._db is None:
  33. return False
  34. try:
  35. self._db.ping(True)
  36. except Exception:
  37. return False
  38. else:
  39. return True
  40. def get_cursor(self) -> pymysql.cursors.Cursor:
  41. if self._cursor is None or self._db is None:
  42. raise DBCloseException
  43. return self._cursor
  44. def search(self, columns: List[str], table: str,
  45. where: Union[str, List[str]] = None,
  46. limit: Optional[int] = None,
  47. offset: Optional[int] = None,
  48. order_by: Optional[List[Tuple[str, str]]] = None,
  49. group_by: Optional[List[str]] = None,
  50. for_update: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  51. if type(where) is list and len(where) > 0:
  52. where: str = " WHERE " + " AND ".join(f"({w})" for w in where)
  53. elif type(where) is str and len(where) > 0:
  54. where = " WHERE " + where
  55. else:
  56. where: str = ""
  57. if order_by is None:
  58. order_by: str = ""
  59. else:
  60. by = [f" {i[0]} {i[1]} " for i in order_by]
  61. order_by: str = " ORDER BY" + ", ".join(by)
  62. if limit is None or limit == 0:
  63. limit: str = ""
  64. else:
  65. limit = f" LIMIT {limit}"
  66. if offset is None:
  67. offset: str = ""
  68. else:
  69. offset = f" OFFSET {offset}"
  70. if group_by is None:
  71. group_by: str = ""
  72. else:
  73. group_by = "GROUP BY " + ", ".join(group_by)
  74. columns: str = ", ".join(columns)
  75. if for_update:
  76. for_update = "FOR UPDATE"
  77. else:
  78. for_update = ""
  79. return self.__search(f"SELECT {columns} "
  80. f"FROM {table} "
  81. f"{where} {group_by} {order_by} {limit} {offset} {for_update};")
  82. def insert(self, table: str, columns: list, values: Union[str, List[str]],
  83. not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  84. columns: str = ", ".join(columns)
  85. if type(values) is str:
  86. values: str = f"({values})"
  87. else:
  88. values: str = ", ".join(f"{v}" for v in values)
  89. return self.__done(f"INSERT INTO {table}({columns}) VALUES {values};", not_commit=not_commit)
  90. def delete(self, table: str, where: Union[str, List[str]] = None,
  91. not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  92. if type(where) is list and len(where) > 0:
  93. where: str = " AND ".join(f"({w})" for w in where)
  94. elif type(where) is not str or len(where) == 0: # 必须指定条件
  95. return None
  96. return self.__done(f"DELETE FROM {table} WHERE {where};", not_commit=not_commit)
  97. def update(self, table: str, kw: "Dict[str:str]", where: Union[str, List[str]] = None,
  98. not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  99. if len(kw) == 0:
  100. return None
  101. if type(where) is list and len(where) > 0:
  102. where: str = " AND ".join(f"({w})" for w in where)
  103. elif type(where) is not str or len(where) == 0: # 必须指定条件
  104. return None
  105. kw_list = [f"{key} = {kw[key]}" for key in kw]
  106. kw_str = ", ".join(kw_list)
  107. return self.__done(f"UPDATE {table} SET {kw_str} WHERE {where};", not_commit=not_commit)
  108. def commit(self):
  109. self._commit()
  110. def __search(self, sql) -> Union[None, pymysql.cursors.Cursor]:
  111. try:
  112. self._lock.acquire() # 上锁
  113. if not self.is_connect():
  114. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} connect error")
  115. return
  116. self._cursor.execute(sql)
  117. except pymysql.MySQLError:
  118. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} error {inspect.stack()[2][2]} "
  119. f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
  120. return
  121. finally:
  122. self._lock.release() # 释放锁
  123. return self._cursor
  124. def __done(self, sql, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  125. try:
  126. self._lock.acquire()
  127. if not self.is_connect():
  128. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} connect error")
  129. return
  130. self._cursor.execute(sql)
  131. except pymysql.MySQLError:
  132. self._db.rollback()
  133. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} error {inspect.stack()[2][2]} "
  134. f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
  135. return
  136. finally:
  137. if not not_commit:
  138. self._db.commit()
  139. self._lock.release()
  140. return self._cursor
  141. def _connect(self):
  142. if self._db is None:
  143. raise MysqlConnectException
  144. try:
  145. self._db.ping(False)
  146. except Exception:
  147. raise MysqlConnectException
  148. def _close(self):
  149. if self._cursor is not None:
  150. self._cursor.close()
  151. if self._db is not None:
  152. self._db.close()
  153. self._db = None
  154. self._cursor = None
  155. self._lock = None
  156. self.logger.warning(f"MySQL({self._name}@{self._host}) connect close")
  157. def _commit(self):
  158. try:
  159. self._lock.acquire()
  160. if not self.is_connect():
  161. self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
  162. return
  163. self._db.commit()
  164. except pymysql.MySQLError:
  165. self.logger.error(f"MySQL({self._name}@{self._host}) commit error", exec_info=True)
  166. finally:
  167. self._lock.release()