mysql_db.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import pymysql
  2. import threading
  3. import traceback
  4. from conf import mysql_url, mysql_name, mysql_passwd
  5. from .base_db import Database, DBCloseException
  6. from tool.type_ import *
  7. class MysqlDB(Database):
  8. def __init__(self, host: str = mysql_url, name: str = mysql_name, passwd: str = mysql_passwd):
  9. super(MysqlDB, self).__init__(host, name, passwd)
  10. try:
  11. self._db = pymysql.connect(user=self._name, password=self._passwd, host=self._host, database="hgssystem")
  12. except pymysql.err.OperationalError:
  13. raise
  14. self._cursor = self._db.cursor()
  15. self._lock = threading.RLock()
  16. def close(self):
  17. if self._cursor is not None:
  18. self._cursor.close()
  19. if self._db is not None:
  20. self._db.close()
  21. self._db = None
  22. self._cursor = None
  23. self._lock = None
  24. def is_connect(self) -> bool:
  25. if self._cursor is None or self._db is None:
  26. return False
  27. return True
  28. def get_cursor(self) -> pymysql.cursors.Cursor:
  29. if self._cursor is None or self._db is None:
  30. raise DBCloseException
  31. return self._cursor
  32. def search(self, sql) -> Union[None, pymysql.cursors.Cursor]:
  33. if self._cursor is None or self._db is None:
  34. raise DBCloseException
  35. try:
  36. self._lock.acquire() # 上锁
  37. self._cursor.execute(sql)
  38. except pymysql.MySQLError:
  39. print(f"{sql}")
  40. traceback.print_exc()
  41. return None
  42. finally:
  43. self._lock.release() # 释放锁
  44. return self._cursor
  45. def done(self, sql) -> Union[None, pymysql.cursors.Cursor]:
  46. if self._cursor is None or self._db is None:
  47. raise DBCloseException
  48. try:
  49. self._lock.acquire()
  50. self._cursor.execute(sql)
  51. except pymysql.MySQLError:
  52. self._db.rollback()
  53. print(f"{sql}")
  54. traceback.print_exc()
  55. return None
  56. finally:
  57. self._db.commit()
  58. self._lock.release()
  59. return self._cursor
  60. def done_(self, sql) -> Union[None, pymysql.cursors.Cursor]:
  61. if self._cursor is None or self._db is None:
  62. raise DBCloseException
  63. try:
  64. self._lock.acquire()
  65. self._cursor.execute(sql)
  66. except pymysql.MySQLError:
  67. self._db.rollback()
  68. print(f"{sql}")
  69. traceback.print_exc()
  70. return None
  71. finally:
  72. self._lock.release()
  73. return self._cursor
  74. def done_commit(self):
  75. try:
  76. self._lock.acquire()
  77. self._db.commit()
  78. except pymysql.err:
  79. traceback.print_exc()
  80. finally:
  81. self._lock.release()