app.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. import os
  2. import sys
  3. from flask import Flask, url_for, request, current_app, render_template, Response
  4. from flask_mail import Mail
  5. from flask_login import LoginManager, current_user
  6. from flask.logging import default_handler
  7. from typing import Optional, Union
  8. import logging.handlers
  9. import logging
  10. from bs4 import BeautifulSoup
  11. from configure import conf
  12. from object.user import AnonymousUser, User
  13. from .index import index
  14. from .archive import archive
  15. from .docx import docx
  16. from .msg import msg
  17. from .oss import oss
  18. from .auth import auth
  19. from .about_me import about_me
  20. if conf["DEBUG_PROFILE"]:
  21. from werkzeug.middleware.profiler import ProfilerMiddleware
  22. class HBlogFlask(Flask):
  23. def __init__(self, import_name: str, *args, **kwargs):
  24. super(HBlogFlask, self).__init__(import_name, *args, **kwargs)
  25. self.about_me = ""
  26. self.update_configure()
  27. if conf["DEBUG_PROFILE"]:
  28. self.wsgi_app = ProfilerMiddleware(self.wsgi_app)
  29. self.register_blueprint(index, url_prefix="/")
  30. self.register_blueprint(archive, url_prefix="/archive")
  31. self.register_blueprint(docx, url_prefix="/docx")
  32. self.register_blueprint(msg, url_prefix="/msg")
  33. self.register_blueprint(auth, url_prefix="/auth")
  34. self.register_blueprint(about_me, url_prefix="/about")
  35. self.register_blueprint(oss, url_prefix="/oss")
  36. self.login_manager = LoginManager()
  37. self.login_manager.init_app(self)
  38. self.login_manager.anonymous_user = AnonymousUser # 设置未登录的匿名对象
  39. self.login_manager.login_view = "auth.login_page"
  40. self.mail = Mail(self)
  41. self.logger.removeHandler(default_handler)
  42. self.logger.setLevel(conf["LOG_LEVEL"])
  43. self.logger.propagate = False
  44. if len(conf["LOG_HOME"]) > 0:
  45. handle = logging.handlers.TimedRotatingFileHandler(
  46. os.path.join(conf["LOG_HOME"], f"flask.log"), backupCount=10)
  47. handle.setFormatter(logging.Formatter(conf["LOG_FORMAT"]))
  48. self.logger.addHandler(handle)
  49. if conf["LOG_STDERR"]:
  50. handle = logging.StreamHandler(sys.stderr)
  51. handle.setFormatter(logging.Formatter(conf["LOG_FORMAT"]))
  52. self.logger.addHandler(handle)
  53. @self.login_manager.user_loader
  54. def user_loader(email: str):
  55. user = User(email)
  56. if user.info.id == -1:
  57. return None
  58. return user
  59. func = {"render_template": render_template, "Response": Response, "self": self}
  60. for i in [400, 401, 403, 404, 405, 408, 410, 413, 414, 423, 500, 501, 502]:
  61. exec(f"def error_{i}(e):\n"
  62. f"\tself.print_load_page_log('{i}')\n"
  63. f"\tdata = render_template('error.html', error_code='{i}', error_info=e)\n"
  64. f"\treturn Response(response=data, status={i})", func)
  65. self.errorhandler(i)(func[f"error_{i}"])
  66. def update_configure(self):
  67. """ 更新配置 """
  68. self.config.update(conf)
  69. about_me_page = conf["ABOUT_ME_PAGE"]
  70. if len(about_me_page) > 0 and os.path.exists(about_me_page):
  71. with open(about_me_page, "r", encoding='utf-8') as f:
  72. bs = BeautifulSoup(f.read(), "html.parser")
  73. self.about_me = str(bs.find("body").find("div", class_="about-me")) # 提取about-me部分的内容
  74. @staticmethod
  75. def get_max_page(count: int, count_page: int):
  76. """ 计算页码数 (共计count个元素, 每页count_page个元素) """
  77. return (count // count_page) + (0 if count % count_page == 0 else 1)
  78. @staticmethod
  79. def get_page(url, page: int, count: int):
  80. """ 计算页码的按钮 """
  81. if count <= 9:
  82. page_list = [[i + 1, url_for(url, page=i + 1)] for i in range(count)]
  83. elif page <= 5:
  84. """
  85. [1][2][3][4][5][6][...][count - 1][count]
  86. """
  87. page_list = [[i + 1, url_for(url, page=i + 1)] for i in range(6)]
  88. page_list += [None, [count - 1, url_for(url, page=count - 1)], [count, url_for(url, page=count)]]
  89. elif page >= count - 5:
  90. """
  91. [1][2][...][count - 5][count - 4][count - 3][count - 2][count - 1][count]
  92. """
  93. page_list: Optional[list] = [[1, url_for(url, page=1)], [2, url_for(url, page=2)], None]
  94. page_list += [[count - 5 + i, url_for(url, page=count - 5 + i)] for i in range(6)]
  95. else:
  96. """
  97. [1][2][...][page - 2][page - 1][page][page + 1][page + 2][...][count - 1][count]
  98. """
  99. page_list: Optional[list] = [[1, url_for(url, page=1)], [2, url_for(url, page=2)], None]
  100. page_list += [[page - 2 + i, url_for(url, page=page - 2 + i)] for i in range(5)]
  101. page_list += [None, [count - 1, url_for(url, page=count - 1)], [count, url_for(url, page=count)]]
  102. return page_list
  103. @staticmethod
  104. def __get_log_request_info():
  105. return (f"user: '{current_user.email}' "
  106. f"url: '{request.url}' blueprint: '{request.blueprint}' "
  107. f"args: {request.args} form: {request.form} "
  108. f"accept_encodings: '{request.accept_encodings}' "
  109. f"accept_charsets: '{request.accept_charsets}' "
  110. f"accept_mimetypes: '{request.accept_mimetypes}' "
  111. f"accept_languages: '{request.accept_languages}'")
  112. @staticmethod
  113. def print_load_page_log(page: str):
  114. current_app.logger.debug(
  115. f"[{request.method}] Load - '{page}' " + HBlogFlask.__get_log_request_info())
  116. @staticmethod
  117. def print_form_error_log(opt: str):
  118. current_app.logger.warning(
  119. f"[{request.method}] '{opt}' - Bad form " + HBlogFlask.__get_log_request_info())
  120. @staticmethod
  121. def print_sys_opt_fail_log(opt: str):
  122. current_app.logger.error(
  123. f"[{request.method}] System {opt} - fail " + HBlogFlask.__get_log_request_info())
  124. @staticmethod
  125. def print_sys_opt_success_log(opt: str):
  126. current_app.logger.warning(
  127. f"[{request.method}] System {opt} - success " + HBlogFlask.__get_log_request_info())
  128. @staticmethod
  129. def print_user_opt_fail_log(opt: str):
  130. current_app.logger.debug(
  131. f"[{request.method}] User {opt} - fail " + HBlogFlask.__get_log_request_info())
  132. @staticmethod
  133. def print_user_opt_success_log(opt: str):
  134. current_app.logger.debug(
  135. f"[{request.method}] User {opt} - success " + HBlogFlask.__get_log_request_info())
  136. @staticmethod
  137. def print_user_opt_error_log(opt: str):
  138. current_app.logger.warning(
  139. f"[{request.method}] User {opt} - system fail " + HBlogFlask.__get_log_request_info())
  140. @staticmethod
  141. def print_import_user_opt_success_log(opt: str):
  142. current_app.logger.info(
  143. f"[{request.method}] User {opt} - success " + HBlogFlask.__get_log_request_info())
  144. @staticmethod
  145. def print_user_not_allow_opt_log(opt: str):
  146. current_app.logger.info(
  147. f"[{request.method}] User '{opt}' - reject " + HBlogFlask.__get_log_request_info())
  148. Hblog = Union[HBlogFlask, Flask]