123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- import pymysql.cursors
- import pymysql
- import threading
- from sql.base import Database, DBException, DBCloseException
- from typing import Optional, Union, List, Tuple, Dict
- import inspect
- class MysqlConnectException(DBCloseException):
- """Mysql Connect error"""
- class MysqlDB(Database):
- def __init__(self,
- host: Optional[str],
- name: Optional[str],
- passwd: Optional[str],
- port: Optional[str],
- database: str = "HBlog"):
- if host is None or name is None:
- raise DBException
- super(MysqlDB, self).__init__(host=host, name=name, passwd=passwd, port=port)
- self.database = database
- try:
- self._db = pymysql.connect(user=self._name,
- password=self._passwd,
- host=self._host,
- port=self._port,
- database=self.database)
- except pymysql.err.OperationalError:
- raise
- # mysql 不算线程安全的
- self._cursor = self._db.cursor()
- self._lock = threading.RLock()
- self.logger.info(f"MySQL({self._name}@{self._host}) connect")
- def close(self):
- self._close()
- def is_connect(self) -> bool:
- if self._cursor is None or self._db is None:
- return False
- try:
- self._db.ping(True)
- except Exception:
- return False
- else:
- return True
- def get_cursor(self) -> pymysql.cursors.Cursor:
- if self._cursor is None or self._db is None:
- raise DBCloseException
- return self._cursor
- def search(self, sql: str, *args) -> Union[None, pymysql.cursors.Cursor]:
- return self.__search(sql, args)
- def insert(self, sql: str, *args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
- return self.__done(sql, args, not_commit=not_commit)
- def delete(self, sql: str, *args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
- return self.__done(sql, args, not_commit=not_commit)
- def update(self, sql: str, *args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
- return self.__done(sql, args, not_commit=not_commit)
- def commit(self):
- self._commit()
- def __search(self, sql, args) -> Union[None, pymysql.cursors.Cursor]:
- try:
- self._lock.acquire() # 上锁
- if not self.is_connect():
- self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
- return
- self._cursor.execute(query=sql, args=args)
- except pymysql.MySQLError:
- self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} with {args} error {inspect.stack()[2][2]} "
- f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
- return
- finally:
- self._lock.release() # 释放锁
- return self._cursor
- def __done(self, sql, args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
- try:
- self._lock.acquire()
- if not self.is_connect():
- self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
- return
- self._cursor.execute(query=sql, args=args)
- except pymysql.MySQLError:
- self._db.rollback()
- self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} error {inspect.stack()[2][2]} "
- f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
- return
- finally:
- if not not_commit:
- self._db.commit()
- self._lock.release()
- return self._cursor
- def _connect(self):
- if self._db is None:
- raise MysqlConnectException
- try:
- self._db.ping(False)
- except Exception:
- raise MysqlConnectException
- def _close(self):
- if self._cursor is not None:
- self._cursor.close()
- if self._db is not None:
- self._db.close()
- self._db = None
- self._cursor = None
- self._lock = None
- self.logger.warning(f"MySQL({self._name}@{self._host}) connect close")
- def _commit(self):
- try:
- self._lock.acquire()
- if not self.is_connect():
- self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
- return
- self._db.commit()
- except pymysql.MySQLError:
- self.logger.error(f"MySQL({self._name}@{self._host}) commit error", exec_info=True)
- finally:
- self._lock.release()
|