mysql.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. import pymysql.cursors
  2. import pymysql
  3. import threading
  4. from sql.base import Database, DBException, DBCloseException
  5. from typing import Optional, Union, List, Tuple, Dict
  6. import inspect
  7. class MysqlConnectException(DBCloseException):
  8. """Mysql Connect error"""
  9. class MysqlDB(Database):
  10. def __init__(self,
  11. host: Optional[str],
  12. name: Optional[str],
  13. passwd: Optional[str],
  14. port: Optional[str],
  15. database: str = "HBlog"):
  16. if host is None or name is None:
  17. raise DBException
  18. super(MysqlDB, self).__init__(host=host, name=name, passwd=passwd, port=port)
  19. self.database = database
  20. try:
  21. self._db = pymysql.connect(user=self._name,
  22. password=self._passwd,
  23. host=self._host,
  24. port=self._port,
  25. database=self.database)
  26. except pymysql.err.OperationalError:
  27. raise
  28. self._cursor = self._db.cursor()
  29. self._lock = threading.RLock()
  30. self.logger.info(f"MySQL({self._name}@{self._host}) connect")
  31. def close(self):
  32. self._close()
  33. def is_connect(self) -> bool:
  34. if self._cursor is None or self._db is None:
  35. return False
  36. try:
  37. self._db.ping(True)
  38. except Exception:
  39. return False
  40. else:
  41. return True
  42. def get_cursor(self) -> pymysql.cursors.Cursor:
  43. if self._cursor is None or self._db is None:
  44. raise DBCloseException
  45. return self._cursor
  46. def search(self, columns: List[str], table: str,
  47. where: Union[str, List[str]] = None,
  48. limit: Optional[int] = None,
  49. offset: Optional[int] = None,
  50. order_by: Optional[List[Tuple[str, str]]] = None,
  51. group_by: Optional[List[str]] = None,
  52. for_update: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  53. if type(where) is list and len(where) > 0:
  54. where: str = " WHERE " + " AND ".join(f"({w})" for w in where)
  55. elif type(where) is str and len(where) > 0:
  56. where = " WHERE " + where
  57. else:
  58. where: str = ""
  59. if order_by is None:
  60. order_by: str = ""
  61. else:
  62. by = [f" {i[0]} {i[1]} " for i in order_by]
  63. order_by: str = " ORDER BY" + ", ".join(by)
  64. if limit is None or limit == 0:
  65. limit: str = ""
  66. else:
  67. limit = f" LIMIT {limit}"
  68. if offset is None:
  69. offset: str = ""
  70. else:
  71. offset = f" OFFSET {offset}"
  72. if group_by is None:
  73. group_by: str = ""
  74. else:
  75. group_by = "GROUP BY " + ", ".join(group_by)
  76. columns: str = ", ".join(columns)
  77. if for_update:
  78. for_update = "FOR UPDATE"
  79. else:
  80. for_update = ""
  81. return self.__search(f"SELECT {columns} "
  82. f"FROM {table} "
  83. f"{where} {group_by} {order_by} {limit} {offset} {for_update};")
  84. def insert(self, table: str, columns: list, values: Union[str, List[str]],
  85. not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  86. columns: str = ", ".join(columns)
  87. if type(values) is str:
  88. values: str = f"({values})"
  89. else:
  90. values: str = ", ".join(f"{v}" for v in values)
  91. return self.__done(f"INSERT INTO {table}({columns}) VALUES {values};", not_commit=not_commit)
  92. def delete(self, table: str, where: Union[str, List[str]] = None,
  93. not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  94. if type(where) is list and len(where) > 0:
  95. where: str = " AND ".join(f"({w})" for w in where)
  96. elif type(where) is not str or len(where) == 0: # 必须指定条件
  97. return None
  98. return self.__done(f"DELETE FROM {table} WHERE {where};", not_commit=not_commit)
  99. def update(self, table: str, kw: "Dict[str:str]", where: Union[str, List[str]] = None,
  100. not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  101. if len(kw) == 0:
  102. return None
  103. if type(where) is list and len(where) > 0:
  104. where: str = " AND ".join(f"({w})" for w in where)
  105. elif type(where) is not str or len(where) == 0: # 必须指定条件
  106. return None
  107. kw_list = [f"{key} = {kw[key]}" for key in kw]
  108. kw_str = ", ".join(kw_list)
  109. return self.__done(f"UPDATE {table} SET {kw_str} WHERE {where};", not_commit=not_commit)
  110. def commit(self):
  111. self._commit()
  112. def __search(self, sql) -> Union[None, pymysql.cursors.Cursor]:
  113. try:
  114. self._lock.acquire() # 上锁
  115. if not self.is_connect():
  116. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} connect error")
  117. return
  118. self._cursor.execute(sql)
  119. except pymysql.MySQLError:
  120. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} error {inspect.stack()[2][2]} "
  121. f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
  122. return
  123. finally:
  124. self._lock.release() # 释放锁
  125. return self._cursor
  126. def __done(self, sql, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  127. try:
  128. self._lock.acquire()
  129. if not self.is_connect():
  130. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} connect error")
  131. return
  132. self._cursor.execute(sql)
  133. except pymysql.MySQLError:
  134. self._db.rollback()
  135. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} error {inspect.stack()[2][2]} "
  136. f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
  137. return
  138. finally:
  139. if not not_commit:
  140. self._db.commit()
  141. self._lock.release()
  142. return self._cursor
  143. def _connect(self):
  144. if self._db is None:
  145. raise MysqlConnectException
  146. try:
  147. self._db.ping(False)
  148. except Exception:
  149. raise MysqlConnectException
  150. def _close(self):
  151. if self._cursor is not None:
  152. self._cursor.close()
  153. if self._db is not None:
  154. self._db.close()
  155. self._db = None
  156. self._cursor = None
  157. self._lock = None
  158. self.logger.warning(f"MySQL({self._name}@{self._host}) connect close")
  159. def _commit(self):
  160. try:
  161. self._lock.acquire()
  162. if not self.is_connect():
  163. self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
  164. return
  165. self._db.commit()
  166. except pymysql.MySQLError:
  167. self.logger.error(f"MySQL({self._name}@{self._host}) commit error", exec_info=True)
  168. finally:
  169. self._lock.release()