user.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. from db import DB, mysql_db, DBBit
  2. from tool.type_ import *
  3. from tool.login import creat_uid, randomPassword
  4. from core.user import NormalUser, ManagerUser, User
  5. import conf
  6. from garbage import countGarbageByTime
  7. def find_user_by_id(uid: uid_t, db: DB) -> Optional[User]:
  8. cur = db.search(f"SELECT uid, name, manager, score, reputation FROM user WHERE uid = '{uid}';")
  9. if cur is None or cur.rowcount == 0:
  10. return None
  11. assert cur.rowcount == 1
  12. res = cur.fetchone()
  13. assert len(res) == 5
  14. uid: uid_t = res[0]
  15. name: uname_t = str(res[1])
  16. manager: bool = res[2] == DBBit.BIT_1
  17. if manager:
  18. return ManagerUser(name, uid)
  19. else:
  20. score: score_t = res[3]
  21. reputation: score_t = res[4]
  22. rubbish: count_t = countGarbageByTime(uid, db)
  23. return NormalUser(name, uid, reputation, rubbish, score) # rubbish 实际计算
  24. def find_user_by_name(name: uname_t, passwd: passwd_t, db: DB) -> Optional[User]:
  25. uid = creat_uid(name, passwd)
  26. return find_user_by_id(uid, db)
  27. def is_user_exists(uid: uid_t, db: DB) -> bool:
  28. cur = db.search(f"SELECT uid FROM user WHERE uid = '{uid}';")
  29. if cur is None or cur.rowcount == 0:
  30. return False
  31. assert cur.rowcount == 1
  32. return True
  33. def update_user(user: User, db: DB) -> bool:
  34. if not is_user_exists(user.get_uid(), db):
  35. return False
  36. uid = user.get_uid()
  37. info: dict = user.get_info()
  38. is_manager = info['manager']
  39. if is_manager == '1':
  40. cur = db.done(f"UPDATE user "
  41. f"SET manager = {is_manager} "
  42. f"WHERE uid = '{uid}';")
  43. else:
  44. score = info['score']
  45. reputation = info['reputation']
  46. cur = db.done(f"UPDATE user "
  47. f"SET manager = {is_manager},"
  48. f" score = {score},"
  49. f" reputation = {reputation} "
  50. f"WHERE uid = '{uid}';")
  51. return cur is not None
  52. def creat_new_user(name: Optional[uname_t], passwd: Optional[passwd_t], phone: phone_t, manager: bool, db: DB) -> Optional[User]:
  53. if name is None:
  54. name = randomPassword()
  55. if passwd is None:
  56. passwd = randomPassword()
  57. uid = creat_uid(name, passwd)
  58. if is_user_exists(uid, db):
  59. return None
  60. is_manager = manager if '1' else '0'
  61. cur = db.done(f"INSERT INTO user(uid, name, manager, phone, score, reputation) "
  62. f"VALUES ('{uid}', '{name}', {is_manager}, '{phone}', {conf.default_score}, "
  63. f"{conf.default_reputation});")
  64. if cur is None:
  65. return None
  66. if is_manager:
  67. return ManagerUser(name, uid)
  68. return NormalUser(name, uid, conf.default_reputation, 0, conf.default_score)
  69. if __name__ == '__main__':
  70. name_ = 'Huan12'
  71. usr = find_user_by_name(name_, "123", mysql_db)
  72. if usr is None:
  73. usr = creat_new_user(name_, "123", "12345678900", False, mysql_db)
  74. print(usr)
  75. for i in range(9):
  76. usr.evaluate(False)
  77. print(usr)
  78. for i in range(1):
  79. usr.evaluate(True)
  80. print(usr)
  81. update_user(usr, mysql_db)
  82. usr = find_user_by_name(name_, "123", mysql_db)
  83. print(usr)