imap.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import imaplib
  2. import re
  3. from typing import List
  4. from .email import Mail
  5. imaplib.Commands['ID'] = 'AUTH'
  6. class Imap:
  7. def __init__(self, user: str, passwd: str, host="localhost", port=993, ssl=True, start_ssl=False):
  8. self.host = host
  9. self.port = port
  10. self.user = user
  11. self.passwd = passwd
  12. self.ssl = ssl
  13. self.start_ssl = False if ssl else start_ssl
  14. self.server: None | imaplib.IMAP4 = None
  15. self.__mailbox = {}
  16. self.__inbox = "INBOX"
  17. @property
  18. def inbox(self):
  19. return self.__inbox
  20. @inbox.setter
  21. def inbox(self, inbox):
  22. self.__inbox = inbox
  23. self.connect() # 测试连接
  24. self.disconnect()
  25. self.__mailbox = {}
  26. def connect(self):
  27. if not self.server:
  28. if self.ssl:
  29. self.server = imaplib.IMAP4_SSL(self.host, port=self.port)
  30. else:
  31. self.server = imaplib.IMAP4(self.host, port=self.port)
  32. if self.start_ssl:
  33. self.server.starttls()
  34. self.server.login(self.user, self.passwd)
  35. args = ("name", "HuanMail", "contact", "songzihuan@song-zh.com", "version", "1.0.0", "vendor", "HuanMail")
  36. self.server._simple_command('ID', '("' + '" "'.join(args) + '")')
  37. self.server.select(self.__inbox.encode("utf-7").replace(b"+", b"&").decode("utf-8"))
  38. def disconnect(self):
  39. if self.server:
  40. self.server.logout()
  41. self.server = None
  42. def search(self):
  43. self.connect()
  44. res = self.__search()
  45. self.disconnect()
  46. return res
  47. def __search(self, opt="ALL"):
  48. _, data = self.server.search(None, opt)
  49. return data[0].decode("utf-8").split()
  50. def list(self):
  51. self.connect()
  52. res = self.__list()
  53. self.disconnect()
  54. return res
  55. def __list(self):
  56. res = []
  57. mailbox_pattern = re.compile(r"\(.*\) \".*\" \"(.+)\"")
  58. for i in self.server.list()[1]:
  59. i: bytes
  60. mailbox = re.match(mailbox_pattern, i.replace(b"&", b"+").decode("utf-7"))
  61. if mailbox:
  62. res.append(mailbox.groups()[0])
  63. return res
  64. def fetch(self, num: str) -> Mail:
  65. self.connect()
  66. res = self.__fetch(num)
  67. self.disconnect()
  68. return res
  69. def __fetch(self, num: str):
  70. mail = self.__mailbox.get(num)
  71. if mail:
  72. return mail
  73. _, data = self.server.fetch(num, '(RFC822)')
  74. mail = Mail(num, data[0][1])
  75. self.__mailbox[num] = mail
  76. return mail
  77. def fetch_all(self, opt="ALL"):
  78. self.connect()
  79. for i in self.__search(opt):
  80. self.__fetch(i)
  81. self.disconnect()
  82. @property
  83. def mailbox(self) -> List[Mail]:
  84. return sorted(self.__mailbox.values(), reverse=True)
  85. @mailbox.setter
  86. def mailbox(self, args: dict):
  87. if type(args) is dict and len(args) == 0:
  88. self.__mailbox = {}
  89. else:
  90. raise ValueError
  91. def add_mail(self, num: str, data: bytes):
  92. self.__mailbox[num] = Mail(num, data)
  93. def get_mail(self, num: str):
  94. return self.__mailbox.get(num, None)
  95. def check_login(self):
  96. try:
  97. self.connect()
  98. self.disconnect()
  99. except imaplib.IMAP4.error:
  100. return False
  101. return True