mysql_db.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. import pymysql
  2. import threading
  3. import traceback
  4. from conf import Config
  5. from .base_db import HGSDatabase, DBException, DBCloseException
  6. from tool.typing import *
  7. class MysqlDB(HGSDatabase):
  8. def __init__(self,
  9. host: Optional[str] = Config.mysql_url,
  10. name: Optional[str] = Config.mysql_name,
  11. passwd: Optional[str] = Config.mysql_passwd,
  12. port: Optional[str] = Config.mysql_port):
  13. if host is None or name is None:
  14. raise DBException
  15. super(MysqlDB, self).__init__(host, name, passwd, port)
  16. try:
  17. self._db = pymysql.connect(user=self._name,
  18. password=self._passwd,
  19. host=self._host,
  20. port=self._port,
  21. database="hgssystem")
  22. except pymysql.err.OperationalError:
  23. raise
  24. self._cursor = self._db.cursor()
  25. self._lock = threading.RLock()
  26. def close(self):
  27. if self._cursor is not None:
  28. self._cursor.close()
  29. if self._db is not None:
  30. self._db.close()
  31. self._db = None
  32. self._cursor = None
  33. self._lock = None
  34. def is_connect(self) -> bool:
  35. if self._cursor is None or self._db is None:
  36. return False
  37. return True
  38. def get_cursor(self) -> pymysql.cursors.Cursor:
  39. if self._cursor is None or self._db is None:
  40. raise DBCloseException
  41. return self._cursor
  42. def search(self, columns: List[str], table: str,
  43. where: Union[str, List[str]] = None,
  44. limit: Optional[int] = None,
  45. offset: Optional[int] = None,
  46. order_by: Optional[List[Tuple[str, str]]] = None,
  47. group_by: Optional[List[str]] = None,
  48. for_update: bool = False):
  49. if type(where) is list and len(where) > 0:
  50. where: str = " WHERE " + " AND ".join(f"({w})" for w in where)
  51. elif type(where) is str and len(where) > 0:
  52. where = " WHERE " + where
  53. else:
  54. where: str = ""
  55. if order_by is None:
  56. order_by: str = ""
  57. else:
  58. by = [f" {i[0]} {i[1]} " for i in order_by]
  59. order_by: str = " ORDER BY" + ", ".join(by)
  60. if limit is None or limit == 0:
  61. limit: str = ""
  62. else:
  63. limit = f" LIMIT {limit}"
  64. if offset is None:
  65. offset: str = ""
  66. else:
  67. offset = f" OFFSET {offset}"
  68. if group_by is None:
  69. group_by: str = ""
  70. else:
  71. group_by = "GROUP BY " + ", ".join(group_by)
  72. columns: str = ", ".join(columns)
  73. if for_update:
  74. for_update = "FOR UPDATE"
  75. else:
  76. for_update = ""
  77. return self.__search(f"SELECT {columns} "
  78. f"FROM {table} "
  79. f"{where} {group_by} {order_by} {limit} {offset} {for_update};")
  80. def insert(self, table: str, columns: list, values: Union[str, List[str]], not_commit: bool = False):
  81. columns: str = ", ".join(columns)
  82. if type(values) is str:
  83. values: str = f"({values})"
  84. else:
  85. values: str = ", ".join(f"{v}" for v in values)
  86. return self.__done(f"INSERT INTO {table}({columns}) VALUES {values};", not_commit=not_commit)
  87. def delete(self, table: str, where: Union[str, List[str]] = None, not_commit: bool = False):
  88. if type(where) is list and len(where) > 0:
  89. where: str = " AND ".join(f"({w})" for w in where)
  90. elif type(where) is not str or len(where) == 0: # 必须指定条件
  91. return None
  92. return self.__done(f"DELETE FROM {table} WHERE {where};", not_commit=not_commit)
  93. def update(self, table: str, kw: dict[str:str], where: Union[str, List[str]] = None, not_commit: bool = False):
  94. if len(kw) == 0:
  95. return None
  96. if type(where) is list and len(where) > 0:
  97. where: str = " AND ".join(f"({w})" for w in where)
  98. elif type(where) is not str or len(where) == 0: # 必须指定条件
  99. return None
  100. kw_list = [f"{key} = {kw[key]}" for key in kw]
  101. kw_str = ", ".join(kw_list)
  102. return self.__done(f"UPDATE {table} SET {kw_str} WHERE {where};", not_commit=not_commit)
  103. def __search(self, sql) -> Union[None, pymysql.cursors.Cursor]:
  104. if self._cursor is None or self._db is None:
  105. raise DBCloseException
  106. try:
  107. self._lock.acquire() # 上锁
  108. self._cursor.execute(sql)
  109. except pymysql.MySQLError:
  110. print(f"{sql}")
  111. traceback.print_exc()
  112. return None
  113. finally:
  114. self._lock.release() # 释放锁
  115. return self._cursor
  116. def __done(self, sql, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  117. if self._cursor is None or self._db is None:
  118. raise DBCloseException
  119. try:
  120. self._lock.acquire()
  121. self._cursor.execute(sql)
  122. except pymysql.MySQLError:
  123. self._db.rollback()
  124. print(f"{sql}")
  125. traceback.print_exc()
  126. return None
  127. finally:
  128. if not not_commit:
  129. self._db.commit()
  130. self._lock.release()
  131. return self._cursor
  132. def commit(self):
  133. try:
  134. self._lock.acquire()
  135. self._db.commit()
  136. finally:
  137. self._lock.release()