email.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. from email import message_from_bytes
  2. import email.header
  3. import os
  4. import re
  5. import datetime
  6. import calendar
  7. class HTML:
  8. def __init__(self, body):
  9. self.body = body
  10. self.type = "text/html"
  11. class PLAIN:
  12. def __init__(self, body):
  13. self.body = body
  14. self.type = "text/plain"
  15. class Mail:
  16. date_pattern = re.compile(
  17. r"[A-Za-z]+, "
  18. r"([0-9]{1,2}) "
  19. r"([A-Za-z]+) "
  20. r"([0-9]{4}) "
  21. r"([0-9]{1,2}):([0-9]{1,2}):([0-9]{1,2}) "
  22. r"([\s\S]*)"
  23. )
  24. time_zone_pattern = re.compile(r"([+-])([0-9]{2})00")
  25. def __init__(self, num: str, data: bytes):
  26. self.__date_ = None
  27. self.byte = data
  28. self.num = num
  29. @property
  30. def msg_data(self): # 有需要的时候才加载
  31. if self.__date_:
  32. return self.__date_
  33. self.__date_ = message_from_bytes(self.byte)
  34. return self.__date_
  35. @property
  36. def from_addr(self):
  37. if not self.msg_data['From']:
  38. return ""
  39. return str(email.header.make_header(email.header.decode_header(self.msg_data['From'])))
  40. @property
  41. def date(self):
  42. if not self.msg_data['Date']:
  43. return datetime.datetime(2022, 1, 1)
  44. date = str(email.header.make_header(email.header.decode_header(self.msg_data['Date'])))
  45. res = self.date_pattern.match(str(date)).groups()
  46. time = datetime.datetime(int(res[2]),
  47. list(calendar.month_abbr).index(res[1]),
  48. int(res[0]),
  49. int(res[3]),
  50. int(res[4]),
  51. int(res[5]))
  52. timezone = self.time_zone_pattern.match(res[6])
  53. if timezone:
  54. if timezone.groups()[0] == '-':
  55. time += datetime.timedelta(hours=int(timezone.groups()[1]))
  56. else:
  57. time -= datetime.timedelta(hours=int(timezone.groups()[1]))
  58. time += datetime.timedelta(hours=8) # 转换为北京时间
  59. return time
  60. @property
  61. def title(self):
  62. if not self.msg_data['Subject']:
  63. return ""
  64. return (str(email.header.make_header(email.header.decode_header(self.msg_data['Subject'])))
  65. .replace('\n', '')
  66. .replace('\r', ''))
  67. @property
  68. def body(self):
  69. return self.__get_body(self.msg_data)
  70. def __get_body(self, msg):
  71. if msg.is_multipart():
  72. res = ""
  73. for i in msg.get_payload():
  74. res += self.__get_body(i)
  75. return res
  76. else:
  77. msg_type = msg.get_content_type()
  78. if msg_type == "text/plain":
  79. return "text/plain:\n" + msg.get_payload(decode=True).decode('utf-8') + "\n"
  80. elif msg_type == "text/html":
  81. return "text/html:\n" + msg.get_payload(decode=True).decode('utf-8') + "\n"
  82. else:
  83. return ""
  84. @property
  85. def body_list(self):
  86. return self.__get_body_list(self.msg_data)
  87. def __get_body_list(self, msg):
  88. if msg.is_multipart():
  89. res = []
  90. for i in msg.get_payload():
  91. son = self.__get_body_list(i)
  92. if son is not None:
  93. res += son
  94. return res
  95. else:
  96. msg_type = msg.get_content_type()
  97. if msg_type == "text/plain":
  98. return [PLAIN(msg.get_payload(decode=True).decode('utf-8'))]
  99. elif msg_type == "text/html":
  100. return [HTML(msg.get_payload(decode=True).decode('utf-8'))]
  101. else:
  102. return None
  103. def save_file(self, file_dir: str):
  104. return self.__get_files(self.msg_data, file_dir)
  105. @staticmethod
  106. def __get_files(msg, file_dir: str):
  107. create = False
  108. for part in msg.walk():
  109. if not create:
  110. os.makedirs(file_dir, exist_ok=True)
  111. create = True
  112. if part.get_content_maintype() == 'multipart':
  113. continue
  114. if part.get('Content-Disposition') is None:
  115. continue
  116. filename = part.get_filename()
  117. if filename:
  118. filepath = os.path.join(file_dir, filename)
  119. with open(filepath, 'wb') as f:
  120. f.write(part.get_payload(decode=True))
  121. def __lt__(self, other: "Mail"):
  122. return self.date < other.date
  123. def __eq__(self, other: "Mail"):
  124. return self.date == other.date
  125. def __le__(self, other: "Mail"):
  126. return self.date <= other.date
  127. def __str__(self):
  128. return f"{self.num} {self.title} {self.from_addr} {self.date}"