Source code for core.utils.fetchers.IMAP4Fetcher

# SPDX-License-Identifier: AGPL-3.0-or-later
#
# Eonvelope - a open-source self-hostable email archiving server
# Copyright (C) 2024 David Aderbauer & The Eonvelope Contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Module with the :class:`IMAP4Fetcher` class."""

from __future__ import annotations

import imaplib
from itertools import batched
from typing import TYPE_CHECKING, override

from django.utils.translation import gettext_lazy as _
from imap_tools.imap_utf7 import utf7_encode

from core.constants import (
    EmailFetchingCriterionChoices,
    EmailProtocolChoices,
)
from core.utils.fetchers.exceptions import FetcherError, MailAccountError
from core.utils.fetchers.SafeIMAPMixin import SafeIMAPMixin
from core.utils.mail_parsing import parse_IMAP_mailbox_data

from .BaseFetcher import BaseFetcher

if TYPE_CHECKING:
    from collections.abc import Generator

    from core.models.Account import Account
    from core.models.Email import Email
    from core.models.Mailbox import Mailbox
    from core.utils import FetchingCriterion


[docs] class IMAP4Fetcher(BaseFetcher, SafeIMAPMixin): """Maintains a connection to the IMAP server and fetches data using :mod:`imaplib`. Opens a connection to the IMAP server on construction and is preferably used in a 'with' environment. Allows fetching of mails and mailboxes from an account on an IMAP host. """ PROTOCOL = EmailProtocolChoices.IMAP4 """Name of the used protocol, refers to :attr:`MailFetchingProtocols.IMAP`.""" AVAILABLE_FETCHING_CRITERIA = ( EmailFetchingCriterionChoices.ALL, EmailFetchingCriterionChoices.UNSEEN, EmailFetchingCriterionChoices.SEEN, EmailFetchingCriterionChoices.RECENT, EmailFetchingCriterionChoices.NEW, EmailFetchingCriterionChoices.OLD, EmailFetchingCriterionChoices.FLAGGED, EmailFetchingCriterionChoices.UNFLAGGED, EmailFetchingCriterionChoices.DRAFT, EmailFetchingCriterionChoices.UNDRAFT, EmailFetchingCriterionChoices.ANSWERED, EmailFetchingCriterionChoices.UNANSWERED, EmailFetchingCriterionChoices.DELETED, EmailFetchingCriterionChoices.UNDELETED, EmailFetchingCriterionChoices.DAILY, EmailFetchingCriterionChoices.WEEKLY, EmailFetchingCriterionChoices.MONTHLY, EmailFetchingCriterionChoices.ANNUALLY, EmailFetchingCriterionChoices.SENTSINCE, EmailFetchingCriterionChoices.SUBJECT, EmailFetchingCriterionChoices.BODY, EmailFetchingCriterionChoices.FROM, EmailFetchingCriterionChoices.KEYWORD, EmailFetchingCriterionChoices.UNKEYWORD, EmailFetchingCriterionChoices.LARGER, EmailFetchingCriterionChoices.SMALLER, ) """Tuple of all criteria available for fetching. Refers to :class:`MailFetchingCriteria`. Must be immutable! IMAP4 does not accept time lookups, only date based. For a list of all existing IMAP criteria see https://datatracker.ietf.org/doc/html/rfc3501.html#section-6.4.4. """ EMAIL_FETCH_BATCH_SIZE = 100
[docs] @override def __init__(self, account: Account) -> None: """Constructor, starts the IMAP connection and logs into the account. Args: account: The model of the account to be fetched from. """ super().__init__(account) self.connect_to_host() self.safe_login( # dont use kwargs here, this would kill the utf-8 fallback! self.account.mail_address, self.account.password )
[docs] @override def connect_to_host(self) -> None: """Opens the connection to the IMAP server using the credentials from :attr:`account`. Raises: MailAccountError: If an error occurs or a bad response is returned. """ self.logger.debug("Connecting to %s ...", self.account) mail_host = self.account.mail_host mail_host_port = self.account.mail_host_port timeout = self.account.timeout try: if mail_host_port: self._mail_client = imaplib.IMAP4( host=mail_host, port=mail_host_port, timeout=timeout ) else: self._mail_client = imaplib.IMAP4(host=mail_host, timeout=timeout) except Exception as error: self.logger.exception("Error connecting to %s!", self.account) raise MailAccountError(error, _("connecting")) from error self.logger.info("Successfully connected to %s.", self.account)
[docs] @override def test(self, mailbox: Mailbox | None = None) -> None: """Tests the connection to the mailserver and, if a mailbox is provided, whether it can be opened and listed. Args: mailbox: The mailbox to be tested. Default is None. Raises: ValueError: If the :attr:`mailbox` does not belong to :attr:`self.account`. MailAccountError: If the account test fails because an error occurs or a bad response is returned. MailboxError: If the mailbox test fails because an error occurs or a bad response is returned testing the mailbox. """ super().test(mailbox) self.logger.debug("Testing %s ...", self.account) self.safe_noop() self.logger.debug("Successfully tested %s.", self.account) if mailbox is not None: self.logger.debug("Testing %s ...", mailbox) self.safe_select(utf7_encode(mailbox.name), readonly=True) self.safe_check() self.safe_unselect() self.logger.debug("Successfully tested %s.", mailbox)
[docs] @override def fetch_emails( self, mailbox: Mailbox, criterion: FetchingCriterion = BaseFetcher.DEFAULT_FETCHING_CRITERION, ) -> Generator[bytes]: """Fetches and returns maildata from a mailbox based on a given criterion. Args: mailbox: Database model of the mailbox to fetch data from. criterion: Formatted criterion to filter mails in the IMAP request. Defaults to :attr:`eonvelope.MailFetchingCriteria.ALL`. Yields: Mails in the mailbox matching the criterion as :class:`bytes`. Raises: ValueError: If the :attr:`mailbox` does not belong to :attr:`self.account`. If :attr:`criterion` is not in :attr:`IMAP4Fetcher.AVAILABLE_FETCHING_CRITERIA`. MailboxError: If an error occurs or a bad response is returned during an action on the mailbox. """ super().fetch_emails(mailbox, criterion) search_criterion = criterion.as_imap_criterion() self.logger.debug( "Searching and fetching %s messages in %s...", search_criterion, mailbox, ) self.logger.debug("Opening mailbox %s ...", mailbox) self.safe_select(utf7_encode(mailbox.name), readonly=True) self.logger.debug("Successfully opened mailbox.") self.logger.debug("Searching %s messages in %s ...", search_criterion, mailbox) if "SORT" in self._mail_client.capabilities: _, message_uids = self.safe_uid("SORT", "(DATE)", "UTF-8", search_criterion) else: _, message_uids = self.safe_uid("SEARCH", search_criterion) self.logger.info( "Found %s messages with uIDs %s in %s.", search_criterion, message_uids, mailbox, ) self.logger.debug("Fetching %s messages in %s ...", search_criterion, mailbox) message_uid_list = message_uids[0].split() for uids in batched( message_uid_list, self.EMAIL_FETCH_BATCH_SIZE, strict=False ): try: _, message_data = self.safe_uid("FETCH", b",".join(uids), "(RFC822)") except FetcherError: self.logger.warning( "Failed to fetch messages %s from %s!", uids, mailbox, exc_info=True, ) continue for _, message in message_data[::2]: yield message self.logger.debug( "Successfully fetched %s messages from %s.", search_criterion, mailbox, ) self.logger.debug("Leaving mailbox %s ...", mailbox) self.safe_unselect() self.logger.debug("Successfully left mailbox.") self.logger.debug( "Successfully searched and fetched %s messages in %s.", search_criterion, mailbox, )
[docs] @override def fetch_mailboxes(self) -> list[tuple[str, str]]: """Retrieves and returns the data of the mailboxes in the account. Todo: Rewrite this into a generator. Returns: List of data of all mailboxes in the account. Empty if none are found. Raises: MailAccountError: If an error occurs or a bad response is returned. """ self.logger.debug("Fetching mailboxes in %s ...", self.account) _, mailboxes_data = self.safe_list() mailboxes = [ parse_IMAP_mailbox_data(mailbox_data) for mailbox_data in mailboxes_data if isinstance(mailbox_data, bytes | str) ] self.logger.debug("Successfully fetched mailboxes in %s.", self.account) return mailboxes
[docs] @override def restore(self, email: Email) -> None: """Places an email in its mailbox. Args: email: The email to restore. Raises: ValueError: If the emails mailbox is not in this fetchers account. FileNotFoundError: If the email has no eml file in storage. MailboxError: If uploading the email to the mailserver fails or returns a bad response. """ super().restore(email) self.logger.debug("Restoring %s to its mailbox ...", email) with email.open_file() as email_file: self.safe_append(email.mailbox.name, None, None, email_file.read()) self.logger.debug("Successfully restored email.")
[docs] @override def close(self) -> None: """Logs out of the account and closes the connection to the IMAP server if it is open.""" self.logger.debug("Closing connection to %s ...", self.account) self.safe_logout() self.logger.info("Successfully closed connection to %s.", self.account)