app.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. import os
  2. import sys
  3. from flask import Flask, url_for, request, current_app, render_template, Response
  4. from flask_mail import Mail
  5. from flask_login import LoginManager, current_user
  6. from flask.logging import default_handler
  7. from flask_caching import Cache
  8. from typing import Optional, Union
  9. import logging.handlers
  10. import logging
  11. from bs4 import BeautifulSoup
  12. from configure import conf
  13. from object.user import AnonymousUser, User
  14. if conf["DEBUG_PROFILE"]:
  15. from werkzeug.middleware.profiler import ProfilerMiddleware
  16. class HBlogFlask(Flask):
  17. def __init__(self, import_name: str, *args, **kwargs):
  18. super(HBlogFlask, self).__init__(import_name, *args, **kwargs)
  19. self.about_me = ""
  20. self.update_configure()
  21. if conf["DEBUG_PROFILE"]:
  22. self.wsgi_app = ProfilerMiddleware(self.wsgi_app, sort_by=("cumtime",))
  23. self.login_manager = LoginManager()
  24. self.login_manager.init_app(self)
  25. self.login_manager.anonymous_user = AnonymousUser # 设置未登录的匿名对象
  26. self.login_manager.login_view = "auth.login_page"
  27. self.mail = Mail(self)
  28. self.cache = Cache(config={
  29. 'CACHE_TYPE': 'RedisCache',
  30. 'CACHE_KEY_PREFIX': 'flask_cache:',
  31. 'CACHE_REDIS_URL': f'redis://{conf["CACHE_REDIS_NAME"]}:{conf["CACHE_REDIS_PASSWD"]}@'
  32. f'{conf["CACHE_REDIS_HOST"]}:{conf["CACHE_REDIS_PORT"]}/{conf["CACHE_REDIS_DATABASE"]}'
  33. })
  34. self.cache.init_app(self)
  35. self.logger.removeHandler(default_handler)
  36. self.logger.setLevel(conf["LOG_LEVEL"])
  37. self.logger.propagate = False
  38. if len(conf["LOG_HOME"]) > 0:
  39. handle = logging.handlers.TimedRotatingFileHandler(
  40. os.path.join(conf["LOG_HOME"], f"flask.log"), backupCount=10)
  41. handle.setFormatter(logging.Formatter(conf["LOG_FORMAT"]))
  42. self.logger.addHandler(handle)
  43. if conf["LOG_STDERR"]:
  44. handle = logging.StreamHandler(sys.stderr)
  45. handle.setFormatter(logging.Formatter(conf["LOG_FORMAT"]))
  46. self.logger.addHandler(handle)
  47. @self.login_manager.user_loader
  48. def user_loader(email: str):
  49. user = User(email)
  50. if user.info.id == -1:
  51. return None
  52. return user
  53. func = {"render_template": render_template, "Response": Response, "self": self}
  54. for i in [400, 401, 403, 404, 405, 408, 410, 413, 414, 423, 500, 501, 502]:
  55. exec(f"def error_{i}(e):\n"
  56. f"\tself.print_load_page_log('{i}')\n"
  57. f"\tdata = render_template('error.html', error_code='{i}', error_info=e)\n"
  58. f"\treturn Response(response=data, status={i})", func)
  59. self.errorhandler(i)(func[f"error_{i}"])
  60. def register_all_blueprint(self):
  61. from .index import index
  62. from .archive import archive
  63. from .docx import docx
  64. from .msg import msg
  65. from .oss import oss
  66. from .auth import auth
  67. from .about_me import about_me
  68. self.register_blueprint(index, url_prefix="/")
  69. self.register_blueprint(archive, url_prefix="/archive")
  70. self.register_blueprint(docx, url_prefix="/docx")
  71. self.register_blueprint(msg, url_prefix="/msg")
  72. self.register_blueprint(auth, url_prefix="/auth")
  73. self.register_blueprint(about_me, url_prefix="/about")
  74. self.register_blueprint(oss, url_prefix="/oss")
  75. def update_configure(self):
  76. """ 更新配置 """
  77. self.config.update(conf)
  78. about_me_page = conf["ABOUT_ME_PAGE"]
  79. if len(about_me_page) > 0 and os.path.exists(about_me_page):
  80. with open(about_me_page, "r", encoding='utf-8') as f:
  81. bs = BeautifulSoup(f.read(), "html.parser")
  82. self.about_me = str(bs.find("body").find("div", class_="about-me")) # 提取about-me部分的内容
  83. @staticmethod
  84. def get_max_page(count: int, count_page: int):
  85. """ 计算页码数 (共计count个元素, 每页count_page个元素) """
  86. return (count // count_page) + (0 if count % count_page == 0 else 1)
  87. @staticmethod
  88. def get_page(url, page: int, count: int):
  89. """ 计算页码的按钮 """
  90. if count <= 9:
  91. page_list = [[i + 1, url_for(url, page=i + 1)] for i in range(count)]
  92. elif page <= 5:
  93. """
  94. [1][2][3][4][5][6][...][count - 1][count]
  95. """
  96. page_list = [[i + 1, url_for(url, page=i + 1)] for i in range(6)]
  97. page_list += [None, [count - 1, url_for(url, page=count - 1)], [count, url_for(url, page=count)]]
  98. elif page >= count - 5:
  99. """
  100. [1][2][...][count - 5][count - 4][count - 3][count - 2][count - 1][count]
  101. """
  102. page_list: Optional[list] = [[1, url_for(url, page=1)], [2, url_for(url, page=2)], None]
  103. page_list += [[count - 5 + i, url_for(url, page=count - 5 + i)] for i in range(6)]
  104. else:
  105. """
  106. [1][2][...][page - 2][page - 1][page][page + 1][page + 2][...][count - 1][count]
  107. """
  108. page_list: Optional[list] = [[1, url_for(url, page=1)], [2, url_for(url, page=2)], None]
  109. page_list += [[page - 2 + i, url_for(url, page=page - 2 + i)] for i in range(5)]
  110. page_list += [None, [count - 1, url_for(url, page=count - 1)], [count, url_for(url, page=count)]]
  111. return page_list
  112. @staticmethod
  113. def __get_log_request_info():
  114. return (f"user: '{current_user.email}' "
  115. f"url: '{request.url}' blueprint: '{request.blueprint}' "
  116. f"args: {request.args} form: {request.form} "
  117. f"accept_encodings: '{request.accept_encodings}' "
  118. f"accept_charsets: '{request.accept_charsets}' "
  119. f"accept_mimetypes: '{request.accept_mimetypes}' "
  120. f"accept_languages: '{request.accept_languages}'")
  121. @staticmethod
  122. def print_load_page_log(page: str):
  123. current_app.logger.debug(
  124. f"[{request.method}] Load - '{page}' " + HBlogFlask.__get_log_request_info())
  125. @staticmethod
  126. def print_form_error_log(opt: str):
  127. current_app.logger.warning(
  128. f"[{request.method}] '{opt}' - Bad form " + HBlogFlask.__get_log_request_info())
  129. @staticmethod
  130. def print_sys_opt_fail_log(opt: str):
  131. current_app.logger.error(
  132. f"[{request.method}] System {opt} - fail " + HBlogFlask.__get_log_request_info())
  133. @staticmethod
  134. def print_sys_opt_success_log(opt: str):
  135. current_app.logger.warning(
  136. f"[{request.method}] System {opt} - success " + HBlogFlask.__get_log_request_info())
  137. @staticmethod
  138. def print_user_opt_fail_log(opt: str):
  139. current_app.logger.debug(
  140. f"[{request.method}] User {opt} - fail " + HBlogFlask.__get_log_request_info())
  141. @staticmethod
  142. def print_user_opt_success_log(opt: str):
  143. current_app.logger.debug(
  144. f"[{request.method}] User {opt} - success " + HBlogFlask.__get_log_request_info())
  145. @staticmethod
  146. def print_user_opt_error_log(opt: str):
  147. current_app.logger.warning(
  148. f"[{request.method}] User {opt} - system fail " + HBlogFlask.__get_log_request_info())
  149. @staticmethod
  150. def print_import_user_opt_success_log(opt: str):
  151. current_app.logger.info(
  152. f"[{request.method}] User {opt} - success " + HBlogFlask.__get_log_request_info())
  153. @staticmethod
  154. def print_user_not_allow_opt_log(opt: str):
  155. current_app.logger.info(
  156. f"[{request.method}] User '{opt}' - reject " + HBlogFlask.__get_log_request_info())
  157. Hblog = Union[HBlogFlask, Flask]