user.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. from db import DB, DBBit
  2. from tool.type_ import *
  3. from tool.login import creat_uid, randomPassword
  4. from core.user import NormalUser, ManagerUser, User
  5. import conf
  6. from garbage import countGarbageByTime
  7. def find_user_by_id(uid: uid_t, db: DB) -> Optional[User]:
  8. cur = db.search(f"SELECT uid, name, manager, score, reputation FROM user WHERE uid = '{uid}';")
  9. if cur is None or cur.rowcount == 0:
  10. return None
  11. assert cur.rowcount == 1
  12. res = cur.fetchone()
  13. assert len(res) == 5
  14. uid: uid_t = res[0]
  15. name: uname_t = str(res[1])
  16. manager: bool = res[2] == DBBit.BIT_1
  17. if manager:
  18. return ManagerUser(name, uid)
  19. else:
  20. score: score_t = res[3]
  21. reputation: score_t = res[4]
  22. rubbish: count_t = countGarbageByTime(uid, db)
  23. return NormalUser(name, uid, reputation, rubbish, score) # rubbish 实际计算
  24. def find_user_by_name(name: uname_t, passwd: passwd_t, db: DB) -> Optional[User]:
  25. uid = creat_uid(name, passwd)
  26. return find_user_by_id(uid, db)
  27. def is_user_exists(uid: uid_t, db: DB) -> bool:
  28. cur = db.search(f"SELECT uid FROM user WHERE uid = '{uid}';")
  29. if cur is None or cur.rowcount == 0:
  30. return False
  31. assert cur.rowcount == 1
  32. return True
  33. def update_user(user: User, db: DB) -> bool:
  34. if not is_user_exists(user.get_uid(), db):
  35. return False
  36. uid = user.get_uid()
  37. info: dict = user.get_info()
  38. is_manager = info['manager']
  39. if is_manager == '1':
  40. cur = db.done(f"UPDATE user "
  41. f"SET manager = {is_manager} "
  42. f"WHERE uid = '{uid}';")
  43. else:
  44. score = info['score']
  45. reputation = info['reputation']
  46. cur = db.done(f"UPDATE user "
  47. f"SET manager = {is_manager},"
  48. f" score = {score},"
  49. f" reputation = {reputation} "
  50. f"WHERE uid = '{uid}';")
  51. return cur is not None
  52. def creat_new_user(name: Optional[uname_t], passwd: Optional[passwd_t], phone: phone_t, manager: bool, db: DB) -> Optional[User]:
  53. if name is None:
  54. name = f'User-{phone[-6:]}'
  55. if passwd is None:
  56. passwd = randomPassword()
  57. uid = creat_uid(name, passwd)
  58. if is_user_exists(uid, db):
  59. return None
  60. is_manager = '1' if manager else '0'
  61. cur = db.done(f"INSERT INTO user(uid, name, manager, phone, score, reputation) "
  62. f"VALUES ('{uid}', '{name}', {is_manager}, '{phone}', {conf.default_score}, "
  63. f"{conf.default_reputation});")
  64. if cur is None:
  65. return None
  66. if is_manager:
  67. return ManagerUser(name, uid)
  68. return NormalUser(name, uid, conf.default_reputation, 0, conf.default_score)
  69. def get_user_phone(uid: uid_t, db: DB) -> Optional[str]:
  70. cur = db.done(f"SELECT phone FROM user WHERE uid = {uid};")
  71. if cur is None or cur.rowcount == 0:
  72. return None
  73. assert cur.rowcount == 1
  74. return cur.fetchall()[0]
  75. def del_user(uid: uid_t, db: DB) -> bool:
  76. cur = db.search(f"SELECT gid FROM garbage_time WHERE uid = '{uid}';") # 确保没有引用
  77. if cur is None or cur.rowcount != 0:
  78. return False
  79. cur = db.done(f"DELETE FROM user WHERE uid = '{uid}';")
  80. if cur is None or cur.rowcount == 0:
  81. return False
  82. assert cur.rowcount == 1
  83. return True
  84. def del_user_from_where_scan(where: str, db: DB) -> int:
  85. cur = db.search(f"SELECT uid FROM user WHERE {where};")
  86. print(f"SELECT uid FROM user WHERE {where};")
  87. if cur is None:
  88. return -1
  89. return cur.rowcount
  90. def del_user_from_where(where: str, db: DB) -> int:
  91. cur = db.search(f"SELECT gid FROM garbage_user WHERE {where};") # 确保没有引用
  92. if cur is None or cur.rowcount != 0:
  93. return False
  94. cur = db.done(f"DELETE FROM user WHERE {where};")
  95. if cur is None:
  96. return -1
  97. return cur.rowcount
  98. if __name__ == '__main__':
  99. mysql_db = DB()
  100. name_ = 'Huan12'
  101. usr = find_user_by_name(name_, "123", mysql_db)
  102. if usr is None:
  103. usr = creat_new_user(name_, "123", "12345678900", False, mysql_db)
  104. print(usr)
  105. for i in range(9):
  106. usr.evaluate(False)
  107. print(usr)
  108. for i in range(1):
  109. usr.evaluate(True)
  110. print(usr)
  111. update_user(usr, mysql_db)
  112. usr = find_user_by_name(name_, "123", mysql_db)
  113. print(usr)