mysql_db.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import pymysql
  2. import threading
  3. from conf import mysql_url, mysql_name, mysql_passwd
  4. from .base_db import Database, DBException, DBCloseException
  5. from tool.type_ import *
  6. class MysqlDB(Database):
  7. def __init__(self, host: str = mysql_url, name: str = mysql_name, passwd: str = mysql_passwd):
  8. super(MysqlDB, self).__init__(host, name, passwd)
  9. try:
  10. self._db = pymysql.connect(user=name, password=passwd, host=host, database="hgssystem")
  11. except pymysql.err.OperationalError:
  12. raise DBException
  13. self._cursor = self._db.cursor()
  14. self._lock = threading.RLock()
  15. def __del__(self):
  16. self.close()
  17. def close(self):
  18. if self._cursor is not None:
  19. self._cursor.close()
  20. if self._db is not None:
  21. self._db.close()
  22. self._db = None
  23. self._cursor = None
  24. self._lock = None
  25. def is_connect(self) -> bool:
  26. if self._cursor is None or self._db is None:
  27. return False
  28. return True
  29. def get_cursor(self) -> pymysql.cursors.Cursor:
  30. if self._cursor is None or self._db is None:
  31. raise DBCloseException
  32. return self._cursor
  33. def search(self, sql) -> Union[None, pymysql.cursors.Cursor]:
  34. if self._cursor is None or self._db is None:
  35. raise DBCloseException
  36. try:
  37. self._lock.acquire() # 上锁
  38. self._cursor.execute(sql)
  39. except pymysql.MySQLError:
  40. return None
  41. finally:
  42. self._lock.release() # 释放锁
  43. return self._cursor
  44. def done(self, sql) -> Union[None, pymysql.cursors.Cursor]:
  45. if self._cursor is None or self._db is None:
  46. raise DBCloseException
  47. try:
  48. self._lock.acquire()
  49. self._cursor.execute(sql)
  50. except pymysql.MySQLError:
  51. self._db.rollback()
  52. return None
  53. finally:
  54. self._db.commit()
  55. self._lock.release()
  56. return self._cursor
  57. def done_(self, sql) -> Union[None, pymysql.cursors.Cursor]:
  58. if self._cursor is None or self._db is None:
  59. raise DBCloseException
  60. try:
  61. self._lock.acquire()
  62. self._cursor.execute(sql)
  63. except pymysql.MySQLError:
  64. self._db.rollback()
  65. finally:
  66. self._lock.release()
  67. return self._cursor
  68. def done_commit(self):
  69. try:
  70. self._lock.acquire()
  71. self._db.commit()
  72. finally:
  73. self._lock.release()
  74. if __name__ == '__main__':
  75. # 测试程序
  76. mysql_db = MysqlDB()
  77. mysql_db.search("SELECT * FROM user;")
  78. res_ = mysql_db.get_cursor().fetchall()
  79. print(res_)
  80. mysql_db.search("SELECT * FROM user WHERE UserID = 0;")
  81. res_ = mysql_db.get_cursor().fetchall()
  82. print(res_)
  83. mysql_db.close()