Source code for core.utils.fetchers.ExchangeFetcher

# SPDX-License-Identifier: AGPL-3.0-or-later
#
# Eonvelope - a open-source self-hostable email archiving server
# Copyright (C) 2024 David Aderbauer & The Eonvelope Contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Module with the :class:`ExchangeFetcher` class."""

from __future__ import annotations

import os
from typing import TYPE_CHECKING, override

import exchangelib
import exchangelib.errors
from django.utils.translation import gettext as _
from requests.adapters import HTTPAdapter

from core.constants import (
    EmailFetchingCriterionChoices,
    EmailProtocolChoices,
)
from core.utils.fetchers.exceptions import MailAccountError, MailboxError
from core.utils.iterations import slices
from eonvelope.utils.workarounds import get_config

from .BaseFetcher import BaseFetcher

if TYPE_CHECKING:
    from collections.abc import Generator

    from core.models.Account import Account
    from core.models.Email import Email
    from core.models.Mailbox import Mailbox
    from core.utils import FetchingCriterion


[docs] class ExchangeFetcher(BaseFetcher): """Maintains a connection to the Exchange server and fetches data using :mod:`imaplib`. Opens a connection to the Exchange server on construction and is preferably used in a 'with' environment. Allows fetching of mails and mailboxes from an account on an Exchange host. """ PROTOCOL = EmailProtocolChoices.EXCHANGE """Name of the used protocol, refers to :attr:`MailFetchingProtocols.Exchange`.""" AVAILABLE_FETCHING_CRITERIA = ( EmailFetchingCriterionChoices.ALL, EmailFetchingCriterionChoices.SEEN, EmailFetchingCriterionChoices.UNSEEN, EmailFetchingCriterionChoices.DRAFT, EmailFetchingCriterionChoices.UNDRAFT, EmailFetchingCriterionChoices.DAILY, EmailFetchingCriterionChoices.WEEKLY, EmailFetchingCriterionChoices.MONTHLY, EmailFetchingCriterionChoices.ANNUALLY, EmailFetchingCriterionChoices.SUBJECT, EmailFetchingCriterionChoices.BODY, ) """Tuple of all criteria available for fetching. Refers to :class:`EmailFetchingCriterionChoices`. Constructed analogous to the IMAP4 criteria. Must be immutable! """ EMAIL_FETCH_BATCH_SIZE = 20
[docs] @override def __init__(self, account: Account) -> None: """Constructor, starts the Exchange connection and logs into the account. Args: account: The model of the account to be fetched from. """ super().__init__(account) self.connect_to_host()
[docs] @override def connect_to_host(self) -> None: """Opens the connection to the Exchange server using the credentials from :attr:`account`. Raises: MailAccountError: If an error occurs accessing the msg_folder_root. """ self.logger.debug("Setting up connection to %s ...", self.account) credentials = exchangelib.Credentials( self.account.mail_address, self.account.password ) retry_policy = exchangelib.FaultTolerance(max_wait=self.account.timeout) # manually set fallback timeout; https://github.com/ecederstrand/exchangelib/issues/1375 exchangelib.BaseProtocol.TIMEOUT = self.account.timeout # manually set adapter for each fetcher instance; https://ecederstrand.github.io/exchangelib/#proxies-and-custom-tls-validation exchangelib.BaseProtocol.HTTP_ADAPTER_CLS = ( exchangelib.NoVerifyHTTPAdapter if ( get_config("ALLOW_INSECURE_CONNECTIONS") and self.account.allow_insecure_connection ) else HTTPAdapter ) config = ( exchangelib.Configuration( service_endpoint=self.account.mail_host, credentials=credentials, retry_policy=retry_policy, ) if self.account.mail_host.startswith("http://") or self.account.mail_host.startswith("https://") else exchangelib.Configuration( server=self.account.mail_host_address, credentials=credentials, retry_policy=retry_policy, ) ) try: exchange_account = exchangelib.Account( primary_smtp_address=self.account.mail_address, config=config, access_type=exchangelib.DELEGATE, autodiscover=False, default_timezone=exchangelib.EWSTimeZone( "UTC" ), # for consistency with celery and django settings ) except ValueError as error: self.logger.exception( "Error in configuration of %s!", self.account, ) raise MailAccountError(error, "connecting") from error try: self._mail_client = exchange_account.msg_folder_root except exchangelib.errors.EWSError as error: self.logger.exception( "Error connecting to %s!", self.account, ) raise MailAccountError(error, "connecting") from error self.logger.info("Successfully set up connection to %s.", self.account)
[docs] @override def test(self, mailbox: Mailbox | None = None) -> None: """Tests the connection to the mailserver and, if a mailbox is provided, whether it can be opened and listed. Args: mailbox: The mailbox to be tested. Default is None. Raises: ValueError: If the :attr:`mailbox` does not belong to :attr:`self.account`. MailAccountError: If the account test fails because an error occurs or a bad response is returned. MailboxError: If the mailbox test fails because an error occurs or a bad response is returned testing the mailbox. """ super().test(mailbox) self.logger.debug("Testing %s ...", self.account) try: self._mail_client.refresh() except exchangelib.errors.EWSError as error: self.logger.exception("Error during refresh of message_root!") raise MailAccountError(error, _("refresh")) from error self.logger.debug("Successfully tested %s.", self.account) if mailbox is not None: self.logger.debug("Testing %s ...", mailbox) try: self.open_mailbox(mailbox).refresh() except exchangelib.errors.EWSError as error: self.logger.exception( "Error during refresh of %s!", mailbox.name, ) raise MailboxError(error, _("refresh")) from error self.logger.debug("Successfully tested %s.", mailbox)
[docs] @override def fetch_emails( self, mailbox: Mailbox, criterion: FetchingCriterion = BaseFetcher.DEFAULT_FETCHING_CRITERION, ) -> Generator[bytes]: """Fetches and returns maildata from a mailbox based on a given criterion. Todo: Rewrite this into a generator. Args: mailbox: Database model of the mailbox to fetch data from. criterion: Formatted criterion to filter mails in the Exchange server. Defaults to :attr:`eonvelope.MailFetchingCriteria.ALL`. Yields: Mails in the mailbox matching the criterion as :class:`bytes`. Raises: ValueError: If the :attr:`mailbox` does not belong to :attr:`self.account`. If :attr:`criterion` is not in :attr:`ExchangeFetcher.AVAILABLE_FETCHING_CRITERIA`. MailboxError: If an error occurs or a bad response is returned during an action on the mailbox.. """ super().fetch_emails(mailbox, criterion) self.logger.debug( "Searching and fetching %s messages in %s...", criterion, mailbox, ) try: mailbox_folder = self.open_mailbox(mailbox) for query_slice in slices(0, self.EMAIL_FETCH_BATCH_SIZE): mail_query_result = list( criterion.as_exchange_queryset( mailbox_folder.all().order_by("datetime_received") )[query_slice] ) for mail in mail_query_result: yield mail.mime_content if len(mail_query_result) < self.EMAIL_FETCH_BATCH_SIZE: break except exchangelib.errors.EWSError as error: self.logger.exception("Error during fetching of mail contents!") raise MailboxError(error, _("fetching of mail contents")) from error self.logger.info( "Successfully searched and fetched %s messages in %s.", criterion, mailbox, )
[docs] @override def fetch_mailboxes(self) -> list[tuple[str, str]]: """Retrieves and returns the data of the mailboxes in the account. Todo: Rewrite this into a generator. Note: Considers only children of the msg_folder_root. Returns: List of paths of all mailboxes in the account relative to the parent folder of the inbox. Empty if none are found. Raises: MailAccountError: If an error occurs or a bad response is returned. """ self.logger.debug("Fetching mailboxes in %s ...", self.account) try: mail_root_path = self._mail_client.absolute mailboxes = [ ( os.path.relpath(folder.absolute, mail_root_path), (str(folder.to_id().id) if folder.is_distinguished else ""), ) for folder in self._mail_client.walk() if isinstance(folder, exchangelib.Folder) and folder.folder_class == "IPF.Note" ] except exchangelib.errors.EWSError as error: self.logger.exception("Error during scan of message_root!") raise MailAccountError(error, _("scan for mailboxes")) from error self.logger.debug("Successfully fetched mailboxes in %s.", self.account) return mailboxes
[docs] @override def restore(self, email: Email) -> None: """Places an email in its mailbox. Args: email: The email to restore. Raises: ValueError: If the emails mailbox is not in this fetchers account. FileNotFoundError: If the email has no eml file in storage. MailboxError: If uploading the email to the mailserver fails or returns a bad response. """ super().restore(email) self.logger.debug("Restoring email %s to its mailbox ...", email) with email.open_file() as email_file: try: exchangelib.Message( folder=self.open_mailbox(email.mailbox), mime_content=email_file.read(), ).save() except exchangelib.errors.EWSError as error: self.logger.exception("Error during restoring of email!") raise MailboxError(error, _("restoring of email")) from error self.logger.debug("Successfully restored email.")
[docs] @override def close(self) -> None: """No cleanup of :class:`exchangelib.Account` required."""
[docs] def open_mailbox(self, mailbox: Mailbox) -> exchangelib.Folder: """Helper method to correctly open a mailbox folder. Note: This may cause problems with mailboxes that have / in their name (should be very uncommon). Returns: The mailbox folder instance. """ mailbox_folder = self._mail_client for folder_name in mailbox.name.split("/"): mailbox_folder = mailbox_folder / folder_name return mailbox_folder