# SPDX-License-Identifier: AGPL-3.0-or-later
#
# Eonvelope - a open-source self-hostable email archiving server
# Copyright (C) 2024 David Aderbauer & The Eonvelope Contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Module with the :class:`JMAPFetcher` class."""
from __future__ import annotations
from typing import TYPE_CHECKING, override
import jmapc
import requests
import urllib3.exceptions
from core.constants import (
EmailFetchingCriterionChoices,
EmailProtocolChoices,
)
from eonvelope.utils.workarounds import get_config
from .BaseFetcher import BaseFetcher
from .exceptions import BadServerResponseError, MailAccountError, MailboxError
if TYPE_CHECKING:
from collections.abc import Generator
from core.models.Account import Account
from core.models.Email import Email
from core.models.Mailbox import Mailbox
from core.utils import FetchingCriterion
[docs]
class JMAPFetcher(BaseFetcher):
"""Maintains a connection to the JMAP server and fetches data using :mod:`jmapc`.
Opens a connection to the JMAP server on construction and is preferably used in a 'with' environment.
Allows fetching of mails and mailboxes from an account on an JMAP host.
"""
PROTOCOL = EmailProtocolChoices.JMAP
"""Name of the used protocol, refers to :attr:`MailFetchingProtocols.JMAP`."""
AVAILABLE_FETCHING_CRITERIA = (
EmailFetchingCriterionChoices.ALL,
EmailFetchingCriterionChoices.SEEN,
EmailFetchingCriterionChoices.UNSEEN,
EmailFetchingCriterionChoices.DRAFT,
EmailFetchingCriterionChoices.UNDRAFT,
EmailFetchingCriterionChoices.ANSWERED,
EmailFetchingCriterionChoices.UNANSWERED,
EmailFetchingCriterionChoices.DAILY,
EmailFetchingCriterionChoices.WEEKLY,
EmailFetchingCriterionChoices.MONTHLY,
EmailFetchingCriterionChoices.ANNUALLY,
EmailFetchingCriterionChoices.BODY,
EmailFetchingCriterionChoices.FROM,
EmailFetchingCriterionChoices.SENTSINCE,
EmailFetchingCriterionChoices.LARGER,
EmailFetchingCriterionChoices.SMALLER,
)
"""Tuple of all criteria available for fetching. Refers to :class:`MailFetchingCriteria`.
Must be immutable!
"""
[docs]
@override
def __init__(self, account: Account) -> None:
super().__init__(account)
self.connect_to_host()
[docs]
@override
def connect_to_host(self) -> None:
try:
if self.account.mail_address:
self._mail_client = jmapc.Client.create_with_password(
host=self.account.mail_host_address,
user=self.account.mail_address,
password=self.account.password,
)
else:
self._mail_client = jmapc.Client.create_with_api_token(
host=self.account.mail_host_address,
api_token=self.account.password,
)
except (requests.RequestException, urllib3.exceptions.HTTPError) as error:
self.logger.exception("Error connecting to %s!", self.account)
raise MailAccountError(error, "login") from error
if (
get_config("ALLOW_INSECURE_CONNECTIONS")
and self.account.allow_insecure_connection
):
self._mail_client.requests_session.verify = False
self.logger.info("Successfully connected to %s.", self.account)
[docs]
@override
def test(self, mailbox: Mailbox | None = None) -> None:
super().test(mailbox)
method = jmapc.methods.IdentityGet()
self.logger.debug("Testing %s ...", self.account)
try:
result = self._mail_client.request(method)
except requests.RequestException as error:
self.logger.exception("Error connecting to %s!", self.account)
raise MailAccountError(error) from error
except jmapc.ClientError as error:
self.logger.exception(
"Wrong number of responses for request to %s!", self.account
)
raise MailAccountError(
BadServerResponseError(str(error)), method.jmap_method_name
) from error
if not isinstance(result, jmapc.methods.IdentityGetResponse):
self.logger.error("Error in response from %s!", self.account)
raise MailAccountError(
BadServerResponseError(result.to_json()), method.jmap_method_name
)
self.logger.debug("Successfully tested %s.", self.account)
if mailbox is not None:
methods = (
jmapc.methods.MailboxQuery(
filter=jmapc.MailboxQueryFilterCondition(name=mailbox.name)
),
jmapc.methods.MailboxGet(ids=jmapc.Ref("/ids")),
)
self.logger.debug("Testing %s ...", mailbox)
try:
results = self._mail_client.request(methods)
except requests.RequestException as error:
self.logger.exception("Error connecting to %s!", self.account)
raise MailboxError(error) from error
if not isinstance(results[1].response, jmapc.methods.MailboxGetResponse):
self.logger.error("Error in response from %s!", self.account)
raise MailboxError(
BadServerResponseError(results[1].response.to_json()),
methods[1].jmap_method_name,
)
self.logger.debug("Successfully tested %s.", mailbox)
[docs]
@override
def fetch_emails(
self,
mailbox: Mailbox,
criterion: FetchingCriterion = BaseFetcher.DEFAULT_FETCHING_CRITERION,
) -> Generator[bytes]:
super().fetch_emails(mailbox, criterion)
self.logger.debug(
"Searching and fetching %s messages in %s...",
criterion,
mailbox,
)
method = jmapc.methods.MailboxQuery(
filter=jmapc.MailboxQueryFilterCondition(name=mailbox.name)
)
self.logger.debug("Querying for mailbox %s ...", mailbox)
try:
result = self._mail_client.request(method)
except requests.RequestException as error:
self.logger.exception("Error connecting to %s!", self.account)
raise MailAccountError(error) from error
except jmapc.ClientError as error:
self.logger.exception(
"Wrong number of responses for request to %s!", self.account
)
raise MailAccountError(
BadServerResponseError(str(error)), method.jmap_method_name
) from error
if not isinstance(result, jmapc.methods.MailboxQueryResponse):
self.logger.error("Error in response from %s!", self.account)
raise MailAccountError(BadServerResponseError(result.to_json()), "")
if not result.ids or not isinstance(result.ids, list):
raise MailboxError(IndexError("Mailbox not found"))
self.logger.debug("Successfully queried mailbox.")
criterion_filter = criterion.as_jmap_filter()
criterion_filter.in_mailbox = result.ids[0]
methods = (
jmapc.methods.EmailQuery(
sort=[jmapc.Comparator(property="receivedAt", is_ascending=True)],
filter=criterion_filter,
),
jmapc.methods.EmailGet(
ids=jmapc.Ref("/ids"),
properties=[
"blobId",
],
),
)
self.logger.debug("Querying %s messages in %s ...", criterion, mailbox)
try:
results = self._mail_client.request(methods)
except requests.RequestException as error:
self.logger.exception("Error connecting to %s!", self.account)
raise MailAccountError(error) from error
if not isinstance(results[1].response, jmapc.methods.EmailGetResponse):
self.logger.error("Error in response from %s!", self.account)
raise MailboxError(
BadServerResponseError(results[1].response.to_json()),
methods[1].jmap_method_name,
)
self.logger.info(
"Found %s messages matching %s in %s.",
len(results[1].response.data),
criterion,
mailbox,
)
self.logger.debug("Downloading matching message blobs from %s ...", mailbox)
for email in results[1].response.data:
blob_url = self._mail_client.jmap_session.download_url.format(
accountId=self._mail_client.account_id,
blobId=email.blob_id,
name="",
type="message/rfc822",
)
try:
response = self._mail_client.requests_session.get(
blob_url, stream=True, timeout=self.account.timeout
)
response.raise_for_status()
except requests.RequestException as error:
self.logger.exception("Error connecting to %s!", self.account)
raise MailAccountError(error) from error
yield response.raw.data
self.logger.debug("Successfully downloaded message blobs.")
[docs]
@override
def fetch_mailboxes(self) -> list[tuple[str, str]]:
method = jmapc.methods.MailboxGet(ids=None, properties=["name", "role"])
self.logger.debug("Fetching mailboxes in %s ...", self.account)
try:
result = self._mail_client.request(method)
except requests.RequestException as error:
self.logger.exception("Error connecting to %s!", self.account)
raise MailAccountError(error) from error
except jmapc.ClientError as error:
self.logger.exception(
"Wrong number of responses for request to %s!", self.account
)
raise MailAccountError(
BadServerResponseError(str(error)), method.jmap_method_name
) from error
if not isinstance(result, jmapc.methods.MailboxGetResponse):
self.logger.error("Error in response from %s!", self.account)
raise MailAccountError(
BadServerResponseError(result.to_json()), method.jmap_method_name
)
self.logger.debug("Successfully fetched mailboxes in %s.", self.account)
return [
(mailbox.name, mailbox.role or "")
for mailbox in result.data
if mailbox.name is not None
]
[docs]
@override
def restore(self, email: Email) -> None:
super().restore(email)
if not email.file_path:
raise FileNotFoundError("This email has no stored eml file.")
self.logger.debug("Uploading blob for %s ...", email)
try:
blob = self._mail_client.upload_blob(file_name=email.absolute_filepath)
except requests.RequestException as error:
self.logger.exception("Error connecting to %s!", self.account)
raise MailAccountError(error) from error
self.logger.debug("Successfully uploaded email blob.")
methods = (
jmapc.methods.MailboxQuery(
filter=jmapc.MailboxQueryFilterCondition(name=email.mailbox.name)
),
jmapc.methods.CustomMethod(
{
"emails": {
1: {
"blobId": blob.id,
"#mailboxIds": {
"path": "/ids",
"resultOf": "0.Mailbox/query",
"name": "Mailbox/query",
},
}
}
}
),
)
methods[1].jmap_method = "Email/import"
self.logger.debug("Importing %s to its mailbox ...", email)
try:
results = self._mail_client.request(methods)
except requests.RequestException as error:
self.logger.exception("Error connecting to %s!", self.account)
raise MailAccountError(error) from error
if not isinstance(results[1].response, jmapc.methods.CustomResponse):
self.logger.error("Error in response from %s!", self.account)
raise MailboxError(
BadServerResponseError(results[1].response.to_json()),
methods[1].jmap_method_name,
)
self.logger.debug("Successfully imported email.")
[docs]
@override
def close(self) -> None:
"""No cleanup of :class:`jmapc.Client` required."""