user.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. import csv
  2. from .db import DB, DBBit
  3. from tool.type_ import *
  4. from tool.login import create_uid, randomPassword
  5. from tool.time_ import mysql_time
  6. from core.user import NormalUser, ManagerUser, User
  7. import conf
  8. from . import garbage
  9. def update_user_score(where: str, score: score_t, db: DB) -> int:
  10. if len(where) == 0 or score < 0:
  11. return -1
  12. cur = db.done(f"UPDATE user SET Score={score} WHERE {where};")
  13. if cur is None:
  14. return -1
  15. return cur.rowcount
  16. def update_user_reputation(where: str, reputation: score_t, db: DB) -> int:
  17. if len(where) == 0 or reputation <= 1 or reputation >= 1000:
  18. return -1
  19. cur = db.done(f"UPDATE user SET Reputation={reputation} WHERE {where};")
  20. if cur is None:
  21. return -1
  22. return cur.rowcount
  23. def search_user_by_fields(columns, uid: uid_t, name: uname_t, phone: phone_t, db: DB):
  24. where = ""
  25. if uid is not None:
  26. where += f"UserID=‘{uid}’ AND "
  27. if name is not None:
  28. where += f"Name='{name}' AND "
  29. if phone is not None:
  30. where += f"Phone='{phone}' AND "
  31. if len(where) != 0:
  32. where = where[0:-4] # 去除末尾的AND
  33. return search_from_user_view(columns, where, db)
  34. def search_from_user_view(columns, where: str, db: DB):
  35. if len(where) > 0:
  36. where = f"WHERE {where} "
  37. column = ", ".join(columns)
  38. cur = db.search(f"SELECT {column} FROM user {where};")
  39. if cur is None:
  40. return None
  41. result = cur.fetchall()
  42. re = []
  43. for res in result:
  44. n = [res[a] for a in range(5)]
  45. n.append("True" if res[5] == DBBit.BIT_1 else "False")
  46. re.append(n)
  47. return re
  48. def find_user_by_id(uid: uid_t, db: DB) -> Optional[User]:
  49. cur = db.search(f"SELECT UserID, Name, IsManager, Score, Reputation FROM user WHERE UserID = '{uid}';")
  50. if cur is None or cur.rowcount == 0:
  51. return None
  52. assert cur.rowcount == 1
  53. res = cur.fetchone()
  54. assert len(res) == 5
  55. uid: uid_t = res[0]
  56. name: uname_t = str(res[1])
  57. manager: bool = res[2] == DBBit.BIT_1
  58. if manager:
  59. return ManagerUser(name, uid)
  60. else:
  61. score: score_t = res[3]
  62. reputation: score_t = res[4]
  63. rubbish: count_t = garbage.count_garbage_by_time(uid, db)
  64. return NormalUser(name, uid, reputation, rubbish, score) # rubbish 实际计算
  65. def find_user_by_name(name: uname_t, passwd: passwd_t, db: DB) -> Optional[User]:
  66. uid = create_uid(name, passwd)
  67. return find_user_by_id(uid, db)
  68. def is_user_exists(uid: uid_t, db: DB) -> bool:
  69. cur = db.search(f"SELECT UserID FROM user WHERE UserID = '{uid}';")
  70. if cur is None or cur.rowcount == 0:
  71. return False
  72. assert cur.rowcount == 1
  73. return True
  74. def update_user(user: User, db: DB) -> bool:
  75. if not is_user_exists(user.get_uid(), db):
  76. return False
  77. uid = user.get_uid()
  78. info: dict = user.get_info()
  79. is_manager = info['manager']
  80. if is_manager == '1':
  81. cur = db.done(f"UPDATE user "
  82. f"SET IsManager = {is_manager} "
  83. f"WHERE UserID = '{uid}';")
  84. else:
  85. score = info['score']
  86. reputation = info['reputation']
  87. cur = db.done(f"UPDATE user "
  88. f"SET IsManager = {is_manager},"
  89. f" Score = {score},"
  90. f" Reputation = {reputation} "
  91. f"WHERE UserID = '{uid}';")
  92. return cur is not None
  93. def create_new_user(name: Optional[uname_t], passwd: Optional[passwd_t], phone: phone_t,
  94. manager: bool, db: DB) -> Optional[User]:
  95. if name is None:
  96. name = f'User-{phone[-6:]}'
  97. if passwd is None:
  98. passwd = randomPassword()
  99. if len(phone) != 11:
  100. return None
  101. uid = create_uid(name, passwd)
  102. if is_user_exists(uid, db):
  103. return None
  104. is_manager = '1' if manager else '0'
  105. cur = db.done(f"INSERT INTO user(UserID, Name, IsManager, Phone, Score, Reputation, CreateTime) "
  106. f"VALUES ('{uid}', '{name}', {is_manager}, '{phone}', {conf.default_score}, "
  107. f"{conf.default_reputation}, {mysql_time()});")
  108. if cur is None:
  109. return None
  110. if is_manager:
  111. return ManagerUser(name, uid)
  112. return NormalUser(name, uid, conf.default_reputation, 0, conf.default_score)
  113. def get_user_phone(uid: uid_t, db: DB) -> Optional[str]:
  114. cur = db.done(f"SELECT Phone FROM user WHERE UserID = '{uid}';")
  115. if cur is None or cur.rowcount == 0:
  116. return None
  117. assert cur.rowcount == 1
  118. return cur.fetchall()[0]
  119. def del_user(uid: uid_t, db: DB) -> bool:
  120. cur = db.search(f"SELECT GarbageID FROM garbage_time WHERE UserID = '{uid}';") # 确保没有引用
  121. if cur is None or cur.rowcount != 0:
  122. return False
  123. cur = db.done(f"DELETE FROM user WHERE UserID = '{uid}';")
  124. if cur is None or cur.rowcount == 0:
  125. return False
  126. assert cur.rowcount == 1
  127. return True
  128. def del_user_from_where_scan(where: str, db: DB) -> int:
  129. cur = db.search(f"SELECT UserID FROM user WHERE {where};")
  130. print(f"SELECT UserID FROM user WHERE {where};")
  131. if cur is None:
  132. return -1
  133. return cur.rowcount
  134. def del_user_from_where(where: str, db: DB) -> int:
  135. cur = db.search(f"SELECT GarbageID FROM garbage_time WHERE {where};") # 确保没有引用
  136. if cur is None or cur.rowcount != 0:
  137. return False
  138. cur = db.done(f"DELETE FROM user WHERE {where};")
  139. if cur is None:
  140. return -1
  141. return cur.rowcount
  142. def creat_user_from_csv(path, db: DB) -> List[User]:
  143. res = []
  144. with open(path, "r") as f:
  145. reader = csv.reader(f)
  146. first = True
  147. name_index = 0
  148. passwd_index = 0
  149. phone_index = 0
  150. manager_index = 0
  151. for item in reader:
  152. if first:
  153. try:
  154. name_index = item.index('Name')
  155. passwd_index = item.index('Passwd')
  156. phone_index = item.index('Phone')
  157. manager_index = item.index('Manager')
  158. except (ValueError, TypeError):
  159. return []
  160. first = False
  161. continue
  162. name = item[name_index]
  163. passwd = item[passwd_index]
  164. phone = item[phone_index]
  165. if item[manager_index].upper() == "TRUE":
  166. is_manager = True
  167. elif item[manager_index].upper() == "FALSE":
  168. is_manager = False
  169. else:
  170. continue
  171. user = create_new_user(name, passwd, phone, is_manager, db)
  172. if user is not None:
  173. res.append(user)
  174. return res
  175. def creat_auto_user_from_csv(path, db: DB) -> List[User]:
  176. res = []
  177. with open(path, "r") as f:
  178. reader = csv.reader(f)
  179. first = True
  180. phone_index = 0
  181. for item in reader:
  182. if first:
  183. try:
  184. phone_index = item.index('Phone')
  185. except (ValueError, TypeError):
  186. return []
  187. first = False
  188. continue
  189. phone = item[phone_index]
  190. user = create_new_user(None, None, phone, False, db)
  191. if user is not None:
  192. res.append(user)
  193. return res
  194. if __name__ == '__main__':
  195. mysql_db = DB()
  196. name_ = 'Huan12'
  197. usr = find_user_by_name(name_, "123", mysql_db)
  198. if usr is None:
  199. usr = create_new_user(name_, "123", "12345678900", False, mysql_db)
  200. print(usr)
  201. for i in range(9):
  202. usr.evaluate(False)
  203. print(usr)
  204. for i in range(1):
  205. usr.evaluate(True)
  206. print(usr)
  207. update_user(usr, mysql_db)
  208. usr = find_user_by_name(name_, "123", mysql_db)
  209. print(usr)