mysql.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. import pymysql
  2. from dbutils.pooled_db import PooledDB
  3. from dbutils.steady_db import SteadyDBCursor
  4. from sql.base import Database, DBException, DBCloseException
  5. from typing import Optional, Union
  6. import inspect
  7. class MysqlConnectException(DBCloseException):
  8. """Mysql Connect error"""
  9. class MysqlDB(Database):
  10. class Result:
  11. def __init__(self, cur: SteadyDBCursor):
  12. self.res: list = cur.fetchall()
  13. self.lastrowid: int = cur.lastrowid
  14. self.rowcount: int = cur.rowcount
  15. def fetchall(self):
  16. return self.res
  17. def fetchone(self):
  18. return self.res[0]
  19. def __iter__(self):
  20. return self.res.__iter__()
  21. class Connection:
  22. def __init__(self, conn):
  23. self.conn = conn
  24. self.cur = conn.cursor()
  25. def get_cursor(self):
  26. return self.cur
  27. def commit(self):
  28. self.conn.commit()
  29. def rollback(self):
  30. self.conn.rollback()
  31. def close(self):
  32. self.cur.close()
  33. self.conn.close()
  34. def __init__(self,
  35. host: Optional[str],
  36. name: Optional[str],
  37. passwd: Optional[str],
  38. port: Optional[str],
  39. database: str = "HBlog"):
  40. if host is None or name is None:
  41. raise DBException
  42. super(MysqlDB, self).__init__(host=host, name=name, passwd=passwd, port=port)
  43. self.database = database
  44. self.pool = PooledDB(pymysql,
  45. mincached=1,
  46. maxcached=4,
  47. maxconnections=16,
  48. blocking=True,
  49. host=self._host,
  50. port=self._port,
  51. user=self._name,
  52. passwd=self._passwd,
  53. db=self.database)
  54. self.logger.info(f"MySQL({self._name}@{self._host}) connect")
  55. def get_connection(self):
  56. return MysqlDB.Connection(self.pool.connection())
  57. def search(self, sql: str, *args) -> Union[None, Result]:
  58. return self.__search(sql, args)
  59. def insert(self, sql: str, *args, connection: Connection = None) -> Union[None, Result]:
  60. return self.__done(sql, args, connection)
  61. def delete(self, sql: str, *args, connection: Connection = None) -> Union[None, Result]:
  62. return self.__done(sql, args, connection)
  63. def update(self, sql: str, *args, connection: Connection = None) -> Union[None, Result]:
  64. return self.__done(sql, args, connection)
  65. def __search(self, sql, args) -> Union[None, Result]:
  66. conn = self.pool.connection()
  67. cur = conn.cursor()
  68. try:
  69. cur.execute(query=sql, args=args)
  70. except pymysql.MySQLError:
  71. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} with {args} error {inspect.stack()[2][2]} "
  72. f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
  73. return None
  74. else:
  75. return MysqlDB.Result(cur)
  76. finally:
  77. cur.close()
  78. conn.close()
  79. def __done(self, sql, args, connection: Connection = None) -> Union[None, Result]:
  80. if connection:
  81. cur = connection.get_cursor()
  82. conn = None
  83. else:
  84. conn = self.pool.connection()
  85. cur = conn.cursor()
  86. try:
  87. cur.execute(query=sql, args=args)
  88. if conn:
  89. conn.commit()
  90. except pymysql.MySQLError:
  91. if conn:
  92. conn.rollback()
  93. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} error {inspect.stack()[2][2]} "
  94. f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
  95. return None
  96. else:
  97. return MysqlDB.Result(cur)
  98. finally:
  99. if not connection:
  100. cur.close()
  101. conn.close()