Bläddra i källkod

feat: 使用连接池

SongZihuan 2 år sedan
förälder
incheckning
b11a2f8f38
3 ändrade filer med 56 tillägg och 115 borttagningar
  1. BIN
      requirements.txt
  2. 0 22
      sql/base.py
  3. 56 93
      sql/mysql.py

BIN
requirements.txt


+ 0 - 22
sql/base.py

@@ -41,28 +41,6 @@ class Database(metaclass=abc.ABCMeta):
             handle.setFormatter(logging.Formatter(conf["LOG_FORMAT"]))
             self.logger.addHandler(handle)
 
-    @abc.abstractmethod
-    def close(self):
-        """
-        关闭数据库, 此代码执行后任何成员函数再被调用其行为是未定义的
-        :return:
-        """
-        ...
-
-    @abc.abstractmethod
-    def is_connect(self) -> bool:
-        """
-        :return: 是否处于连接状态
-        """
-        ...
-
-    @abc.abstractmethod
-    def get_cursor(self) -> any:
-        """
-        :return: 返回数据库游标
-        """
-        ...
-
     @abc.abstractmethod
     def search(self, sql: str, *args, not_commit: bool = False):
         """

+ 56 - 93
sql/mysql.py

@@ -1,8 +1,9 @@
 import pymysql.cursors
 import pymysql
+from dbutils.pooled_db import PooledDB
 import threading
 from sql.base import Database, DBException, DBCloseException
-from typing import Optional, Union, List, Tuple, Dict
+from typing import Optional, Union
 import inspect
 
 
@@ -11,6 +12,21 @@ class MysqlConnectException(DBCloseException):
 
 
 class MysqlDB(Database):
+    class Result:
+        def __init__(self, cur: pymysql.cursors):
+            self.res: list = cur.fetchall()
+            self.lastrowid: int = cur.lastrowid
+            self.rowcount: int = cur.rowcount
+
+        def fetchall(self):
+            return self.res
+
+        def fetchone(self):
+            return self.res[0]
+
+        def __iter__(self):
+            return self.res.__iter__()
+
     def __init__(self,
                  host: Optional[str],
                  name: Optional[str],
@@ -23,114 +39,61 @@ class MysqlDB(Database):
         super(MysqlDB, self).__init__(host=host, name=name, passwd=passwd, port=port)
         self.database = database
 
-        try:
-            self._db = pymysql.connect(user=self._name,
-                                       password=self._passwd,
-                                       host=self._host,
-                                       port=self._port,
-                                       database=self.database)
-        except pymysql.err.OperationalError:
-            raise
-
-        # mysql 不算线程安全的
-        self._cursor = self._db.cursor()
-        self._lock = threading.RLock()
-        self.logger.info(f"MySQL({self._name}@{self._host}) connect")
-
-    def close(self):
-        self._close()
-
-    def is_connect(self) -> bool:
-        if self._cursor is None or self._db is None:
-            return False
-
-        try:
-            self._db.ping(True)
-        except Exception:
-            return False
-        else:
-            return True
+        self.pool = PooledDB(pymysql,
+                             mincached=1,
+                             maxcached=4,
+                             maxconnections=16,
+                             blocking=True,
+                             host=self._host,
+                             port=self._port,
+                             user=self._name,
+                             passwd=self._passwd,
+                             db=self.database)
 
-    def get_cursor(self) -> pymysql.cursors.Cursor:
-        if self._cursor is None or self._db is None:
-            raise DBCloseException
-        return self._cursor
+        self.logger.info(f"MySQL({self._name}@{self._host}) connect")
 
-    def search(self, sql: str, *args) -> Union[None, pymysql.cursors.Cursor]:
+    def search(self, sql: str, *args) -> Union[None, Result]:
         return self.__search(sql, args)
 
-    def insert(self, sql: str, *args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
-        return self.__done(sql, args, not_commit=not_commit)
+    def insert(self, sql: str, *args) -> Union[None, Result]:
+        return self.__done(sql, args)
 
-    def delete(self, sql: str, *args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
-        return self.__done(sql, args, not_commit=not_commit)
+    def delete(self, sql: str, *args) -> Union[None, Result]:
+        return self.__done(sql, args)
 
-    def update(self, sql: str, *args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
-        return self.__done(sql, args, not_commit=not_commit)
+    def update(self, sql: str, *args) -> Union[None, Result]:
+        return self.__done(sql, args)
 
-    def commit(self):
-        self._commit()
+    def __search(self, sql, args) -> Union[None, Result]:
+        conn = self.pool.connection()
+        cur = conn.cursor()
 
-    def __search(self, sql, args) -> Union[None, pymysql.cursors.Cursor]:
         try:
-            self._lock.acquire()  # 上锁
-            if not self.is_connect():
-                self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
-                return
-            self._cursor.execute(query=sql, args=args)
+            cur.execute(query=sql, args=args)
         except pymysql.MySQLError:
             self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} with {args} error {inspect.stack()[2][2]} "
                               f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
-            return
+            return None
+        else:
+            return MysqlDB.Result(cur)
         finally:
-            self._lock.release()  # 释放锁
-        return self._cursor
+            cur.close()
+            conn.close()
+
+    def __done(self, sql, args) -> Union[None, Result]:
+        conn = self.pool.connection()
+        cur = conn.cursor()
 
-    def __done(self, sql, args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
         try:
-            self._lock.acquire()
-            if not self.is_connect():
-                self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
-                return
-            self._cursor.execute(query=sql, args=args)
+            cur.execute(query=sql, args=args)
+            conn.commit()
         except pymysql.MySQLError:
-            self._db.rollback()
+            conn.rollback()
             self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} error {inspect.stack()[2][2]} "
                               f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
-            return
-        finally:
-            if not not_commit:
-                self._db.commit()
-            self._lock.release()
-        return self._cursor
-
-    def _connect(self):
-        if self._db is None:
-            raise MysqlConnectException
-
-        try:
-            self._db.ping(False)
-        except Exception:
-            raise MysqlConnectException
-
-    def _close(self):
-        if self._cursor is not None:
-            self._cursor.close()
-        if self._db is not None:
-            self._db.close()
-        self._db = None
-        self._cursor = None
-        self._lock = None
-        self.logger.warning(f"MySQL({self._name}@{self._host}) connect close")
-
-    def _commit(self):
-        try:
-            self._lock.acquire()
-            if not self.is_connect():
-                self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
-                return
-            self._db.commit()
-        except pymysql.MySQLError:
-            self.logger.error(f"MySQL({self._name}@{self._host}) commit error", exec_info=True)
+            return None
+        else:
+            return MysqlDB.Result(cur)
         finally:
-            self._lock.release()
+            cur.close()
+            conn.close()