mysql.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import pymysql.cursors
  2. import pymysql
  3. import threading
  4. from sql.base import Database, DBException, DBCloseException
  5. from typing import Optional, Union, List, Tuple, Dict
  6. import inspect
  7. class MysqlConnectException(DBCloseException):
  8. """Mysql Connect error"""
  9. class MysqlDB(Database):
  10. def __init__(self,
  11. host: Optional[str],
  12. name: Optional[str],
  13. passwd: Optional[str],
  14. port: Optional[str],
  15. database: str = "HBlog"):
  16. if host is None or name is None:
  17. raise DBException
  18. super(MysqlDB, self).__init__(host=host, name=name, passwd=passwd, port=port)
  19. self.database = database
  20. try:
  21. self._db = pymysql.connect(user=self._name,
  22. password=self._passwd,
  23. host=self._host,
  24. port=self._port,
  25. database=self.database)
  26. except pymysql.err.OperationalError:
  27. raise
  28. # mysql 不算线程安全的
  29. self._cursor = self._db.cursor()
  30. self._lock = threading.RLock()
  31. self.logger.info(f"MySQL({self._name}@{self._host}) connect")
  32. def close(self):
  33. self._close()
  34. def is_connect(self) -> bool:
  35. if self._cursor is None or self._db is None:
  36. return False
  37. try:
  38. self._db.ping(True)
  39. except Exception:
  40. return False
  41. else:
  42. return True
  43. def get_cursor(self) -> pymysql.cursors.Cursor:
  44. if self._cursor is None or self._db is None:
  45. raise DBCloseException
  46. return self._cursor
  47. def search(self, sql: str, *args) -> Union[None, pymysql.cursors.Cursor]:
  48. return self.__search(sql, args)
  49. def insert(self, sql: str, *args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  50. return self.__done(sql, args, not_commit=not_commit)
  51. def delete(self, sql: str, *args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  52. return self.__done(sql, args, not_commit=not_commit)
  53. def update(self, sql: str, *args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  54. return self.__done(sql, args, not_commit=not_commit)
  55. def commit(self):
  56. self._commit()
  57. def __search(self, sql, args) -> Union[None, pymysql.cursors.Cursor]:
  58. try:
  59. self._lock.acquire() # 上锁
  60. if not self.is_connect():
  61. self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
  62. return
  63. self._cursor.execute(query=sql, args=args)
  64. except pymysql.MySQLError:
  65. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} with {args} error {inspect.stack()[2][2]} "
  66. f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
  67. return
  68. finally:
  69. self._lock.release() # 释放锁
  70. return self._cursor
  71. def __done(self, sql, args, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  72. try:
  73. self._lock.acquire()
  74. if not self.is_connect():
  75. self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
  76. return
  77. self._cursor.execute(query=sql, args=args)
  78. except pymysql.MySQLError:
  79. self._db.rollback()
  80. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} error {inspect.stack()[2][2]} "
  81. f"{inspect.stack()[2][1]} {inspect.stack()[2][3]}", exc_info=True, stack_info=True)
  82. return
  83. finally:
  84. if not not_commit:
  85. self._db.commit()
  86. self._lock.release()
  87. return self._cursor
  88. def _connect(self):
  89. if self._db is None:
  90. raise MysqlConnectException
  91. try:
  92. self._db.ping(False)
  93. except Exception:
  94. raise MysqlConnectException
  95. def _close(self):
  96. if self._cursor is not None:
  97. self._cursor.close()
  98. if self._db is not None:
  99. self._db.close()
  100. self._db = None
  101. self._cursor = None
  102. self._lock = None
  103. self.logger.warning(f"MySQL({self._name}@{self._host}) connect close")
  104. def _commit(self):
  105. try:
  106. self._lock.acquire()
  107. if not self.is_connect():
  108. self.logger.error(f"MySQL({self._name}@{self._host}) connect error")
  109. return
  110. self._db.commit()
  111. except pymysql.MySQLError:
  112. self.logger.error(f"MySQL({self._name}@{self._host}) commit error", exec_info=True)
  113. finally:
  114. self._lock.release()