app.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import os
  2. import sys
  3. from flask import Flask, url_for, request, current_app, render_template, Response, jsonify
  4. from flask_mail import Mail
  5. from flask_login import LoginManager, current_user
  6. from flask_moment import Moment
  7. from flask.logging import default_handler
  8. from typing import Optional, Union
  9. import logging.handlers
  10. import logging
  11. from bs4 import BeautifulSoup
  12. from configure import conf
  13. from object.user import AnonymousUser, User
  14. from app.cache import cache
  15. from app.http_auth import http_auth
  16. if conf["DEBUG_PROFILE"]:
  17. from werkzeug.middleware.profiler import ProfilerMiddleware
  18. class HBlogFlask(Flask):
  19. def __init__(self, import_name: str, *args, **kwargs):
  20. super(HBlogFlask, self).__init__(import_name, *args, **kwargs)
  21. self.about_me = ""
  22. self.update_configure()
  23. if conf["DEBUG_PROFILE"]:
  24. self.wsgi_app = ProfilerMiddleware(self.wsgi_app, sort_by=("cumtime",))
  25. self.login_manager = LoginManager()
  26. self.login_manager.init_app(self)
  27. self.login_manager.anonymous_user = AnonymousUser # 设置未登录的匿名对象
  28. self.login_manager.login_view = "auth.login_page"
  29. self.mail = Mail(self)
  30. self.moment = Moment(self)
  31. self.cache = cache
  32. self.cache.init_app(self)
  33. self.http_auth = http_auth
  34. self.logger.removeHandler(default_handler)
  35. self.logger.setLevel(conf["LOG_LEVEL"])
  36. self.logger.propagate = False
  37. if len(conf["LOG_HOME"]) > 0:
  38. handle = logging.handlers.TimedRotatingFileHandler(
  39. os.path.join(conf["LOG_HOME"], f"flask.log"), backupCount=10)
  40. handle.setFormatter(logging.Formatter(conf["LOG_FORMAT"]))
  41. self.logger.addHandler(handle)
  42. if conf["LOG_STDERR"]:
  43. handle = logging.StreamHandler(sys.stderr)
  44. handle.setFormatter(logging.Formatter(conf["LOG_FORMAT"]))
  45. self.logger.addHandler(handle)
  46. @self.login_manager.user_loader
  47. def user_loader(email: str):
  48. user = User(email)
  49. if user.info.id == -1:
  50. return None
  51. return user
  52. for i in [400, 401, 403, 404, 405, 408, 410, 413, 414, 423, 500, 501, 502]:
  53. def create_error_handle(status):
  54. def error_handle(e):
  55. self.print_load_page_log(status)
  56. if "/api" in request.base_url:
  57. rsp = jsonify({"status": status, "error": str(e)})
  58. rsp.status_code = status
  59. return rsp
  60. data = render_template('error.html', error_code=status, error_info=e)
  61. return Response(response=data, status=status)
  62. return error_handle
  63. self.errorhandler(i)(create_error_handle(i))
  64. def register_all_blueprint(self):
  65. import app.index as index
  66. import app.archive as archive
  67. import app.docx as docx
  68. import app.msg as msg
  69. import app.oss as oss
  70. import app.auth as auth
  71. import app.about_me as about_me
  72. import app.api as api
  73. self.register_blueprint(index.index, url_prefix="/")
  74. self.register_blueprint(archive.archive, url_prefix="/archive")
  75. self.register_blueprint(docx.docx, url_prefix="/docx")
  76. self.register_blueprint(msg.msg, url_prefix="/msg")
  77. self.register_blueprint(auth.auth, url_prefix="/auth")
  78. self.register_blueprint(about_me.about_me, url_prefix="/about")
  79. self.register_blueprint(oss.oss, url_prefix="/oss")
  80. self.register_blueprint(api.api, url_prefix="/api")
  81. def update_configure(self):
  82. """ 更新配置 """
  83. self.config.update(conf)
  84. about_me_page = conf["ABOUT_ME_PAGE"]
  85. if len(about_me_page) > 0 and os.path.exists(about_me_page):
  86. with open(about_me_page, "r", encoding='utf-8') as f:
  87. bs = BeautifulSoup(f.read(), "html.parser")
  88. self.about_me = str(bs.find("body").find("div", class_="about-me")) # 提取about-me部分的内容
  89. @staticmethod
  90. def get_max_page(count: int, count_page: int):
  91. """ 计算页码数 (共计count个元素, 每页count_page个元素) """
  92. return (count // count_page) + (0 if count % count_page == 0 else 1)
  93. @staticmethod
  94. def get_page(url, page: int, count: int):
  95. """ 计算页码的按钮 """
  96. if count <= 9:
  97. page_list = [[i + 1, url_for(url, page=i + 1)] for i in range(count)]
  98. elif page <= 5:
  99. """
  100. [1][2][3][4][5][6][...][count - 1][count]
  101. """
  102. page_list = [[i + 1, url_for(url, page=i + 1)] for i in range(6)]
  103. page_list += [None, [count - 1, url_for(url, page=count - 1)], [count, url_for(url, page=count)]]
  104. elif page >= count - 5:
  105. """
  106. [1][2][...][count - 5][count - 4][count - 3][count - 2][count - 1][count]
  107. """
  108. page_list: Optional[list] = [[1, url_for(url, page=1)], [2, url_for(url, page=2)], None]
  109. page_list += [[count - 5 + i, url_for(url, page=count - 5 + i)] for i in range(6)]
  110. else:
  111. """
  112. [1][2][...][page - 2][page - 1][page][page + 1][page + 2][...][count - 1][count]
  113. """
  114. page_list: Optional[list] = [[1, url_for(url, page=1)], [2, url_for(url, page=2)], None]
  115. page_list += [[page - 2 + i, url_for(url, page=page - 2 + i)] for i in range(5)]
  116. page_list += [None, [count - 1, url_for(url, page=count - 1)], [count, url_for(url, page=count)]]
  117. return page_list
  118. @staticmethod
  119. def __get_log_request_info():
  120. return (f"user: '{current_user.email}' "
  121. f"url: '{request.url}' blueprint: '{request.blueprint}' "
  122. f"args: {request.args} form: {request.form} "
  123. f"accept_encodings: '{request.accept_encodings}' "
  124. f"accept_charsets: '{request.accept_charsets}' "
  125. f"accept_mimetypes: '{request.accept_mimetypes}' "
  126. f"accept_languages: '{request.accept_languages}'")
  127. @staticmethod
  128. def print_load_page_log(page: str):
  129. current_app.logger.debug(
  130. f"[{request.method}] Load - '{page}' " + HBlogFlask.__get_log_request_info())
  131. @staticmethod
  132. def print_form_error_log(opt: str):
  133. current_app.logger.warning(
  134. f"[{request.method}] '{opt}' - Bad form " + HBlogFlask.__get_log_request_info())
  135. @staticmethod
  136. def print_sys_opt_fail_log(opt: str):
  137. current_app.logger.error(
  138. f"[{request.method}] System {opt} - fail " + HBlogFlask.__get_log_request_info())
  139. @staticmethod
  140. def print_sys_opt_success_log(opt: str):
  141. current_app.logger.warning(
  142. f"[{request.method}] System {opt} - success " + HBlogFlask.__get_log_request_info())
  143. @staticmethod
  144. def print_user_opt_fail_log(opt: str):
  145. current_app.logger.debug(
  146. f"[{request.method}] User {opt} - fail " + HBlogFlask.__get_log_request_info())
  147. @staticmethod
  148. def print_user_opt_success_log(opt: str):
  149. current_app.logger.debug(
  150. f"[{request.method}] User {opt} - success " + HBlogFlask.__get_log_request_info())
  151. @staticmethod
  152. def print_user_opt_error_log(opt: str):
  153. current_app.logger.warning(
  154. f"[{request.method}] User {opt} - system fail " + HBlogFlask.__get_log_request_info())
  155. @staticmethod
  156. def print_import_user_opt_success_log(opt: str):
  157. current_app.logger.info(
  158. f"[{request.method}] User {opt} - success " + HBlogFlask.__get_log_request_info())
  159. @staticmethod
  160. def print_user_not_allow_opt_log(opt: str):
  161. current_app.logger.info(
  162. f"[{request.method}] User '{opt}' - reject " + HBlogFlask.__get_log_request_info())
  163. Hblog = Union[HBlogFlask, Flask]