app.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. import os
  2. import sys
  3. from flask import Flask, url_for, request, current_app, g, render_template
  4. from flask_mail import Mail
  5. from flask_login import LoginManager, current_user
  6. from flask.logging import default_handler
  7. from typing import Optional, Union
  8. import logging.handlers
  9. import logging
  10. from bs4 import BeautifulSoup
  11. from configure import conf
  12. from object.user import AnonymousUser, load_user_by_email
  13. from .index import index
  14. from .archive import archive
  15. from .docx import docx
  16. from .msg import msg
  17. from .oss import oss
  18. from .auth import auth
  19. from .about_me import about_me
  20. class HBlogFlask(Flask):
  21. def __init__(self, import_name: str, *args, **kwargs):
  22. super(HBlogFlask, self).__init__(import_name, *args, **kwargs)
  23. self.about_me = ""
  24. self.update_configure()
  25. self.register_blueprint(index, url_prefix="/")
  26. self.register_blueprint(archive, url_prefix="/archive")
  27. self.register_blueprint(docx, url_prefix="/docx")
  28. self.register_blueprint(msg, url_prefix="/msg")
  29. self.register_blueprint(auth, url_prefix="/auth")
  30. self.register_blueprint(about_me, url_prefix="/about-me")
  31. self.register_blueprint(oss, url_prefix="/oss")
  32. self.login_manager = LoginManager()
  33. self.login_manager.init_app(self)
  34. self.login_manager.anonymous_user = AnonymousUser # 设置未登录的匿名对象
  35. self.login_manager.login_view = "auth.login_page"
  36. self.mail = Mail(self)
  37. self.logger.removeHandler(default_handler)
  38. self.logger.setLevel(conf["LOG_LEVEL"])
  39. self.logger.propagate = False
  40. if len(conf["LOG_HOME"]) > 0:
  41. handle = logging.handlers.TimedRotatingFileHandler(
  42. os.path.join(conf["LOG_HOME"], f"flask-{os.getpid()}.log"))
  43. handle.setFormatter(logging.Formatter(conf["LOG_FORMAT"]))
  44. self.logger.addHandler(handle)
  45. if conf["LOG_STDERR"]:
  46. handle = logging.StreamHandler(sys.stderr)
  47. handle.setFormatter(logging.Formatter(conf["LOG_FORMAT"]))
  48. self.logger.addHandler(handle)
  49. @self.login_manager.user_loader
  50. def user_loader(email: str):
  51. return load_user_by_email(email)
  52. func = {"render_template": render_template, "self": self}
  53. for i in [400, 401, 403, 404, 405, 408, 410, 413, 414, 423, 500, 501, 502]:
  54. exec(f"def error_{i}(e):\n"
  55. f"\tself.print_load_page_log('{i}')\n"
  56. f"\treturn render_template('error.html', error_code='{i}', error_info=e)", func)
  57. self.errorhandler(i)(func[f"error_{i}"])
  58. def update_configure(self):
  59. self.config.update(conf)
  60. about_me_page = conf["ABOUT_ME_PAGE"]
  61. if len(about_me_page) > 0 and os.path.exists(about_me_page):
  62. with open(about_me_page, "r", encoding='utf-8') as f:
  63. bs = BeautifulSoup(f.read(), "html.parser")
  64. self.about_me = str(bs.find("body").find("div", class_="about-me"))
  65. @staticmethod
  66. def get_max_page(count: int, count_page: int):
  67. return (count // count_page) + (0 if count % count_page == 0 else 1)
  68. @staticmethod
  69. def get_page(url, page: int, count: int):
  70. if count <= 9:
  71. page_list = [[f"{i + 1}", url_for(url, page=i + 1)] for i in range(count)]
  72. elif page <= 5:
  73. """
  74. [1][2][3][4][5][6][...][count - 1][count]
  75. """
  76. page_list = [[f"{i + 1}", url_for(url, page=i + 1)] for i in range(6)]
  77. page_list += [None,
  78. [f"{count - 1}", url_for(url, page=count - 1)],
  79. [f"{count}", url_for(url, page=count)]]
  80. elif page >= count - 5:
  81. """
  82. [1][2][...][count - 5][count - 4][count - 3][count - 2][count - 1][count]
  83. """
  84. page_list: Optional[list] = [["1", url_for(url, page=1)],
  85. ["2", url_for(url, page=2)],
  86. None]
  87. page_list += [[f"{count - 5 + i}", url_for(url, page=count - 5 + i), False] for i in range(6)]
  88. else:
  89. """
  90. [1][2][...][page - 2][page - 1][page][page + 1][page + 2][...][count - 1][count]
  91. """
  92. page_list: Optional[list] = [["1", url_for(url, page=1)],
  93. ["2", url_for(url, page=2)],
  94. None]
  95. page_list += [[f"{page - 2 + i}", url_for(url, page=page - 2 + i)] for i in range(5)]
  96. page_list += [None,
  97. [f"{count - 1}", url_for(url, page=count - 1)],
  98. [f"{count}", url_for(url, page=count)]]
  99. return page_list
  100. @staticmethod
  101. def __get_log_request_info():
  102. return (f"user: '{current_user.email}' "
  103. f"url: '{request.url}' blueprint: '{request.blueprint}' "
  104. f"args: {request.args} form: {request.form} "
  105. f"accept_encodings: '{request.accept_encodings}' "
  106. f"accept_charsets: '{request.accept_charsets}' "
  107. f"accept_mimetypes: '{request.accept_mimetypes}' "
  108. f"accept_languages: '{request.accept_languages}'")
  109. @staticmethod
  110. def print_load_page_log(page: str):
  111. current_app.logger.debug(
  112. f"[{request.method}] Load - '{page}' " + HBlogFlask.__get_log_request_info())
  113. @staticmethod
  114. def print_form_error_log(opt: str):
  115. current_app.logger.warning(
  116. f"[{request.method}] '{opt}' - Bad form " + HBlogFlask.__get_log_request_info())
  117. @staticmethod
  118. def print_sys_opt_fail_log(opt: str):
  119. current_app.logger.error(
  120. f"[{request.method}] System {opt} - fail " + HBlogFlask.__get_log_request_info())
  121. @staticmethod
  122. def print_sys_opt_success_log(opt: str):
  123. current_app.logger.warning(
  124. f"[{request.method}] System {opt} - success " + HBlogFlask.__get_log_request_info())
  125. @staticmethod
  126. def print_user_opt_fail_log(opt: str):
  127. current_app.logger.debug(
  128. f"[{request.method}] User {opt} - fail " + HBlogFlask.__get_log_request_info())
  129. @staticmethod
  130. def print_user_opt_success_log(opt: str):
  131. current_app.logger.debug(
  132. f"[{request.method}] User {opt} - success " + HBlogFlask.__get_log_request_info())
  133. @staticmethod
  134. def print_user_opt_error_log(opt: str):
  135. current_app.logger.warning(
  136. f"[{request.method}] User {opt} - system fail " + HBlogFlask.__get_log_request_info())
  137. @staticmethod
  138. def print_import_user_opt_success_log(opt: str):
  139. current_app.logger.info(
  140. f"[{request.method}] User {opt} - success " + HBlogFlask.__get_log_request_info())
  141. @staticmethod
  142. def print_user_not_allow_opt_log(opt: str):
  143. current_app.logger.info(
  144. f"[{request.method}] User '{opt}' - reject " + HBlogFlask.__get_log_request_info())
  145. Hblog = Union[HBlogFlask, Flask]