app.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import os
  2. import sys
  3. from flask import Flask, url_for, request, current_app, render_template, Response
  4. from flask_mail import Mail
  5. from flask_login import LoginManager, current_user
  6. from flask_moment import Moment
  7. from flask.logging import default_handler
  8. from typing import Optional, Union
  9. import logging.handlers
  10. import logging
  11. from bs4 import BeautifulSoup
  12. from configure import conf
  13. from object.user import AnonymousUser, User
  14. from app.cache import cache
  15. if conf["DEBUG_PROFILE"]:
  16. from werkzeug.middleware.profiler import ProfilerMiddleware
  17. class HBlogFlask(Flask):
  18. def __init__(self, import_name: str, *args, **kwargs):
  19. super(HBlogFlask, self).__init__(import_name, *args, **kwargs)
  20. self.about_me = ""
  21. self.update_configure()
  22. if conf["DEBUG_PROFILE"]:
  23. self.wsgi_app = ProfilerMiddleware(self.wsgi_app, sort_by=("cumtime",))
  24. self.login_manager = LoginManager()
  25. self.login_manager.init_app(self)
  26. self.login_manager.anonymous_user = AnonymousUser # 设置未登录的匿名对象
  27. self.login_manager.login_view = "auth.login_page"
  28. self.mail = Mail(self)
  29. self.moment = Moment(self)
  30. self.cache = cache
  31. self.cache.init_app(self)
  32. self.logger.removeHandler(default_handler)
  33. self.logger.setLevel(conf["LOG_LEVEL"])
  34. self.logger.propagate = False
  35. if len(conf["LOG_HOME"]) > 0:
  36. handle = logging.handlers.TimedRotatingFileHandler(
  37. os.path.join(conf["LOG_HOME"], f"flask.log"), backupCount=10)
  38. handle.setFormatter(logging.Formatter(conf["LOG_FORMAT"]))
  39. self.logger.addHandler(handle)
  40. if conf["LOG_STDERR"]:
  41. handle = logging.StreamHandler(sys.stderr)
  42. handle.setFormatter(logging.Formatter(conf["LOG_FORMAT"]))
  43. self.logger.addHandler(handle)
  44. @self.login_manager.user_loader
  45. def user_loader(email: str):
  46. user = User(email)
  47. if user.info.id == -1:
  48. return None
  49. return user
  50. func = {"render_template": render_template, "Response": Response, "self": self}
  51. for i in [400, 401, 403, 404, 405, 408, 410, 413, 414, 423, 500, 501, 502]:
  52. exec(f"def error_{i}(e):\n"
  53. f"\tself.print_load_page_log('{i}')\n"
  54. f"\tdata = render_template('error.html', error_code='{i}', error_info=e)\n"
  55. f"\treturn Response(response=data, status={i})", func)
  56. self.errorhandler(i)(func[f"error_{i}"])
  57. def register_all_blueprint(self):
  58. import app.index as index
  59. import app.archive as archive
  60. import app.docx as docx
  61. import app.msg as msg
  62. import app.oss as oss
  63. import app.auth as auth
  64. import app.about_me as about_me
  65. self.register_blueprint(index.index, url_prefix="/")
  66. self.register_blueprint(archive.archive, url_prefix="/archive")
  67. self.register_blueprint(docx.docx, url_prefix="/docx")
  68. self.register_blueprint(msg.msg, url_prefix="/msg")
  69. self.register_blueprint(auth.auth, url_prefix="/auth")
  70. self.register_blueprint(about_me.about_me, url_prefix="/about")
  71. self.register_blueprint(oss.oss, url_prefix="/oss")
  72. def update_configure(self):
  73. """ 更新配置 """
  74. self.config.update(conf)
  75. about_me_page = conf["ABOUT_ME_PAGE"]
  76. if len(about_me_page) > 0 and os.path.exists(about_me_page):
  77. with open(about_me_page, "r", encoding='utf-8') as f:
  78. bs = BeautifulSoup(f.read(), "html.parser")
  79. self.about_me = str(bs.find("body").find("div", class_="about-me")) # 提取about-me部分的内容
  80. @staticmethod
  81. def get_max_page(count: int, count_page: int):
  82. """ 计算页码数 (共计count个元素, 每页count_page个元素) """
  83. return (count // count_page) + (0 if count % count_page == 0 else 1)
  84. @staticmethod
  85. def get_page(url, page: int, count: int):
  86. """ 计算页码的按钮 """
  87. if count <= 9:
  88. page_list = [[i + 1, url_for(url, page=i + 1)] for i in range(count)]
  89. elif page <= 5:
  90. """
  91. [1][2][3][4][5][6][...][count - 1][count]
  92. """
  93. page_list = [[i + 1, url_for(url, page=i + 1)] for i in range(6)]
  94. page_list += [None, [count - 1, url_for(url, page=count - 1)], [count, url_for(url, page=count)]]
  95. elif page >= count - 5:
  96. """
  97. [1][2][...][count - 5][count - 4][count - 3][count - 2][count - 1][count]
  98. """
  99. page_list: Optional[list] = [[1, url_for(url, page=1)], [2, url_for(url, page=2)], None]
  100. page_list += [[count - 5 + i, url_for(url, page=count - 5 + i)] for i in range(6)]
  101. else:
  102. """
  103. [1][2][...][page - 2][page - 1][page][page + 1][page + 2][...][count - 1][count]
  104. """
  105. page_list: Optional[list] = [[1, url_for(url, page=1)], [2, url_for(url, page=2)], None]
  106. page_list += [[page - 2 + i, url_for(url, page=page - 2 + i)] for i in range(5)]
  107. page_list += [None, [count - 1, url_for(url, page=count - 1)], [count, url_for(url, page=count)]]
  108. return page_list
  109. @staticmethod
  110. def __get_log_request_info():
  111. return (f"user: '{current_user.email}' "
  112. f"url: '{request.url}' blueprint: '{request.blueprint}' "
  113. f"args: {request.args} form: {request.form} "
  114. f"accept_encodings: '{request.accept_encodings}' "
  115. f"accept_charsets: '{request.accept_charsets}' "
  116. f"accept_mimetypes: '{request.accept_mimetypes}' "
  117. f"accept_languages: '{request.accept_languages}'")
  118. @staticmethod
  119. def print_load_page_log(page: str):
  120. current_app.logger.debug(
  121. f"[{request.method}] Load - '{page}' " + HBlogFlask.__get_log_request_info())
  122. @staticmethod
  123. def print_form_error_log(opt: str):
  124. current_app.logger.warning(
  125. f"[{request.method}] '{opt}' - Bad form " + HBlogFlask.__get_log_request_info())
  126. @staticmethod
  127. def print_sys_opt_fail_log(opt: str):
  128. current_app.logger.error(
  129. f"[{request.method}] System {opt} - fail " + HBlogFlask.__get_log_request_info())
  130. @staticmethod
  131. def print_sys_opt_success_log(opt: str):
  132. current_app.logger.warning(
  133. f"[{request.method}] System {opt} - success " + HBlogFlask.__get_log_request_info())
  134. @staticmethod
  135. def print_user_opt_fail_log(opt: str):
  136. current_app.logger.debug(
  137. f"[{request.method}] User {opt} - fail " + HBlogFlask.__get_log_request_info())
  138. @staticmethod
  139. def print_user_opt_success_log(opt: str):
  140. current_app.logger.debug(
  141. f"[{request.method}] User {opt} - success " + HBlogFlask.__get_log_request_info())
  142. @staticmethod
  143. def print_user_opt_error_log(opt: str):
  144. current_app.logger.warning(
  145. f"[{request.method}] User {opt} - system fail " + HBlogFlask.__get_log_request_info())
  146. @staticmethod
  147. def print_import_user_opt_success_log(opt: str):
  148. current_app.logger.info(
  149. f"[{request.method}] User {opt} - success " + HBlogFlask.__get_log_request_info())
  150. @staticmethod
  151. def print_user_not_allow_opt_log(opt: str):
  152. current_app.logger.info(
  153. f"[{request.method}] User '{opt}' - reject " + HBlogFlask.__get_log_request_info())
  154. Hblog = Union[HBlogFlask, Flask]