user.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. from db import DB, DBBit
  2. from tool.type_ import *
  3. from tool.login import creat_uid, randomPassword
  4. from core.user import NormalUser, ManagerUser, User
  5. import conf
  6. from garbage import countGarbageByTime
  7. def find_user_by_id(uid: uid_t, db: DB) -> Optional[User]:
  8. cur = db.search(f"SELECT uid, name, manager, score, reputation FROM user WHERE uid = '{uid}';")
  9. if cur is None or cur.rowcount == 0:
  10. return None
  11. assert cur.rowcount == 1
  12. res = cur.fetchone()
  13. assert len(res) == 5
  14. uid: uid_t = res[0]
  15. name: uname_t = str(res[1])
  16. manager: bool = res[2] == DBBit.BIT_1
  17. if manager:
  18. return ManagerUser(name, uid)
  19. else:
  20. score: score_t = res[3]
  21. reputation: score_t = res[4]
  22. rubbish: count_t = countGarbageByTime(uid, db)
  23. return NormalUser(name, uid, reputation, rubbish, score) # rubbish 实际计算
  24. def find_user_by_name(name: uname_t, passwd: passwd_t, db: DB) -> Optional[User]:
  25. uid = creat_uid(name, passwd)
  26. return find_user_by_id(uid, db)
  27. def is_user_exists(uid: uid_t, db: DB) -> bool:
  28. cur = db.search(f"SELECT uid FROM user WHERE uid = '{uid}';")
  29. if cur is None or cur.rowcount == 0:
  30. return False
  31. assert cur.rowcount == 1
  32. return True
  33. def update_user(user: User, db: DB) -> bool:
  34. if not is_user_exists(user.get_uid(), db):
  35. return False
  36. uid = user.get_uid()
  37. info: dict = user.get_info()
  38. is_manager = info['manager']
  39. if is_manager == '1':
  40. cur = db.done(f"UPDATE user "
  41. f"SET manager = {is_manager} "
  42. f"WHERE uid = '{uid}';")
  43. else:
  44. score = info['score']
  45. reputation = info['reputation']
  46. cur = db.done(f"UPDATE user "
  47. f"SET manager = {is_manager},"
  48. f" score = {score},"
  49. f" reputation = {reputation} "
  50. f"WHERE uid = '{uid}';")
  51. return cur is not None
  52. def creat_new_user(name: Optional[uname_t], passwd: Optional[passwd_t], phone: phone_t, manager: bool, db: DB) -> Optional[User]:
  53. if name is None:
  54. name = randomPassword()
  55. if passwd is None:
  56. passwd = randomPassword()
  57. uid = creat_uid(name, passwd)
  58. if is_user_exists(uid, db):
  59. return None
  60. is_manager = '1' if manager else '0'
  61. cur = db.done(f"INSERT INTO user(uid, name, manager, phone, score, reputation) "
  62. f"VALUES ('{uid}', '{name}', {is_manager}, '{phone}', {conf.default_score}, "
  63. f"{conf.default_reputation});")
  64. if cur is None:
  65. return None
  66. if is_manager:
  67. return ManagerUser(name, uid)
  68. return NormalUser(name, uid, conf.default_reputation, 0, conf.default_score)
  69. def get_user_phone(uid: uid_t, db: DB) -> Optional[str]:
  70. cur = db.done(f"SELECT phone FROM user WHERE uid = {uid};")
  71. if cur is None or cur.rowcount == 0:
  72. return None
  73. assert cur.rowcount == 1
  74. return cur.fetchall()[0]
  75. def del_user(uid: uid_t, db: DB) -> bool:
  76. cur = db.search(f"SELECT gid FROM garbage_time WHERE uid = {uid};")
  77. if cur is None or cur.rowcount != 0:
  78. return False
  79. cur = db.done(f"DELETE FROM user WHERE uid = {uid};")
  80. if cur is None or cur.rowcount == 0:
  81. return False
  82. assert cur.rowcount == 1
  83. return True
  84. if __name__ == '__main__':
  85. mysql_db = DB()
  86. name_ = 'Huan12'
  87. usr = find_user_by_name(name_, "123", mysql_db)
  88. if usr is None:
  89. usr = creat_new_user(name_, "123", "12345678900", False, mysql_db)
  90. print(usr)
  91. for i in range(9):
  92. usr.evaluate(False)
  93. print(usr)
  94. for i in range(1):
  95. usr.evaluate(True)
  96. print(usr)
  97. update_user(usr, mysql_db)
  98. usr = find_user_by_name(name_, "123", mysql_db)
  99. print(usr)