user.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. from db import DB, DBBit
  2. from tool.type_ import *
  3. from tool.login import create_uid, randomPassword
  4. from tool.time_ import mysql_time
  5. from core.user import NormalUser, ManagerUser, User
  6. import conf
  7. from garbage import count_garbage_by_time
  8. def update_user_score(where: str, score: score_t, db: DB) -> int:
  9. if len(where) == 0 or score < 0:
  10. return -1
  11. cur = db.done(f"UPDATE user SET Score={score} WHERE {where};")
  12. if cur is None:
  13. return -1
  14. return cur.rowcount
  15. def update_user_reputation(where: str, reputation: score_t, db: DB) -> int:
  16. if len(where) == 0 or reputation <= 1 or reputation >= 1000:
  17. return -1
  18. cur = db.done(f"UPDATE user SET Reputation={reputation} WHERE {where};")
  19. if cur is None:
  20. return -1
  21. return cur.rowcount
  22. def search_user_by_fields(columns, uid: uid_t, name: uname_t, phone: phone_t, db: DB):
  23. where = ""
  24. if uid is not None:
  25. where += f"UserID=‘{uid}’ AND "
  26. if name is not None:
  27. where += f"Name='{name}' AND "
  28. if phone is not None:
  29. where += f"Phone='{phone}' AND "
  30. if len(where) != 0:
  31. where = where[0:-4] # 去除末尾的AND
  32. return search_from_user_view(columns, where, db)
  33. def search_from_user_view(columns, where: str, db: DB):
  34. if len(where) > 0:
  35. where = f"WHERE {where} "
  36. column = ", ".join(columns)
  37. cur = db.search(f"SELECT {column} FROM user {where};")
  38. if cur is None:
  39. return None
  40. result = cur.fetchall()
  41. re = []
  42. for res in result:
  43. n = [res[a] for a in range(5)]
  44. n.append("True" if res[5] == DBBit.BIT_1 else "False")
  45. re.append(n)
  46. return re
  47. def find_user_by_id(uid: uid_t, db: DB) -> Optional[User]:
  48. cur = db.search(f"SELECT UserID, Name, IsManager, Score, Reputation FROM user WHERE UserID = '{uid}';")
  49. if cur is None or cur.rowcount == 0:
  50. return None
  51. assert cur.rowcount == 1
  52. res = cur.fetchone()
  53. assert len(res) == 5
  54. uid: uid_t = res[0]
  55. name: uname_t = str(res[1])
  56. manager: bool = res[2] == DBBit.BIT_1
  57. if manager:
  58. return ManagerUser(name, uid)
  59. else:
  60. score: score_t = res[3]
  61. reputation: score_t = res[4]
  62. rubbish: count_t = count_garbage_by_time(uid, db)
  63. return NormalUser(name, uid, reputation, rubbish, score) # rubbish 实际计算
  64. def find_user_by_name(name: uname_t, passwd: passwd_t, db: DB) -> Optional[User]:
  65. uid = create_uid(name, passwd)
  66. return find_user_by_id(uid, db)
  67. def is_user_exists(uid: uid_t, db: DB) -> bool:
  68. cur = db.search(f"SELECT UserID FROM user WHERE UserID = '{uid}';")
  69. if cur is None or cur.rowcount == 0:
  70. return False
  71. assert cur.rowcount == 1
  72. return True
  73. def update_user(user: User, db: DB) -> bool:
  74. if not is_user_exists(user.get_uid(), db):
  75. return False
  76. uid = user.get_uid()
  77. info: dict = user.get_info()
  78. is_manager = info['manager']
  79. if is_manager == '1':
  80. cur = db.done(f"UPDATE user "
  81. f"SET IsManager = {is_manager} "
  82. f"WHERE UserID = '{uid}';")
  83. else:
  84. score = info['score']
  85. reputation = info['reputation']
  86. cur = db.done(f"UPDATE user "
  87. f"SET IsManager = {is_manager},"
  88. f" Score = {score},"
  89. f" Reputation = {reputation} "
  90. f"WHERE UserID = '{uid}';")
  91. return cur is not None
  92. def create_new_user(name: Optional[uname_t], passwd: Optional[passwd_t], phone: phone_t,
  93. manager: bool, db: DB) -> Optional[User]:
  94. if name is None:
  95. name = f'User-{phone[-6:]}'
  96. if passwd is None:
  97. passwd = randomPassword()
  98. uid = create_uid(name, passwd)
  99. if is_user_exists(uid, db):
  100. return None
  101. is_manager = '1' if manager else '0'
  102. cur = db.done(f"INSERT INTO user(UserID, Name, IsManager, Phone, Score, Reputation, CreateTime) "
  103. f"VALUES ('{uid}', '{name}', {is_manager}, '{phone}', {conf.default_score}, "
  104. f"{conf.default_reputation}, {mysql_time()});")
  105. if cur is None:
  106. return None
  107. if is_manager:
  108. return ManagerUser(name, uid)
  109. return NormalUser(name, uid, conf.default_reputation, 0, conf.default_score)
  110. def get_user_phone(uid: uid_t, db: DB) -> Optional[str]:
  111. cur = db.done(f"SELECT Phone FROM user WHERE UserID = '{uid}';")
  112. if cur is None or cur.rowcount == 0:
  113. return None
  114. assert cur.rowcount == 1
  115. return cur.fetchall()[0]
  116. def del_user(uid: uid_t, db: DB) -> bool:
  117. cur = db.search(f"SELECT GarbageID FROM garbage_time WHERE UserID = '{uid}';") # 确保没有引用
  118. if cur is None or cur.rowcount != 0:
  119. return False
  120. cur = db.done(f"DELETE FROM user WHERE UserID = '{uid}';")
  121. if cur is None or cur.rowcount == 0:
  122. return False
  123. assert cur.rowcount == 1
  124. return True
  125. def del_user_from_where_scan(where: str, db: DB) -> int:
  126. cur = db.search(f"SELECT UserID FROM user WHERE {where};")
  127. print(f"SELECT UserID FROM user WHERE {where};")
  128. if cur is None:
  129. return -1
  130. return cur.rowcount
  131. def del_user_from_where(where: str, db: DB) -> int:
  132. cur = db.search(f"SELECT GarbageID FROM garbage_time WHERE {where};") # 确保没有引用
  133. if cur is None or cur.rowcount != 0:
  134. return False
  135. cur = db.done(f"DELETE FROM user WHERE {where};")
  136. if cur is None:
  137. return -1
  138. return cur.rowcount
  139. if __name__ == '__main__':
  140. mysql_db = DB()
  141. name_ = 'Huan12'
  142. usr = find_user_by_name(name_, "123", mysql_db)
  143. if usr is None:
  144. usr = create_new_user(name_, "123", "12345678900", False, mysql_db)
  145. print(usr)
  146. for i in range(9):
  147. usr.evaluate(False)
  148. print(usr)
  149. for i in range(1):
  150. usr.evaluate(True)
  151. print(usr)
  152. update_user(usr, mysql_db)
  153. usr = find_user_by_name(name_, "123", mysql_db)
  154. print(usr)