app.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. import os
  2. import sys
  3. from flask import Flask, url_for, request, current_app, render_template, Response
  4. from flask_mail import Mail
  5. from flask_login import LoginManager, current_user
  6. from flask.logging import default_handler
  7. from typing import Optional, Union
  8. import logging.handlers
  9. import logging
  10. from bs4 import BeautifulSoup
  11. from configure import conf
  12. from object.user import AnonymousUser, User
  13. from .index import index
  14. from .archive import archive
  15. from .docx import docx
  16. from .msg import msg
  17. from .oss import oss
  18. from .auth import auth
  19. from .about_me import about_me
  20. class HBlogFlask(Flask):
  21. def __init__(self, import_name: str, *args, **kwargs):
  22. super(HBlogFlask, self).__init__(import_name, *args, **kwargs)
  23. self.about_me = ""
  24. self.update_configure()
  25. self.register_blueprint(index, url_prefix="/")
  26. self.register_blueprint(archive, url_prefix="/archive")
  27. self.register_blueprint(docx, url_prefix="/docx")
  28. self.register_blueprint(msg, url_prefix="/msg")
  29. self.register_blueprint(auth, url_prefix="/auth")
  30. self.register_blueprint(about_me, url_prefix="/about")
  31. self.register_blueprint(oss, url_prefix="/oss")
  32. self.login_manager = LoginManager()
  33. self.login_manager.init_app(self)
  34. self.login_manager.anonymous_user = AnonymousUser # 设置未登录的匿名对象
  35. self.login_manager.login_view = "auth.login_page"
  36. self.mail = Mail(self)
  37. self.logger.removeHandler(default_handler)
  38. self.logger.setLevel(conf["LOG_LEVEL"])
  39. self.logger.propagate = False
  40. if len(conf["LOG_HOME"]) > 0:
  41. handle = logging.handlers.TimedRotatingFileHandler(
  42. os.path.join(conf["LOG_HOME"], f"flask.log"), backupCount=10)
  43. handle.setFormatter(logging.Formatter(conf["LOG_FORMAT"]))
  44. self.logger.addHandler(handle)
  45. if conf["LOG_STDERR"]:
  46. handle = logging.StreamHandler(sys.stderr)
  47. handle.setFormatter(logging.Formatter(conf["LOG_FORMAT"]))
  48. self.logger.addHandler(handle)
  49. @self.login_manager.user_loader
  50. def user_loader(email: str):
  51. user = User(email)
  52. if user.info[2] == -1:
  53. return None
  54. return user
  55. func = {"render_template": render_template, "Response": Response, "self": self}
  56. for i in [400, 401, 403, 404, 405, 408, 410, 413, 414, 423, 500, 501, 502]:
  57. exec(f"def error_{i}(e):\n"
  58. f"\tself.print_load_page_log('{i}')\n"
  59. f"\tdata = render_template('error.html', error_code='{i}', error_info=e)\n"
  60. f"\treturn Response(response=data, status={i})", func)
  61. self.errorhandler(i)(func[f"error_{i}"])
  62. def update_configure(self):
  63. self.config.update(conf)
  64. about_me_page = conf["ABOUT_ME_PAGE"]
  65. if len(about_me_page) > 0 and os.path.exists(about_me_page):
  66. with open(about_me_page, "r", encoding='utf-8') as f:
  67. bs = BeautifulSoup(f.read(), "html.parser")
  68. self.about_me = str(bs.find("body").find("div", class_="about-me"))
  69. @staticmethod
  70. def get_max_page(count: int, count_page: int):
  71. return (count // count_page) + (0 if count % count_page == 0 else 1)
  72. @staticmethod
  73. def get_page(url, page: int, count: int):
  74. if count <= 9:
  75. page_list = [[f"{i + 1}", url_for(url, page=i + 1)] for i in range(count)]
  76. elif page <= 5:
  77. """
  78. [1][2][3][4][5][6][...][count - 1][count]
  79. """
  80. page_list = [[f"{i + 1}", url_for(url, page=i + 1)] for i in range(6)]
  81. page_list += [None,
  82. [f"{count - 1}", url_for(url, page=count - 1)],
  83. [f"{count}", url_for(url, page=count)]]
  84. elif page >= count - 5:
  85. """
  86. [1][2][...][count - 5][count - 4][count - 3][count - 2][count - 1][count]
  87. """
  88. page_list: Optional[list] = [["1", url_for(url, page=1)],
  89. ["2", url_for(url, page=2)],
  90. None]
  91. page_list += [[f"{count - 5 + i}", url_for(url, page=count - 5 + i), False] for i in range(6)]
  92. else:
  93. """
  94. [1][2][...][page - 2][page - 1][page][page + 1][page + 2][...][count - 1][count]
  95. """
  96. page_list: Optional[list] = [["1", url_for(url, page=1)],
  97. ["2", url_for(url, page=2)],
  98. None]
  99. page_list += [[f"{page - 2 + i}", url_for(url, page=page - 2 + i)] for i in range(5)]
  100. page_list += [None,
  101. [f"{count - 1}", url_for(url, page=count - 1)],
  102. [f"{count}", url_for(url, page=count)]]
  103. return page_list
  104. @staticmethod
  105. def __get_log_request_info():
  106. return (f"user: '{current_user.email}' "
  107. f"url: '{request.url}' blueprint: '{request.blueprint}' "
  108. f"args: {request.args} form: {request.form} "
  109. f"accept_encodings: '{request.accept_encodings}' "
  110. f"accept_charsets: '{request.accept_charsets}' "
  111. f"accept_mimetypes: '{request.accept_mimetypes}' "
  112. f"accept_languages: '{request.accept_languages}'")
  113. @staticmethod
  114. def print_load_page_log(page: str):
  115. current_app.logger.debug(
  116. f"[{request.method}] Load - '{page}' " + HBlogFlask.__get_log_request_info())
  117. @staticmethod
  118. def print_form_error_log(opt: str):
  119. current_app.logger.warning(
  120. f"[{request.method}] '{opt}' - Bad form " + HBlogFlask.__get_log_request_info())
  121. @staticmethod
  122. def print_sys_opt_fail_log(opt: str):
  123. current_app.logger.error(
  124. f"[{request.method}] System {opt} - fail " + HBlogFlask.__get_log_request_info())
  125. @staticmethod
  126. def print_sys_opt_success_log(opt: str):
  127. current_app.logger.warning(
  128. f"[{request.method}] System {opt} - success " + HBlogFlask.__get_log_request_info())
  129. @staticmethod
  130. def print_user_opt_fail_log(opt: str):
  131. current_app.logger.debug(
  132. f"[{request.method}] User {opt} - fail " + HBlogFlask.__get_log_request_info())
  133. @staticmethod
  134. def print_user_opt_success_log(opt: str):
  135. current_app.logger.debug(
  136. f"[{request.method}] User {opt} - success " + HBlogFlask.__get_log_request_info())
  137. @staticmethod
  138. def print_user_opt_error_log(opt: str):
  139. current_app.logger.warning(
  140. f"[{request.method}] User {opt} - system fail " + HBlogFlask.__get_log_request_info())
  141. @staticmethod
  142. def print_import_user_opt_success_log(opt: str):
  143. current_app.logger.info(
  144. f"[{request.method}] User {opt} - success " + HBlogFlask.__get_log_request_info())
  145. @staticmethod
  146. def print_user_not_allow_opt_log(opt: str):
  147. current_app.logger.info(
  148. f"[{request.method}] User '{opt}' - reject " + HBlogFlask.__get_log_request_info())
  149. Hblog = Union[HBlogFlask, Flask]