Source code for granite_storage.contracts

from __future__ import annotations

from collections.abc import Iterator
from typing import Any, BinaryIO, Protocol

from granite_storage.models import StoredObjectRef


[docs] class StorageBackend(Protocol): backend_name: str
[docs] def put_bytes(
self, *, key: str, content: bytes, content_type: str | None = None, original_filename: str | None = None, extra: dict[str, Any] | None = None, ) -> StoredObjectRef: ...
[docs] def put_stream(
self, *, key: str, stream: BinaryIO, size: int | None = None, checksum: str | None = None, content_type: str | None = None, original_filename: str | None = None, extra: dict[str, Any] | None = None, ) -> StoredObjectRef: ...
[docs] def get(self, ref: StoredObjectRef) -> bytes: ...
[docs] def open(self, ref: StoredObjectRef) -> BinaryIO: ...
[docs] def delete(self, ref: StoredObjectRef) -> None: ...
[docs] def exists(self, ref: StoredObjectRef) -> bool: ...
[docs] def iter_locations(self, prefix: str | None = None) -> Iterator[str]: ...