Source code for granite_assets.repositories.s3

"""S3 asset repository backed by AWS S3 (or any S3-compatible store).

Design decisions
----------------
* **Presigned PUT** is used for upload URLs instead of presigned POST.  POST
  allows richer server-side validation (file-size limits, content-type
  enforcement) but requires a multipart form submission which complicates
  client-side HTTP libraries.  PUT is a plain binary body, trivially consumed
  by ``fetch``, ``axios``, ``requests``, ``curl``, and native mobile SDKs.

* **boto3** is loaded lazily so that importing ``granite_assets`` in a project
  without the ``s3`` extra does *not* raise ``ImportError`` at module level.
  Only instantiating ``S3AssetRepository`` triggers the import.

* Object keys in S3 are prefixed with ``config.key_prefix`` when non-empty.
  The logical *key* exposed to callers never includes this prefix; the mapping
  is transparent.

* Public vs private is implemented via S3 object ACLs when the bucket allows
  it, *or* purely by policy.  To keep the library simple we set
  ``ACL='public-read'`` for public objects and no ACL for private objects.
  Callers must ensure their bucket policy is compatible.  If you rely on a
  bucket policy instead of ACLs, set ``public_base_url`` and manage ACLs
  externally.
"""

from __future__ import annotations

import os
import uuid as _uuid_mod
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING, Any

from granite_assets.enums import AssetVisibility, CfSigningMethod
from granite_assets.exceptions import (
    AssetConfigurationError,
    AssetError,
    AssetNotFoundError,
)
from granite_assets.models import (
    AssetAccessUrl,
    AssetDescriptor,
    AssetSaveRequest,
    AssetSaveResult,
    CfSignedCookies,
    S3AssetRepositoryConfig,
    UploadUrlResult,
)

if TYPE_CHECKING:
    import boto3  # noqa: F401 – type-checking only
    from mypy_boto3_s3 import S3Client  # noqa: F401

_BACKEND_NAME = "S3AssetRepository"


def _cf_url_safe_base64(data: bytes) -> str:
    """CloudFront-specific URL-safe base64 encoding.

    Standard base64, then substitute: ``+`` → ``-``, ``=`` → ``_``, ``/`` → ``~``.
    """
    import base64

    b64 = base64.b64encode(data).decode()
    return b64.replace("+", "-").replace("=", "_").replace("/", "~")


def _require_boto3() -> Any:
    try:
        import boto3  # noqa: PLC0415
        return boto3
    except ImportError as exc:
        raise ImportError(
            "boto3 is required for S3AssetRepository. "
            "Install it with: pip install granite-assets[s3]"
        ) from exc


def _assert_no_leading_slash(key: str) -> None:
    if key.startswith("/"):
        raise AssetError(f"Asset key must not start with '/': {key!r}")


def _resolve_asset_key(key: str | None, filename: str | None) -> str:
    """Return the final storage key.

    Three cases:
    * *key* is ``None``  →  auto-generate ``<uuid>/<uuid>.<ext>``.
    * *key* has no file extension  →  treat it as a folder prefix and append
      ``/<last_segment><ext>`` so callers can pass ``visibility/uuid`` and get
      back ``visibility/uuid/uuid.ext``.
    * *key* has a file extension  →  use it unchanged (backward-compatible).
    """
    ext = os.path.splitext(filename or "")[1].lower()
    if key is None:
        asset_id = str(_uuid_mod.uuid4())
        return f"{asset_id}/{asset_id}{ext}"
    _, key_ext = os.path.splitext(key)
    if not key_ext:
        last_segment = key.rstrip("/").rsplit("/", 1)[-1]
        return f"{key}/{last_segment}{ext}"
    return key


[docs] class S3AssetRepository: """Asset repository backed by AWS S3. Instantiation is cheap; the boto3 session is created once and reused. Example:: config = S3AssetRepositoryConfig( bucket="my-assets", region="eu-west-1", public_base_url="https://cdn.example.com", presign_ttl_seconds=3600, ) repo = S3AssetRepository(config) """
[docs] def __init__(self, config: S3AssetRepositoryConfig) -> None: self._cfg = config self._validate_config() self._s3: Any = self._build_client()
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _validate_config(self) -> None: if not self._cfg.bucket: raise AssetConfigurationError("bucket must not be empty") if not self._cfg.region: raise AssetConfigurationError("region must not be empty") def _build_client(self) -> Any: boto3 = _require_boto3() kwargs: dict[str, Any] = { "region_name": self._cfg.region, } if self._cfg.endpoint_url: kwargs["endpoint_url"] = self._cfg.endpoint_url if self._cfg.access_key_id and self._cfg.secret_access_key: kwargs["aws_access_key_id"] = self._cfg.access_key_id kwargs["aws_secret_access_key"] = self._cfg.secret_access_key if self._cfg.session_token: kwargs["aws_session_token"] = self._cfg.session_token return boto3.client("s3", **kwargs) def _s3_key(self, key: str) -> str: """Map a logical key to the actual S3 object key.""" prefix = self._cfg.key_prefix.rstrip("/") return f"{prefix}/{key}" if prefix else key def _logical_key(self, s3_key: str) -> str: """Strip the configured prefix to get the logical key back.""" prefix = self._cfg.key_prefix.rstrip("/") if prefix and s3_key.startswith(f"{prefix}/"): return s3_key[len(prefix) + 1:] return s3_key def _effective_ttl(self, ttl_seconds: int | None) -> int: return ttl_seconds if ttl_seconds is not None else self._cfg.presign_ttl_seconds def _expires_at(self, ttl_seconds: int) -> datetime: return datetime.now(tz=UTC) + timedelta(seconds=ttl_seconds) def _public_url_for_key(self, s3_key: str) -> str: if self._cfg.public_base_url: base = self._cfg.public_base_url.rstrip("/") return f"{base}/{s3_key}" # Fall back to virtual-hosted-style URL bucket = self._cfg.bucket region = self._cfg.region return f"https://{bucket}.s3.{region}.amazonaws.com/{s3_key}" def _cf_base_url(self) -> str: """Return the CloudFront base URL, raising if not configured.""" if not self._cfg.public_base_url: raise AssetConfigurationError( "public_base_url must be set to use CloudFront signed URLs" ) return self._cfg.public_base_url.rstrip("/") def _build_cf_signed_url(self, s3_key: str, ttl: int) -> str: """Generate a CloudFront signed URL (canned policy) for *s3_key*. Uses ``Expires`` / ``Signature`` / ``Key-Pair-Id`` query params. """ try: from botocore.signers import CloudFrontSigner except ImportError as exc: raise AssetError( "botocore is required for CloudFront signed URLs (install boto3)" ) from exc try: from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding except ImportError as exc: raise AssetError( "cryptography is required for CloudFront signed URLs: pip install cryptography" ) from exc pem: str = self._cfg.cf_private_key # type: ignore[assignment] private_key = serialization.load_pem_private_key(pem.encode(), password=None) def _rsa_sign(message: bytes) -> bytes: return private_key.sign(message, padding.PKCS1v15(), hashes.SHA1()) # noqa: S303 signer = CloudFrontSigner(self._cfg.cf_key_id, _rsa_sign) # type: ignore[arg-type] base = self._cf_base_url() resource_url = f"{base}/{s3_key}" expires_at = self._expires_at(ttl) return signer.generate_presigned_url( resource_url, date_less_than=expires_at, ) def _get_rsa_signer(self): """Return ``(rsa_sign_fn, private_key)`` for CloudFront custom-policy operations. Lazily imports ``cryptography``; raises :exc:`AssetError` if missing. """ try: from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding except ImportError as exc: raise AssetError( "cryptography is required for CloudFront signed operations: " "pip install cryptography" ) from exc pem: str = self._cfg.cf_private_key # type: ignore[assignment] private_key = serialization.load_pem_private_key(pem.encode(), password=None) def _rsa_sign(message: bytes) -> bytes: return private_key.sign(message, padding.PKCS1v15(), hashes.SHA1()) # noqa: S303 return _rsa_sign def _build_cf_custom_policy_params( self, resource_pattern: str, ttl: int ) -> tuple[str, str, str, int]: """Compute CloudFront custom-policy signing params for *resource_pattern*. *resource_pattern* may contain a trailing wildcard (``https://cdn.example.com/assets/private/videos/uuid/*``) so that a single set of credentials authorises every segment of an HLS stream. Returns ``(policy_b64, signature_b64, key_pair_id, unix_expires)``. """ import json import time _rsa_sign = self._get_rsa_signer() unix_expires = int(time.time()) + ttl policy_dict = { "Statement": [ { "Resource": resource_pattern, "Condition": {"DateLessThan": {"AWS:EpochTime": unix_expires}}, } ] } policy_json = json.dumps(policy_dict, separators=(",", ":")) policy_b64 = _cf_url_safe_base64(policy_json.encode()) signature_b64 = _cf_url_safe_base64(_rsa_sign(policy_json.encode())) return policy_b64, signature_b64, self._cfg.cf_key_id, unix_expires # type: ignore[return-value] # ------------------------------------------------------------------ # Write operations # ------------------------------------------------------------------
[docs] def save(self, request: AssetSaveRequest) -> AssetSaveResult: """Upload an asset to S3. Sets ``ACL='public-read'`` for PUBLIC assets. Metadata and checksum are forwarded as S3 object metadata. """ key = _resolve_asset_key(request.key, request.filename) _assert_no_leading_slash(key) s3_key = self._s3_key(key) stream = request.open_source() put_kwargs: dict[str, Any] = { "Bucket": self._cfg.bucket, "Key": s3_key, "Body": stream, "ContentType": request.content_type, } if request.visibility == AssetVisibility.PUBLIC and self._cfg.use_object_acl: put_kwargs["ACL"] = "public-read" if request.content_length is not None: put_kwargs["ContentLength"] = request.content_length # Always store visibility so that get_descriptor / resolve_access can # reconstruct it without an extra get_object_acl call. put_kwargs["Metadata"] = { **(request.metadata or {}), "x-visibility": request.visibility.value, } if request.checksum: put_kwargs["Metadata"]["x-checksum"] = request.checksum try: response = self._s3.put_object(**put_kwargs) except Exception as exc: # botocore.exceptions.ClientError raise AssetError(f"Failed to save asset {key!r} to S3: {exc}") from exc etag: str = response.get("ETag", "").strip('"') return AssetSaveResult( key=key, backend_ref=f"s3://{self._cfg.bucket}/{s3_key}", checksum=f"etag:{etag}" if etag else None, visibility=request.visibility, )
[docs] def delete(self, key: str) -> None: """Delete an S3 object. Raises: AssetNotFoundError: If the key does not exist. """ _assert_no_leading_slash(key) if not self.exists(key): raise AssetNotFoundError(key) s3_key = self._s3_key(key) try: self._s3.delete_object(Bucket=self._cfg.bucket, Key=s3_key) except Exception as exc: raise AssetError(f"Failed to delete asset {key!r}: {exc}") from exc
[docs] def copy(self, source_key: str, dest_key: str, *, overwrite: bool = True) -> None: """Server-side S3 copy (no data transfer to/from this process).""" _assert_no_leading_slash(source_key) _assert_no_leading_slash(dest_key) if not self.exists(source_key): raise AssetNotFoundError(source_key) if not overwrite and self.exists(dest_key): raise AssetError(f"Destination key already exists: {dest_key!r}") src_s3_key = self._s3_key(source_key) dst_s3_key = self._s3_key(dest_key) copy_source = {"Bucket": self._cfg.bucket, "Key": src_s3_key} try: self._s3.copy_object( CopySource=copy_source, Bucket=self._cfg.bucket, Key=dst_s3_key, ) except Exception as exc: raise AssetError( f"Failed to copy asset {source_key!r} -> {dest_key!r}: {exc}" ) from exc
[docs] def move(self, source_key: str, dest_key: str, *, overwrite: bool = True) -> None: """Copy then delete (S3 has no native move operation).""" self.copy(source_key, dest_key, overwrite=overwrite) try: self._s3.delete_object( Bucket=self._cfg.bucket, Key=self._s3_key(source_key) ) except Exception as exc: raise AssetError( f"Failed to remove source after move {source_key!r}: {exc}" ) from exc
# ------------------------------------------------------------------ # Query operations # ------------------------------------------------------------------
[docs] def exists(self, key: str) -> bool: """Check object existence using a lightweight ``head_object`` call.""" _assert_no_leading_slash(key) try: self._s3.head_object(Bucket=self._cfg.bucket, Key=self._s3_key(key)) return True except Exception as exc: # botocore raises ClientError with 404 or NoSuchKey error_code = getattr(getattr(exc, "response", {}), "get", lambda *_: None)( "Error", {} ).get("Code", "") if error_code in ("404", "NoSuchKey"): return False # For moto/real boto3 we check the string representation if "404" in str(exc) or "NoSuchKey" in str(exc) or "Not Found" in str(exc): return False raise AssetError(f"Failed to check existence of {key!r}: {exc}") from exc
[docs] def get_descriptor(self, key: str) -> AssetDescriptor: """Return S3 object metadata via ``head_object``.""" _assert_no_leading_slash(key) s3_key = self._s3_key(key) try: response = self._s3.head_object(Bucket=self._cfg.bucket, Key=s3_key) except Exception as exc: if "404" in str(exc) or "NoSuchKey" in str(exc) or "Not Found" in str(exc): raise AssetNotFoundError(key) from exc raise AssetError(f"Failed to get descriptor for {key!r}: {exc}") from exc raw_meta: dict[str, str] = response.get("Metadata") or {} visibility_str = raw_meta.get("x-visibility", "private") visibility = ( AssetVisibility.PUBLIC if visibility_str == "public" else AssetVisibility.PRIVATE ) return AssetDescriptor( key=key, content_type=response.get("ContentType"), content_length=response.get("ContentLength"), last_modified=response.get("LastModified"), checksum=f"etag:{response.get('ETag', '').strip('\"')}", visibility=visibility, metadata=raw_meta, )
# ------------------------------------------------------------------ # URL construction # ------------------------------------------------------------------
[docs] def build_public_url(self, key: str) -> AssetAccessUrl: """Return the permanent public URL for a PUBLIC asset. If ``public_base_url`` is configured, uses that as base (CDN URL). Otherwise builds a standard virtual-hosted S3 URL. Raises: AssetAccessNotSupportedError: If called for a PRIVATE asset key that is known to be private (best-effort; requires a head_object call not performed here for performance). """ _assert_no_leading_slash(key) s3_key = self._s3_key(key) url = self._public_url_for_key(s3_key) return AssetAccessUrl(url=url, expires_at=None)
[docs] def build_download_url(self, key: str, ttl_seconds: int | None = None) -> AssetAccessUrl: """Generate a download URL for a private asset. Priority order: 1. ``cf_key_id`` + ``cf_private_key`` set **and** ``cf_signing_method=URL`` → **CloudFront signed URL** (canned policy, query-param credentials). 2. ``cf_key_id`` + ``cf_private_key`` set **and** ``cf_signing_method=COOKIE`` → **plain CloudFront URL** (no signature). The browser must already hold the signed cookies obtained via :meth:`build_signed_cookies`. 3. ``cf_unsigned_urls=True`` + ``public_base_url`` set → **plain CloudFront URL** (permanent, no signature). 4. Fallback → **S3 presigned URL** (time-limited, exposes S3 domain). """ _assert_no_leading_slash(key) s3_key = self._s3_key(key) try: if self._cfg.has_cf_signing(): if self._cfg.cf_signing_method == CfSigningMethod.COOKIE: # Credentials are in browser cookies — return plain CF URL. base = self._cf_base_url() return AssetAccessUrl(url=f"{base}/{s3_key}", expires_at=None) # Default: URL-based signed URL (custom policy, directory wildcard). ttl = self._effective_ttl(ttl_seconds) base = self._cf_base_url() directory = s3_key.rsplit("/", 1)[0] if "/" in s3_key else s3_key resource_pattern = f"{base}/{directory}/*" policy_b64, sig_b64, kp_id, _ = self._build_cf_custom_policy_params( resource_pattern, ttl ) url = ( f"{base}/{s3_key}" f"?Policy={policy_b64}&Signature={sig_b64}&Key-Pair-Id={kp_id}" ) return AssetAccessUrl(url=url, expires_at=self._expires_at(ttl)) if self._cfg.has_cf_unsigned_url(): base = self._cfg.public_base_url.rstrip("/") # type: ignore[union-attr] url = f"{base}/{s3_key}" return AssetAccessUrl(url=url, expires_at=None) ttl = self._effective_ttl(ttl_seconds) url = self._s3.generate_presigned_url( "get_object", Params={"Bucket": self._cfg.bucket, "Key": s3_key}, ExpiresIn=ttl, ) except AssetError: raise except Exception as exc: raise AssetError( f"Failed to generate signed download URL for {key!r}: {exc}" ) from exc return AssetAccessUrl(url=url, expires_at=self._expires_at(ttl))
[docs] def build_path_signed_url( self, key: str, *, path_pattern: str | None = None, ttl_seconds: int | None = None, ) -> AssetAccessUrl: """Generate a CloudFront URL with **custom-policy** signing for *key*. Unlike the canned-policy URL returned by :meth:`build_download_url`, the custom policy can authorise a **wildcard path** so that a single set of query-param credentials is valid for every file under a directory. This is the recommended approach for HLS/DASH video streaming where the player autonomously fetches dozens of segment files. Args: key: Logical key of the file whose URL is returned (e.g. ``"private/videos/uuid/master.m3u8"``). Must not start with ``/``. path_pattern: CloudFront resource pattern to embed in the policy. Accepts a trailing ``*`` wildcard (e.g. ``"private/videos/uuid/*"``). Defaults to the directory of *key* + ``/*``. ttl_seconds: URL lifetime in seconds (default: configured TTL). Returns: :class:`AssetAccessUrl` whose ``url`` carries ``?Policy=…&Signature=…&Key-Pair-Id=…`` query params. Raises: :exc:`AssetConfigurationError`: if CloudFront signing is not configured. """ if not self._cfg.has_cf_signing(): raise AssetConfigurationError( "cf_key_id and cf_private_key must be set to use build_path_signed_url" ) _assert_no_leading_slash(key) ttl = self._effective_ttl(ttl_seconds) base = self._cf_base_url() s3_key = self._s3_key(key) if path_pattern is None: # Derive: strip filename, append /* directory = s3_key.rsplit("/", 1)[0] if "/" in s3_key else s3_key resource_pattern = f"{base}/{directory}/*" else: _assert_no_leading_slash(path_pattern.lstrip("*").lstrip("/")) resource_pattern = f"{base}/{self._s3_key(path_pattern)}" policy_b64, sig_b64, kp_id, _ = self._build_cf_custom_policy_params( resource_pattern, ttl ) url = ( f"{base}/{s3_key}" f"?Policy={policy_b64}&Signature={sig_b64}&Key-Pair-Id={kp_id}" ) return AssetAccessUrl(url=url, expires_at=self._expires_at(ttl))
[docs] def build_folder_signed_url( self, key: str, *, entry_filename: str, ttl_seconds: int | None = None, ) -> AssetAccessUrl: """Generate a CloudFront URL with **wildcard custom-policy** for the folder of *key*, pointing to *entry_filename* within that folder. This is the recommended method for composite assets such as HLS/DASH video streams. The source asset (e.g. a transcoded ``.mp4``) and the player entry point (e.g. ``master.m3u8``) live in the same S3 folder. A single set of credentials authorises the manifest **and** all segment files that the player fetches from relative paths. Usage example:: # key layout in S3: # assets/<uuid>/<uuid>.mp4 ← original source # assets/<uuid>/master.m3u8 ← HLS manifest # assets/<uuid>/1080p/index.m3u8 # assets/<uuid>/1080p/seg000.ts … seg009.ts url = repo.build_folder_signed_url( "assets/<uuid>/<uuid>.mp4", entry_filename="master.m3u8", ttl_seconds=7200, ) # Returns: # https://<cf>/assets/<uuid>/master.m3u8 # ?Policy=<wildcard over assets/<uuid>/*> # &Signature=...&Key-Pair-Id=... Pass ``url.url`` directly to HLS.js as the source — the query-string credentials are inherited by all relative segment requests. Args: key: Logical key of **any** file that belongs to the target folder. Used solely to derive the folder path; the URL will **not** point to this file. Example: ``"assets/<uuid>/<uuid>.mp4"``. entry_filename: Filename of the player entry point within the same folder. Example: ``"master.m3u8"``. ttl_seconds: Lifetime of the signing credentials in seconds (default: configured ``presign_ttl_seconds``). A new URL with a new expiry is generated on every call; credentials are not cached. Returns: :class:`AssetAccessUrl` whose ``url`` is ``https://<cf>/<folder>/<entry_filename>?Policy=…&Signature=…&Key-Pair-Id=…`` and whose ``expires_at`` reflects the policy expiry. Raises: :exc:`AssetConfigurationError`: CloudFront signing is not configured (``cf_key_id`` or ``cf_private_key`` not set). :exc:`AssetError`: *key* has no directory component (it is a root-level key with no ``/`` separator). """ if not self._cfg.has_cf_signing(): raise AssetConfigurationError( "cf_key_id and cf_private_key must be set to use build_folder_signed_url" ) _assert_no_leading_slash(key) s3_key = self._s3_key(key) if "/" not in s3_key: raise AssetError( f"Cannot derive folder from root-level key {key!r}; " "key must contain at least one '/' separator." ) folder = s3_key.rsplit("/", 1)[0] # e.g. "assets/<uuid>" base = self._cf_base_url() resource_pattern = f"{base}/{folder}/*" ttl = self._effective_ttl(ttl_seconds) policy_b64, sig_b64, kp_id, _ = self._build_cf_custom_policy_params( resource_pattern, ttl ) entry_s3_key = f"{folder}/{entry_filename}" url = ( f"{base}/{entry_s3_key}" f"?Policy={policy_b64}&Signature={sig_b64}&Key-Pair-Id={kp_id}" ) return AssetAccessUrl(url=url, expires_at=self._expires_at(ttl))
[docs] def build_signed_cookies( self, key_pattern: str, ttl_seconds: int | None = None, ) -> CfSignedCookies: """Generate CloudFront signed-cookie values for *key_pattern*. Call this once per session (or per resource group) and set the returned values as ``HttpOnly; Secure; SameSite=None`` cookies on the response. The browser will then include them automatically on every CloudFront request that matches the policy path. Args: key_pattern: Logical key pattern (may include a trailing ``*`` wildcard) relative to the configured key prefix. Example: ``"private/videos/uuid/*"``. ttl_seconds: Cookie lifetime in seconds (default: configured TTL). Returns: :class:`CfSignedCookies` with ``policy``, ``signature``, and ``key_pair_id`` values ready to set as cookies. Raises: :exc:`AssetConfigurationError`: if CloudFront signing is not configured. """ if not self._cfg.has_cf_signing(): raise AssetConfigurationError( "cf_key_id and cf_private_key must be set to use build_signed_cookies" ) ttl = self._effective_ttl(ttl_seconds) base = self._cf_base_url() s3_pattern = self._s3_key(key_pattern) resource_pattern = f"{base}/{s3_pattern}" policy_b64, sig_b64, kp_id, unix_expires = self._build_cf_custom_policy_params( resource_pattern, ttl ) from datetime import UTC return CfSignedCookies( policy=policy_b64, signature=sig_b64, key_pair_id=kp_id, expires_at=datetime.fromtimestamp(unix_expires, tz=UTC), )
[docs] def build_upload_url( self, key: str, content_type: str, ttl_seconds: int | None = None, ) -> UploadUrlResult: """Generate a presigned PUT URL for client-side upload. The client must send the file as an HTTP PUT with the ``Content-Type`` header set to exactly the value provided here. No other headers are required by default. Example (using ``requests``):: result = repo.build_upload_url("images/photo.jpg", "image/jpeg") with open("photo.jpg", "rb") as f: requests.put(result.url, data=f, headers=result.headers) """ _assert_no_leading_slash(key) ttl = self._effective_ttl(ttl_seconds) s3_key = self._s3_key(key) try: url = self._s3.generate_presigned_url( "put_object", Params={ "Bucket": self._cfg.bucket, "Key": s3_key, "ContentType": content_type, }, ExpiresIn=ttl, ) except Exception as exc: raise AssetError( f"Failed to generate presigned upload URL for {key!r}: {exc}" ) from exc return UploadUrlResult( url=url, method="PUT", headers={"Content-Type": content_type}, expires_at=self._expires_at(ttl), key=key, )
[docs] def resolve_access(self, key: str, ttl_seconds: int | None = None) -> AssetAccessUrl: """Return public URL for public assets, signed download URL for private.""" _assert_no_leading_slash(key) descriptor = self.get_descriptor(key) if descriptor.visibility == AssetVisibility.PUBLIC: return self.build_public_url(key) return self.build_download_url(key, ttl_seconds=ttl_seconds)
# Verify structural compatibility at import time assert isinstance(S3AssetRepository, type)