imap.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. import imaplib
  2. import re
  3. from typing import List
  4. from .email import Mail
  5. imaplib.Commands['ID'] = 'AUTH'
  6. class Imap:
  7. def __init__(self, user: str, passwd: str, host="localhost", port=993, ssl=True, start_ssl=False):
  8. self.host = host
  9. self.port = port
  10. self.user = user
  11. self.passwd = passwd
  12. self.ssl = ssl
  13. self.start_ssl = False if ssl else start_ssl
  14. self.server: None | imaplib.IMAP4 = None
  15. self.__mailbox = {}
  16. self.__inbox = "INBOX"
  17. @property
  18. def inbox(self):
  19. return self.__inbox
  20. @inbox.setter
  21. def inbox(self, inbox):
  22. self.__inbox = inbox
  23. self.__mailbox = {}
  24. def connect(self):
  25. if not self.server:
  26. if self.ssl:
  27. self.server = imaplib.IMAP4_SSL(self.host, port=self.port)
  28. else:
  29. self.server = imaplib.IMAP4(self.host, port=self.port)
  30. if self.start_ssl:
  31. self.server.starttls()
  32. self.server.login(self.user, self.passwd)
  33. args = ("name", "HuanMail", "contact", "songzihuan@song-zh.com", "version", "1.0.0", "vendor", "HuanMail")
  34. self.server._simple_command('ID', '("' + '" "'.join(args) + '")')
  35. self.server.select(self.__inbox.encode("utf-7").replace(b"+", b"&").decode("utf-8"))
  36. def disconnect(self):
  37. if self.server:
  38. self.server.logout()
  39. self.server = None
  40. def search(self):
  41. self.connect()
  42. res = self.__search()
  43. self.disconnect()
  44. return res
  45. def __search(self, opt="ALL"):
  46. _, data = self.server.search(None, opt)
  47. return data[0].decode("utf-8").split()
  48. def list(self):
  49. self.connect()
  50. res = self.__list()
  51. self.disconnect()
  52. return res
  53. def __list(self):
  54. res = []
  55. mailbox_pattern = re.compile(r"\(.*\) \".*\" \"(.+)\"")
  56. for i in self.server.list()[1]:
  57. i: bytes
  58. mailbox = re.match(mailbox_pattern, i.replace(b"&", b"+").decode("utf-7"))
  59. if mailbox:
  60. res.append(mailbox.groups()[0])
  61. return res
  62. def fetch(self, num: str) -> Mail:
  63. self.connect()
  64. res = self.__fetch(num)
  65. self.disconnect()
  66. return res
  67. def __fetch(self, num: str):
  68. mail = self.__mailbox.get(num)
  69. if mail:
  70. return mail
  71. _, data = self.server.fetch(num, '(RFC822)')
  72. mail = Mail(num, data[0][1])
  73. self.__mailbox[num] = mail
  74. return mail
  75. def fetch_all(self, opt="ALL"):
  76. self.connect()
  77. for i in self.__search(opt):
  78. self.__fetch(i)
  79. self.disconnect()
  80. def fetch_remote_count(self, opt="ALL"):
  81. self.connect()
  82. res = len(set(self.__search(opt)) - set(self.__mailbox.keys()))
  83. self.disconnect()
  84. return res
  85. @property
  86. def mailbox(self) -> List[Mail]:
  87. return sorted(self.__mailbox.values(), reverse=True)
  88. @mailbox.setter
  89. def mailbox(self, args: dict):
  90. if type(args) is dict and len(args) == 0:
  91. self.__mailbox = {}
  92. else:
  93. raise ValueError
  94. def add_mail(self, num: str, data: bytes):
  95. self.__mailbox[num] = Mail(num, data)
  96. def get_mail(self, num: str):
  97. return self.__mailbox.get(num, None)
  98. def check_login(self):
  99. try:
  100. self.connect()
  101. self.disconnect()
  102. except imaplib.IMAP4.error:
  103. return False
  104. return True