user.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. import csv
  2. from . import DBBit
  3. from .db import DB
  4. from tool.type_ import *
  5. from tool.login import create_uid, randomPassword
  6. from tool.time_ import mysql_time
  7. from core.user import NormalUser, ManagerUser, User
  8. import conf
  9. from . import garbage
  10. def update_user_score(where: str, score: score_t, db: DB) -> int:
  11. if len(where) == 0 or score < 0:
  12. return -1
  13. cur = db.done(f"UPDATE user SET Score={score} WHERE {where};")
  14. if cur is None:
  15. return -1
  16. return cur.rowcount
  17. def update_user_reputation(where: str, reputation: score_t, db: DB) -> int:
  18. if len(where) == 0 or reputation <= 1 or reputation >= 1000:
  19. return -1
  20. cur = db.done(f"UPDATE user SET Reputation={reputation} WHERE {where};")
  21. if cur is None:
  22. return -1
  23. return cur.rowcount
  24. def search_user_by_fields(columns, uid: uid_t, name: uname_t, phone: phone_t, db: DB):
  25. where = ""
  26. if uid is not None:
  27. where += f"UserID=‘{uid}’ AND "
  28. if name is not None:
  29. where += f"Name='{name}' AND "
  30. if phone is not None:
  31. where += f"Phone='{phone}' AND "
  32. if len(where) != 0:
  33. where = where[0:-4] # 去除末尾的AND
  34. return search_from_user_view(columns, where, db)
  35. def search_from_user_view(columns, where: str, db: DB):
  36. if len(where) > 0:
  37. where = f"WHERE {where} "
  38. column = ", ".join(columns)
  39. cur = db.search(f"SELECT {column} FROM user {where};")
  40. if cur is None:
  41. return None
  42. result = cur.fetchall()
  43. re = []
  44. for res in result:
  45. n = [res[a] for a in range(5)]
  46. n.append("True" if res[5] == DBBit.BIT_1 else "False")
  47. re.append(n)
  48. return re
  49. def find_user_by_id(uid: uid_t, db: DB) -> Optional[User]:
  50. cur = db.search(f"SELECT UserID, Name, IsManager, Score, Reputation FROM user WHERE UserID = '{uid}';")
  51. if cur is None or cur.rowcount == 0:
  52. return None
  53. assert cur.rowcount == 1
  54. res = cur.fetchone()
  55. assert len(res) == 5
  56. uid: uid_t = res[0]
  57. name: uname_t = str(res[1])
  58. manager: bool = res[2] == DBBit.BIT_1
  59. if manager:
  60. return ManagerUser(name, uid)
  61. else:
  62. score: score_t = res[3]
  63. reputation: score_t = res[4]
  64. rubbish: count_t = garbage.count_garbage_by_time(uid, db)
  65. return NormalUser(name, uid, reputation, rubbish, score) # rubbish 实际计算
  66. def find_user_by_name(name: uname_t, passwd: passwd_t, db: DB) -> Optional[User]:
  67. uid = create_uid(name, passwd)
  68. return find_user_by_id(uid, db)
  69. def is_user_exists(uid: uid_t, db: DB) -> bool:
  70. cur = db.search(f"SELECT UserID FROM user WHERE UserID = '{uid}';")
  71. if cur is None or cur.rowcount == 0:
  72. return False
  73. assert cur.rowcount == 1
  74. return True
  75. def update_user(user: User, db: DB) -> bool:
  76. if not is_user_exists(user.get_uid(), db):
  77. return False
  78. uid = user.get_uid()
  79. info: dict = user.get_info()
  80. is_manager = info['manager']
  81. if is_manager == '1':
  82. cur = db.done(f"UPDATE user "
  83. f"SET IsManager = {is_manager} "
  84. f"WHERE UserID = '{uid}';")
  85. else:
  86. score = info['score']
  87. reputation = info['reputation']
  88. cur = db.done(f"UPDATE user "
  89. f"SET IsManager = {is_manager},"
  90. f" Score = {score},"
  91. f" Reputation = {reputation} "
  92. f"WHERE UserID = '{uid}';")
  93. return cur is not None
  94. def create_new_user(name: Optional[uname_t], passwd: Optional[passwd_t], phone: phone_t,
  95. manager: bool, db: DB) -> Optional[User]:
  96. if name is None:
  97. name = f'User-{phone[-6:]}'
  98. if passwd is None:
  99. passwd = randomPassword()
  100. if len(phone) != 11:
  101. return None
  102. uid = create_uid(name, passwd)
  103. if is_user_exists(uid, db):
  104. return None
  105. is_manager = '1' if manager else '0'
  106. cur = db.done(f"INSERT INTO user(UserID, Name, IsManager, Phone, Score, Reputation, CreateTime) "
  107. f"VALUES ('{uid}', '{name}', {is_manager}, '{phone}', {conf.default_score}, "
  108. f"{conf.default_reputation}, {mysql_time()});")
  109. if cur is None:
  110. return None
  111. if is_manager:
  112. return ManagerUser(name, uid)
  113. return NormalUser(name, uid, conf.default_reputation, 0, conf.default_score)
  114. def get_user_phone(uid: uid_t, db: DB) -> Optional[str]:
  115. cur = db.done(f"SELECT Phone FROM user WHERE UserID = '{uid}';")
  116. if cur is None or cur.rowcount == 0:
  117. return None
  118. assert cur.rowcount == 1
  119. return cur.fetchall()[0]
  120. def del_user(uid: uid_t, db: DB) -> bool:
  121. cur = db.search(f"SELECT GarbageID FROM garbage_time WHERE UserID = '{uid}';") # 确保没有引用
  122. if cur is None or cur.rowcount != 0:
  123. return False
  124. cur = db.done(f"DELETE FROM user WHERE UserID = '{uid}';")
  125. if cur is None or cur.rowcount == 0:
  126. return False
  127. assert cur.rowcount == 1
  128. return True
  129. def del_user_from_where_scan(where: str, db: DB) -> int:
  130. cur = db.search(f"SELECT UserID FROM user WHERE {where};")
  131. print(f"SELECT UserID FROM user WHERE {where};")
  132. if cur is None:
  133. return -1
  134. return cur.rowcount
  135. def del_user_from_where(where: str, db: DB) -> int:
  136. cur = db.search(f"SELECT GarbageID FROM garbage_time WHERE {where};") # 确保没有引用
  137. if cur is None or cur.rowcount != 0:
  138. return False
  139. cur = db.done(f"DELETE FROM user WHERE {where};")
  140. if cur is None:
  141. return -1
  142. return cur.rowcount
  143. def creat_user_from_csv(path, db: DB) -> List[User]:
  144. res = []
  145. with open(path, "r") as f:
  146. reader = csv.reader(f)
  147. first = True
  148. name_index = 0
  149. passwd_index = 0
  150. phone_index = 0
  151. manager_index = 0
  152. for item in reader:
  153. if first:
  154. try:
  155. name_index = item.index('Name')
  156. passwd_index = item.index('Passwd')
  157. phone_index = item.index('Phone')
  158. manager_index = item.index('Manager')
  159. except (ValueError, TypeError):
  160. return []
  161. first = False
  162. continue
  163. name = item[name_index]
  164. passwd = item[passwd_index]
  165. phone = item[phone_index]
  166. if item[manager_index].upper() == "TRUE":
  167. is_manager = True
  168. elif item[manager_index].upper() == "FALSE":
  169. is_manager = False
  170. else:
  171. continue
  172. user = create_new_user(name, passwd, phone, is_manager, db)
  173. if user is not None:
  174. res.append(user)
  175. return res
  176. def creat_auto_user_from_csv(path, db: DB) -> List[User]:
  177. res = []
  178. with open(path, "r") as f:
  179. reader = csv.reader(f)
  180. first = True
  181. phone_index = 0
  182. for item in reader:
  183. if first:
  184. try:
  185. phone_index = item.index('Phone')
  186. except (ValueError, TypeError):
  187. return []
  188. first = False
  189. continue
  190. phone = item[phone_index]
  191. user = create_new_user(None, None, phone, False, db)
  192. if user is not None:
  193. res.append(user)
  194. return res