email.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. from email import message_from_bytes
  2. import email.header
  3. import os
  4. import re
  5. import datetime
  6. import calendar
  7. class Mail:
  8. date_pattern = re.compile(
  9. r"[A-Za-z]+, "
  10. r"([0-9]{1,2}) "
  11. r"([A-Za-z]+) "
  12. r"([0-9]{4}) "
  13. r"([0-9]{1,2}):([0-9]{1,2}):([0-9]{1,2}) "
  14. r"([\s\S]*)"
  15. )
  16. time_zone_pattern = re.compile(r"([+-])([0-9]{2})00")
  17. def __init__(self, num: str, data: bytes):
  18. self.__date_ = None
  19. self.byte = data
  20. self.num = num
  21. @property
  22. def msg_data(self): # 有需要的时候才加载
  23. if self.__date_:
  24. return self.__date_
  25. self.__date_ = message_from_bytes(self.byte)
  26. return self.__date_
  27. @property
  28. def from_addr(self):
  29. if not self.msg_data['From']:
  30. return ""
  31. return str(email.header.make_header(email.header.decode_header(self.msg_data['From'])))
  32. @property
  33. def date(self):
  34. if not self.msg_data['Date']:
  35. return datetime.datetime(2022, 1, 1)
  36. date = str(email.header.make_header(email.header.decode_header(self.msg_data['Date'])))
  37. res = self.date_pattern.match(str(date)).groups()
  38. time = datetime.datetime(int(res[2]),
  39. list(calendar.month_abbr).index(res[1]),
  40. int(res[0]),
  41. int(res[3]),
  42. int(res[4]),
  43. int(res[5]))
  44. timezone = self.time_zone_pattern.match(res[6])
  45. if timezone:
  46. if timezone.groups()[0] == '-':
  47. time += datetime.timedelta(hours=int(timezone.groups()[1]))
  48. else:
  49. time -= datetime.timedelta(hours=int(timezone.groups()[1]))
  50. time += datetime.timedelta(hours=8) # 转换为北京时间
  51. return time
  52. @property
  53. def title(self):
  54. if not self.msg_data['Subject']:
  55. return ""
  56. return (str(email.header.make_header(email.header.decode_header(self.msg_data['Subject'])))
  57. .replace('\n', '')
  58. .replace('\r', ''))
  59. @property
  60. def body(self):
  61. return self.__get_body(self.msg_data)
  62. def __get_body(self, msg):
  63. if msg.is_multipart():
  64. res = ""
  65. for i in msg.get_payload():
  66. res += self.__get_body(i)
  67. return res
  68. else:
  69. msg_type = msg.get_content_type()
  70. if msg_type == "text/plain":
  71. return "text/plain:\n" + msg.get_payload(decode=True).decode('utf-8') + "\n"
  72. elif msg_type == "text/html":
  73. return "text/html:\n" + msg.get_payload(decode=True).decode('utf-8') + "\n"
  74. else:
  75. return ""
  76. def save_file(self, file_dir: str):
  77. return self.__get_files(self.msg_data, file_dir)
  78. @staticmethod
  79. def __get_files(msg, file_dir: str):
  80. create = False
  81. for part in msg.walk():
  82. if not create:
  83. os.makedirs(file_dir, exist_ok=True)
  84. create = True
  85. if part.get_content_maintype() == 'multipart':
  86. continue
  87. if part.get('Content-Disposition') is None:
  88. continue
  89. filename = part.get_filename()
  90. if filename:
  91. filepath = os.path.join(file_dir, filename)
  92. with open(filepath, 'wb') as f:
  93. f.write(part.get_payload(decode=True))
  94. def __lt__(self, other: "Mail"):
  95. return self.date < other.date
  96. def __eq__(self, other: "Mail"):
  97. return self.date == other.date
  98. def __le__(self, other: "Mail"):
  99. return self.date <= other.date
  100. def __str__(self):
  101. return f"{self.num} {self.title} {self.from_addr} {self.date}"