1
0

mysql.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import pymysql.cursors
  2. import pymysql
  3. import threading
  4. from sql.base import Database, DBException, DBCloseException
  5. from typing import Optional, Union, List, Tuple, Dict
  6. import inspect
  7. class MysqlConnectException(DBCloseException):
  8. """Mysql Connect error"""
  9. class MysqlDB(Database):
  10. def __init__(self,
  11. host: Optional[str],
  12. name: Optional[str],
  13. passwd: Optional[str],
  14. port: Optional[str],
  15. database: str = "HBlog"):
  16. if host is None or name is None:
  17. raise DBException
  18. super(MysqlDB, self).__init__(host=host, name=name, passwd=passwd, port=port)
  19. self.database = database
  20. try:
  21. self._db = pymysql.connect(user=self._name,
  22. password=self._passwd,
  23. host=self._host,
  24. port=self._port,
  25. database=self.database)
  26. except pymysql.err.OperationalError:
  27. raise
  28. self._cursor = self._db.cursor()
  29. self._lock = threading.RLock()
  30. self.logger.info(f"MySQL({self._name}@{self._host}) connect")
  31. def close(self):
  32. self._close()
  33. def is_connect(self) -> bool:
  34. if self._cursor is None or self._db is None:
  35. return False
  36. try:
  37. self._db.ping(True)
  38. except Exception:
  39. return False
  40. else:
  41. return True
  42. def get_cursor(self) -> pymysql.cursors.Cursor:
  43. if self._cursor is None or self._db is None:
  44. raise DBCloseException
  45. return self._cursor
  46. def search(self, sql: str, *args) -> Union[None, pymysql.cursors.Cursor]:
  47. return self.__search(sql, args)
  48. def insert(self, table: str, columns: list, values: Union[str, List[str]],
  49. not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  50. columns: str = ", ".join(columns)
  51. if type(values) is str:
  52. values: str = f"({values})"
  53. else:
  54. values: str = ", ".join(f"{v}" for v in values)
  55. return self.__done(f"INSERT INTO {table}({columns}) VALUES {values};", not_commit=not_commit)
  56. def delete(self, table: str, where: Union[str, List[str]] = None,
  57. not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  58. if type(where) is list and len(where) > 0:
  59. where: str = " AND ".join(f"({w})" for w in where)
  60. elif type(where) is not str or len(where) == 0: # 必须指定条件
  61. return None
  62. return self.__done(f"DELETE FROM {table} WHERE {where};", not_commit=not_commit)
  63. def update(self, table: str, kw: "Dict[str:str]", where: Union[str, List[str]] = None,
  64. not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  65. if len(kw) == 0:
  66. return None
  67. if type(where) is list and len(where) > 0:
  68. where: str = " AND ".join(f"({w})" for w in where)
  69. elif type(where) is not str or len(where) == 0: # 必须指定条件
  70. return None
  71. kw_list = [f"{key} = {kw[key]}" for key in kw]
  72. kw_str = ", ".join(kw_list)
  73. return self.__done(f"UPDATE {table} SET {kw_str} WHERE {where};", not_commit=not_commit)
  74. def commit(self):
  75. self._commit()
  76. def __search(self, sql, args) -> Union[None, pymysql.cursors.Cursor]:
  77. try:
  78. self._lock.acquire() # 上锁
  79. if not self.is_connect():
  80. self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
  81. return
  82. self._cursor.execute(query=sql, args=args)
  83. except pymysql.MySQLError:
  84. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} with {args} error {inspect.stack()[2][2]} "
  85. f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
  86. return
  87. finally:
  88. self._lock.release() # 释放锁
  89. return self._cursor
  90. def __done(self, sql, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  91. try:
  92. self._lock.acquire()
  93. if not self.is_connect():
  94. self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
  95. return
  96. self._cursor.execute(sql)
  97. except pymysql.MySQLError:
  98. self._db.rollback()
  99. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} error {inspect.stack()[2][2]} "
  100. f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
  101. return
  102. finally:
  103. if not not_commit:
  104. self._db.commit()
  105. self._lock.release()
  106. return self._cursor
  107. def _connect(self):
  108. if self._db is None:
  109. raise MysqlConnectException
  110. try:
  111. self._db.ping(False)
  112. except Exception:
  113. raise MysqlConnectException
  114. def _close(self):
  115. if self._cursor is not None:
  116. self._cursor.close()
  117. if self._db is not None:
  118. self._db.close()
  119. self._db = None
  120. self._cursor = None
  121. self._lock = None
  122. self.logger.warning(f"MySQL({self._name}@{self._host}) connect close")
  123. def _commit(self):
  124. try:
  125. self._lock.acquire()
  126. if not self.is_connect():
  127. self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
  128. return
  129. self._db.commit()
  130. except pymysql.MySQLError:
  131. self.logger.error(f"MySQL({self._name}@{self._host}) commit error", exec_info=True)
  132. finally:
  133. self._lock.release()