user.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. import csv
  2. from . import DBBit
  3. from .db import DB
  4. from tool.type_ import *
  5. from tool.login import create_uid, randomPassword
  6. from tool.time_ import mysql_time
  7. from core.user import NormalUser, ManagerUser, User
  8. from conf import Config
  9. from . import garbage
  10. def update_user_score(where: str, score: score_t, db: DB) -> int:
  11. if len(where) == 0 or score < 0:
  12. return -1
  13. cur = db.update(table="user", kw={"score": score}, where=where)
  14. if cur is None:
  15. return -1
  16. return cur.rowcount
  17. def update_user_reputation(where: str, reputation: score_t, db: DB) -> int:
  18. if len(where) == 0 or reputation <= 1 or reputation >= 1000:
  19. return -1
  20. cur = db.update(table="user", kw={"Reputation": reputation}, where=where)
  21. if cur is None:
  22. return -1
  23. return cur.rowcount
  24. def search_user_by_fields(columns, uid: uid_t, name: uname_t, phone: phone_t, db: DB):
  25. where = ""
  26. if uid is not None:
  27. where += f"UserID=‘{uid}’ AND "
  28. if name is not None:
  29. where += f"Name='{name}' AND "
  30. if phone is not None:
  31. where += f"Phone='{phone}' AND "
  32. if len(where) != 0:
  33. where = where[0:-4] # 去除末尾的AND
  34. return search_from_user_view(columns, where, db)
  35. def search_from_user_view(columns, where: str, db: DB):
  36. cur = db.search(columns=columns,
  37. table="user",
  38. where=where)
  39. if cur is None:
  40. return None
  41. result = cur.fetchall()
  42. re = []
  43. for res in result:
  44. n = [res[a] for a in range(5)]
  45. n.append("True" if res[5] == DBBit.BIT_1 else "False")
  46. re.append(n)
  47. return re
  48. def find_user_by_id(uid: uid_t, db: DB) -> Optional[User]:
  49. cur = db.search(columns=["UserID", "Name", "IsManager", "Score", "Reputation"],
  50. table="user",
  51. where=f"UserID = '{uid}'")
  52. if cur is None or cur.rowcount == 0:
  53. return None
  54. assert cur.rowcount == 1
  55. res = cur.fetchone()
  56. assert len(res) == 5
  57. uid: uid_t = res[0]
  58. name: uname_t = str(res[1])
  59. manager: bool = res[2] == DBBit.BIT_1
  60. if manager:
  61. return ManagerUser(name, uid)
  62. else:
  63. score: score_t = res[3]
  64. reputation: score_t = res[4]
  65. rubbish: count_t = garbage.count_garbage_by_time(uid, db)
  66. return NormalUser(name, uid, reputation, rubbish, score) # rubbish 实际计算
  67. def find_user_by_name(name: uname_t, passwd: passwd_t, db: DB) -> Optional[User]:
  68. uid = create_uid(name, passwd)
  69. return find_user_by_id(uid, db)
  70. def is_user_exists(uid: uid_t, db: DB) -> bool:
  71. cur = db.search(columns=["UserID"],
  72. table="user",
  73. where=f"UserID = '{uid}'")
  74. if cur is None or cur.rowcount == 0:
  75. return False
  76. assert cur.rowcount == 1
  77. return True
  78. def update_user(user: User, db: DB) -> bool:
  79. if not is_user_exists(user.get_uid(), db):
  80. return False
  81. uid = user.get_uid()
  82. info: Dict[str, str] = user.get_info()
  83. is_manager = info['manager']
  84. if is_manager == '1':
  85. cur = db.update(table="user",
  86. kw={"IsManager": {is_manager}},
  87. where=f"UserID = '{uid}'")
  88. else:
  89. score = info['score']
  90. reputation = info['reputation']
  91. cur = db.update(table="user",
  92. kw={"IsManager": {is_manager},
  93. "Score": {score},
  94. "Reputation": {reputation}},
  95. where=f"UserID = '{uid}'")
  96. return cur is not None
  97. def create_new_user(name: Optional[uname_t], passwd: Optional[passwd_t], phone: phone_t,
  98. manager: bool, db: DB) -> Optional[User]:
  99. if name is None:
  100. name = f'User-{phone[-6:]}'
  101. if passwd is None:
  102. passwd = randomPassword()
  103. if len(phone) != 11:
  104. return None
  105. uid = create_uid(name, passwd)
  106. if is_user_exists(uid, db):
  107. return None
  108. is_manager = '1' if manager else '0'
  109. cur = db.insert(table="user",
  110. columns=["UserID", "Name", "IsManager", "Phone", "Score", "Reputation", "CreateTime"],
  111. values=f"'{uid}', '{name}', {is_manager}, '{phone}', {Config.default_score}, "
  112. f"{Config.default_reputation}, {mysql_time()}")
  113. if cur is None:
  114. return None
  115. if is_manager:
  116. return ManagerUser(name, uid)
  117. return NormalUser(name, uid, Config.default_reputation, 0, Config.default_score)
  118. def get_user_phone(uid: uid_t, db: DB) -> Optional[str]:
  119. cur = db.search(columns=["Phone"], table="user", where=f"UserID = '{uid}'")
  120. if cur is None or cur.rowcount == 0:
  121. return None
  122. assert cur.rowcount == 1
  123. return cur.fetchall()[0]
  124. def del_user(uid: uid_t, db: DB) -> bool:
  125. cur = db.search(columns=["GarbageID"], table="garbage_time", where=f"UserID = '{uid}'") # 确保没有引用
  126. if cur is None or cur.rowcount != 0:
  127. return False
  128. cur = db.delete(table="user", where=f"UserID = '{uid}'")
  129. if cur is None or cur.rowcount == 0:
  130. return False
  131. assert cur.rowcount == 1
  132. return True
  133. def del_user_from_where_scan(where: str, db: DB) -> int:
  134. cur = db.search(columns=["UserID"], table="user", where=where)
  135. if cur is None:
  136. return -1
  137. return cur.rowcount
  138. def del_user_from_where(where: str, db: DB) -> int:
  139. cur = db.search(columns=["GarbageID"], table="garbage_time", where=where) # 确保没有引用
  140. if cur is None or cur.rowcount != 0:
  141. return False
  142. cur = db.delete(table="user", where=where)
  143. if cur is None:
  144. return -1
  145. return cur.rowcount
  146. def creat_user_from_csv(path, db: DB) -> List[User]:
  147. res = []
  148. with open(path, "r") as f:
  149. reader = csv.reader(f)
  150. first = True
  151. name_index = 0
  152. passwd_index = 0
  153. phone_index = 0
  154. manager_index = 0
  155. for item in reader:
  156. if first:
  157. try:
  158. name_index = item.index('Name')
  159. passwd_index = item.index('Passwd')
  160. phone_index = item.index('Phone')
  161. manager_index = item.index('Manager')
  162. except (ValueError, TypeError):
  163. return []
  164. first = False
  165. continue
  166. name = item[name_index]
  167. passwd = item[passwd_index]
  168. phone = item[phone_index]
  169. if item[manager_index].upper() == "TRUE":
  170. is_manager = True
  171. elif item[manager_index].upper() == "FALSE":
  172. is_manager = False
  173. else:
  174. continue
  175. user = create_new_user(name, passwd, phone, is_manager, db)
  176. if user is not None:
  177. res.append(user)
  178. return res
  179. def creat_auto_user_from_csv(path, db: DB) -> List[User]:
  180. res = []
  181. with open(path, "r") as f:
  182. reader = csv.reader(f)
  183. first = True
  184. phone_index = 0
  185. for item in reader:
  186. if first:
  187. try:
  188. phone_index = item.index('Phone')
  189. except (ValueError, TypeError):
  190. return []
  191. first = False
  192. continue
  193. phone = item[phone_index]
  194. user = create_new_user(None, None, phone, False, db)
  195. if user is not None:
  196. res.append(user)
  197. return res