Source code for core.models.Mailbox

# SPDX-License-Identifier: AGPL-3.0-or-later
#
# Eonvelope - a open-source self-hostable email archiving server
# Copyright (C) 2024 David Aderbauer & The Eonvelope Contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Module with the :class:`Mailbox` model class."""

from __future__ import annotations

import contextlib
import logging
import os
import re
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import TYPE_CHECKING, BinaryIO, ClassVar, override
from zipfile import BadZipFile, ZipFile

from dirtyfields import DirtyFieldsMixin
from django.db import models
from django.utils.translation import gettext_lazy as _
from django_prometheus.models import ExportModelOperationsMixin

from core.constants import (
    EmailFetchingCriterionChoices,
    MailboxTypeChoices,
    SupportedEmailDownloadFormats,
    SupportedEmailUploadFormats,
    file_format_parsers,
)
from core.mixins import (
    DownloadMixin,
    FavoriteModelMixin,
    HealthModelMixin,
    TimestampModelMixin,
    UploadMixin,
    URLMixin,
)
from core.utils.fetchers.exceptions import MailAccountError, MailboxError
from core.utils.mail_parsing import parse_mailbox_type
from eonvelope.utils.workarounds import get_config

from .Email import Email

if TYPE_CHECKING:
    from tempfile import _TemporaryFileWrapper

    from django_stubs_ext import StrOrPromise

    from core.utils import FetchingCriterion

    from .Account import Account


logger = logging.getLogger(__name__)
"""The logger instance for this module."""


[docs] class Mailbox( ExportModelOperationsMixin("mailbox"), DirtyFieldsMixin, URLMixin, UploadMixin, DownloadMixin, FavoriteModelMixin, HealthModelMixin, TimestampModelMixin, models.Model, ): """Database model for a mailbox in a mail account.""" BASENAME = "mailbox" DELETE_NOTICE = _( "This will delete the record of this mailbox and all emails and attachments found in it!" ) DELETE_NOTICE_PLURAL = _( "This will delete the records of these mailboxes and all emails and attachments found in them!" ) name = models.CharField( max_length=255, # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("name"), ) """The mailaccount internal name of the mailbox. Unique together with :attr:`account`.""" type = models.CharField( default=MailboxTypeChoices.CUSTOM, choices=MailboxTypeChoices, max_length=32, verbose_name=_("type"), ) """The mailaccount internal role or distinguished id of the mailbox.""" account: models.ForeignKey[Account] = models.ForeignKey( "Account", related_name="mailboxes", on_delete=models.CASCADE, # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("account"), ) """The mailaccount this mailbox was found in. Unique together with :attr:`name`. Deletion of that `account` deletes this mailbox.""" save_attachments = models.BooleanField( default=True, # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("save attachments"), help_text=_( "Whether the attachments from the emails in this mailbox will be saved." ), ) """Whether to save attachments of the mails found in this mailbox. :attr:`constance.get_config('DEFAULT_SAVE_ATTACHMENTS')` by default.""" save_to_eml = models.BooleanField( default=True, # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("save as .eml"), help_text=_("Whether the emails in this mailbox will be stored in .eml files."), ) """Whether to save the mails found in this mailbox as .eml files. :attr:`constance.get_config('DEFAULT_SAVE_TO_EML')` by default.""" class Meta: """Metadata class for the model.""" db_table = "mailboxes" """The name of the database table for the mailboxes.""" # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name = _("mailbox") # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name_plural = _("mailboxes") get_latest_by = TimestampModelMixin.Meta.get_latest_by constraints: ClassVar[list[models.BaseConstraint]] = [ models.UniqueConstraint( fields=["name", "account"], name="mailbox_unique_together_name_account" ) ] """:attr:`name` and :attr:`account` in combination are unique."""
[docs] @override def __str__(self) -> str: """Returns a string representation of the model data. Returns: The string representation of the mailbox, using :attr:`name` and :attr:`account`. """ return _("Mailbox %(name)s of %(account)s") % { "account": self.account, "name": self.name, }
[docs] def test(self) -> None: """Tests whether the data in the model is correct. Tests connecting and logging in to the mailhost and account. The :attr:`core.models.Mailbox.is_healthy` flag is set accordingly. Relies on the `test` method of the :mod:`core.utils.fetchers` classes. Raises: MailAccountError: If the test is fails due to an issue with the account. MailboxError: If the test is fails due to an issue with the mailbox. """ logger.info("Testing %s ...", self) with self.account.get_fetcher() as fetcher: try: fetcher.test(self) except MailboxError as error: logger.info("Failed testing %s with error: %s.", self, error) self.set_unhealthy(error) raise except MailAccountError as error: logger.info("Failed testing %s with error %s.", self.account, error) self.account.set_unhealthy(error) raise self.set_healthy() logger.info("Successfully tested mailbox")
[docs] def fetch(self, criterion: FetchingCriterion) -> None: """Fetches emails from this mailbox based on :attr:`criterion` and adds them to the db. If successful, marks this mailbox as healthy, otherwise unhealthy. Args: criterion: The criterion used to fetch emails from the mailbox. Raises: MailboxError: Reraised if fetching failed due to a MailboxError. MailAccountError: Reraised if fetching failed due to a MailAccountError. """ logger.info( "Fetching and saving emails with criterion %s from %s ...", criterion, self ) with self.account.get_fetcher() as fetcher: try: for fetched_mail in fetcher.fetch_emails(self, criterion): Email.create_from_email_bytes(fetched_mail, self) except MailboxError as error: logger.info("Failed fetching %s with error: %s.", self, error) self.set_unhealthy(error) raise except MailAccountError as error: logger.info("Failed fetching %s with error: %s.", self, error) self.account.set_unhealthy(error) raise self.set_healthy() logger.info("Successfully fetched and saved emails.")
[docs] def _add_email_from_eml(self, file: BinaryIO) -> None: """Reads emails from a zipped mailbox dir.""" Email.create_from_email_bytes(file.read(), mailbox=self)
[docs] def _add_emails_from_zip_eml(self, file: BinaryIO) -> None: """Reads emails from a zip of eml files.""" try: with ZipFile(file) as zipfile: for zipped_file in zipfile.namelist(): Email.create_from_email_bytes( zipfile.read(zipped_file), mailbox=self ) except BadZipFile as error: logger.exception("Error parsing file as zip!") raise ValueError( _("The given file is not a valid %(file_format)s.") % {"file_format": "zip"} ) from error
[docs] def _add_emails_from_mailbox_file(self, file: BinaryIO, file_format: str) -> None: """Reads emails from a mailbox file. Note: Does not validate file_format! This has to be done beforehand. """ parser_class = file_format_parsers[file_format] with NamedTemporaryFile() as tempfile: tempfile.write(file.read()) tempfile.seek(0) parser = parser_class(tempfile.name, create=False) parser.lock() for key in parser.iterkeys(): with contextlib.suppress( AssertionError ): # Babyl.get_bytes can raise AssertionError for a bad message Email.create_from_email_bytes(parser.get_bytes(key), mailbox=self) parser.close()
[docs] def _add_emails_from_mailbox_zip(self, file: BinaryIO, file_format: str) -> None: """Reads emails from a zipped mailbox dir. Note: Does not validate file_format! This has to be done beforehand. """ parser_class = file_format_parsers[file_format] with TemporaryDirectory() as tempdirpath: try: with ZipFile(file) as zipfile: zipfile.extractall(tempdirpath) except BadZipFile as error: logger.exception("Error parsing file as zip!") raise ValueError( _("The given file is not a valid %(file_format)s.") % {"file_format": "zip"} ) from error for name in os.listdir(tempdirpath): path = os.path.join(tempdirpath, name) if os.path.isdir(path): parser = parser_class(path, create=False) parser.lock() try: for key in parser.iterkeys(): Email.create_from_email_bytes( parser.get_bytes(key), mailbox=self ) except ( FileNotFoundError ) as error: # raised if the given maildir doesn't have the expected structure logger.exception("Error parsing file as %s!", file_format) raise ValueError( _("The given file is not a valid %(file_format)s.") % {"file_format": file_format} ) from error parser.close()
[docs] def add_emails_from_file(self, file: BinaryIO, file_format: str) -> None: """Adds emails from a file to the db. Args: file: The mailbox file. file_format: The format of the mailbox file. Case-insensitive. Raises: ValueError: If the file format is not implemented or the file failed to open. """ file_format = file_format.lower() logger.info("Adding emails from %s file to %s ...", file_format, self) match file_format: case SupportedEmailUploadFormats.EML: self._add_email_from_eml(file) case SupportedEmailUploadFormats.ZIP_EML: self._add_emails_from_zip_eml(file) case ( SupportedEmailUploadFormats.MBOX | SupportedEmailUploadFormats.MMDF | SupportedEmailUploadFormats.BABYL ): self._add_emails_from_mailbox_file(file, file_format) case SupportedEmailUploadFormats.MAILDIR | SupportedEmailUploadFormats.MH: self._add_emails_from_mailbox_zip(file, file_format) case _: logger.error("Unsupported fileformat for uploaded file.") raise ValueError( _("The file format %(file_format)s is not supported.") % {"file_format": file_format} ) logger.info("Successfully added emails from file.")
@property @override def has_download(self) -> bool: return self.emails.exists() @property def available_fetching_criteria(self) -> tuple[str, ...]: """Gets the available fetching criteria based on the mail protocol of this mailbox. Returns: A tuple of all available fetching criteria for this mailbox. Raises: ValueError: If the account has an unimplemented protocol. """ return self.account.get_fetcher_class().AVAILABLE_FETCHING_CRITERIA # type: ignore[no-any-return] # for some reason mypy doesn't get this @property def available_no_arg_fetching_criteria(self) -> tuple[str, ...]: """Gets the available fetching criteria that do not require an argument based on the mail protocol of this mailbox. Returns: A tuple of all available fetching criteria that do not require an argument for this mailbox. Raises: ValueError: If the account has an unimplemented protocol. """ return tuple( criterion for criterion in self.available_fetching_criteria if criterion.format("arg") == criterion ) @property def available_fetching_criterion_choices(self) -> list[tuple[str, StrOrPromise]]: """Gets the available fetching criterion choices based on the mail protocol of this mailbox. Returns: A choices-type tuple of all available fetching criteria for this mailbox. Raises: ValueError: If the account has an unimplemented protocol. """ return [ (criterion, label) for criterion, label in EmailFetchingCriterionChoices.choices if criterion in self.account.get_fetcher_class().AVAILABLE_FETCHING_CRITERIA ] @property def available_no_arg_fetching_criterion_choices( self, ) -> list[tuple[str, StrOrPromise]]: """Gets the available fetching criterion choices that do not require an argumentbased on the mail protocol of this mailbox. Returns: A choices-type tuple of all available fetching criteria that do not require an argument for this mailbox. Raises: ValueError: If the account has an unimplemented protocol. """ return [ (criterion, label) for criterion, label in self.available_fetching_criterion_choices if criterion.format("arg") == criterion ] @property def available_download_formats(self) -> list[tuple[str, StrOrPromise]]: """Get all formats that emails in this mailbox can be downloaded in. Returns: A list of download formats and format names. """ return SupportedEmailDownloadFormats.choices
[docs] @classmethod def create_from_data( cls, mailbox_name: str, mailbox_type: str, account: Account ) -> Mailbox | None: """Creates a :class:`core.models.Mailbox` from the mailboxdata. Note: Mailbox created from data is considered healthy by default. Args: mailbox_name: The name of the mailbox. mailbox_type: The type of the mailbox. account: The account the mailbox is in. Returns: The :class:`core.models.Mailbox` instance with data from the bytes. `None` if the mailbox name is ignored. Raises: ValueError: If the given account is not in the db. """ if account.pk is None: raise ValueError("Account is not in the db!") mailbox_type = parse_mailbox_type(mailbox_type) if get_config("THROW_OUT_SPAM") and (mailbox_type == MailboxTypeChoices.JUNK): logger.debug("%s is a spambox, it is skipped.", mailbox_name) return None if re.compile( get_config("IGNORED_MAILBOXES_REGEX"), flags=re.IGNORECASE ).search(mailbox_name): logger.debug("%s is in the ignorelist, it is skipped.", mailbox_name) return None try: mailbox = cls.objects.get(account=account, name=mailbox_name) mailbox.type = mailbox_type # for migration of old mailbox instances mailbox.save(update_fields=["type"]) mailbox.set_healthy() logger.debug( "%s already exists in db, it has been set to healthy.", mailbox ) except Mailbox.DoesNotExist: mailbox = cls( account=account, name=mailbox_name, type=mailbox_type, save_to_eml=get_config("DEFAULT_SAVE_TO_EML"), save_attachments=get_config("DEFAULT_SAVE_ATTACHMENTS"), is_healthy=True, ) mailbox.save() logger.debug("Successfully saved %s to db.", mailbox_name) return mailbox
[docs] @staticmethod def queryset_as_file( queryset: models.QuerySet[Mailbox], file_format: str ) -> _TemporaryFileWrapper: """Processes the files of the emails in the mailboxes in the queryset into a temporary file. Args: queryset: The mailbox queryset to compile into a file. file_format: The desired format of the mailbox files. Must be one of :class:`core.constants.SupportedEmailDownloadFormats`. Case-insensitive. Returns: The temporary file wrapper. Raises: ValueError: If the given :attr:`file_format` is not supported. Mailbox.DoesNotExist: If the :attr:`queryset` is empty. """ if not queryset.exists(): raise Mailbox.DoesNotExist("The queryset is empty") file_format = file_format.lower() tempfile = ( NamedTemporaryFile() # noqa: SIM115 # pylint: disable=consider-using-with ) # the file must not be closed as it is returned later with ZipFile(tempfile.name, "w") as zipfile: for mailbox in queryset: try: mailbox_file = Email.queryset_as_file( mailbox.emails.all(), file_format ) except Email.DoesNotExist: continue with ( mailbox_file, zipfile.open( mailbox.name + "." + file_format.split("[", maxsplit=1)[0], "w" ) as zipped_file, ): zipped_file.write(mailbox_file.read()) return tempfile