Garbage Collection
Over time, stored objects can become orphaned — they exist in the backend but no database row holds a reference to them. This happens when:
A row is deleted without calling
manager.delete(ref)first.An upload succeeded but the database transaction was rolled back.
Content was replaced and the old
StoredObjectRefwas overwritten without deleting the old object from the backend.
StorageGarbageCollector scans all objects in a backend and deletes those
that are not referenced in the database.
How It Works
Collect all
StoredObjectRefvalues currently stored in the database (the referenced set).Ask the backend to list all objects with
iter_locations().Any object whose
locationis not in the referenced set is orphaned.In
dry_run=Truemode (default) only count them. Indry_run=Falsemode delete them.
Basic Usage
from sqlalchemy.orm import Session
from granite_storage.gc import StorageGarbageCollector
from granite_storage.integrations.sqlalchemy import iter_model_storage_refs
def collect_referenced_refs(session: Session):
"""Yield every StoredObjectRef currently stored in the DB."""
yield from iter_model_storage_refs(session, Document, "_file_ref")
yield from iter_model_storage_refs(session, UserProfile, "avatar")
with Session(engine) as session:
gc = StorageGarbageCollector(
manager=manager,
iter_references=lambda: collect_referenced_refs(session),
)
# Dry run — just count orphans
report = gc.collect(storage_key="documents", dry_run=True)
print(f"Orphaned: {report.orphaned} / {report.scanned} objects scanned")
# Real run — delete orphans
report = gc.collect(storage_key="documents", dry_run=False)
print(f"Deleted {report.deleted} orphaned objects")
GarbageCollectionReport
@dataclass
class GarbageCollectionReport:
scanned: int # total objects found in the backend
referenced: int # objects with a live DB reference
orphaned: int # objects with no DB reference
deleted: int # objects actually deleted (0 if dry_run=True)
Scheduling
Run garbage collection as a periodic background task. With ARQ:
async def run_gc(ctx):
async with AsyncSession(engine) as session:
gc = StorageGarbageCollector(
manager=manager,
iter_references=lambda: collect_referenced_refs(session),
)
report = gc.collect(storage_key="documents", dry_run=False)
print(report)
Or as a simple cron job that calls a management CLI command.
Warning
Always run garbage collection after confirming that all in-flight transactions have committed. Running it during a high-write period may delete objects that are about to be referenced.
iter_model_storage_refs Helper
granite_storage.integrations.sqlalchemy.iter_model_storage_refs is a
convenience function that queries a SQLAlchemy column for non-null values:
from granite_storage.integrations.sqlalchemy import iter_model_storage_refs
for ref in iter_model_storage_refs(session, MyModel, "file_ref_column"):
print(ref.location)