mysql.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import pymysql.cursors
  2. import pymysql
  3. import threading
  4. from sql.base import Database, DBException, DBCloseException
  5. from typing import Optional, Union, List, Tuple, Dict
  6. import inspect
  7. class MysqlConnectException(DBCloseException):
  8. """Mysql Connect error"""
  9. class MysqlDB(Database):
  10. def __init__(self,
  11. host: Optional[str],
  12. name: Optional[str],
  13. passwd: Optional[str],
  14. port: Optional[str],
  15. database: str = "HBlog"):
  16. if host is None or name is None:
  17. raise DBException
  18. super(MysqlDB, self).__init__(host=host, name=name, passwd=passwd, port=port)
  19. self.database = database
  20. try:
  21. self._db = pymysql.connect(user=self._name,
  22. password=self._passwd,
  23. host=self._host,
  24. port=self._port,
  25. database=self.database)
  26. except pymysql.err.OperationalError:
  27. raise
  28. self._cursor = self._db.cursor()
  29. self._lock = threading.RLock()
  30. self.logger.info(f"MySQL({self._name}@{self._host}) connect")
  31. def close(self):
  32. self._close()
  33. def is_connect(self) -> bool:
  34. if self._cursor is None or self._db is None:
  35. return False
  36. try:
  37. self._db.ping(True)
  38. except Exception:
  39. return False
  40. else:
  41. return True
  42. def get_cursor(self) -> pymysql.cursors.Cursor:
  43. if self._cursor is None or self._db is None:
  44. raise DBCloseException
  45. return self._cursor
  46. def search(self, sql: str, *args) -> Union[None, pymysql.cursors.Cursor]:
  47. return self.__search(sql, args)
  48. def insert(self, sql: str, *args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  49. return self.__done(sql, args, not_commit=not_commit)
  50. def delete(self, sql: str, *args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  51. return self.__done(sql, args, not_commit=not_commit)
  52. def update(self, sql: str, *args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  53. return self.__done(sql, args, not_commit=not_commit)
  54. def commit(self):
  55. self._commit()
  56. def __search(self, sql, args) -> Union[None, pymysql.cursors.Cursor]:
  57. try:
  58. self._lock.acquire() # 上锁
  59. if not self.is_connect():
  60. self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
  61. return
  62. self._cursor.execute(query=sql, args=args)
  63. except pymysql.MySQLError:
  64. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} with {args} error {inspect.stack()[2][2]} "
  65. f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
  66. return
  67. finally:
  68. self._lock.release() # 释放锁
  69. return self._cursor
  70. def __done(self, sql, args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  71. try:
  72. self._lock.acquire()
  73. if not self.is_connect():
  74. self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
  75. return
  76. self._cursor.execute(query=sql, args=args)
  77. except pymysql.MySQLError:
  78. self._db.rollback()
  79. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} error {inspect.stack()[2][2]} "
  80. f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
  81. return
  82. finally:
  83. if not not_commit:
  84. self._db.commit()
  85. self._lock.release()
  86. return self._cursor
  87. def _connect(self):
  88. if self._db is None:
  89. raise MysqlConnectException
  90. try:
  91. self._db.ping(False)
  92. except Exception:
  93. raise MysqlConnectException
  94. def _close(self):
  95. if self._cursor is not None:
  96. self._cursor.close()
  97. if self._db is not None:
  98. self._db.close()
  99. self._db = None
  100. self._cursor = None
  101. self._lock = None
  102. self.logger.warning(f"MySQL({self._name}@{self._host}) connect close")
  103. def _commit(self):
  104. try:
  105. self._lock.acquire()
  106. if not self.is_connect():
  107. self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
  108. return
  109. self._db.commit()
  110. except pymysql.MySQLError:
  111. self.logger.error(f"MySQL({self._name}@{self._host}) commit error", exec_info=True)
  112. finally:
  113. self._lock.release()