from __future__ import annotations
from dataclasses import dataclass
from typing import Any, BinaryIO, ClassVar
try:
from fastapi import UploadFile
except Exception: # pragma: no cover
class UploadFile: # type: ignore
pass
from granite_storage.exceptions import StorageError
from granite_storage.manager import StorageManager
from granite_storage.models import StoredObjectRef
from granite_storage.utils import guess_content_type
[docs]
@dataclass
class ReplaceContentResult:
previous_ref: StoredObjectRef | None
new_ref: StoredObjectRef
[docs]
class StoredContentMixin:
__stored_content_field_name__: ClassVar[str]
__stored_content_storage_key__: ClassVar[str]
__storage_manager__: ClassVar[StorageManager | None] = None
__tablename__: ClassVar[str]
def _require_storage_manager(self) -> StorageManager:
manager = getattr(self.__class__, "__storage_manager__", None)
if manager is None:
raise StorageError(
f"Storage manager not configured for {self.__class__.__name__}"
)
return manager # type: ignore[return-value]
def _get_existing_ref(self) -> StoredObjectRef | None:
return getattr(self, self.__stored_content_field_name__, None) # type: ignore[return-value]
def _set_ref(self, ref: StoredObjectRef | None) -> None:
setattr(self, self.__stored_content_field_name__, ref)
[docs]
def set_content(
self,
content: bytes | str,
*,
filename: str | None = None,
content_type: str | None = None,
storage_key: str | None = None,
extra: dict[str, Any] | None = None,
) -> StoredObjectRef:
if isinstance(content, str):
payload = content.encode("utf-8")
content_type = content_type or "text/plain; charset=utf-8"
else:
payload = content
content_type = content_type or guess_content_type(filename)
entity_id = getattr(self, "id", None)
if entity_id is None:
raise StorageError("Model instance must have an id before set_content().")
ref = self._require_storage_manager().put_bytes(
storage_key=storage_key or self.__stored_content_storage_key__,
model_name=self.__tablename__,
entity_id=entity_id,
field_name=self.__stored_content_field_name__.lstrip("_"),
content=payload,
content_type=content_type,
original_filename=filename,
extra=extra,
)
self._set_ref(ref)
return ref
[docs]
def set_content_from_stream(
self,
stream: BinaryIO,
*,
filename: str | None = None,
content_type: str | None = None,
storage_key: str | None = None,
extra: dict[str, Any] | None = None,
) -> StoredObjectRef:
entity_id = getattr(self, "id", None)
if entity_id is None:
raise StorageError(
"Model instance must have an id before set_content_from_stream()."
)
ref = self._require_storage_manager().put_stream(
storage_key=storage_key or self.__stored_content_storage_key__,
model_name=self.__tablename__,
entity_id=entity_id,
field_name=self.__stored_content_field_name__.lstrip("_"),
stream=stream,
content_type=content_type or guess_content_type(filename),
original_filename=filename,
extra=extra,
)
self._set_ref(ref)
return ref
[docs]
async def set_content_from_uploadfile(
self,
upload_file: UploadFile,
*,
storage_key: str | None = None,
extra: dict[str, Any] | None = None,
) -> StoredObjectRef:
upload_file.file.seek(0) # type: ignore[attr-defined]
return self.set_content_from_stream(
upload_file.file, # type: ignore[attr-defined]
filename=upload_file.filename, # type: ignore[attr-defined]
content_type=upload_file.content_type, # type: ignore[attr-defined]
storage_key=storage_key,
extra=extra,
)
[docs]
def replace_content(
self,
content: bytes | str,
*,
filename: str | None = None,
content_type: str | None = None,
storage_key: str | None = None,
extra: dict[str, Any] | None = None,
) -> ReplaceContentResult:
previous_ref = self._get_existing_ref()
new_ref = self.set_content(
content,
filename=filename,
content_type=content_type,
storage_key=storage_key,
extra=extra,
)
return ReplaceContentResult(previous_ref=previous_ref, new_ref=new_ref)
[docs]
def get_content(self) -> bytes | None:
ref = self._get_existing_ref()
return None if ref is None else self._require_storage_manager().get(ref)
[docs]
def open_content(self) -> BinaryIO | None:
ref = self._get_existing_ref()
return None if ref is None else self._require_storage_manager().open(ref)
[docs]
def get_content_text(self, encoding: str = "utf-8") -> str | None:
content = self.get_content()
return None if content is None else content.decode(encoding)
[docs]
def clear_content_reference(self) -> StoredObjectRef | None:
previous_ref = self._get_existing_ref()
self._set_ref(None)
return previous_ref
[docs]
@classmethod
def configure_storage_manager(cls, manager: StorageManager) -> None:
cls.__storage_manager__ = manager