123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- from sql import db
- from typing import Optional
- def read_msg_list(limit: Optional[int] = None, offset: Optional[int] = None, show_secret: bool = False):
- if show_secret:
- if limit is not None and offset is not None:
- cur = db.search("SELECT MsgID "
- "FROM message_user "
- "ORDER BY UpdateTime DESC "
- "LIMIT %s "
- "OFFSET %s", limit, offset)
- else:
- cur = db.search("SELECT MsgID "
- "FROM message_user "
- "ORDER BY UpdateTime DESC")
- else:
- if limit is not None and offset is not None:
- cur = db.search("SELECT MsgID "
- "FROM message_user "
- "WHERE Secret=0 "
- "ORDER BY UpdateTime DESC "
- "LIMIT %s "
- "OFFSET %s", limit, offset)
- else:
- cur = db.search("SELECT MsgID "
- "FROM message_user "
- "WHERE Secret=0 "
- "ORDER BY UpdateTime DESC")
- if cur is None or cur.rowcount == 0:
- return []
- return [i[0] for i in cur.fetchall()]
- def create_msg(auth: int, content: str, secret: bool = False):
- content = content.replace("'", "''")
- cur = db.insert(table="message",
- columns=["Auth", "Content", "Secret"],
- values=f"{auth}, '{content}', {1 if secret else 0}")
- if cur is None or cur.rowcount != 1:
- return None
- return cur.lastrowid
- def read_msg(msg_id: int):
- cur = db.search("SELECT Email, Content, UpdateTime, Secret "
- "FROM message_user "
- "WHERE MsgID=%s", msg_id)
- if cur is None or cur.rowcount == 0:
- return ["", "", 0, False]
- return cur.fetchone()
- def delete_msg(msg_id: int):
- cur = db.delete(table="message", where=f"ID={msg_id}")
- if cur is None or cur.rowcount == 0:
- return False
- return True
- def get_msg_count():
- cur = db.search("SELECT COUNT(*) FROM message")
- if cur is None or cur.rowcount == 0:
- return 0
- return cur.fetchone()[0]
- def get_user_msg_count(user_id: int):
- cur = db.search("SELECT COUNT(*) FROM message WHERE Auth=%s", user_id)
- if cur is None or cur.rowcount == 0:
- return 0
- return cur.fetchone()[0]
|