# SPDX-License-Identifier: AGPL-3.0-or-later
#
# Eonvelope - a open-source self-hostable email archiving server
# Copyright (C) 2024 David Aderbauer & The Eonvelope Contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Module with the :class:`Email` model class."""
from __future__ import annotations
import contextlib
import email
import logging
import os
import re
import shutil
from email import policy
from functools import cached_property
from hashlib import md5
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import TYPE_CHECKING, Any, ClassVar, override
from zipfile import ZipFile
from django.db import connection, models, transaction
from django.template import engines
from django.utils.translation import gettext as __
from django.utils.translation import gettext_lazy as _
from django_prometheus.models import ExportModelOperationsMixin
from core.constants import (
PROTOCOLS_SUPPORTING_RESTORE,
HeaderFields,
SupportedEmailDownloadFormats,
file_format_parsers,
)
from core.mixins import (
DownloadMixin,
FavoriteModelMixin,
FilePathModelMixin,
ThumbnailMixin,
TimestampModelMixin,
URLMixin,
)
from core.utils.fetchers.exceptions import MailboxError
from core.utils.mail_parsing import (
get_bodytexts,
get_header,
is_x_spam,
parse_datetime_header,
)
from eonvelope.utils.workarounds import get_config
from .Attachment import Attachment
from .EmailCorrespondent import EmailCorrespondent
if TYPE_CHECKING:
from tempfile import _TemporaryFileWrapper
from django.db.models import QuerySet
from .Correspondent import Correspondent
from .Mailbox import Mailbox
logger = logging.getLogger(__name__)
"""The logger instance for this module."""
[docs]
class Email(
ExportModelOperationsMixin("email"),
DownloadMixin,
ThumbnailMixin,
URLMixin,
FavoriteModelMixin,
FilePathModelMixin,
TimestampModelMixin,
models.Model,
):
"""Database model for an email."""
BASENAME = "email"
DELETE_NOTICE = _(
"This will delete the records of this email and all its attachments but not its correspondents."
)
DELETE_NOTICE_PLURAL = _(
"This will delete the records of these emails and all their attachments but not their correspondents."
)
message_id = models.CharField(
max_length=255,
# Translators: Do not capitalize the very first letter unless your language requires it.
verbose_name=_("message-ID"),
)
"""The messageID header of the mail. Unique together with :attr:`mailbox`."""
datetime = models.DateTimeField(
# Translators: Do not capitalize the very first letter unless your language requires it.
verbose_name=_("time received"),
)
"""The Date header of the mail."""
subject = models.TextField(
blank=True,
default="",
# Translators: Do not capitalize the very first letter unless your language requires it.
verbose_name=_("subject"),
)
"""The subject header of the mail."""
plain_bodytext = models.TextField(
blank=True,
default="",
# Translators: Do not capitalize the very first letter unless your language requires it.
verbose_name=_("plain bodytext"),
)
"""The plain bodytext of the mail. Can be blank."""
html_bodytext = models.TextField(
blank=True,
default="",
# Translators: Do not capitalize the very first letter unless your language requires it.
verbose_name=_("HTML bodytext"),
)
"""The html bodytext of the mail. Can be blank."""
in_reply_to: models.ManyToManyField[Email, Email] = models.ManyToManyField(
"self",
symmetrical=False,
related_name="replies",
# Translators: Do not capitalize the very first letter unless your language requires it.
verbose_name=_("in reply to email"),
)
"""The mails that this mail is a response to.
Technically just a single mail, but as a mail can exist in multiple mailboxes, this needs to be able to reference multiples."""
references: models.ManyToManyField[Email, Email] = models.ManyToManyField(
"self",
symmetrical=False,
related_name="referenced_by",
# Translators: Do not capitalize the very first letter unless your language requires it.
verbose_name=_("referencing emails"),
)
"""The mails that this email references."""
datasize = models.PositiveIntegerField(
# Translators: Do not capitalize the very first letter unless your language requires it.
verbose_name=_("datasize"),
)
"""The bytes size of the mail."""
correspondents: models.ManyToManyField[Correspondent, Correspondent] = (
models.ManyToManyField(
"Correspondent",
through="EmailCorrespondent",
related_name="emails",
# Translators: Do not capitalize the very first letter unless your language requires it.
verbose_name=_("correspondents"),
)
)
"""The correspondents that are mentioned in this mail. Bridges through :class:`core.models.EmailCorrespondent`."""
mailbox: models.ForeignKey[Mailbox] = models.ForeignKey(
"Mailbox",
related_name="emails",
on_delete=models.CASCADE,
# Translators: Do not capitalize the very first letter unless your language requires it.
verbose_name=_("mailbox"),
)
"""The mailbox that this mail has been found in. Unique together with :attr:`message_id`. Deletion of that `mailbox` deletes this mail."""
headers = models.JSONField(
null=True,
# Translators: Do not capitalize the very first letter unless your language requires it.
verbose_name=_("headers"),
)
"""All other header fields of the mail. Can be null."""
x_spam_flag = models.BooleanField(
null=True,
blank=True,
# Translators: Do not capitalize the very first letter unless your language requires it.
verbose_name=_("X-Spam Flag"),
)
"""The x_spam header of this mail. Can be null."""
class Meta:
"""Metadata class for the model."""
db_table = "emails"
"""The name of the database table for the emails."""
# Translators: Do not capitalize the very first letter unless your language requires it.
verbose_name = _("email")
# Translators: Do not capitalize the very first letter unless your language requires it.
verbose_name_plural = _("emails")
get_latest_by = "datetime"
constraints: ClassVar[list[models.BaseConstraint]] = [
models.UniqueConstraint(
fields=["message_id", "mailbox"],
name="email_unique_together_message_id_mailbox",
)
]
""":attr:`message_id` and :attr:`mailbox` in combination are unique."""
[docs]
@override
def __str__(self) -> str:
"""Returns a string representation of the model data.
Returns:
The string representation of the email, using :attr:`message_id`, :attr:`datetime` and :attr:`mailbox`.
"""
return _(
"Email with ID %(message_id)s, received on %(datetime)s from %(mailbox)s"
) % {
"message_id": self.message_id,
"datetime": self.datetime,
"mailbox": self.mailbox,
}
[docs]
@override
def save(self, *args: Any, **kwargs: Any) -> None:
"""Extended :django::func:`django.models.Model.save` method.
Saves the data to eml if configured.
"""
if not self.mailbox.save_to_eml:
kwargs.pop("file_payload", None)
super().save(*args, **kwargs)
[docs]
@override
def _get_storage_file_name(self) -> str:
"""Create the filename for the stored eml."""
return str(self.pk) + "_" + self.message_id + ".eml"
[docs]
def fill_from_email_bytes(self, email_bytes: bytes) -> Email:
"""Fills the :class:`core.models.Email` with data from an email in bytes form.
Args:
email_bytes: The email bytes data.
Returns:
The :class:`core.models.Email` instance with data from the bytes.
"""
email_message = email.message_from_bytes(email_bytes, policy=policy.default)
header_dict: dict[str, str | None] = {}
for header_name in email_message:
header_dict[header_name.lower()] = get_header(email_message, header_name)
bodytexts = get_bodytexts(email_message)
self.headers = header_dict
self.message_id = (
header_dict.get(HeaderFields.MESSAGE_ID)
or md5(email_bytes).hexdigest() # noqa: S324 # no safe hash required here
)
self.datetime = parse_datetime_header(header_dict.get(HeaderFields.DATE))
self.subject = header_dict.get(HeaderFields.SUBJECT) or __("No subject")
self.x_spam_flag = is_x_spam(header_dict.get(HeaderFields.X_SPAM))
self.datasize = len(email_bytes)
self.plain_bodytext = bodytexts.get("plain", "")
self.html_bodytext = bodytexts.get("html", "")
return self
[docs]
def add_correspondents(self) -> None:
"""Adds the correspondents from the headerfields to the model."""
if self.headers:
for mention in HeaderFields.Correspondents.values:
correspondent_header = self.headers.get(mention)
if correspondent_header:
new_emailcorrespondents = EmailCorrespondent.create_from_header(
correspondent_header, mention, self
)
if (
mention == HeaderFields.Correspondents.FROM
and new_emailcorrespondents is not None
):
for new_emailcorrespondent in new_emailcorrespondents:
new_emailcorrespondent.correspondent.list_id = (
self.headers.get(HeaderFields.MailingList.ID, "")
)
new_emailcorrespondent.correspondent.list_help = (
self.headers.get(HeaderFields.MailingList.HELP, "")
)
new_emailcorrespondent.correspondent.list_archive = (
self.headers.get(HeaderFields.MailingList.ARCHIVE, "")
)
new_emailcorrespondent.correspondent.list_subscribe = (
self.headers.get(HeaderFields.MailingList.SUBSCRIBE, "")
)
new_emailcorrespondent.correspondent.list_unsubscribe = (
self.headers.get(
HeaderFields.MailingList.UNSUBSCRIBE, ""
)
)
new_emailcorrespondent.correspondent.list_unsubscribe_post = self.headers.get(
HeaderFields.MailingList.UNSUBSCRIBE_POST, ""
)
new_emailcorrespondent.correspondent.list_post = (
self.headers.get(HeaderFields.MailingList.POST, "")
)
new_emailcorrespondent.correspondent.list_owner = (
self.headers.get(HeaderFields.MailingList.OWNER, "")
)
new_emailcorrespondent.correspondent.save()
[docs]
def add_in_reply_to(self) -> None:
"""Adds the in-reply-to emails from the headerfields to the model."""
if self.headers:
in_reply_to_message_id = self.headers.get(HeaderFields.IN_REPLY_TO)
if in_reply_to_message_id:
for in_reply_to_email in Email.objects.filter(
message_id=in_reply_to_message_id.strip(),
mailbox__account__user=self.mailbox.account.user,
):
self.in_reply_to.add(in_reply_to_email)
[docs]
def add_references(self) -> None:
"""Adds the references from the headerfields to the model."""
if self.headers:
references_header = self.headers.get(HeaderFields.REFERENCES)
if references_header:
referenced_message_ids = [
message_id.strip()
for message_id in re.split(r"[ ,]", references_header)
]
for referenced_message_id in referenced_message_ids:
if referenced_message_id: # re.split may produce empty strings
for referenced_email in Email.objects.filter(
message_id=referenced_message_id,
mailbox__account__user=self.mailbox.account.user,
):
self.references.add(referenced_email)
[docs]
def reprocess(self) -> None:
"""Reprocesses the mails connections to other emails in the database."""
with contextlib.suppress(FileNotFoundError):
with self.open_file() as email_file:
email_bytes = email_file.read()
self.fill_from_email_bytes(email_bytes)
with transaction.atomic():
self.save()
self.in_reply_to.clear()
self.add_in_reply_to()
self.references.clear()
self.add_references()
[docs]
def restore_to_mailbox(self) -> None:
"""Restores the email to its mailbox.
Raises:
NotImplementedError: If the emails account does not allow restoring.
FileNotFoundError: If there is no eml file for the email.
MailAccountError: If there was an error connected to the account.
MailboxError: If there was an error with the mailbox.
"""
logger.debug("Restoring %s to its mailbox.", self)
with self.mailbox.account.get_fetcher() as fetcher:
try:
fetcher.restore(self)
except MailboxError as error:
logger.exception("Restoring of email %s to its mailbox failed!", self)
self.mailbox.set_unhealthy(error)
raise
logger.debug("Successfully restored email.")
[docs]
@cached_property
def conversation(self) -> QuerySet[Email]:
"""Recursively gets all emails that are part of this emails conversation,
connected through references or in_reply_to.
Returns:
Queryset of all mails in the conversation.
"""
conversation_sql = """
WITH RECURSIVE
emails_links AS (
SELECT from_email_id, to_email_id FROM emails_in_reply_to
UNION ALL
SELECT from_email_id, to_email_id FROM emails_references
),
conversation_root AS (
SELECT e.id
FROM emails e
WHERE e.id = %s
UNION ALL
SELECT DISTINCT linked.id
FROM emails linked
JOIN emails_links l ON l.to_email_id = linked.id
JOIN conversation_root cr ON cr.id = l.from_email_id
),
conversation_thread AS (
SELECT e.id
FROM emails e
JOIN conversation_root cr ON e.id = cr.id
WHERE NOT EXISTS (
SELECT 1
FROM emails_links l
WHERE l.from_email_id = e.id
)
UNION ALL
SELECT DISTINCT linking.id
FROM emails linking
JOIN emails_links l ON l.from_email_id = linking.id
JOIN conversation_thread ct ON l.to_email_id = ct.id
)
SELECT DISTINCT id
FROM conversation_thread;
"""
with connection.cursor() as cursor:
cursor.execute(conversation_sql, [self.id])
conversation_rows = cursor.fetchall()
conversation_ids = [
conversation_row[0] for conversation_row in conversation_rows
]
return Email.objects.filter(
id__in=conversation_ids, mailbox__account__user=self.mailbox.account.user
).order_by("datetime")
@property
@override
def has_thumbnail(self) -> bool:
return not self.is_spam
@property
def can_be_restored(self) -> bool:
"""Checks if the email can be restored to its mailbox.
Returns:
Whether the email can be restored.
"""
return (
self.file_path is not None
and self.mailbox.account.protocol in PROTOCOLS_SUPPORTING_RESTORE
and self.mailbox.is_healthy
)
[docs]
@cached_property
def html_version(self) -> str:
"""Renders a html version of this email.
Uses the template and css from constance settings.
Returns:
The emails html version.
"""
engine = engines["django"]
template = engine.from_string(get_config("EMAIL_HTML_TEMPLATE"))
from_emailcorrespondents = self.emailcorrespondents.filter(
mention=HeaderFields.Correspondents.FROM
).select_related("correspondent")
to_emailcorrespondents = self.emailcorrespondents.filter(
mention=HeaderFields.Correspondents.TO
).select_related("correspondent")
cc_emailcorrespondents = self.emailcorrespondents.filter(
mention=HeaderFields.Correspondents.CC
).select_related("correspondent")
bcc_emailcorrespondents = self.emailcorrespondents.filter(
mention=HeaderFields.Correspondents.BCC
).select_related("correspondent")
return template.render(
context={
"email": self,
"email_css": get_config("EMAIL_CSS"),
"from_emailcorrespondents": from_emailcorrespondents,
"to_emailcorrespondents": to_emailcorrespondents,
"cc_emailcorrespondents": cc_emailcorrespondents,
"bcc_emailcorrespondents": bcc_emailcorrespondents,
}
)
@property
def is_spam(self) -> bool:
"""Checks the spam headers to decide whether the mail is spam.
Returns:
Whether the mail is considered spam.
"""
return bool(self.x_spam_flag)
[docs]
@classmethod
def create_from_email_bytes(
cls, email_bytes: bytes, mailbox: Mailbox
) -> Email | None:
"""Creates an :class:`core.models.Email` from an email in bytes form.
Args:
email_bytes: The email bytes to parse the emaildata from.
mailbox: The mailbox the email is in.
Returns:
The :class:`core.models.Email` instance with data from the bytes.
None if there is no Message-ID header in :attr:`email_message`,
if the mail already exists in the db or
if the mail is spam and is supposed to be thrown out.
"""
email_message = email.message_from_bytes(email_bytes, policy=policy.default)
message_id = (
get_header(
email_message,
HeaderFields.MESSAGE_ID,
)
or md5(email_bytes).hexdigest() # noqa: S324 # no safe hash required here
)
logger.debug("Parsed email %s ...", message_id)
x_spam = is_x_spam(get_header(email_message, HeaderFields.X_SPAM))
if x_spam and get_config("THROW_OUT_SPAM"):
logger.debug(
"Skipping email with Message-ID %s in %s, it is flagged as spam.",
message_id,
mailbox,
)
return None
if cls.objects.filter(message_id=message_id, mailbox=mailbox).exists():
logger.debug(
"Skipping email with Message-ID %s in %s, it already exists in the db.",
message_id,
mailbox,
)
return None
new_email = cls(mailbox=mailbox).fill_from_email_bytes(email_bytes=email_bytes)
logger.debug("Successfully parsed email.")
logger.debug("Saving email %s to db...", message_id)
try:
with transaction.atomic():
new_email.save(file_payload=email_bytes)
new_email.add_correspondents()
new_email.add_in_reply_to()
new_email.add_references()
Attachment.create_from_email_message(email_message, new_email)
except Exception:
logger.exception(
"Failed creating email from bytes: Error while saving email to db!"
)
return None
logger.debug("Successfully saved email to db.")
return new_email
[docs]
@staticmethod
def _queryset_as_zip_eml(queryset: QuerySet[Email]) -> _TemporaryFileWrapper:
"""Parses a queryset of emails into a zip of eml files.
Note:
Does not validate args! This has to be done beforehand.
"""
tempfile = (
NamedTemporaryFile() # noqa: SIM115 # pylint: disable=consider-using-with
) # the file must not be closed as it is returned later
with ZipFile(tempfile.name, "w") as zipfile:
for email_item in queryset:
try:
eml_file = email_item.open_file()
except FileNotFoundError:
continue
with (
eml_file,
zipfile.open(
os.path.basename(email_item.file_path), "w"
) as zipped_file,
):
zipped_file.write(eml_file.read())
return tempfile
[docs]
@staticmethod
def _queryset_as_mailbox_file(
queryset: QuerySet[Email], file_format: str
) -> _TemporaryFileWrapper:
"""Parses a queryset of emails into a mailbox file.
Note:
Does not validate args! This has to be done beforehand.
"""
tempfile = (
NamedTemporaryFile() # noqa: SIM115 # pylint: disable=consider-using-with
) # the file must not be closed as it is returned later
parser_class = file_format_parsers[file_format]
parser = parser_class(tempfile.name, create=True)
parser.lock()
for email_item in queryset:
try:
eml_file = email_item.open_file()
except FileNotFoundError:
continue
with eml_file:
parser.add(eml_file)
parser.close()
return tempfile
[docs]
@staticmethod
def _queryset_as_mailbox_zip(
queryset: QuerySet[Email], file_format: str
) -> _TemporaryFileWrapper:
"""Parses a queryset of emails into a zipped mailbox dir.
Note:
Does not validate args! This has to be done beforehand.
"""
tempfile = (
NamedTemporaryFile( # noqa: SIM115 # pylint: disable=consider-using-with
suffix=".zip" # the suffix allows zipping to this file with shutil
)
) # the file must not be closed as it is returned later
with TemporaryDirectory() as tempdirpath:
mailbox_path = os.path.join(tempdirpath, file_format)
parser_class = file_format_parsers[file_format]
parser = parser_class(mailbox_path, create=True)
parser.lock()
for email_item in queryset:
# this construction is strictly necessary as Maildir.add can also raise FileNotFound
# if the directory is incorrectly structured; that warning must not be blocked
try:
eml_file = email_item.open_file()
except FileNotFoundError:
continue
with eml_file:
parser.add(eml_file)
parser.close()
shutil.make_archive(os.path.splitext(tempfile.name)[0], "zip", tempdirpath)
return tempfile
[docs]
@staticmethod
def queryset_as_file(
queryset: QuerySet[Email], file_format: str
) -> _TemporaryFileWrapper:
"""Processes the files of the emails in the queryset into a temporary file.
Args:
queryset: The email queryset to compile into a file.
file_format: The desired format of the file. Must be one of :class:`core.constants.SupportedEmailDownloadFormats`. Case-insensitive.
Returns:
The temporary file wrapper.
Raises:
ValueError: If the given :attr:`file_format` is not supported.
Email.DoesNotExist: If the :attr:`queryset` is empty.
"""
if not queryset.exists():
raise Email.DoesNotExist("The queryset is empty!")
file_format = file_format.lower()
if file_format == SupportedEmailDownloadFormats.ZIP_EML:
return Email._queryset_as_zip_eml(queryset)
if file_format in [
SupportedEmailDownloadFormats.MBOX,
SupportedEmailDownloadFormats.BABYL,
SupportedEmailDownloadFormats.MMDF,
]:
return Email._queryset_as_mailbox_file(queryset, file_format)
if file_format in [
SupportedEmailDownloadFormats.MAILDIR,
SupportedEmailDownloadFormats.MH,
]:
return Email._queryset_as_mailbox_zip(queryset, file_format)
raise ValueError(
_("The file format %(file_format)s is not supported.")
% {"file_format": file_format}
)