Source code for core.utils.fetchers.SafeIMAPMixin

# SPDX-License-Identifier: AGPL-3.0-or-later
#
# Eonvelope - a open-source self-hostable email archiving server
# Copyright (C) 2024 David Aderbauer & The Eonvelope Contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Module with the SafeIMAPMixin mixin."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Literal, Protocol, Self, TypeVar, overload

from core.utils.fetchers.exceptions import (
    BadServerResponseError,
    FetcherError,
    MailAccountError,
    MailboxError,
)

if TYPE_CHECKING:
    import imaplib
    import logging
    from collections.abc import Callable

type IMAP4Response = tuple[
    str,
    str
    | list[bytes]
    | list[None]
    | list[Any]
    | list[bytes | None]
    | list[bytes | tuple[bytes, bytes]],
]

IMAP4ActionResponse = TypeVar("IMAP4ActionResponse", bound=IMAP4Response)


[docs] class IMAP4FetcherClass(Protocol): """Protocol defining the required attributes of a class implementing this mixin.""" _mail_client: imaplib.IMAP4 logger: logging.Logger
[docs] class SafeIMAPMixin: """Mixin with safe IMAP operations. The implementing class must have an :attr:`_mail_client` that is an :class:`imaplib.IMAP4` or :class:`imaplib.IMAP4_SSL` and an instance logger. Only errors leading up to and during the fetching process raise outside of the class. Otherwise issues with logout etc would destroy the work done with fetching, which makes no sense. """
[docs] def check_response( self, response: IMAP4Response, command_name: str, exception_class: type[FetcherError] | None, expected_status: str = "OK", ) -> None: """Checks the status response of an IMAP action. If it doesn't match the expectation raises an exception. Todo: Safely! extract error message from response for logging and exception. Args: response: The complete response to the IMAP action. command_name: The name of the action. exception_class: The exception to raise if the status doesn't match the expectation. Defaults to :class:`core.utils.fetchers.exceptions.FetcherError`. expected_status: The expected status response. Defaults to `"OK"`. Raises: exception_class: If the response status doesn't match the expectation. """ status = response[0] if status != expected_status: self.logger.error( "Bad server response for %s:\n%s", command_name, response, ) if exception_class is not None: raise exception_class(BadServerResponseError(response), command_name) self.logger.debug( "Server responded %s to %s as expected.", expected_status, command_name, )
@overload @staticmethod def safe( exception_class: type[FetcherError], expected_status: str = "OK", ) -> Callable[ [Callable[..., IMAP4ActionResponse]], Callable[..., IMAP4ActionResponse] ]: ... @overload @staticmethod def safe( exception_class: None, expected_status: str = "OK", ) -> Callable[ [Callable[..., IMAP4ActionResponse]], Callable[..., IMAP4ActionResponse | None] ]: ...
[docs] @staticmethod def safe( exception_class: type[FetcherError] | None, expected_status: str = "OK", ) -> Callable[ [Callable[..., IMAP4ActionResponse]], Callable[..., IMAP4ActionResponse | None] ]: """Wrapper for IMAP actions. Catches expected errors, checks for correct responses and raises an FetcherError in case. Todo: Find a better way to disable the exception and getting rid of the typing mess. Args: exception_class: The exception class to raise if an error occurs or the status doesn't match the expectation. Must be a subclass of :class:`core.utils.fetchers.exceptions.FetcherError`. Defaults to :class:`core.utils.fetchers.exceptions.FetcherError`. If `None` is passed, no exception is raised, all errors are logged nonetheless. expected_status: The expected status response. Defaults to `"OK"`. Returns: The return value of the wrapped action. None if an error occurs and `exception` is `None`. Raises: exception_class: If an error occurs or the status doesn't match the expectation. """ def safe_wrapper( imap_action: Callable[..., IMAP4ActionResponse], ) -> Callable[..., IMAP4ActionResponse | None]: def safe_action( self: Self, *args: Any, **kwargs: Any ) -> IMAP4ActionResponse | None: try: response = imap_action(self, *args, **kwargs) except Exception as error: self.logger.exception( "Error during %s!", imap_action.__name__, ) if exception_class is not None: raise exception_class(error, imap_action.__name__) from error return None else: self.check_response( response, imap_action.__name__, exception_class, expected_status ) return response return safe_action return safe_wrapper
[docs] @safe(exception_class=MailAccountError) def safe_login( self: IMAP4FetcherClass, *args: Any, **kwargs: Any ) -> tuple[Literal["OK"], list[bytes]] | tuple[str, str]: """The :func:`safe` wrapped version of :func:`imaplib.IMAP4.login`. In case the passwords contains utf-8 chars, use authenticate instead. References: https://github.com/ikvk/imap_tools/pull/186/commits/0ce6b47a0019538e22a977516e28df16ea0a7961 """ try: response = self._mail_client.login(*args, **kwargs) except UnicodeEncodeError: credentials = ( b"\0" + args[0].encode("utf-8") + b"\0" + args[1].encode("utf-8") ) response = self._mail_client.authenticate( "PLAIN", lambda x: credentials, # noqa: ARG005 ; authenticate method needs a callable that takes a single arg ) return response
[docs] @safe(exception_class=MailboxError) def safe_select( self: IMAP4FetcherClass, *args: Any, **kwargs: Any ) -> tuple[str, list[bytes | None]]: """The :func:`safe` wrapped version of :func:`imaplib.IMAP4.select`.""" return self._mail_client.select(*args, **kwargs)
[docs] @safe(exception_class=None) def safe_unselect( self: IMAP4FetcherClass, *args: Any, **kwargs: Any ) -> tuple[str, list[Any]]: """The :func:`safe` wrapped version of :func:`imaplib.IMAP4.select`.""" return self._mail_client.unselect(*args, **kwargs)
[docs] @safe(exception_class=MailAccountError) def safe_list( self: IMAP4FetcherClass, *args: Any, **kwargs: Any ) -> tuple[str, list[None] | list[bytes | tuple[bytes, bytes]]]: """The :func:`safe` wrapped version of :func:`imaplib.IMAP4.list`.""" return self._mail_client.list(*args, **kwargs)
[docs] @safe(exception_class=MailboxError) def safe_uid( self: IMAP4FetcherClass, *args: Any, **kwargs: Any ) -> tuple[str, list[Any]]: """The :func:`safe` wrapped version of :func:`imaplib.IMAP4.uid`.""" return self._mail_client.uid(*args, **kwargs)
[docs] @safe(exception_class=MailAccountError) def safe_noop( self: IMAP4FetcherClass, *args: Any, **kwargs: Any ) -> tuple[str, list[bytes]]: """The :func:`safe` wrapped version of :func:`imaplib.IMAP4.noop`.""" return self._mail_client.noop(*args, **kwargs)
[docs] @safe(exception_class=MailboxError) def safe_check( self: IMAP4FetcherClass, *args: Any, **kwargs: Any ) -> tuple[str, list[Any]]: """The :func:`safe` wrapped version of :func:`imaplib.IMAP4.check`.""" return self._mail_client.check(*args, **kwargs)
[docs] @safe(exception_class=MailboxError) def safe_append( self: IMAP4FetcherClass, *args: Any, **kwargs: Any ) -> tuple[str, list[bytes]]: """The :func:`safe` wrapped version of :func:`imaplib.IMAP4.append`.""" return self._mail_client.append(*args, **kwargs)
[docs] @safe(exception_class=None, expected_status="BYE") def safe_logout( self: IMAP4FetcherClass, *args: Any, **kwargs: Any ) -> tuple[str, list[None] | list[bytes | tuple[bytes, bytes]]]: """The :func:`safe` wrapped version of :func:`imaplib.IMAP4.logout`.""" return self._mail_client.logout(*args, **kwargs)