1
0

mysql.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import pymysql
  2. import threading
  3. import traceback
  4. from sql.base import Database, DBException, DBCloseException
  5. from typing import Optional, Union, List, Tuple, Dict
  6. class MysqlDB(Database):
  7. def __init__(self,
  8. host: Optional[str],
  9. name: Optional[str],
  10. passwd: Optional[str],
  11. port: Optional[str]):
  12. if host is None or name is None:
  13. raise DBException
  14. super(MysqlDB, self).__init__(host=host, name=name, passwd=passwd, port=port)
  15. try:
  16. self._db = pymysql.connect(user=self._name,
  17. password=self._passwd,
  18. host=self._host,
  19. port=self._port,
  20. database="HBlog")
  21. except pymysql.err.OperationalError:
  22. raise
  23. self._cursor = self._db.cursor()
  24. self._lock = threading.RLock()
  25. def close(self):
  26. if self._cursor is not None:
  27. self._cursor.close()
  28. if self._db is not None:
  29. self._db.close()
  30. self._db = None
  31. self._cursor = None
  32. self._lock = None
  33. def is_connect(self) -> bool:
  34. if self._cursor is None or self._db is None:
  35. return False
  36. return True
  37. def get_cursor(self) -> pymysql.cursors.Cursor:
  38. if self._cursor is None or self._db is None:
  39. raise DBCloseException
  40. return self._cursor
  41. def search(self, columns: List[str], table: str,
  42. where: Union[str, List[str]] = None,
  43. limit: Optional[int] = None,
  44. offset: Optional[int] = None,
  45. order_by: Optional[List[Tuple[str, str]]] = None,
  46. group_by: Optional[List[str]] = None,
  47. for_update: bool = False):
  48. if type(where) is list and len(where) > 0:
  49. where: str = " WHERE " + " AND ".join(f"({w})" for w in where)
  50. elif type(where) is str and len(where) > 0:
  51. where = " WHERE " + where
  52. else:
  53. where: str = ""
  54. if order_by is None:
  55. order_by: str = ""
  56. else:
  57. by = [f" {i[0]} {i[1]} " for i in order_by]
  58. order_by: str = " ORDER BY" + ", ".join(by)
  59. if limit is None or limit == 0:
  60. limit: str = ""
  61. else:
  62. limit = f" LIMIT {limit}"
  63. if offset is None:
  64. offset: str = ""
  65. else:
  66. offset = f" OFFSET {offset}"
  67. if group_by is None:
  68. group_by: str = ""
  69. else:
  70. group_by = "GROUP BY " + ", ".join(group_by)
  71. columns: str = ", ".join(columns)
  72. if for_update:
  73. for_update = "FOR UPDATE"
  74. else:
  75. for_update = ""
  76. return self.__search(f"SELECT {columns} "
  77. f"FROM {table} "
  78. f"{where} {group_by} {order_by} {limit} {offset} {for_update};")
  79. def insert(self, table: str, columns: list, values: Union[str, List[str]], not_commit: bool = False):
  80. columns: str = ", ".join(columns)
  81. if type(values) is str:
  82. values: str = f"({values})"
  83. else:
  84. values: str = ", ".join(f"{v}" for v in values)
  85. return self.__done(f"INSERT INTO {table}({columns}) VALUES {values};", not_commit=not_commit)
  86. def delete(self, table: str, where: Union[str, List[str]] = None, not_commit: bool = False):
  87. if type(where) is list and len(where) > 0:
  88. where: str = " AND ".join(f"({w})" for w in where)
  89. elif type(where) is not str or len(where) == 0: # 必须指定条件
  90. return None
  91. return self.__done(f"DELETE FROM {table} WHERE {where};", not_commit=not_commit)
  92. def update(self, table: str, kw: "Dict[str:str]", where: Union[str, List[str]] = None, not_commit: bool = False):
  93. if len(kw) == 0:
  94. return None
  95. if type(where) is list and len(where) > 0:
  96. where: str = " AND ".join(f"({w})" for w in where)
  97. elif type(where) is not str or len(where) == 0: # 必须指定条件
  98. return None
  99. kw_list = [f"{key} = {kw[key]}" for key in kw]
  100. kw_str = ", ".join(kw_list)
  101. return self.__done(f"UPDATE {table} SET {kw_str} WHERE {where};", not_commit=not_commit)
  102. def __search(self, sql) -> Union[None, pymysql.cursors.Cursor]:
  103. if self._cursor is None or self._db is None:
  104. raise DBCloseException
  105. try:
  106. self._lock.acquire() # 上锁
  107. self._cursor.execute(sql)
  108. except pymysql.MySQLError:
  109. print(f"sql='{sql}'")
  110. traceback.print_exc()
  111. return None
  112. finally:
  113. self._lock.release() # 释放锁
  114. return self._cursor
  115. def __done(self, sql, not_commit: bool = False) -> Union[None, pymysql.cursors.Cursor]:
  116. if self._cursor is None or self._db is None:
  117. raise DBCloseException
  118. try:
  119. self._lock.acquire()
  120. self._cursor.execute(sql)
  121. except pymysql.MySQLError:
  122. self._db.rollback()
  123. print(f"sql={sql}")
  124. traceback.print_exc()
  125. return None
  126. finally:
  127. if not not_commit:
  128. self._db.commit()
  129. self._lock.release()
  130. return self._cursor
  131. def commit(self):
  132. try:
  133. self._lock.acquire()
  134. self._db.commit()
  135. finally:
  136. self._lock.release()