Source code for core.models.Email

# SPDX-License-Identifier: AGPL-3.0-or-later
#
# Eonvelope - a open-source self-hostable email archiving server
# Copyright (C) 2024 David Aderbauer & The Eonvelope Contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Module with the :class:`Email` model class."""

from __future__ import annotations

import contextlib
import email
import logging
import os
import re
import shutil
from email import policy
from functools import cached_property
from hashlib import md5
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import TYPE_CHECKING, Any, ClassVar, override
from zipfile import ZipFile

from django.db import connection, models, transaction
from django.template import engines
from django.utils.translation import gettext as __
from django.utils.translation import gettext_lazy as _
from django_prometheus.models import ExportModelOperationsMixin

from core.constants import (
    PROTOCOLS_SUPPORTING_RESTORE,
    HeaderFields,
    SupportedEmailDownloadFormats,
    file_format_parsers,
)
from core.mixins import (
    DownloadMixin,
    FavoriteModelMixin,
    FilePathModelMixin,
    ThumbnailMixin,
    TimestampModelMixin,
    URLMixin,
)
from core.utils.fetchers.exceptions import MailboxError
from core.utils.mail_parsing import (
    get_bodytexts,
    get_header,
    is_x_spam,
    parse_datetime_header,
)
from eonvelope.utils.workarounds import get_config

from .Attachment import Attachment
from .EmailCorrespondent import EmailCorrespondent

if TYPE_CHECKING:
    from tempfile import _TemporaryFileWrapper

    from django.db.models import QuerySet

    from .Correspondent import Correspondent
    from .Mailbox import Mailbox


logger = logging.getLogger(__name__)
"""The logger instance for this module."""


[docs] class Email( ExportModelOperationsMixin("email"), DownloadMixin, ThumbnailMixin, URLMixin, FavoriteModelMixin, FilePathModelMixin, TimestampModelMixin, models.Model, ): """Database model for an email.""" BASENAME = "email" DELETE_NOTICE = _( "This will delete the records of this email and all its attachments but not its correspondents." ) DELETE_NOTICE_PLURAL = _( "This will delete the records of these emails and all their attachments but not their correspondents." ) message_id = models.CharField( max_length=255, # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("message-ID"), ) """The messageID header of the mail. Unique together with :attr:`mailbox`.""" datetime = models.DateTimeField( # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("time received"), ) """The Date header of the mail.""" subject = models.TextField( blank=True, default="", # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("subject"), ) """The subject header of the mail.""" plain_bodytext = models.TextField( blank=True, default="", # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("plain bodytext"), ) """The plain bodytext of the mail. Can be blank.""" html_bodytext = models.TextField( blank=True, default="", # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("HTML bodytext"), ) """The html bodytext of the mail. Can be blank.""" in_reply_to: models.ManyToManyField[Email, Email] = models.ManyToManyField( "self", symmetrical=False, related_name="replies", # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("in reply to email"), ) """The mails that this mail is a response to. Technically just a single mail, but as a mail can exist in multiple mailboxes, this needs to be able to reference multiples.""" references: models.ManyToManyField[Email, Email] = models.ManyToManyField( "self", symmetrical=False, related_name="referenced_by", # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("referencing emails"), ) """The mails that this email references.""" datasize = models.PositiveIntegerField( # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("datasize"), ) """The bytes size of the mail.""" correspondents: models.ManyToManyField[Correspondent, Correspondent] = ( models.ManyToManyField( "Correspondent", through="EmailCorrespondent", related_name="emails", # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("correspondents"), ) ) """The correspondents that are mentioned in this mail. Bridges through :class:`core.models.EmailCorrespondent`.""" mailbox: models.ForeignKey[Mailbox] = models.ForeignKey( "Mailbox", related_name="emails", on_delete=models.CASCADE, # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("mailbox"), ) """The mailbox that this mail has been found in. Unique together with :attr:`message_id`. Deletion of that `mailbox` deletes this mail.""" headers = models.JSONField( null=True, # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("headers"), ) """All other header fields of the mail. Can be null.""" x_spam_flag = models.BooleanField( null=True, blank=True, # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("X-Spam Flag"), ) """The x_spam header of this mail. Can be null.""" class Meta: """Metadata class for the model.""" db_table = "emails" """The name of the database table for the emails.""" # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name = _("email") # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name_plural = _("emails") get_latest_by = "datetime" constraints: ClassVar[list[models.BaseConstraint]] = [ models.UniqueConstraint( fields=["message_id", "mailbox"], name="email_unique_together_message_id_mailbox", ) ] """:attr:`message_id` and :attr:`mailbox` in combination are unique."""
[docs] @override def __str__(self) -> str: """Returns a string representation of the model data. Returns: The string representation of the email, using :attr:`message_id`, :attr:`datetime` and :attr:`mailbox`. """ return _( "Email with ID %(message_id)s, received on %(datetime)s from %(mailbox)s" ) % { "message_id": self.message_id, "datetime": self.datetime, "mailbox": self.mailbox, }
[docs] @override def save(self, *args: Any, **kwargs: Any) -> None: """Extended :django::func:`django.models.Model.save` method. Saves the data to eml if configured. """ if not self.mailbox.save_to_eml: kwargs.pop("file_payload", None) super().save(*args, **kwargs)
[docs] @override def _get_storage_file_name(self) -> str: """Create the filename for the stored eml.""" return str(self.pk) + "_" + self.message_id + ".eml"
[docs] def fill_from_email_bytes(self, email_bytes: bytes) -> Email: """Fills the :class:`core.models.Email` with data from an email in bytes form. Args: email_bytes: The email bytes data. Returns: The :class:`core.models.Email` instance with data from the bytes. """ email_message = email.message_from_bytes(email_bytes, policy=policy.default) header_dict: dict[str, str | None] = {} for header_name in email_message: header_dict[header_name.lower()] = get_header(email_message, header_name) bodytexts = get_bodytexts(email_message) self.headers = header_dict self.message_id = ( header_dict.get(HeaderFields.MESSAGE_ID) or md5(email_bytes).hexdigest() # noqa: S324 # no safe hash required here ) self.datetime = parse_datetime_header(header_dict.get(HeaderFields.DATE)) self.subject = header_dict.get(HeaderFields.SUBJECT) or __("No subject") self.x_spam_flag = is_x_spam(header_dict.get(HeaderFields.X_SPAM)) self.datasize = len(email_bytes) self.plain_bodytext = bodytexts.get("plain", "") self.html_bodytext = bodytexts.get("html", "") return self
[docs] def add_correspondents(self) -> None: """Adds the correspondents from the headerfields to the model.""" if self.headers: for mention in HeaderFields.Correspondents.values: correspondent_header = self.headers.get(mention) if correspondent_header: new_emailcorrespondents = EmailCorrespondent.create_from_header( correspondent_header, mention, self ) if ( mention == HeaderFields.Correspondents.FROM and new_emailcorrespondents is not None ): for new_emailcorrespondent in new_emailcorrespondents: new_emailcorrespondent.correspondent.list_id = ( self.headers.get(HeaderFields.MailingList.ID, "") ) new_emailcorrespondent.correspondent.list_help = ( self.headers.get(HeaderFields.MailingList.HELP, "") ) new_emailcorrespondent.correspondent.list_archive = ( self.headers.get(HeaderFields.MailingList.ARCHIVE, "") ) new_emailcorrespondent.correspondent.list_subscribe = ( self.headers.get(HeaderFields.MailingList.SUBSCRIBE, "") ) new_emailcorrespondent.correspondent.list_unsubscribe = ( self.headers.get( HeaderFields.MailingList.UNSUBSCRIBE, "" ) ) new_emailcorrespondent.correspondent.list_unsubscribe_post = self.headers.get( HeaderFields.MailingList.UNSUBSCRIBE_POST, "" ) new_emailcorrespondent.correspondent.list_post = ( self.headers.get(HeaderFields.MailingList.POST, "") ) new_emailcorrespondent.correspondent.list_owner = ( self.headers.get(HeaderFields.MailingList.OWNER, "") ) new_emailcorrespondent.correspondent.save()
[docs] def add_in_reply_to(self) -> None: """Adds the in-reply-to emails from the headerfields to the model.""" if self.headers: in_reply_to_message_id = self.headers.get(HeaderFields.IN_REPLY_TO) if in_reply_to_message_id: for in_reply_to_email in Email.objects.filter( message_id=in_reply_to_message_id.strip(), mailbox__account__user=self.mailbox.account.user, ): self.in_reply_to.add(in_reply_to_email)
[docs] def add_references(self) -> None: """Adds the references from the headerfields to the model.""" if self.headers: references_header = self.headers.get(HeaderFields.REFERENCES) if references_header: referenced_message_ids = [ message_id.strip() for message_id in re.split(r"[ ,]", references_header) ] for referenced_message_id in referenced_message_ids: if referenced_message_id: # re.split may produce empty strings for referenced_email in Email.objects.filter( message_id=referenced_message_id, mailbox__account__user=self.mailbox.account.user, ): self.references.add(referenced_email)
[docs] def reprocess(self) -> None: """Reprocesses the mails connections to other emails in the database.""" with contextlib.suppress(FileNotFoundError): with self.open_file() as email_file: email_bytes = email_file.read() self.fill_from_email_bytes(email_bytes) with transaction.atomic(): self.save() self.in_reply_to.clear() self.add_in_reply_to() self.references.clear() self.add_references()
[docs] def restore_to_mailbox(self) -> None: """Restores the email to its mailbox. Raises: NotImplementedError: If the emails account does not allow restoring. FileNotFoundError: If there is no eml file for the email. MailAccountError: If there was an error connected to the account. MailboxError: If there was an error with the mailbox. """ logger.debug("Restoring %s to its mailbox.", self) with self.mailbox.account.get_fetcher() as fetcher: try: fetcher.restore(self) except MailboxError as error: logger.exception("Restoring of email %s to its mailbox failed!", self) self.mailbox.set_unhealthy(error) raise logger.debug("Successfully restored email.")
[docs] @cached_property def conversation(self) -> QuerySet[Email]: """Recursively gets all emails that are part of this emails conversation, connected through references or in_reply_to. Returns: Queryset of all mails in the conversation. """ conversation_sql = """ WITH RECURSIVE emails_links AS ( SELECT from_email_id, to_email_id FROM emails_in_reply_to UNION ALL SELECT from_email_id, to_email_id FROM emails_references ), conversation_root AS ( SELECT e.id FROM emails e WHERE e.id = %s UNION ALL SELECT DISTINCT linked.id FROM emails linked JOIN emails_links l ON l.to_email_id = linked.id JOIN conversation_root cr ON cr.id = l.from_email_id ), conversation_thread AS ( SELECT e.id FROM emails e JOIN conversation_root cr ON e.id = cr.id WHERE NOT EXISTS ( SELECT 1 FROM emails_links l WHERE l.from_email_id = e.id ) UNION ALL SELECT DISTINCT linking.id FROM emails linking JOIN emails_links l ON l.from_email_id = linking.id JOIN conversation_thread ct ON l.to_email_id = ct.id ) SELECT DISTINCT id FROM conversation_thread; """ with connection.cursor() as cursor: cursor.execute(conversation_sql, [self.id]) conversation_rows = cursor.fetchall() conversation_ids = [ conversation_row[0] for conversation_row in conversation_rows ] return Email.objects.filter( id__in=conversation_ids, mailbox__account__user=self.mailbox.account.user ).order_by("datetime")
@property @override def has_thumbnail(self) -> bool: return not self.is_spam @property def can_be_restored(self) -> bool: """Checks if the email can be restored to its mailbox. Returns: Whether the email can be restored. """ return ( self.file_path is not None and self.mailbox.account.protocol in PROTOCOLS_SUPPORTING_RESTORE and self.mailbox.is_healthy )
[docs] @cached_property def html_version(self) -> str: """Renders a html version of this email. Uses the template and css from constance settings. Returns: The emails html version. """ engine = engines["django"] template = engine.from_string(get_config("EMAIL_HTML_TEMPLATE")) from_emailcorrespondents = self.emailcorrespondents.filter( mention=HeaderFields.Correspondents.FROM ).select_related("correspondent") to_emailcorrespondents = self.emailcorrespondents.filter( mention=HeaderFields.Correspondents.TO ).select_related("correspondent") cc_emailcorrespondents = self.emailcorrespondents.filter( mention=HeaderFields.Correspondents.CC ).select_related("correspondent") bcc_emailcorrespondents = self.emailcorrespondents.filter( mention=HeaderFields.Correspondents.BCC ).select_related("correspondent") return template.render( context={ "email": self, "email_css": get_config("EMAIL_CSS"), "from_emailcorrespondents": from_emailcorrespondents, "to_emailcorrespondents": to_emailcorrespondents, "cc_emailcorrespondents": cc_emailcorrespondents, "bcc_emailcorrespondents": bcc_emailcorrespondents, } )
@property def is_spam(self) -> bool: """Checks the spam headers to decide whether the mail is spam. Returns: Whether the mail is considered spam. """ return bool(self.x_spam_flag)
[docs] @classmethod def create_from_email_bytes( cls, email_bytes: bytes, mailbox: Mailbox ) -> Email | None: """Creates an :class:`core.models.Email` from an email in bytes form. Args: email_bytes: The email bytes to parse the emaildata from. mailbox: The mailbox the email is in. Returns: The :class:`core.models.Email` instance with data from the bytes. None if there is no Message-ID header in :attr:`email_message`, if the mail already exists in the db or if the mail is spam and is supposed to be thrown out. """ email_message = email.message_from_bytes(email_bytes, policy=policy.default) message_id = ( get_header( email_message, HeaderFields.MESSAGE_ID, ) or md5(email_bytes).hexdigest() # noqa: S324 # no safe hash required here ) logger.debug("Parsed email %s ...", message_id) x_spam = is_x_spam(get_header(email_message, HeaderFields.X_SPAM)) if x_spam and get_config("THROW_OUT_SPAM"): logger.debug( "Skipping email with Message-ID %s in %s, it is flagged as spam.", message_id, mailbox, ) return None if cls.objects.filter(message_id=message_id, mailbox=mailbox).exists(): logger.debug( "Skipping email with Message-ID %s in %s, it already exists in the db.", message_id, mailbox, ) return None new_email = cls(mailbox=mailbox).fill_from_email_bytes(email_bytes=email_bytes) logger.debug("Successfully parsed email.") logger.debug("Saving email %s to db...", message_id) try: with transaction.atomic(): new_email.save(file_payload=email_bytes) new_email.add_correspondents() new_email.add_in_reply_to() new_email.add_references() Attachment.create_from_email_message(email_message, new_email) except Exception: logger.exception( "Failed creating email from bytes: Error while saving email to db!" ) return None logger.debug("Successfully saved email to db.") return new_email
[docs] @staticmethod def _queryset_as_zip_eml(queryset: QuerySet[Email]) -> _TemporaryFileWrapper: """Parses a queryset of emails into a zip of eml files. Note: Does not validate args! This has to be done beforehand. """ tempfile = ( NamedTemporaryFile() # noqa: SIM115 # pylint: disable=consider-using-with ) # the file must not be closed as it is returned later with ZipFile(tempfile.name, "w") as zipfile: for email_item in queryset: try: eml_file = email_item.open_file() except FileNotFoundError: continue with ( eml_file, zipfile.open( os.path.basename(email_item.file_path), "w" ) as zipped_file, ): zipped_file.write(eml_file.read()) return tempfile
[docs] @staticmethod def _queryset_as_mailbox_file( queryset: QuerySet[Email], file_format: str ) -> _TemporaryFileWrapper: """Parses a queryset of emails into a mailbox file. Note: Does not validate args! This has to be done beforehand. """ tempfile = ( NamedTemporaryFile() # noqa: SIM115 # pylint: disable=consider-using-with ) # the file must not be closed as it is returned later parser_class = file_format_parsers[file_format] parser = parser_class(tempfile.name, create=True) parser.lock() for email_item in queryset: try: eml_file = email_item.open_file() except FileNotFoundError: continue with eml_file: parser.add(eml_file) parser.close() return tempfile
[docs] @staticmethod def _queryset_as_mailbox_zip( queryset: QuerySet[Email], file_format: str ) -> _TemporaryFileWrapper: """Parses a queryset of emails into a zipped mailbox dir. Note: Does not validate args! This has to be done beforehand. """ tempfile = ( NamedTemporaryFile( # noqa: SIM115 # pylint: disable=consider-using-with suffix=".zip" # the suffix allows zipping to this file with shutil ) ) # the file must not be closed as it is returned later with TemporaryDirectory() as tempdirpath: mailbox_path = os.path.join(tempdirpath, file_format) parser_class = file_format_parsers[file_format] parser = parser_class(mailbox_path, create=True) parser.lock() for email_item in queryset: # this construction is strictly necessary as Maildir.add can also raise FileNotFound # if the directory is incorrectly structured; that warning must not be blocked try: eml_file = email_item.open_file() except FileNotFoundError: continue with eml_file: parser.add(eml_file) parser.close() shutil.make_archive(os.path.splitext(tempfile.name)[0], "zip", tempdirpath) return tempfile
[docs] @staticmethod def queryset_as_file( queryset: QuerySet[Email], file_format: str ) -> _TemporaryFileWrapper: """Processes the files of the emails in the queryset into a temporary file. Args: queryset: The email queryset to compile into a file. file_format: The desired format of the file. Must be one of :class:`core.constants.SupportedEmailDownloadFormats`. Case-insensitive. Returns: The temporary file wrapper. Raises: ValueError: If the given :attr:`file_format` is not supported. Email.DoesNotExist: If the :attr:`queryset` is empty. """ if not queryset.exists(): raise Email.DoesNotExist("The queryset is empty!") file_format = file_format.lower() if file_format == SupportedEmailDownloadFormats.ZIP_EML: return Email._queryset_as_zip_eml(queryset) if file_format in [ SupportedEmailDownloadFormats.MBOX, SupportedEmailDownloadFormats.BABYL, SupportedEmailDownloadFormats.MMDF, ]: return Email._queryset_as_mailbox_file(queryset, file_format) if file_format in [ SupportedEmailDownloadFormats.MAILDIR, SupportedEmailDownloadFormats.MH, ]: return Email._queryset_as_mailbox_zip(queryset, file_format) raise ValueError( _("The file format %(file_format)s is not supported.") % {"file_format": file_format} )