Storage Backends Guide¶
This guide covers persisting validation results with Truthound's Python API. It includes practical workflows for configuring storage backends, combining enterprise features, and managing result lifecycle.
Quick Start¶
from truthound.stores import get_store
from truthound.stores.results import ValidationResult
# Create store
store = get_store("filesystem", base_path=".truthound/store")
# Save validation result
result = ValidationResult.from_report(report, "customers.csv")
run_id = store.save(result)
# Retrieve result
loaded = store.get(run_id)
Common Workflows¶
Workflow 1: Production Storage with S3¶
from truthound.stores import get_store
from truthound.stores.results import ValidationResult
import truthound as th
# Configure S3 store
store = get_store(
"s3",
bucket="company-validation-results",
prefix="prod/daily/",
region="us-east-1",
)
# Run validation
report = th.check("data.csv")
# Save with metadata
result = ValidationResult.from_report(
report,
data_asset="data.csv",
tags={"environment": "production", "team": "data-eng"},
)
run_id = store.save(result)
print(f"Saved as: {run_id}")
Workflow 2: Combining Versioning + Caching + Observability¶
from truthound.stores import get_store
from truthound.stores.versioning import VersionedStore, VersioningConfig
from truthound.stores.caching import CachedStore, CacheMode
from truthound.stores.caching.backends import LRUCache
from truthound.stores.observability import ObservableStore
from truthound.stores.observability.config import (
ObservabilityConfig,
AuditConfig,
MetricsConfig,
)
# Layer 1: Base storage
base = get_store("s3", bucket="validation-results", prefix="prod/")
# Layer 2: Add version history (keep last 10 versions)
versioned = VersionedStore(base, VersioningConfig(max_versions=10))
# Layer 3: Add caching (LRU cache with 1-hour TTL)
cache = LRUCache(max_size=1000, ttl_seconds=3600)
cached = CachedStore(versioned, cache, mode=CacheMode.READ_WRITE)
# Layer 4: Add observability (audit logs + metrics)
store = ObservableStore(
cached,
ObservabilityConfig(
audit=AuditConfig(enabled=True),
metrics=MetricsConfig(enabled=True),
),
)
# Use the fully-featured store
result = store.save(validation_result)
# Version operations
history = versioned.get_history(run_id, limit=5)
versioned.rollback(run_id, version=3)
Workflow 3: Query Historical Results¶
from truthound.stores import get_store
from truthound.stores.base import StoreQuery
from datetime import datetime, timedelta
store = get_store("database", connection_url="postgresql://localhost/truthound")
# Query last 7 days of failures
query = StoreQuery(
data_asset="customers.csv",
start_time=datetime.now() - timedelta(days=7),
status="failure",
limit=100,
order_by="run_time",
ascending=False,
)
results = store.query(query)
for result in results:
print(f"{result.run_time}: {result.issue_count} issues")
Workflow 4: Retention Policy with Auto-Cleanup¶
from truthound.stores import get_store
from truthound.stores.retention import (
RetentionStore,
TimeBasedPolicy,
CountBasedPolicy,
CompositePolicy,
)
from datetime import timedelta
# Base store
base = get_store("filesystem", base_path=".truthound/store")
# Define retention policies
policy = CompositePolicy([
TimeBasedPolicy(max_age=timedelta(days=90)), # Delete after 90 days
CountBasedPolicy(max_count=1000), # Keep max 1000 results
])
# Wrap with retention
store = RetentionStore(base, policy, cleanup_interval_hours=24)
# Results older than 90 days are automatically cleaned up
Full Documentation¶
Truthound provides flexible storage backends for persisting validation results, schemas, and profiles. All stores implement a common interface with support for CRUD operations, querying, and optional features like versioning, caching, and replication.
Backend Comparison¶
| Backend | Use Case | Persistence | Scalability | Dependencies |
|---|---|---|---|---|
| FileSystem | Local development, single-node | File-based | Single machine | None |
| Memory | Testing, ephemeral | None | Single process | None |
| S3 | AWS production | Cloud | Unlimited | boto3 |
| GCS | GCP production | Cloud | Unlimited | google-cloud-storage |
| Azure Blob | Azure production | Cloud | Unlimited | azure-storage-blob |
| Database | SQL-based storage | Database | Depends on DB | sqlalchemy |
Installation¶
# Core (includes FileSystem and Memory stores)
pip install truthound
# Cloud storage backends
pip install truthound[s3] # AWS S3
pip install truthound[gcs] # Google Cloud Storage
pip install truthound[azure] # Azure Blob Storage
pip install truthound[db] # Database (SQLAlchemy)
# All storage backends
pip install truthound[all-stores]
Quick Start¶
Using the Factory¶
from truthound.stores import get_store
# FileSystem (default)
store = get_store("filesystem", base_path=".truthound/store")
# S3
store = get_store(
"s3",
bucket="my-bucket",
prefix="validations/",
region="us-east-1",
)
# Database
store = get_store(
"database",
connection_url="postgresql://user:pass@localhost/db",
)
Direct Instantiation¶
from truthound.stores.backends.filesystem import FileSystemStore, FileSystemConfig
from truthound.stores.backends.s3 import S3Store
# FileSystem with config
config = FileSystemConfig(
base_path=".truthound/store",
use_compression=True,
pretty_print=False,
)
store = FileSystemStore(config=config)
# S3 with direct parameters
store = S3Store(
bucket="my-validation-results",
prefix="prod/",
region="us-west-2",
compression=True,
)
Common Operations¶
All stores implement the ValidationStore protocol:
from truthound.stores.results import ValidationResult
from truthound.stores.base import StoreQuery
# Initialize store (lazy initialization)
store.initialize()
# Save a result
result = ValidationResult.from_report(report, "customers.csv")
run_id = store.save(result)
# Retrieve a result
result = store.get(run_id)
# Check existence
exists = store.exists(run_id)
# Delete a result
deleted = store.delete(run_id)
# List all IDs
ids = store.list_ids()
# Query with filters
query = StoreQuery(
data_asset="customers.csv",
start_time=datetime(2024, 1, 1),
end_time=datetime(2024, 12, 31),
status="failure",
limit=100,
)
results = store.query(query)
# Close connection
store.close()
StoreQuery Parameters¶
| Parameter | Type | Description |
|---|---|---|
data_asset |
str \| None |
Filter by data asset name |
start_time |
datetime \| None |
Filter results after this time |
end_time |
datetime \| None |
Filter results before this time |
status |
str \| None |
Filter by status (success, failure, error) |
tags |
dict[str, str] \| None |
Filter by tags |
limit |
int \| None |
Maximum results to return |
offset |
int |
Skip first N results (default: 0) |
order_by |
str |
Sort field (default: run_time) |
ascending |
bool |
Sort order (default: False) |
Enterprise Features¶
Truthound stores support advanced features through wrapper stores:
| Feature | Module | Description |
|---|---|---|
| Versioning | stores/versioning/ |
Version history, diff, rollback |
| Retention | stores/retention/ |
TTL policies, automatic cleanup |
| Tiering | stores/tiering/ |
Hot/Warm/Cold/Archive storage |
| Caching | stores/caching/ |
LRU, LFU, TTL caches |
| Replication | stores/replication/ |
Cross-region sync |
| Observability | stores/observability/ |
Audit, metrics, tracing |
Composing Features¶
from truthound.stores import get_store
from truthound.stores.versioning import VersionedStore, VersioningConfig
from truthound.stores.caching import CachedStore, CacheConfig, CacheMode
from truthound.stores.observability import ObservableStore
from truthound.stores.observability.config import (
ObservabilityConfig,
AuditConfig,
MetricsConfig,
TracingConfig,
)
# Base store
base = get_store("s3", bucket="my-bucket", prefix="results/")
# Add versioning
versioned = VersionedStore(base, VersioningConfig(max_versions=10))
# Add caching
from truthound.stores.caching.backends import LRUCache
cache = LRUCache(max_size=1000, ttl_seconds=3600)
cached = CachedStore(versioned, cache, mode=CacheMode.READ_WRITE)
# Add observability
store = ObservableStore(
cached,
ObservabilityConfig(
audit=AuditConfig(enabled=True),
metrics=MetricsConfig(enabled=True),
tracing=TracingConfig(enabled=True),
),
)
Error Handling¶
All stores raise consistent exceptions:
from truthound.stores.base import (
StoreError, # Base exception
StoreNotFoundError, # Item not found
StoreConnectionError, # Connection failed
StoreWriteError, # Write operation failed
StoreReadError, # Read operation failed
)
try:
result = store.get("nonexistent-id")
except StoreNotFoundError as e:
print(f"Result not found: {e.identifier}")
except StoreConnectionError as e:
print(f"Connection failed to {e.backend}: {e}")
Configuration Reference¶
Base Configuration¶
All stores extend StoreConfig:
from truthound.stores.base import StoreConfig
config = StoreConfig(
namespace="production", # Logical grouping
prefix="validations/", # Path/key prefix
)
Compression¶
Most stores support gzip compression:
# FileSystem
FileSystemConfig(use_compression=True)
# S3
S3Config(use_compression=True)
# Azure
AzureBlobConfig(use_compression=True)
Architecture¶
stores/
├── backends/ # Storage backend implementations
│ ├── filesystem.py # Local filesystem
│ ├── memory.py # In-memory (testing)
│ ├── s3.py # AWS S3
│ ├── gcs.py # Google Cloud Storage
│ ├── azure_blob.py # Azure Blob Storage
│ └── database.py # SQL databases
│
├── versioning/ # Result versioning
├── retention/ # TTL and retention policies
├── tiering/ # Hot/Warm/Cold/Archive
├── caching/ # Result caching layer
├── replication/ # Cross-region replication
├── observability/ # Audit, metrics, tracing
├── batching/ # Batch write optimization
└── backpressure/ # Flow control
Next Steps¶
- FileSystem Store - Local file storage
- Cloud Storage - S3, GCS, Azure Blob
- Versioning - Version control for results
- Caching - In-memory caching layer