Implementing a New Backend

Granite Storage uses the StorageBackend protocol (granite_storage.contracts) as its extension point. Any object that implements the protocol methods can be used as a backend — no inheritance required.

The Protocol

from typing import Any, BinaryIO, Iterator, Protocol
from granite_storage.models import StoredObjectRef

class StorageBackend(Protocol):
    backend_name: str  # unique identifier, e.g. "gcs" or "azure_blob"

    def put_bytes(
        self,
        *,
        key: str,
        content: bytes,
        content_type: str | None = None,
        original_filename: str | None = None,
        extra: dict[str, Any] | None = None,
    ) -> StoredObjectRef: ...

    def put_stream(
        self,
        *,
        key: str,
        stream: BinaryIO,
        size: int | None = None,
        checksum: str | None = None,
        content_type: str | None = None,
        original_filename: str | None = None,
        extra: dict[str, Any] | None = None,
    ) -> StoredObjectRef: ...

    def get(self, ref: StoredObjectRef) -> bytes: ...
    def open(self, ref: StoredObjectRef) -> BinaryIO: ...
    def delete(self, ref: StoredObjectRef) -> None: ...
    def exists(self, ref: StoredObjectRef) -> bool: ...
    def iter_locations(self, prefix: str | None = None) -> Iterator[str]: ...

Method Contracts

put_bytes(*, key, content, ...)

Write content (bytes) at the logical path key inside the backend. Return a fully populated StoredObjectRef. The storage_key field of the returned ref is filled in by StorageManager after the call.

put_stream(*, key, stream, ...)

Write from a file-like object. The stream is wrapped by SizeLimitedStream before it reaches the backend, so max_size enforcement is handled by the manager — the backend does not need to re-check it. Compute checksum and byte count during the write.

get(ref)

Return the full content as bytes. Raise StorageError if the object does not exist.

open(ref)

Return a readable, binary file-like object. The caller is responsible for closing it. Raise StorageError if the object does not exist.

delete(ref)

Remove the object from the backend. Should be idempotent — do not raise if the object is already gone.

exists(ref)

Return True if the object is present in the backend.

iter_locations(prefix)

Yield every location (string key) stored in the backend, optionally filtered by prefix. Used by the garbage collector.

Example — Google Cloud Storage Backend

from __future__ import annotations

import io
from typing import Any, BinaryIO, Iterator

from google.cloud import storage as gcs

from granite_storage.exceptions import StorageError
from granite_storage.models import StoredObjectRef
from granite_storage.utils import sha256_bytes, utcnow_iso


class GCSStorageBackend:
    """Google Cloud Storage backend for Granite Storage."""

    backend_name = "gcs"

    def __init__(self, *, bucket: str, prefix: str = "", client=None):
        self.bucket_name = bucket
        self.prefix = prefix.strip("/")
        self._client = client or gcs.Client()
        self._bucket = self._client.bucket(bucket)

    def _full_key(self, location: str) -> str:
        return f"{self.prefix}/{location}" if self.prefix else location

    def put_bytes(
        self,
        *,
        key: str,
        content: bytes,
        content_type: str | None = None,
        original_filename: str | None = None,
        extra: dict[str, Any] | None = None,
    ) -> StoredObjectRef:
        blob = self._bucket.blob(self._full_key(key))
        blob.upload_from_string(content, content_type=content_type or "application/octet-stream")
        return StoredObjectRef(
            storage_key="",
            backend=self.backend_name,
            location=key,
            size=len(content),
            checksum=sha256_bytes(content),
            content_type=content_type,
            original_filename=original_filename,
            created_at=utcnow_iso(),
            extra={"bucket": self.bucket_name, **(extra or {})},
        )

    def put_stream(
        self,
        *,
        key: str,
        stream: BinaryIO,
        size: int | None = None,
        checksum: str | None = None,
        content_type: str | None = None,
        original_filename: str | None = None,
        extra: dict[str, Any] | None = None,
    ) -> StoredObjectRef:
        import hashlib

        data = stream.read()          # stream is already size-limited by the manager
        digest = hashlib.sha256(data).hexdigest()
        blob = self._bucket.blob(self._full_key(key))
        blob.upload_from_string(data, content_type=content_type or "application/octet-stream")
        return StoredObjectRef(
            storage_key="",
            backend=self.backend_name,
            location=key,
            size=size if size is not None else len(data),
            checksum=checksum or f"sha256:{digest}",
            content_type=content_type,
            original_filename=original_filename,
            created_at=utcnow_iso(),
            extra={"bucket": self.bucket_name, **(extra or {})},
        )

    def get(self, ref: StoredObjectRef) -> bytes:
        blob = self._bucket.blob(self._full_key(ref.location))
        if not blob.exists():
            raise StorageError(f"GCS object not found: {ref.location}")
        return blob.download_as_bytes()

    def open(self, ref: StoredObjectRef) -> BinaryIO:
        return io.BytesIO(self.get(ref))

    def delete(self, ref: StoredObjectRef) -> None:
        blob = self._bucket.blob(self._full_key(ref.location))
        if blob.exists():
            blob.delete()

    def exists(self, ref: StoredObjectRef) -> bool:
        return self._bucket.blob(self._full_key(ref.location)).exists()

    def iter_locations(self, prefix: str | None = None) -> Iterator[str]:
        blobs = self._bucket.list_blobs(prefix=self._full_key(prefix or ""))
        strip = f"{self.prefix}/" if self.prefix else ""
        for blob in blobs:
            yield blob.name[len(strip):]

Registering the New Backend

Pass your backend instance to StorageManager under a unique key:

from granite_storage import StorageManager, StoragePolicy

manager = StorageManager(
    backends={"gcs": GCSStorageBackend(bucket="my-bucket")},
    policies={
        "uploads": StoragePolicy("uploads", backend_key="gcs", max_size=50*1024*1024),
    },
)

Testing Your Backend

The built-in test suite uses moto to mock S3 and a temporary directory for local storage. For a new backend, mock the external client in your tests:

from unittest.mock import MagicMock, patch
import pytest
from my_project.backends.gcs import GCSStorageBackend

@pytest.fixture
def mock_bucket():
    bucket = MagicMock()
    blob = MagicMock()
    bucket.blob.return_value = blob
    blob.exists.return_value = True
    blob.download_as_bytes.return_value = b"hello"
    return bucket

def test_get_returns_bytes(mock_bucket):
    backend = GCSStorageBackend.__new__(GCSStorageBackend)
    backend.bucket_name = "test"
    backend.prefix = ""
    backend._bucket = mock_bucket

    from granite_storage.models import StoredObjectRef
    ref = StoredObjectRef(
        storage_key="uploads", backend="gcs", location="user/1/avatar/photo.jpg",
        size=5, checksum="sha256:abc",
    )
    assert backend.get(ref) == b"hello"

Checklist

Before publishing a new backend, verify:

  • put_bytes and put_stream return a StoredObjectRef with storage_key="" (the manager fills it in).

  • delete is idempotent — does not raise if the object is absent.

  • iter_locations yields bare locations (without the backend prefix).

  • open returns a file-like object that the caller can close normally.

  • The backend_name class attribute is unique.