Source code for core.models.Attachment

# SPDX-License-Identifier: AGPL-3.0-or-later
#
# Eonvelope - a open-source self-hostable email archiving server
# Copyright (C) 2024 David Aderbauer & The Eonvelope Contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Module with the :class:`Attachment` model class."""

from __future__ import annotations

import logging
import os
from functools import cached_property
from hashlib import md5
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Any, override
from zipfile import ZipFile

import httpcore
import httpx
from django.db import models
from django.template import engines
from django.utils.html import format_html
from django.utils.text import get_valid_filename
from django.utils.translation import gettext_lazy as _
from django_prometheus.models import ExportModelOperationsMixin
from rest_framework import status

from core.constants import (
    HTML_SUPPORTED_AUDIO_TYPE,
    HTML_SUPPORTED_VIDEO_TYPES,
    ICALENDAR_TEMPLATE,
    IMMICH_SUPPORTED_APPLICATION_TYPES,
    IMMICH_SUPPORTED_IMAGE_TYPES,
    IMMICH_SUPPORTED_VIDEO_TYPES,
    PAPERLESS_SUPPORTED_IMAGE_TYPES,
    PAPERLESS_TIKA_SUPPORTED_MIMETYPES,
    VCARD_TEMPLATE,
    HeaderFields,
)
from core.mixins import (
    DownloadMixin,
    FavoriteModelMixin,
    FilePathModelMixin,
    ThumbnailMixin,
    TimestampModelMixin,
    URLMixin,
)
from core.utils.mail_parsing import make_icalendar_readout, make_vcard_readout
from eonvelope.utils.workarounds import get_config

if TYPE_CHECKING:
    from email.message import EmailMessage
    from tempfile import _TemporaryFileWrapper

    from django.db.models import QuerySet

    from .Email import Email


logger = logging.getLogger(__name__)


[docs] class Attachment( ExportModelOperationsMixin("attachment"), DownloadMixin, ThumbnailMixin, URLMixin, FavoriteModelMixin, FilePathModelMixin, TimestampModelMixin, models.Model, ): """Database model for an attachment file in a mail.""" BASENAME = "attachment" DELETE_NOTICE = _( "This will only delete the record of this attachment, not of the email." ) DELETE_NOTICE_PLURAL = _( "This will only delete the records of these attachments, not of their emails." ) file_name = models.CharField( max_length=255, # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("filename"), ) """The filename of the attachment.""" content_disposition = models.CharField( blank=True, default="", max_length=255, # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("content disposition"), ) """The disposition of the file. Typically 'attachment', 'inline' or ''.""" content_id = models.CharField( max_length=255, default="", # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("content ID"), ) """The MIME subtype of the file.""" content_maintype = models.CharField( max_length=255, default="", # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("content maintype"), ) """The MIME maintype of the file.""" content_subtype = models.CharField( max_length=255, default="", # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("content subtype"), ) """The MIME subtype of the file.""" datasize = models.PositiveIntegerField( # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("datasize"), ) """The filesize of the attachment.""" email: models.ForeignKey[Email] = models.ForeignKey( "Email", related_name="attachments", on_delete=models.CASCADE, # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name=_("email"), ) """The mail that the attachment was found in. Deletion of that `email` deletes this attachment.""" class Meta: """Metadata class for the model.""" db_table = "attachments" """The name of the database table for the attachments.""" # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name = _("attachment") # Translators: Do not capitalize the very first letter unless your language requires it. verbose_name_plural = _("attachments") get_latest_by = "email__datetime"
[docs] @override def __str__(self) -> str: """Returns a string representation of the model data. Returns: The string representation of the attachment, using :attr:`file_name` and :attr:`email`. """ return _("Attachment %(file_name)s from %(email)s") % { "file_name": self.file_name, "email": self.email, }
[docs] @override def save(self, *args: Any, **kwargs: Any) -> None: """Extended :django::func:`django.models.Model.save` method. Saves the data to storage if configured. """ self.file_name = get_valid_filename(self.file_name) if not self.email.mailbox.save_attachments: kwargs.pop("file_payload", None) super().save(*args, **kwargs)
[docs] @override def _get_storage_file_name(self) -> str: """Create the filename for the stored attachment.""" return str(self.pk) + "_" + self.file_name
[docs] def share_to_paperless(self) -> str: """Sends this attachment to the Paperless server of its user. Returns: The uuid string of the Paperless consumer task for the document. Raises: FileNotFoundError: If the attachment file was not found in the storage. RuntimeError: If the users Paperless URL is not or improperly set. ConnectionError: If connecting to Paperless failed. PermissionError: If authentication to Paperless failed. ValueError: If uploading the file to Paperless resulted in a bad response. """ paperless_baseurl = ( self.email.mailbox.account.user.profile.paperless_url.rstrip("/") ) logger.debug( "Sending %s to Paperless server at %s ...", str(self), paperless_baseurl ) post_document_url = paperless_baseurl + "/api/documents/post_document/" headers = { "Accept": "application/json", "Authorization": f"Token {self.email.mailbox.account.user.profile.paperless_api_key}".strip(), } # stripping the entire string to create a valid header even if the token is empty try: with self.open_file() as document: response = httpx.post( post_document_url, headers=headers, data={"title": self.file_name, "created": str(self.created)}, files={"document": (self.file_name, document, self.content_type)}, ) except ( httpcore.UnsupportedProtocol, httpx.UnsupportedProtocol, httpx.InvalidURL, ) as error: logger.info( "Failed to send attachment to Paperless.", exc_info=True, ) raise RuntimeError( # Translators: Paperless is a brand name. _("Paperless URL is malformed: %(error)s") % {"error": error} ) from error except httpx.RequestError as error: logger.info("Failed to send attachment to Paperless.", exc_info=True) raise ConnectionError( # Translators: Paperless is a brand name. _("Error connecting to the Paperless server: %(error)s") % {"error": error} ) from error try: response.raise_for_status() except httpx.HTTPStatusError as error: logger.info("Failed to send attachment to Paperless.", exc_info=True) if error.response.status_code in [ status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, ]: raise PermissionError( # Translators: Paperless is a brand name. _("Authentication to Paperless failed: %(response)s") % {"response": error.response.json()} ) from error raise ValueError( # Translators: Paperless is a brand name. _("Uploading to Paperless failed: %(response)s") % {"response": error.response.json()} ) from error logger.debug("Successfully sent attachment to Paperless.") return response.json()
[docs] def share_to_immich(self) -> str: """Sends this attachment to the Immich server of its user. Returns: The response by Immich with the id string of the stored immich image. Raises: FileNotFoundError: If the attachment file was not found in the storage. RuntimeError: If the users Immich URL is not or improperly set. ConnectionError: If connecting to Immich failed. PermissionError: If authentication to Immich failed. ValueError: If uploading the file to Immich resulted in a bad response. """ immich_baseurl = self.email.mailbox.account.user.profile.immich_url.rstrip("/") logger.debug("Sending %s to Immich server at %s ...", str(self), immich_baseurl) post_document_url = immich_baseurl + "/api/assets" headers = { "Accept": "application/json", "x-api-key": self.email.mailbox.account.user.profile.immich_api_key.strip(), } try: with self.open_file() as image_file: response = httpx.post( post_document_url, headers=headers, data={ "assetId": self.file_name, "deviceAssetId": "eonvelope", "deviceId": "eonvelope", "fileCreatedAt": str(self.created.date()), "fileModifiedAt": str(self.created.date()), "metadata": [], }, files={ "assetData": (self.file_name, image_file, self.content_type) }, ) except ( httpcore.UnsupportedProtocol, httpx.UnsupportedProtocol, httpx.InvalidURL, ) as error: logger.info( "Failed to send attachment to Immich.", exc_info=True, ) raise RuntimeError( # Translators: Immich is a brand name. _("Immich URL is malformed: %(error)s") % {"error": error} ) from error except httpx.RequestError as error: logger.info("Failed to send attachment to Immich.", exc_info=True) raise ConnectionError( # Translators: Immich is a brand name. _("Error connecting to the Immich server: %(error)s") % {"error": error} ) from error try: response.raise_for_status() except httpx.HTTPStatusError as error: logger.info("Failed to send attachment to Immich.", exc_info=True) if error.response.status_code in [ status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, ]: raise PermissionError( # Translators: Immich is a brand name. _("Authentication to Immich failed: %(response)s") % {"response": error.response.json()} ) from error raise ValueError( # Translators: Immich is a brand name. _("Uploading to Immich failed: %(response)s") % {"response": error.response.json()} ) from error logger.debug("Successfully sent attachment to Immich.") return response.json()
@property @override def has_thumbnail(self) -> bool: """Whether the attachment has a mimetype that can be embedded into html. References: https://stackoverflow.com/questions/51107683/which-mime-types-can-be-displayed-in-browser """ return ( super().has_thumbnail and not self.email.is_spam and (self.datasize <= get_config("WEB_THUMBNAIL_MAX_DATASIZE")) and ( self.content_maintype in ["image", "font"] or self.content_maintype == "text" or ( self.content_maintype == "audio" and self.content_subtype in HTML_SUPPORTED_AUDIO_TYPE ) or ( self.content_maintype == "video" and self.content_subtype in HTML_SUPPORTED_VIDEO_TYPES ) or ( self.content_maintype == "application" and ( self.content_subtype in ["pdf", "json"] or self.content_subtype.endswith(("xml", "script")) ) ) ) )
[docs] @cached_property def thumbnail(self) -> str: """Builds the html thumbnail for the attachment. Returns: The html for the thumbnail. The empty string if the attachment has no thumbnail. """ if self.content_maintype == "audio": return format_html( """<div class="d-flex justify-content-center m-5"> <audio controls src="{src}"></audio> </div> """, src=self.get_absolute_thumbnail_url(), ) if self.content_maintype == "video": return format_html( """<video controls preload="metadata" class="img-thumbnail" src="{src}"> </video> """, src=self.get_absolute_thumbnail_url(), ) if self.content_maintype == "image": return format_html( """<img src="{src}" class="img-thumbnail" alt="{alt}" /> """, src=self.get_absolute_thumbnail_url(), alt=_("Attachment image"), ) if self.content_maintype == "text": if self.content_subtype.endswith("calendar"): try: icalendar_file = self.open_file(mode="r") except FileNotFoundError: return "" with icalendar_file: engine = engines["django"] template = engine.from_string(ICALENDAR_TEMPLATE) return template.render( context={ "icalendar_readout": make_icalendar_readout(icalendar_file) } ) if self.content_subtype in ["vcard", "vcf", "x-vcard"]: try: vcard_file = self.open_file(mode="r") except FileNotFoundError: return "" with vcard_file: engine = engines["django"] template = engine.from_string(VCARD_TEMPLATE) return template.render( context={"vcard_readout": make_vcard_readout(vcard_file)} ) return format_html( """<iframe sandbox class="w-100 h-100 p-1 rounded" title="{alt}" src="{src}"></iframe> """, src=self.get_absolute_thumbnail_url(), alt=_("Attachment text"), ) if self.content_maintype == "application": return format_html( """<embed class="w-100 h-100 p-1 rounded" title={title}" src="{src}" /> """, src=self.get_absolute_thumbnail_url(), title=_("Attachment file content"), ) if self.content_maintype == "font": return format_html( """<style> @font-face {{ font-family: attachment_{id}; src: url({src}); }} </style> <h1 class="text-center p-2" style="font-family: attachment_{id}"> Aa <br /> Zz <br /> 12 </h1> """, src=self.get_absolute_thumbnail_url(), id=self.id, ) return ""
@property def content_type(self) -> str: """Reconstructs the full MIME content type of the attachment. Returns: The attachments content type if known, else "". """ if self.content_maintype and self.content_subtype: return self.content_maintype + "/" + self.content_subtype return "" @property def is_shareable_to_paperless(self) -> bool: """Whether the attachment has a mimetype that can be processed by a paperless server. References: https://docs.paperless-ngx.com/faq/#what-file-types-does-paperless-ngx-support """ return self.file_path is not None and ( (self.content_maintype == "text" and self.content_subtype == "plain") or ( self.content_maintype == "image" and self.content_subtype in PAPERLESS_SUPPORTED_IMAGE_TYPES ) or ( self.content_maintype == "application" and ( self.content_subtype == "pdf" or ( self.email.mailbox.account.user.profile.paperless_tika_enabled and self.content_subtype in PAPERLESS_TIKA_SUPPORTED_MIMETYPES ) ) ) ) @property def is_shareable_to_immich(self) -> bool: """Whether the attachment has a mimetype that can be processed by a Immich server. References: https://immich.app/docs/features/supported-formats/ """ return self.file_path is not None and ( ( self.content_maintype == "image" and self.content_subtype in IMMICH_SUPPORTED_IMAGE_TYPES ) or ( self.content_maintype == "video" and self.content_subtype in IMMICH_SUPPORTED_VIDEO_TYPES ) or ( self.content_maintype == "application" and self.content_subtype in IMMICH_SUPPORTED_APPLICATION_TYPES ) )
[docs] @classmethod def create_from_email_message( cls, email_message: EmailMessage, email: Email ) -> list[Attachment]: """Creates :class:`core.models.Attachment`s from an email message. Args: email_message: The email_message to get and create all attachments from. email: The email model created from the email_message. Returns: A list of :class:`core.models.Attachment` in the email message. """ if email.pk is None: raise ValueError("Email is not in db!") logger.debug("Parsing and saving attachments in email %s ...", email.message_id) ignore_maintypes = get_config("DONT_PARSE_CONTENT_MAINTYPES") ignore_subtypes = get_config("DONT_PARSE_CONTENT_SUBTYPES") new_attachments = [] for part in email_message.walk(): if part.is_multipart(): # for safe get_payload continue content_disposition = part.get_content_disposition() content_maintype = part.get_content_maintype() content_subtype = part.get_content_subtype() # first part of the condition checks whether the part qualifies as attachment in general, # the second one whether the parts contenttype is blacklisted if ( content_disposition or ( content_maintype != "text" or content_subtype not in ["plain", "html"] ) ) and ( content_maintype not in ignore_maintypes and content_subtype not in ignore_subtypes ): part_payload = part.get_payload(decode=True) if isinstance(part_payload, bytes): new_attachment = cls( file_name=( part.get_filename() or md5( # noqa: S324 # no safe hash required here part_payload ).hexdigest() + f".{content_subtype}" ), content_disposition=content_disposition or "", content_id=part.get(HeaderFields.CONTENT_ID, ""), content_maintype=content_maintype, content_subtype=content_subtype, datasize=len(part_payload), email=email, ) logger.debug("Saving attachment %s to db ...", part.get_filename()) new_attachment.save(file_payload=part_payload) new_attachments.append(new_attachment) logger.debug("Successfully parsed and saved attachments.") return new_attachments
[docs] @staticmethod def queryset_as_file(queryset: QuerySet[Attachment]) -> _TemporaryFileWrapper: """Processes the files of the emails in the queryset into a temporary file. Args: queryset: The email queryset to compile into a file. Returns: The temporary file wrapper. Raises: Attachment.DoesNotExist: If the :attr:`queryset` is empty. """ if not queryset.exists(): raise Attachment.DoesNotExist("The queryset is empty!") tempfile = ( NamedTemporaryFile() # noqa: SIM115 # pylint: disable=consider-using-with ) # the file must not be closed as it is returned later with ZipFile(tempfile.name, "w") as zipfile: for attachment_item in queryset: try: attachment_file = attachment_item.open_file() except FileNotFoundError: continue with ( attachment_file, zipfile.open( os.path.basename(attachment_item.file_path), "w" ) as zipped_file, ): zipped_file.write(attachment_file.read()) return tempfile