Implementing a Custom Repository
Granite Assets uses a Protocol-based contract (structural subtyping, PEP 544),
so you can implement a new backend without inheriting from any base class.
If your class provides all the required methods with compatible signatures, it
satisfies IAssetRepository — including at runtime, because the protocol is
decorated with @runtime_checkable.
The Interface
from granite_assets import IAssetRepository
# Runtime check (optional)
assert isinstance(my_repo, IAssetRepository)
Methods you must implement:
Method |
Description |
|---|---|
|
Persist an asset; return |
|
Remove an asset; raise |
|
Server-side copy without download. |
|
Server-side move (copy + delete source). |
|
Return |
|
Return |
|
Return permanent public |
|
Return time-limited |
|
Return presigned |
Example: Azure Blob Storage Backend
Below is a minimal skeleton for an Azure Blob Storage backend. It illustrates the method signatures and exception mapping you need to implement.
from __future__ import annotations
import hashlib
import io
from datetime import datetime, timezone, timedelta
from typing import TYPE_CHECKING
from granite_assets import (
IAssetRepository,
AssetSaveRequest,
AssetSaveResult,
AssetDescriptor,
AssetAccessUrl,
UploadUrlResult,
AssetVisibility,
AssetNotFoundError,
AssetAccessNotSupportedError,
)
if TYPE_CHECKING:
from azure.storage.blob import BlobServiceClient # pip install azure-storage-blob
class AzureBlobAssetRepositoryConfig:
def __init__(
self,
connection_string: str,
container: str,
public_base_url: str | None = None,
presign_ttl_seconds: int = 3600,
) -> None:
self.connection_string = connection_string
self.container = container
self.public_base_url = public_base_url
self.presign_ttl_seconds = presign_ttl_seconds
class AzureBlobAssetRepository:
"""Azure Blob Storage implementation of IAssetRepository."""
def __init__(self, config: AzureBlobAssetRepositoryConfig) -> None:
from azure.storage.blob import BlobServiceClient
self._client: BlobServiceClient = BlobServiceClient.from_connection_string(
config.connection_string
)
self._container = config.container
self._public_base_url = config.public_base_url
self._presign_ttl = config.presign_ttl_seconds
def _blob(self, key: str):
return self._client.get_blob_client(container=self._container, blob=key)
# ------------------------------------------------------------------
# Write operations
# ------------------------------------------------------------------
def save(self, request: AssetSaveRequest) -> AssetSaveResult:
source = request.open_source()
data = source.read()
blob = self._blob(request.key)
blob.upload_blob(data, overwrite=request.overwrite or True)
return AssetSaveResult(
key=request.key,
backend_ref=blob.url,
content_length=len(data),
checksum=hashlib.md5(data).hexdigest(),
visibility=request.visibility or AssetVisibility.PRIVATE,
)
def delete(self, key: str) -> None:
if not self.exists(key):
raise AssetNotFoundError(key)
self._blob(key).delete_blob()
def copy(self, source_key: str, dest_key: str, *, overwrite: bool = True) -> None:
if not self.exists(source_key):
raise AssetNotFoundError(source_key)
src_url = self._blob(source_key).url
self._blob(dest_key).start_copy_from_url(src_url)
def move(self, source_key: str, dest_key: str, *, overwrite: bool = True) -> None:
self.copy(source_key, dest_key, overwrite=overwrite)
self.delete(source_key)
# ------------------------------------------------------------------
# Query operations
# ------------------------------------------------------------------
def exists(self, key: str) -> bool:
return self._blob(key).exists()
def get_descriptor(self, key: str) -> AssetDescriptor:
if not self.exists(key):
raise AssetNotFoundError(key)
props = self._blob(key).get_blob_properties()
return AssetDescriptor(
key=key,
content_type=props.content_settings.content_type or "application/octet-stream",
content_length=props.size,
visibility=AssetVisibility.PUBLIC, # inspect ACLs for a real impl
last_modified=props.last_modified,
checksum=props.etag,
metadata=dict(props.metadata or {}),
)
# ------------------------------------------------------------------
# URL construction
# ------------------------------------------------------------------
def build_public_url(self, key: str) -> AssetAccessUrl:
if self._public_base_url:
url = f"{self._public_base_url.rstrip('/')}/{key}"
else:
url = self._blob(key).url
return AssetAccessUrl(url=url, expires_at=None)
def build_download_url(
self, key: str, ttl_seconds: int | None = None
) -> AssetAccessUrl:
from azure.storage.blob import generate_blob_sas, BlobSasPermissions
ttl = ttl_seconds or self._presign_ttl
expiry = datetime.now(timezone.utc) + timedelta(seconds=ttl)
# Real implementation: generate SAS token here
raise NotImplementedError("generate SAS token and return AssetAccessUrl")
def build_upload_url(
self,
key: str,
content_type: str,
ttl_seconds: int | None = None,
) -> UploadUrlResult:
# Azure uses SAS tokens for presigned PUT equivalent
raise NotImplementedError("generate SAS upload URL here")
# ---------------------------------------------------------------------------
# Register with the factory (optional)
# ---------------------------------------------------------------------------
# You can monkey-patch build_asset_repository or wire it yourself in DI:
#
# from granite_assets import build_asset_repository
# # Use a factory wrapper instead:
#
# def my_build_asset_repository(config):
# if isinstance(config, AzureBlobAssetRepositoryConfig):
# return AzureBlobAssetRepository(config)
# return build_asset_repository(config)
Implementation Checklist
Use this checklist when building a new backend:
[ ] Implement all 9 methods of
IAssetRepository.[ ] Raise
AssetNotFoundError(key)when an asset is missing.[ ] Raise
AssetAccessNotSupportedError(backend, operation)for unsupported URL features (e.g. presigned upload on a backend that does not support it).[ ] Raise
AssetConfigurationErrorfrom__init__if the configuration is invalid (missing credentials, unreachable endpoint, etc.).[ ]
save()must respect theoverwritefield fromAssetSaveRequest(fall back to the config default ifNone).[ ]
move()should be atomic where the backend supports it; otherwise it is acceptable to copy then delete.[ ] Return
AssetAccessUrl(url=..., expires_at=None)for permanent public URLs so thatis_permanentreturnsTrue.[ ] Add a configuration dataclass (preferably a
@dataclass(slots=True)) that holds all backend-specific settings.[ ] Write unit tests using mocks for the backend client.
[ ] Verify
isinstance(repo, IAssetRepository)passes at runtime.