1
0

user.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. import csv
  2. from . import DBBit
  3. from .db import DB
  4. from tool.typing import *
  5. from tool.login import create_uid, randomPassword
  6. from tool.time import mysql_time
  7. from core.user import NormalUser, ManagerUser, User
  8. from conf import Config
  9. from . import garbage
  10. def get_rank_for_user(db: DB, limit, offset, order_by: str = "DESC"):
  11. cur = db.search(columns=['UserID', 'Name', 'Score', 'Reputation'],
  12. table='user',
  13. where='IsManager=0',
  14. order_by=[('Reputation', order_by), ('Score', order_by), ('UserID', order_by)],
  15. limit=limit,
  16. offset=offset)
  17. if cur is None:
  18. return None, None
  19. res = []
  20. for index in range(cur.rowcount):
  21. i = cur.fetchone()
  22. res.append((f"{offset + index + 1}", i[1], i[0][:Config.show_uid_len], str(i[3]), str(i[2])))
  23. return res
  24. def update_user_score(where: str, score: score_t, db: DB) -> int:
  25. if len(where) == 0 or score < 0:
  26. return -1
  27. cur = db.update(table="user", kw={"score": score}, where=where)
  28. if cur is None:
  29. return -1
  30. return cur.rowcount
  31. def update_user_reputation(where: str, reputation: score_t, db: DB) -> int:
  32. if len(where) == 0 or reputation <= 1 or reputation >= 1000:
  33. return -1
  34. cur = db.update(table="user", kw={"Reputation": reputation}, where=where)
  35. if cur is None:
  36. return -1
  37. return cur.rowcount
  38. def set_where_(ex: Optional[str], column: str):
  39. if ex is None:
  40. return ""
  41. if ex.startswith("LIKE ") or ex.startswith("REGEXP "):
  42. where = f"{column} {ex} AND "
  43. else:
  44. where = f"{column}='{ex}' AND "
  45. return where
  46. def search_user_by_fields(columns, uid: uid_t, name: uname_t, phone: phone_t, db: DB):
  47. where = ""
  48. where += set_where_(uid, "UserID")
  49. where += set_where_(name, "Name")
  50. where += set_where_(phone, "Phone")
  51. if len(where) != 0:
  52. where = where[0:-4] # 去除末尾的AND
  53. return search_from_user_view(columns, where, db)
  54. def search_from_user_view(columns, where: str, db: DB):
  55. cur = db.search(columns=columns,
  56. table="user",
  57. where=where)
  58. if cur is None:
  59. return None
  60. return cur.fetchall()
  61. def find_user_by_id(uid: uid_t, db: DB) -> Optional[User]:
  62. cur = db.search(columns=["UserID", "Name", "IsManager", "Score", "Reputation", "UserLock"],
  63. table="user",
  64. where=f"UserID = '{uid}'")
  65. if cur is None or cur.rowcount == 0:
  66. return None
  67. assert cur.rowcount == 1
  68. res = cur.fetchone()
  69. assert len(res) == 6
  70. uid: uid_t = res[0]
  71. name: uname_t = str(res[1])
  72. manager: bool = res[2] == DBBit.BIT_1
  73. lock: bool = res[5] == DBBit.BIT_1
  74. if lock:
  75. db.commit()
  76. return None
  77. else:
  78. cur = db.update(table="user",
  79. kw={"UserLock": "1"},
  80. where=f"UserID = '{uid}'")
  81. if cur is None or cur.rowcount == 0:
  82. db.commit()
  83. return None
  84. def user_destruct(*args, **kwargs):
  85. db.update(table="user",
  86. kw={"UserLock": "0"},
  87. where=f"UserID = '{uid}'")
  88. if manager:
  89. return ManagerUser(name, uid, user_destruct)
  90. else:
  91. score: score_t = res[3]
  92. reputation: score_t = res[4]
  93. rubbish: count_t = garbage.count_garbage_by_uid(uid, db)
  94. return NormalUser(name, uid, reputation, rubbish, score, user_destruct) # rubbish 实际计算
  95. def find_user_by_name(name: uname_t, passwd: passwd_t, db: DB) -> Optional[User]:
  96. uid = create_uid(name, passwd)
  97. return find_user_by_id(uid, db)
  98. def is_user_exists(uid: uid_t, db: DB) -> bool:
  99. cur = db.search(columns=["UserID"],
  100. table="user",
  101. where=f"UserID = '{uid}'")
  102. if cur is None or cur.rowcount == 0:
  103. return False
  104. assert cur.rowcount == 1
  105. return True
  106. def update_user(user: User, db: DB, not_commit: bool = False) -> bool:
  107. if not is_user_exists(user.get_uid(), db):
  108. return False
  109. uid = user.get_uid()
  110. info: Dict[str, str] = user.get_info()
  111. is_manager = info['manager']
  112. if is_manager == '1':
  113. cur = db.update(table="user",
  114. kw={"IsManager": is_manager},
  115. where=f"UserID = '{uid}'", not_commit=not_commit)
  116. else:
  117. score = info['score']
  118. reputation = info['reputation']
  119. cur = db.update(table="user",
  120. kw={"IsManager": is_manager,
  121. "Score": f"{score}",
  122. "Reputation": f"{reputation}"},
  123. where=f"UserID = '{uid}'", not_commit=not_commit)
  124. return cur is not None
  125. def create_new_user(name: Optional[uname_t], passwd: Optional[passwd_t], phone: phone_t,
  126. manager: bool, db: DB) -> Optional[User]:
  127. if name is None:
  128. name = f'User-{phone[-6:]}'
  129. if passwd is None:
  130. passwd = randomPassword()
  131. if len(phone) != 11:
  132. return None
  133. uid = create_uid(name, passwd)
  134. if is_user_exists(uid, db):
  135. return None
  136. is_manager = '1' if manager else '0'
  137. cur = db.insert(table="user",
  138. columns=["UserID", "Name", "IsManager", "Phone", "Score", "Reputation", "CreateTime", "UserLock"],
  139. values=f"'{uid}', '{name}', {is_manager}, '{phone}', {Config.default_score}, "
  140. f"{Config.default_reputation}, {mysql_time()}, 1")
  141. if cur is None:
  142. return None
  143. def user_destruct(*args, **kwargs):
  144. db.update(table="user",
  145. kw={"UserLock": "0"},
  146. where=f"UserID = '{uid}'")
  147. if is_manager:
  148. return ManagerUser(name, uid, user_destruct)
  149. return NormalUser(name, uid, Config.default_reputation, 0, Config.default_score, user_destruct)
  150. def get_user_phone(uid: uid_t, db: DB) -> Optional[str]:
  151. cur = db.search(columns=["Phone"], table="user", where=f"UserID = '{uid}'")
  152. if cur is None or cur.rowcount == 0:
  153. return None
  154. assert cur.rowcount == 1
  155. return cur.fetchall()[0]
  156. def del_user(uid: uid_t, db: DB) -> bool:
  157. cur = db.search(columns=["GarbageID"], table="garbage_time", where=f"UserID = '{uid}'") # 确保没有引用
  158. if cur is None or cur.rowcount == 0:
  159. return False
  160. cur = db.delete(table="user", where=f"UserID = '{uid}'")
  161. if cur is None or cur.rowcount == 0:
  162. return False
  163. assert cur.rowcount == 1
  164. return True
  165. def del_user_from_where_scan(where: str, db: DB) -> int:
  166. cur = db.search(columns=["UserID"], table="user", where=where)
  167. if cur is None:
  168. return -1
  169. return cur.rowcount
  170. def del_user_from_where(where: str, db: DB) -> int:
  171. cur = db.search(columns=["UserID"], table="user", where=where) # 确保没有引用
  172. if cur is None or cur.rowcount == 0:
  173. return 0
  174. cur = db.delete(table="user", where=where)
  175. if cur is None:
  176. return -1
  177. return cur.rowcount
  178. def creat_user_from_csv(path, db: DB) -> List[User]:
  179. res = []
  180. with open(path, "r") as f:
  181. reader = csv.reader(f)
  182. first = True
  183. name_index = 0
  184. passwd_index = 0
  185. phone_index = 0
  186. manager_index = 0
  187. for item in reader:
  188. if first:
  189. try:
  190. name_index = item.index('Name')
  191. passwd_index = item.index('Passwd')
  192. phone_index = item.index('Phone')
  193. manager_index = item.index('Manager')
  194. except (ValueError, TypeError):
  195. return []
  196. first = False
  197. continue
  198. name = item[name_index]
  199. passwd = item[passwd_index]
  200. phone = item[phone_index]
  201. if item[manager_index].upper() == "TRUE":
  202. is_manager = True
  203. elif item[manager_index].upper() == "FALSE":
  204. is_manager = False
  205. else:
  206. continue
  207. user = create_new_user(name, passwd, phone, is_manager, db)
  208. if user is not None:
  209. res.append(user)
  210. return res
  211. def creat_auto_user_from_csv(path, db: DB) -> List[User]:
  212. res = []
  213. with open(path, "r") as f:
  214. reader = csv.reader(f)
  215. first = True
  216. phone_index = 0
  217. for item in reader:
  218. if first:
  219. try:
  220. phone_index = item.index('Phone')
  221. except (ValueError, TypeError):
  222. return []
  223. first = False
  224. continue
  225. phone = item[phone_index]
  226. user = create_new_user(None, None, phone, False, db)
  227. if user is not None:
  228. res.append(user)
  229. return res
  230. def count_all_user(db: DB):
  231. cur = db.search(columns=['count(UserID)'], table='user')
  232. if cur is None:
  233. return 0
  234. assert cur.rowcount == 1
  235. return int(cur.fetchone()[0])