mysql.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import pymysql.cursors
  2. import pymysql
  3. from dbutils.pooled_db import PooledDB
  4. import threading
  5. from sql.base import Database, DBException, DBCloseException
  6. from typing import Optional, Union
  7. import inspect
  8. class MysqlConnectException(DBCloseException):
  9. """Mysql Connect error"""
  10. class MysqlDB(Database):
  11. class Result:
  12. def __init__(self, cur: pymysql.cursors):
  13. self.res: list = cur.fetchall()
  14. self.lastrowid: int = cur.lastrowid
  15. self.rowcount: int = cur.rowcount
  16. def fetchall(self):
  17. return self.res
  18. def fetchone(self):
  19. return self.res[0]
  20. def __iter__(self):
  21. return self.res.__iter__()
  22. def __init__(self,
  23. host: Optional[str],
  24. name: Optional[str],
  25. passwd: Optional[str],
  26. port: Optional[str],
  27. database: str = "HBlog"):
  28. if host is None or name is None:
  29. raise DBException
  30. super(MysqlDB, self).__init__(host=host, name=name, passwd=passwd, port=port)
  31. self.database = database
  32. self.pool = PooledDB(pymysql,
  33. mincached=1,
  34. maxcached=4,
  35. maxconnections=16,
  36. blocking=True,
  37. host=self._host,
  38. port=self._port,
  39. user=self._name,
  40. passwd=self._passwd,
  41. db=self.database)
  42. self.logger.info(f"MySQL({self._name}@{self._host}) connect")
  43. def search(self, sql: str, *args) -> Union[None, Result]:
  44. return self.__search(sql, args)
  45. def insert(self, sql: str, *args) -> Union[None, Result]:
  46. return self.__done(sql, args)
  47. def delete(self, sql: str, *args) -> Union[None, Result]:
  48. return self.__done(sql, args)
  49. def update(self, sql: str, *args) -> Union[None, Result]:
  50. return self.__done(sql, args)
  51. def __search(self, sql, args) -> Union[None, Result]:
  52. conn = self.pool.connection()
  53. cur = conn.cursor()
  54. try:
  55. cur.execute(query=sql, args=args)
  56. except pymysql.MySQLError:
  57. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} with {args} error {inspect.stack()[2][2]} "
  58. f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
  59. return None
  60. else:
  61. return MysqlDB.Result(cur)
  62. finally:
  63. cur.close()
  64. conn.close()
  65. def __done(self, sql, args) -> Union[None, Result]:
  66. conn = self.pool.connection()
  67. cur = conn.cursor()
  68. try:
  69. cur.execute(query=sql, args=args)
  70. conn.commit()
  71. except pymysql.MySQLError:
  72. conn.rollback()
  73. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} error {inspect.stack()[2][2]} "
  74. f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
  75. return None
  76. else:
  77. return MysqlDB.Result(cur)
  78. finally:
  79. cur.close()
  80. conn.close()