test.py 7.0 KB


  1. from flask import blueprints, render_template, current_app, abort, redirect, url_for, flash, make_response, request
  2. from flask import send_file
  3. from flask_login import current_user, login_required, logout_user
  4. from flask_wtf import FlaskForm
  5. from wtforms import StringField, SubmitField, BooleanField, PasswordField
  6. from wtforms.validators import DataRequired, Length
  7. from app.user import UserWordDataBase
  8. from itsdangerous import URLSafeTimedSerializer
  9. from itsdangerous.exc import BadData
  10. from threading import Thread
  11. from typing import Optional
  12. from core.word import Word
  13. import io
  14. test = blueprints.Blueprint("test", __name__)
  15. class SearchForm(FlaskForm):
  16. search = StringField("Word", validators=[DataRequired(), Length(1, 50)])
  17. from_internet = BooleanField("Internet")
  18. add_to_db = BooleanField("Add")
  19. submit = SubmitField("Search")
  20. class ResetDeleteForm(FlaskForm):
  21. name = StringField("User name", validators=[DataRequired(), Length(1, 32)])
  22. passwd = PasswordField("Passwd", validators=[DataRequired(), Length(4, 32)])
  23. submit = SubmitField("Submit")
  24. def __load_word(word):
  25. user: UserWordDataBase = current_user
  26. box, box_sum = user.get_box_count()
  27. search_from = SearchForm()
  28. reset_delete_form = ResetDeleteForm()
  29. if word is None:
  30. return render_template("test.html", word=word, len=len,
  31. box=box, box_sum=box_sum,
  32. search=search_from, have_word=False, reset_delete=reset_delete_form)
  33. serializer = URLSafeTimedSerializer(current_app.config["SECRET_KEY"])
  34. word_id = serializer.dumps({"word": word.name})
  35. return render_template("test.html", word=word, len=len,
  36. word_id=word_id, box=box, box_sum=box_sum,
  37. search=search_from, have_word=True, reset_delete=reset_delete_form)
  38. @test.route("/")
  39. @login_required
  40. def question():
  41. word = current_user.rand_word()
  42. return __load_word(word)
  43. @test.route("/right/<string:word_id>")
  44. @login_required
  45. def right(word_id: str):
  46. serializer = URLSafeTimedSerializer(current_app.config["SECRET_KEY"])
  47. try:
  48. word: dict = serializer.loads(word_id, max_age=120) # 120s内生效
  49. user: UserWordDataBase = current_user
  50. user.right_word(word["word"])
  51. except BadData:
  52. flash(f"Timeout for confirm word")
  53. abort(408)
  54. except KeyError:
  55. abort(404)
  56. return redirect(url_for("test.question"))
  57. @test.route("/wrong/<string:word_id>")
  58. @login_required
  59. def wrong(word_id: str):
  60. serializer = URLSafeTimedSerializer(current_app.config["SECRET_KEY"])
  61. try:
  62. word: dict = serializer.loads(word_id, max_age=120) # 120s内生效
  63. user: UserWordDataBase = current_user
  64. user.wrong_word(word["word"])
  65. except BadData:
  66. flash(f"Timeout for confirm word")
  67. abort(408)
  68. except KeyError:
  69. abort(404)
  70. return redirect(url_for("test.question"))
  71. @test.route("/delete/<string:word>")
  72. @login_required
  73. def delete(word: str):
  74. user: UserWordDataBase = current_user
  75. user.delete_txt(word)
  76. flash(f"Word '{word}' is deleted.")
  77. return redirect(url_for("test.question"))
  78. @test.route("/download/<string:word>")
  79. @login_required
  80. def download(word: str):
  81. user: UserWordDataBase = current_user
  82. w = user.find_word(word, False)
  83. if w is None:
  84. abort(404)
  85. w_str = f"{w.name}\n"
  86. for i in w.comment:
  87. comment = w.comment[i]
  88. w_str += f" {comment.part}\n {comment.english}\n {comment.chinese}\n"
  89. for a in comment.eg:
  90. e, c = a.split("##")
  91. w_str += f" {e}\n {c}\n"
  92. response = make_response(w_str)
  93. response.headers["Content-Disposition"] = f"attachment;filename={word}.henglish.txt"
  94. return response
  95. class Search(Thread):
  96. def __init__(self, user: UserWordDataBase, word: str, internet: bool, add: bool):
  97. super(Search, self).__init__()
  98. self.word: Optional[Word] = None
  99. self.word_str = word
  100. self.internet = internet
  101. self.add = add
  102. self.user = user
  103. self.daemon = True
  104. def run(self):
  105. self.word = self.user.find_word(self.word_str, self.internet, self.add)
  106. def wait_event(self) -> Optional[Word]:
  107. self.join(timeout=5)
  108. return self.word
  109. @test.route("/search", methods=["GET", "POST"])
  110. @login_required
  111. def search():
  112. form = SearchForm()
  113. if not form.validate_on_submit():
  114. word = request.args.get("word", "")
  115. if len(word) == 0:
  116. abort(400)
  117. user = current_user._get_current_object()
  118. th = Search(user, word, request.args.get("internet", 0) != '0', request.args.get("add", 0) != '0')
  119. th.start()
  120. word = th.wait_event()
  121. if th.is_alive():
  122. flash("Search timeout")
  123. return __load_word(word)
  124. return redirect(url_for("test.search",
  125. word=form.search.data, internet=int(form.from_internet.data), add=int(form.add_to_db.data)))
  126. @test.route("/download_table/<string:file_type>")
  127. @login_required
  128. def download_table(file_type: str):
  129. user: UserWordDataBase = current_user
  130. try:
  131. max_eg = int(request.args.get("max_eg", -1))
  132. except (ValueError, TypeError):
  133. return abort(400)
  134. df = user.export_frame(max_eg, file_type == "html")
  135. if file_type == "csv":
  136. df_io = io.BytesIO()
  137. df.to_csv(df_io)
  138. elif file_type == "xlsx":
  139. df_io = io.BytesIO()
  140. df.to_excel(df_io)
  141. elif file_type == "html":
  142. df_io = io.StringIO()
  143. df.to_html(df_io, escape=False)
  144. df_io = io.BytesIO(df_io.getvalue().encode('utf-8'))
  145. elif file_type == "json":
  146. df_io = io.BytesIO()
  147. df.to_json(df_io)
  148. elif file_type == "markdown":
  149. df_io = io.StringIO()
  150. df.to_markdown(df_io)
  151. df_io = io.BytesIO(df_io.getvalue().encode('utf-8'))
  152. elif file_type == "latex":
  153. df_io = io.StringIO()
  154. df.to_latex(df_io)
  155. df_io = io.BytesIO(df_io.getvalue().encode('utf-8'))
  156. else:
  157. return abort(400)
  158. df_io.seek(0, io.SEEK_SET)
  159. return send_file(df_io, attachment_filename=f"{user.user}.henglish.{file_type}", as_attachment=True)
  160. @test.route("/reset_user", methods=["POST"])
  161. @login_required
  162. def reset_user():
  163. reset_form = ResetDeleteForm()
  164. if reset_form.validate_on_submit():
  165. user: UserWordDataBase = current_user
  166. if not user.check_passwd(reset_form.passwd.data):
  167. flash("Passwd error.")
  168. else:
  169. flash("User reset")
  170. user.reset()
  171. return redirect(url_for("test.question"))
  172. abort(400)
  173. @test.route("/delete_user", methods=["POST"])
  174. @login_required
  175. def delete_user():
  176. delete_form = ResetDeleteForm()
  177. if delete_form.validate_on_submit():
  178. user: UserWordDataBase = current_user
  179. if not user.check_passwd(delete_form.passwd.data):
  180. flash("Passwd error.")
  181. else:
  182. flash("User reset")
  183. logout_user()
  184. user.delete_user()
  185. return redirect(url_for("test.question"))
  186. abort(400)