1
0

mysql.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import pymysql.cursors
  2. import pymysql
  3. import threading
  4. from sql.base import Database, DBException, DBCloseException
  5. from typing import Optional, Union, List, Tuple, Dict
  6. class MysqlConnectException(DBCloseException):
  7. """Mysql Connect error"""
  8. class MysqlDB(Database):
  9. def __init__(self,
  10. host: Optional[str],
  11. name: Optional[str],
  12. passwd: Optional[str],
  13. port: Optional[str]):
  14. if host is None or name is None:
  15. raise DBException
  16. super(MysqlDB, self).__init__(host=host, name=name, passwd=passwd, port=port)
  17. try:
  18. self._db = pymysql.connect(user=self._name,
  19. password=self._passwd,
  20. host=self._host,
  21. port=self._port,
  22. database="HBlog")
  23. except pymysql.err.OperationalError:
  24. raise
  25. self._cursor = self._db.cursor()
  26. self._lock = threading.RLock()
  27. self.logger.info(f"MySQL({self._name}@{self._host}) connect")
  28. def close(self):
  29. self._close()
  30. def is_connect(self) -> bool:
  31. if self._cursor is None or self._db is None:
  32. return False
  33. try:
  34. self._db.ping(False)
  35. except Exception:
  36. return False
  37. else:
  38. return True
  39. def get_cursor(self) -> pymysql.cursors.Cursor:
  40. if self._cursor is None or self._db is None:
  41. raise DBCloseException
  42. return self._cursor
  43. def search(self, columns: List[str], table: str,
  44. where: Union[str, List[str]] = None,
  45. limit: Optional[int] = None,
  46. offset: Optional[int] = None,
  47. order_by: Optional[List[Tuple[str, str]]] = None,
  48. group_by: Optional[List[str]] = None,
  49. for_update: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  50. if type(where) is list and len(where) > 0:
  51. where: str = " WHERE " + " AND ".join(f"({w})" for w in where)
  52. elif type(where) is str and len(where) > 0:
  53. where = " WHERE " + where
  54. else:
  55. where: str = ""
  56. if order_by is None:
  57. order_by: str = ""
  58. else:
  59. by = [f" {i[0]} {i[1]} " for i in order_by]
  60. order_by: str = " ORDER BY" + ", ".join(by)
  61. if limit is None or limit == 0:
  62. limit: str = ""
  63. else:
  64. limit = f" LIMIT {limit}"
  65. if offset is None:
  66. offset: str = ""
  67. else:
  68. offset = f" OFFSET {offset}"
  69. if group_by is None:
  70. group_by: str = ""
  71. else:
  72. group_by = "GROUP BY " + ", ".join(group_by)
  73. columns: str = ", ".join(columns)
  74. if for_update:
  75. for_update = "FOR UPDATE"
  76. else:
  77. for_update = ""
  78. return self.__search(f"SELECT {columns} "
  79. f"FROM {table} "
  80. f"{where} {group_by} {order_by} {limit} {offset} {for_update};")
  81. def insert(self, table: str, columns: list, values: Union[str, List[str]],
  82. not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  83. columns: str = ", ".join(columns)
  84. if type(values) is str:
  85. values: str = f"({values})"
  86. else:
  87. values: str = ", ".join(f"{v}" for v in values)
  88. return self.__done(f"INSERT INTO {table}({columns}) VALUES {values};", not_commit=not_commit)
  89. def delete(self, table: str, where: Union[str, List[str]] = None,
  90. not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  91. if type(where) is list and len(where) > 0:
  92. where: str = " AND ".join(f"({w})" for w in where)
  93. elif type(where) is not str or len(where) == 0: # 必须指定条件
  94. return None
  95. return self.__done(f"DELETE FROM {table} WHERE {where};", not_commit=not_commit)
  96. def update(self, table: str, kw: "Dict[str:str]", where: Union[str, List[str]] = None,
  97. not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  98. if len(kw) == 0:
  99. return None
  100. if type(where) is list and len(where) > 0:
  101. where: str = " AND ".join(f"({w})" for w in where)
  102. elif type(where) is not str or len(where) == 0: # 必须指定条件
  103. return None
  104. kw_list = [f"{key} = {kw[key]}" for key in kw]
  105. kw_str = ", ".join(kw_list)
  106. return self.__done(f"UPDATE {table} SET {kw_str} WHERE {where};", not_commit=not_commit)
  107. def commit(self):
  108. self._commit()
  109. def __search(self, sql) -> Union[None, pymysql.cursors.Cursor]:
  110. try:
  111. self._lock.acquire() # 上锁
  112. if not self.is_connect():
  113. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} connect error")
  114. return
  115. self._cursor.execute(sql)
  116. except pymysql.MySQLError:
  117. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} error", exc_info=True, stack_info=True)
  118. return
  119. finally:
  120. self._lock.release() # 释放锁
  121. return self._cursor
  122. def __done(self, sql, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  123. try:
  124. self._lock.acquire()
  125. if not self.is_connect():
  126. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} connect error")
  127. return
  128. self._cursor.execute(sql)
  129. except pymysql.MySQLError:
  130. self._db.rollback()
  131. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} error", exc_info=True, stack_info=True)
  132. return
  133. finally:
  134. if not not_commit:
  135. self._db.commit()
  136. self._lock.release()
  137. return self._cursor
  138. def _connect(self):
  139. if self._db is None:
  140. raise MysqlConnectException
  141. try:
  142. self._db.ping(False)
  143. except Exception:
  144. raise MysqlConnectException
  145. def _close(self):
  146. if self._cursor is not None:
  147. self._cursor.close()
  148. if self._db is not None:
  149. self._db.close()
  150. self._db = None
  151. self._cursor = None
  152. self._lock = None
  153. self.logger.warning(f"MySQL({self._name}@{self._host}) connect close")
  154. def _commit(self):
  155. try:
  156. self._lock.acquire()
  157. if not self.is_connect():
  158. self.logger.error(f"MySQL({self._name}@{self._host}) SQL {sql} connect error")
  159. return
  160. self._db.commit()
  161. except pymysql.MySQLError:
  162. self.logger.error(f"MySQL({self._name}@{self._host}) commit error", exec_info=True)
  163. finally:
  164. self._lock.release()