123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- import csv
- from . import DBBit
- from .db import DB
- from tool.typing import *
- from tool.login import create_uid, randomPassword
- from tool.time import mysql_time
- from core.user import NormalUser, ManagerUser, User
- from conf import Config
- from . import garbage
- def get_rank_for_user(db: DB, limit, offset, order_by: str = "DESC"):
- cur = db.search(columns=['UserID', 'Name', 'Score', 'Reputation'],
- table='user',
- where='IsManager=0',
- order_by=[('Reputation', order_by), ('Score', order_by), ('UserID', order_by)],
- limit=limit,
- offset=offset)
- if cur is None:
- return None, None
- res = []
- for index in range(cur.rowcount):
- i = cur.fetchone()
- res.append((f"{offset + index + 1}", i[1], i[0][:Config.show_uid_len], str(i[3]), str(i[2])))
- return res
- def update_user_score(where: str, score: score_t, db: DB) -> int:
- if len(where) == 0 or score < 0:
- return -1
- cur = db.update(table="user", kw={"score": score}, where=where)
- if cur is None:
- return -1
- return cur.rowcount
- def update_user_reputation(where: str, reputation: score_t, db: DB) -> int:
- if len(where) == 0 or reputation <= 1 or reputation >= 1000:
- return -1
- cur = db.update(table="user", kw={"Reputation": reputation}, where=where)
- if cur is None:
- return -1
- return cur.rowcount
- def set_where_(ex: Optional[str], column: str):
- if ex is None:
- return ""
- if ex.startswith("LIKE ") or ex.startswith("REGEXP "):
- where = f"{column} {ex} AND "
- else:
- where = f"{column}='{ex}' AND "
- return where
- def search_user_by_fields(columns, uid: uid_t, name: uname_t, phone: phone_t, db: DB):
- where = ""
- where += set_where_(uid, "UserID")
- where += set_where_(name, "Name")
- where += set_where_(phone, "Phone")
- if len(where) != 0:
- where = where[0:-4] # 去除末尾的AND
- return search_from_user_view(columns, where, db)
- def search_from_user_view(columns, where: str, db: DB):
- cur = db.search(columns=columns,
- table="user",
- where=where)
- if cur is None:
- return None
- return cur.fetchall()
- def find_user_by_id(uid: uid_t, db: DB) -> Optional[User]:
- cur = db.search(columns=["UserID", "Name", "IsManager", "Score", "Reputation", "UserLock"],
- table="user",
- where=f"UserID = '{uid}'")
- if cur is None or cur.rowcount == 0:
- return None
- assert cur.rowcount == 1
- res = cur.fetchone()
- assert len(res) == 6
- uid: uid_t = res[0]
- name: uname_t = str(res[1])
- manager: bool = res[2] == DBBit.BIT_1
- lock: bool = res[5] == DBBit.BIT_1
- if lock:
- db.commit()
- return None
- else:
- cur = db.update(table="user",
- kw={"UserLock": "1"},
- where=f"UserID = '{uid}'")
- if cur is None or cur.rowcount == 0:
- db.commit()
- return None
- def user_destruct(*args, **kwargs):
- db.update(table="user",
- kw={"UserLock": "0"},
- where=f"UserID = '{uid}'")
- if manager:
- return ManagerUser(name, uid, user_destruct)
- else:
- score: score_t = res[3]
- reputation: score_t = res[4]
- rubbish: count_t = garbage.count_garbage_by_uid(uid, db)
- return NormalUser(name, uid, reputation, rubbish, score, user_destruct) # rubbish 实际计算
- def find_user_by_name(name: uname_t, passwd: passwd_t, db: DB) -> Optional[User]:
- uid = create_uid(name, passwd)
- return find_user_by_id(uid, db)
- def is_user_exists(uid: uid_t, db: DB) -> bool:
- cur = db.search(columns=["UserID"],
- table="user",
- where=f"UserID = '{uid}'")
- if cur is None or cur.rowcount == 0:
- return False
- assert cur.rowcount == 1
- return True
- def update_user(user: User, db: DB, not_commit: bool = False) -> bool:
- if not is_user_exists(user.get_uid(), db):
- return False
- uid = user.get_uid()
- info: Dict[str, str] = user.get_info()
- is_manager = info['manager']
- if is_manager == '1':
- cur = db.update(table="user",
- kw={"IsManager": is_manager},
- where=f"UserID = '{uid}'", not_commit=not_commit)
- else:
- score = info['score']
- reputation = info['reputation']
- cur = db.update(table="user",
- kw={"IsManager": is_manager,
- "Score": f"{score}",
- "Reputation": f"{reputation}"},
- where=f"UserID = '{uid}'", not_commit=not_commit)
- return cur is not None
- def create_new_user(name: Optional[uname_t], passwd: Optional[passwd_t], phone: phone_t,
- manager: bool, db: DB) -> Optional[User]:
- if name is None:
- name = f'User-{phone[-6:]}'
- if passwd is None:
- passwd = randomPassword()
- if len(phone) != 11:
- return None
- uid = create_uid(name, passwd)
- if is_user_exists(uid, db):
- return None
- is_manager = '1' if manager else '0'
- cur = db.insert(table="user",
- columns=["UserID", "Name", "IsManager", "Phone", "Score", "Reputation", "CreateTime", "UserLock"],
- values=f"'{uid}', '{name}', {is_manager}, '{phone}', {Config.default_score}, "
- f"{Config.default_reputation}, {mysql_time()}, 1")
- if cur is None:
- return None
- def user_destruct(*args, **kwargs):
- db.update(table="user",
- kw={"UserLock": "0"},
- where=f"UserID = '{uid}'")
- if is_manager:
- return ManagerUser(name, uid, user_destruct)
- return NormalUser(name, uid, Config.default_reputation, 0, Config.default_score, user_destruct)
- def get_user_phone(uid: uid_t, db: DB) -> Optional[str]:
- cur = db.search(columns=["Phone"], table="user", where=f"UserID = '{uid}'")
- if cur is None or cur.rowcount == 0:
- return None
- assert cur.rowcount == 1
- return cur.fetchall()[0]
- def del_user(uid: uid_t, db: DB) -> bool:
- cur = db.search(columns=["GarbageID"], table="garbage_time", where=f"UserID = '{uid}'") # 确保没有引用
- if cur is None or cur.rowcount == 0:
- return False
- cur = db.delete(table="user", where=f"UserID = '{uid}'")
- if cur is None or cur.rowcount == 0:
- return False
- assert cur.rowcount == 1
- return True
- def del_user_from_where_scan(where: str, db: DB) -> int:
- cur = db.search(columns=["UserID"], table="user", where=where)
- if cur is None:
- return -1
- return cur.rowcount
- def del_user_from_where(where: str, db: DB) -> int:
- cur = db.search(columns=["UserID"], table="user", where=where) # 确保没有引用
- if cur is None or cur.rowcount == 0:
- return 0
- cur = db.delete(table="user", where=where)
- if cur is None:
- return -1
- return cur.rowcount
- def creat_user_from_csv(path, db: DB) -> List[User]:
- res = []
- with open(path, "r") as f:
- reader = csv.reader(f)
- first = True
- name_index = 0
- passwd_index = 0
- phone_index = 0
- manager_index = 0
- for item in reader:
- if first:
- try:
- name_index = item.index('Name')
- passwd_index = item.index('Passwd')
- phone_index = item.index('Phone')
- manager_index = item.index('Manager')
- except (ValueError, TypeError):
- return []
- first = False
- continue
- name = item[name_index]
- passwd = item[passwd_index]
- phone = item[phone_index]
- if item[manager_index].upper() == "TRUE":
- is_manager = True
- elif item[manager_index].upper() == "FALSE":
- is_manager = False
- else:
- continue
- user = create_new_user(name, passwd, phone, is_manager, db)
- if user is not None:
- res.append(user)
- return res
- def creat_auto_user_from_csv(path, db: DB) -> List[User]:
- res = []
- with open(path, "r") as f:
- reader = csv.reader(f)
- first = True
- phone_index = 0
- for item in reader:
- if first:
- try:
- phone_index = item.index('Phone')
- except (ValueError, TypeError):
- return []
- first = False
- continue
- phone = item[phone_index]
- user = create_new_user(None, None, phone, False, db)
- if user is not None:
- res.append(user)
- return res
- def count_all_user(db: DB):
- cur = db.search(columns=['count(UserID)'], table='user')
- if cur is None:
- return 0
- assert cur.rowcount == 1
- return int(cur.fetchone()[0])
|