test.py 10 KB


  1. from flask import (blueprints, render_template, current_app,
  2. abort, redirect, url_for, flash, make_response, request, g, session)
  3. from flask import send_file
  4. from flask_login import current_user, login_required, logout_user
  5. from flask_wtf import FlaskForm
  6. from wtforms import SearchField, SubmitField, BooleanField, PasswordField, FileField
  7. from wtforms.validators import DataRequired, Length
  8. from app.user import UserWordDataBase
  9. from itsdangerous import URLSafeTimedSerializer
  10. from itsdangerous.exc import BadData
  11. from threading import Thread
  12. from typing import Optional, List
  13. from core.word import Word
  14. import io
  15. from app.tool import AuthForm, form_required
  16. test = blueprints.Blueprint("test", __name__)
  17. class SearchForm(FlaskForm):
  18. search = SearchField("Word", description="Search word",
  19. validators=[DataRequired(message="Must enter word"),
  20. Length(1, 20, message="Length: 1- 20")])
  21. from_internet = BooleanField("Internet")
  22. add_to_db = BooleanField("Add")
  23. submit = SubmitField("Search")
  24. class ResetDeleteForm(AuthForm):
  25. new_passwd = PasswordField("New Password", description="new password",
  26. validators=[Length(0, 32, "Length: 0 - 32")])
  27. submit = SubmitField("Submit")
  28. class UploadFileForm(FlaskForm):
  29. file = FileField("File", description="Upload file", validators=[DataRequired("Must upload file")])
  30. submit = SubmitField("Upload")
  31. def __load_word(word, search_from: SearchForm, reset_delete_form: ResetDeleteForm, upload_form: UploadFileForm):
  32. user: UserWordDataBase = current_user
  33. box, box_distinct, box_sum, box_sum_distinct = user.get_box_count()
  34. right_count, wrong_count, history_list = user.get_history_info()
  35. job: Upload = Upload.upload.get(user.user)
  36. if Upload.upload.get(user.user) is not None:
  37. if job.is_alive():
  38. have_job = True
  39. else:
  40. flash("Upload finished")
  41. have_job = False
  42. del Upload.upload[user.user]
  43. else:
  44. have_job = False
  45. history_len = len(history_list)
  46. if history_len == 0:
  47. history = ""
  48. else:
  49. if history_len > 3:
  50. history_list = history_list[:3]
  51. history = f"Last word is: {', '.join(history_list)}"
  52. template_var = dict(word=word, len=len, right_count=right_count, wrong_count=wrong_count, history=history,
  53. box=box, box_distinct=box_distinct, box_sum=box_sum, box_sum_distinct=box_sum_distinct,
  54. have_job=have_job, search=search_from, reset_delete=reset_delete_form, upload=upload_form)
  55. if word is None:
  56. return render_template("test.html", **template_var, have_word=False)
  57. serializer = URLSafeTimedSerializer(current_app.config["SECRET_KEY"])
  58. session["word"] = serializer.dumps({"word": word.name})
  59. return render_template("test.html", **template_var, have_word=True)
  60. def __load_question(search_from: SearchForm, reset_delete_form: ResetDeleteForm, upload_form: UploadFileForm):
  61. word = current_user.rand_word()
  62. return __load_word(word, search_from, reset_delete_form, upload_form)
  63. @test.route("/")
  64. @login_required
  65. def question():
  66. return __load_question(SearchForm(), ResetDeleteForm(), UploadFileForm())
  67. @test.route("/right")
  68. @login_required
  69. def right():
  70. word_id = session.get("word", "")
  71. session["word"] = ""
  72. if len(word_id) == 0:
  73. abort(404)
  74. serializer = URLSafeTimedSerializer(current_app.config["SECRET_KEY"])
  75. try:
  76. word: dict = serializer.loads(word_id, max_age=600) # 600s内生效
  77. user: UserWordDataBase = current_user
  78. user.right_word(word["word"])
  79. except BadData:
  80. flash(f"Timeout for confirm word")
  81. abort(408)
  82. except KeyError:
  83. abort(404)
  84. return redirect(url_for("test.question"))
  85. @test.route("/wrong")
  86. @login_required
  87. def wrong():
  88. word_id = session.get("word", "")
  89. session["word"] = ""
  90. if len(word_id) == 0:
  91. abort(404)
  92. serializer = URLSafeTimedSerializer(current_app.config["SECRET_KEY"])
  93. try:
  94. word: dict = serializer.loads(word_id, max_age=600) # 600s内生效
  95. user: UserWordDataBase = current_user
  96. user.wrong_word(word["word"])
  97. except BadData:
  98. flash(f"Timeout for confirm word")
  99. abort(408)
  100. except KeyError:
  101. abort(404)
  102. return redirect(url_for("test.question"))
  103. @test.route("/delete/word/<string:word>")
  104. @login_required
  105. def delete(word: str):
  106. user: UserWordDataBase = current_user
  107. user.delete_txt(word)
  108. flash(f"Word '{word}' is deleted.")
  109. return redirect(url_for("test.question"))
  110. @test.route("/download/word/<string:word>")
  111. @login_required
  112. def download(word: str):
  113. user: UserWordDataBase = current_user
  114. w = user.find_word(word, False)
  115. if w is None:
  116. abort(404)
  117. response = make_response(render_template("download.html", w=w))
  118. response.headers["Content-Disposition"] = f"attachment;filename={word}.html"
  119. return response
  120. class Search(Thread):
  121. def __init__(self, user: UserWordDataBase, word: str, internet: bool, add: bool):
  122. super(Search, self).__init__()
  123. self.word: Optional[Word] = None
  124. self.word_str = word
  125. self.internet = internet
  126. self.add = add
  127. self.user = user
  128. self.daemon = True
  129. def run(self):
  130. self.word = self.user.find_word(self.word_str, self.internet, self.add)
  131. def wait_event(self) -> Optional[Word]:
  132. self.join(timeout=5)
  133. return self.word
  134. @test.route("/search", methods=["GET", "POST"])
  135. @login_required
  136. def search():
  137. form = SearchForm()
  138. if not form.validate_on_submit():
  139. if request.method == "POST":
  140. return __load_question(form, ResetDeleteForm(), UploadFileForm())
  141. word = request.args.get("word", "")
  142. if len(word) == 0:
  143. abort(400)
  144. user = current_user._get_current_object()
  145. th = Search(user, word, request.args.get("internet", 0) != '0', request.args.get("add", 0) != '0')
  146. th.start()
  147. word = th.wait_event()
  148. if th.is_alive():
  149. flash("Search timeout")
  150. return __load_word(word, SearchForm(), ResetDeleteForm(), UploadFileForm())
  151. return redirect(url_for("test.search",
  152. word=form.search.data, internet=int(form.from_internet.data), add=int(form.add_to_db.data)))
  153. @test.route("/download/table/<string:file_type>")
  154. @login_required
  155. def download_table(file_type: str):
  156. user: UserWordDataBase = current_user
  157. try:
  158. max_eg = int(request.args.get("max_eg", -1))
  159. except (ValueError, TypeError):
  160. return abort(400)
  161. df = user.export_frame(max_eg, file_type == "html")
  162. if file_type == "csv":
  163. df_io = io.BytesIO()
  164. df.to_csv(df_io)
  165. elif file_type == "xlsx":
  166. df_io = io.BytesIO()
  167. df.to_excel(df_io)
  168. elif file_type == "html":
  169. df_io = io.StringIO()
  170. df.to_html(df_io, escape=False)
  171. df_io = io.BytesIO(df_io.getvalue().encode('utf-8'))
  172. elif file_type == "json":
  173. df_io = io.BytesIO()
  174. df.to_json(df_io)
  175. elif file_type == "markdown":
  176. df_io = io.StringIO()
  177. df.to_markdown(df_io)
  178. df_io = io.BytesIO(df_io.getvalue().encode('utf-8'))
  179. elif file_type == "latex":
  180. df_io = io.StringIO()
  181. df.to_latex(df_io)
  182. df_io = io.BytesIO(df_io.getvalue().encode('utf-8'))
  183. else:
  184. return abort(400)
  185. df_io.seek(0, io.SEEK_SET)
  186. return send_file(df_io, attachment_filename=f"{user.user}.henglish.{file_type}", as_attachment=True)
  187. @test.route("/reset/user", methods=["POST"])
  188. @login_required
  189. @form_required(ResetDeleteForm, lambda form: __load_question(SearchForm(), form, UploadFileForm()))
  190. def reset_user():
  191. user: UserWordDataBase = current_user
  192. if not user.check_passwd(g.form.passwd.data):
  193. flash("Passwd error.")
  194. else:
  195. flash("User reset")
  196. user.reset()
  197. return redirect(url_for("test.question"))
  198. @test.route("/delete/user", methods=["POST"])
  199. @login_required
  200. @form_required(ResetDeleteForm, lambda form: __load_question(SearchForm(), form, UploadFileForm()))
  201. def delete_user():
  202. delete_form: ResetDeleteForm = g.form
  203. user: UserWordDataBase = current_user
  204. if not user.check_passwd(delete_form.passwd.data):
  205. flash("Passwd error.")
  206. else:
  207. flash("User delete")
  208. user.delete_user()
  209. logout_user()
  210. return redirect(url_for("home.index"))
  211. return redirect(url_for("test.question"))
  212. @test.route("/reset/passwd", methods=["POST"])
  213. @login_required
  214. @form_required(ResetDeleteForm, lambda form: __load_question(SearchForm(), form, UploadFileForm()))
  215. def reset_passwd():
  216. reset_form: ResetDeleteForm = g.form
  217. if len(reset_form.new_passwd.data) < 4 or len(reset_form.new_passwd.data) > 32:
  218. flash("Please enter a password of length 4-32")
  219. else:
  220. user: UserWordDataBase = current_user
  221. if not user.check_passwd(reset_form.passwd.data):
  222. flash("Passwd error.")
  223. else:
  224. flash("User passwd reset")
  225. user.set_passwd(reset_form.new_passwd.data)
  226. logout_user()
  227. return redirect(url_for("home.index"))
  228. return redirect(url_for("test.question"))
  229. class Upload(Thread):
  230. upload = {}
  231. def __init__(self, user: UserWordDataBase, file: List[str]):
  232. super(Upload, self).__init__()
  233. self.user: UserWordDataBase = user
  234. self.file = file
  235. self.upload[user.user] = self
  236. self.daemon = True
  237. def run(self):
  238. for i in self.file:
  239. self.user.import_txt(i)
  240. @test.route("/upload", methods=["POST"])
  241. @login_required
  242. @form_required(UploadFileForm, lambda form: __load_question(SearchForm(), ResetDeleteForm(), form))
  243. def upload():
  244. file = request.files["file"]
  245. user = current_user._get_current_object()
  246. job: Upload = Upload.upload.get(user.user)
  247. if job is not None and job.is_alive():
  248. flash("Please wait for the current task to complete")
  249. return abort(423)
  250. Upload(user, file.stream.read().decode('utf-8').split('\n')).start()
  251. flash("File is being processed")
  252. return redirect(url_for("test.question"))
  253. @test.route("/introduce")
  254. def introduce():
  255. return render_template("introduce.html")