mysql_db.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import pymysql
  2. import threading
  3. import traceback
  4. from conf import mysql_url, mysql_name, mysql_passwd
  5. from .base_db import HGSDatabase, DBCloseException, HGSCursor
  6. from tool.type_ import *
  7. class MySQLCursor(HGSCursor):
  8. def __init__(self, cursor: pymysql.cursors.Cursor):
  9. self._cursor: pymysql.cursors.Cursor = cursor
  10. def fetchall(self):
  11. return self._cursor.fetchall()
  12. def fetchone(self):
  13. return self._cursor.fetchone()
  14. class MysqlDB(HGSDatabase):
  15. def __init__(self, host: str = mysql_url, name: str = mysql_name, passwd: str = mysql_passwd):
  16. super(MysqlDB, self).__init__(host, name, passwd)
  17. try:
  18. self._db = pymysql.connect(user=self._name, password=self._passwd, host=self._host, database="hgssystem")
  19. except pymysql.err.OperationalError:
  20. raise
  21. self._cursor: pymysql.cursors.Cursor = self._db.cursor()
  22. self._mysql_cursor = MySQLCursor(self._cursor)
  23. self._lock = threading.RLock()
  24. def close(self):
  25. if self._cursor is not None:
  26. self._cursor.close()
  27. if self._db is not None:
  28. self._db.close()
  29. self._db = None
  30. self._cursor = None
  31. self._mysql_cursor = None
  32. self._lock = None
  33. def is_connect(self) -> bool:
  34. if self._cursor is None or self._db is None:
  35. return False
  36. return True
  37. def get_cursor(self) -> HGSCursor:
  38. if self._cursor is None or self._db is None:
  39. raise DBCloseException
  40. return self._mysql_cursor
  41. def search(self, columns: List[str], table: str,
  42. where: Union[str, List[str]] = None,
  43. limit: Optional[int] = None,
  44. offset: Optional[int] = None,
  45. order_by: Optional[List[Tuple[str, str]]] = None):
  46. if type(where) is list and len(where) > 0:
  47. where: str = " WHERE " + " AND ".join(f"({w})" for w in where)
  48. elif type(where) is str and len(where) > 0:
  49. where = " WHERE " + where
  50. else:
  51. where: str = ""
  52. if order_by is None:
  53. order_by: str = ""
  54. else:
  55. by = [f" {i[0]} {i[1]} " for i in order_by]
  56. order_by: str = " ORDER BY" + ", ".join(by)
  57. if limit is None:
  58. limit: str = ""
  59. else:
  60. limit = f" LIMIT {limit}"
  61. if offset is None:
  62. offset: str = ""
  63. else:
  64. offset = f" OFFSET {offset}"
  65. columns: str = ", ".join(columns)
  66. return self.__search(f"SELECT {columns} FROM {table} {where} {order_by} {limit} {offset};")
  67. def insert(self, table: str, columns: list, values: Union[str, List[str]]):
  68. columns: str = ", ".join(columns)
  69. if type(values) is str:
  70. values: str = f"({values})"
  71. else:
  72. values: str = ", ".join(f"{v}" for v in values)
  73. return self.__done(f"INSERT INTO {table}({columns}) VALUES {values};")
  74. def delete(self, table: str, where: Union[str, List[str]] = None):
  75. if type(where) is list and len(where) > 0:
  76. where: str = " AND ".join(f"({w})" for w in where)
  77. elif type(where) is not str or len(where) == 0: # 必须指定条件
  78. return None
  79. return self.__done(f"DELETE FROM {table} WHERE {where};")
  80. def update(self, table: str, kw: dict[str:str], where: Union[str, List[str]] = None):
  81. if len(kw) == 0:
  82. return None
  83. if type(where) is list and len(where) > 0:
  84. where: str = " AND ".join(f"({w})" for w in where)
  85. elif type(where) is not str or len(where) == 0: # 必须指定条件
  86. return None
  87. kw_list = [f"{key} = {kw[key]}" for key in kw]
  88. kw_str = ", ".join(kw_list)
  89. return self.__done(f"UPDATE {table} SET {kw_str} WHERE {where};")
  90. def __search(self, sql) -> Union[None, HGSCursor]:
  91. if self._cursor is None or self._db is None:
  92. raise DBCloseException
  93. try:
  94. self._lock.acquire() # 上锁
  95. self._cursor.execute(sql)
  96. except pymysql.MySQLError:
  97. print(f"{sql}")
  98. traceback.print_exc()
  99. return None
  100. finally:
  101. self._lock.release() # 释放锁
  102. return self._mysql_cursor
  103. def __done(self, sql) -> Union[None, HGSCursor]:
  104. if self._cursor is None or self._db is None:
  105. raise DBCloseException
  106. try:
  107. self._lock.acquire()
  108. self._cursor.execute(sql)
  109. except pymysql.MySQLError:
  110. self._db.rollback()
  111. print(f"{sql}")
  112. traceback.print_exc()
  113. return None
  114. finally:
  115. self._db.commit()
  116. self._lock.release()
  117. return self._mysql_cursor