user.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. from db import DB, DBBit
  2. from tool.type_ import *
  3. from tool.login import create_uid, randomPassword
  4. from core.user import NormalUser, ManagerUser, User
  5. import conf
  6. from garbage import count_garbage_by_time
  7. def search_user_by_fields(columns, uid: uid_t, name: uname_t, phone: phone_t, db: DB):
  8. where = ""
  9. if uid is not None:
  10. where += f"UserID=‘{uid}’ AND "
  11. if name is not None:
  12. where += f"Name='{name}' AND "
  13. if phone is not None:
  14. where += f"Phone='{phone}' AND "
  15. if len(where) != 0:
  16. where = where[0:-4] # 去除末尾的AND
  17. return search_from_user_view(columns, where, db)
  18. def search_from_user_view(columns, where: str, db: DB):
  19. if len(where) > 0:
  20. where = f"WHERE {where} "
  21. column = ", ".join(columns)
  22. cur = db.search(f"SELECT {column} FROM user_view {where};")
  23. if cur is None:
  24. return None
  25. result = cur.fetchall()
  26. re = []
  27. for res in result:
  28. n = [res[a] for a in range(5)]
  29. n.append("True" if res[5] == DBBit.BIT_1 else "False")
  30. re.append(n)
  31. return re
  32. def find_user_by_id(uid: uid_t, db: DB) -> Optional[User]:
  33. cur = db.search(f"SELECT uid, name, manager, score, reputation FROM user WHERE uid = '{uid}';")
  34. if cur is None or cur.rowcount == 0:
  35. return None
  36. assert cur.rowcount == 1
  37. res = cur.fetchone()
  38. assert len(res) == 5
  39. uid: uid_t = res[0]
  40. name: uname_t = str(res[1])
  41. manager: bool = res[2] == DBBit.BIT_1
  42. if manager:
  43. return ManagerUser(name, uid)
  44. else:
  45. score: score_t = res[3]
  46. reputation: score_t = res[4]
  47. rubbish: count_t = count_garbage_by_time(uid, db)
  48. return NormalUser(name, uid, reputation, rubbish, score) # rubbish 实际计算
  49. def find_user_by_name(name: uname_t, passwd: passwd_t, db: DB) -> Optional[User]:
  50. uid = create_uid(name, passwd)
  51. return find_user_by_id(uid, db)
  52. def is_user_exists(uid: uid_t, db: DB) -> bool:
  53. cur = db.search(f"SELECT uid FROM user WHERE uid = '{uid}';")
  54. if cur is None or cur.rowcount == 0:
  55. return False
  56. assert cur.rowcount == 1
  57. return True
  58. def update_user(user: User, db: DB) -> bool:
  59. if not is_user_exists(user.get_uid(), db):
  60. return False
  61. uid = user.get_uid()
  62. info: dict = user.get_info()
  63. is_manager = info['manager']
  64. if is_manager == '1':
  65. cur = db.done(f"UPDATE user "
  66. f"SET manager = {is_manager} "
  67. f"WHERE uid = '{uid}';")
  68. else:
  69. score = info['score']
  70. reputation = info['reputation']
  71. cur = db.done(f"UPDATE user "
  72. f"SET manager = {is_manager},"
  73. f" score = {score},"
  74. f" reputation = {reputation} "
  75. f"WHERE uid = '{uid}';")
  76. return cur is not None
  77. def create_new_user(name: Optional[uname_t], passwd: Optional[passwd_t], phone: phone_t,
  78. manager: bool, db: DB) -> Optional[User]:
  79. if name is None:
  80. name = f'User-{phone[-6:]}'
  81. if passwd is None:
  82. passwd = randomPassword()
  83. uid = create_uid(name, passwd)
  84. if is_user_exists(uid, db):
  85. return None
  86. is_manager = '1' if manager else '0'
  87. cur = db.done(f"INSERT INTO user(uid, name, manager, phone, score, reputation) "
  88. f"VALUES ('{uid}', '{name}', {is_manager}, '{phone}', {conf.default_score}, "
  89. f"{conf.default_reputation});")
  90. if cur is None:
  91. return None
  92. if is_manager:
  93. return ManagerUser(name, uid)
  94. return NormalUser(name, uid, conf.default_reputation, 0, conf.default_score)
  95. def get_user_phone(uid: uid_t, db: DB) -> Optional[str]:
  96. cur = db.done(f"SELECT phone FROM user WHERE uid = {uid};")
  97. if cur is None or cur.rowcount == 0:
  98. return None
  99. assert cur.rowcount == 1
  100. return cur.fetchall()[0]
  101. def del_user(uid: uid_t, db: DB) -> bool:
  102. cur = db.search(f"SELECT gid FROM garbage_time WHERE uid = '{uid}';") # 确保没有引用
  103. if cur is None or cur.rowcount != 0:
  104. return False
  105. cur = db.done(f"DELETE FROM user WHERE uid = '{uid}';")
  106. if cur is None or cur.rowcount == 0:
  107. return False
  108. assert cur.rowcount == 1
  109. return True
  110. def del_user_from_where_scan(where: str, db: DB) -> int:
  111. cur = db.search(f"SELECT uid FROM user WHERE {where};")
  112. print(f"SELECT uid FROM user WHERE {where};")
  113. if cur is None:
  114. return -1
  115. return cur.rowcount
  116. def del_user_from_where(where: str, db: DB) -> int:
  117. cur = db.search(f"SELECT gid FROM garbage_time WHERE {where};") # 确保没有引用
  118. if cur is None or cur.rowcount != 0:
  119. return False
  120. cur = db.done(f"DELETE FROM user WHERE {where};")
  121. if cur is None:
  122. return -1
  123. return cur.rowcount
  124. if __name__ == '__main__':
  125. mysql_db = DB()
  126. name_ = 'Huan12'
  127. usr = find_user_by_name(name_, "123", mysql_db)
  128. if usr is None:
  129. usr = create_new_user(name_, "123", "12345678900", False, mysql_db)
  130. print(usr)
  131. for i in range(9):
  132. usr.evaluate(False)
  133. print(usr)
  134. for i in range(1):
  135. usr.evaluate(True)
  136. print(usr)
  137. update_user(usr, mysql_db)
  138. usr = find_user_by_name(name_, "123", mysql_db)
  139. print(usr)