Skip to content

Store Configuration

Truthound provides multiple storage backends for validation results with enterprise features.

Quick Start

from truthound.stores import get_store

# Filesystem (default)
store = get_store("filesystem", base_path=".truthound/results")

# S3
store = get_store("s3", bucket="my-bucket", region="us-east-1")

# Database
store = get_store("database", connection_url="postgresql://localhost/db")

StoreConfig

Base configuration for all stores.

from truthound.stores.base import StoreConfig

config = StoreConfig(
    namespace="default",           # Logical grouping
    prefix="",                     # Path prefix
    serialization_format="json",   # json, yaml, pickle
    compression=None,              # gzip, lz4, zstd
    metadata={},                   # Custom metadata
)

Backend Configurations

FileSystem Store

from truthound.stores.backends.filesystem import (
    FileSystemStore,
    FileSystemConfig,
)

config = FileSystemConfig(
    base_path=".truthound/store",  # Base directory
    file_extension=".json",        # File extension
    create_dirs=True,              # Auto-create directories
    pretty_print=True,             # Pretty JSON output
    use_compression=False,         # Enable gzip compression
)

store = FileSystemStore(config)
Parameter Default Description
base_path .truthound/store Base storage directory
file_extension .json File extension
create_dirs True Auto-create directories
pretty_print True Human-readable JSON
use_compression False Enable gzip

S3 Store

from truthound.stores.backends.s3 import S3Store, S3Config

config = S3Config(
    bucket="my-bucket",
    prefix="truthound/",
    region="us-east-1",
    endpoint_url=None,                  # Custom endpoint (MinIO, etc.)
    use_compression=True,
    storage_class="STANDARD",           # STANDARD, INTELLIGENT_TIERING, etc.
    server_side_encryption=None,        # AES256, aws:kms
    kms_key_id=None,                    # KMS key ARN
    tags={},                            # S3 object tags
)

store = S3Store(config)
Parameter Default Description
bucket "" S3 bucket name
prefix truthound/ Object key prefix
region None AWS region
use_compression True Compress objects
storage_class STANDARD S3 storage class
server_side_encryption None SSE type

GCS Store

from truthound.stores.backends.gcs import GCSStore, GCSConfig

config = GCSConfig(
    bucket="my-bucket",
    prefix="truthound/",
    project=None,                       # GCP project ID
    credentials_path=None,              # Service account JSON path
    use_compression=True,
)

store = GCSStore(config)

Azure Blob Store

from truthound.stores.backends.azure_blob import (
    AzureBlobStore,
    AzureBlobConfig,
)

config = AzureBlobConfig(
    container="my-container",
    prefix="truthound/",
    # Authentication (choose one)
    connection_string=None,             # Full connection string
    account_url=None,                   # Account URL
    account_name=None,                  # Account name
    account_key=None,                   # Account key
    sas_token=None,                     # SAS token
    # Options
    use_compression=True,
    content_type="application/json",
    access_tier=None,                   # Hot, Cool, Archive
    metadata={},                        # Blob metadata
)

store = AzureBlobStore(config)

Authentication Methods:

# Connection string (development)
config = AzureBlobConfig(
    container="my-container",
    connection_string="DefaultEndpointsProtocol=https;..."
)

# Account URL + SAS token
config = AzureBlobConfig(
    container="my-container",
    account_url="https://myaccount.blob.core.windows.net",
    sas_token="sv=2021-06-08&..."
)

# Managed identity (production)
config = AzureBlobConfig(
    container="my-container",
    account_url="https://myaccount.blob.core.windows.net",
    # Uses DefaultAzureCredential automatically
)

Database Store

from truthound.stores.backends.database import (
    DatabaseStore,
    DatabaseConfig,
    PoolingConfig,
)

config = DatabaseConfig(
    connection_url="postgresql://user:pass@localhost/db",
    table_prefix="",                    # Table name prefix
    pool_size=5,                        # Connection pool size
    max_overflow=10,                    # Additional connections
    echo=False,                         # SQLAlchemy echo
    create_tables=True,                 # Auto-create tables
    pooling=PoolingConfig(),
    use_pool_manager=True,              # Enterprise pool manager
)

store = DatabaseStore(config)

Connection Pool Configuration

Advanced connection pooling for database stores.

from truthound.stores.backends.connection_pool import (
    ConnectionPoolConfig,
    PoolConfig,
    PoolStrategy,
    RetryConfig,
    CircuitBreakerConfig,
    HealthCheckConfig,
)

# Pool settings
pool = PoolConfig(
    strategy=PoolStrategy.QUEUE_POOL,   # Pool strategy
    pool_size=5,                        # Max connections
    max_overflow=10,                    # Extra connections
    pool_timeout=30.0,                  # Acquire timeout
    pool_recycle=3600,                  # Recycle connections (1hr)
    pool_pre_ping=True,                 # Ping before use
    echo_pool=False,                    # Log pool events
    reset_on_return="rollback",         # Reset strategy
)

# Retry settings
retry = RetryConfig(
    max_retries=3,
    base_delay=0.1,
    max_delay=30.0,
    exponential_base=2.0,
    jitter=True,
)

# Circuit breaker
circuit_breaker = CircuitBreakerConfig(
    failure_threshold=5,
    success_threshold=3,
    timeout=60.0,
    half_open_max_calls=3,
)

# Health checks
health_check = HealthCheckConfig(
    enabled=True,
    interval=30.0,
    timeout=5.0,
    query="SELECT 1",
)

# Full configuration
config = ConnectionPoolConfig(
    connection_url="postgresql://localhost/db",
    pool=pool,
    retry=retry,
    circuit_breaker=circuit_breaker,
    health_check=health_check,
)

Pool Strategies:

Strategy Description
QUEUE_POOL Standard pool with overflow (default)
NULL_POOL No pooling, new connection each time
STATIC_POOL Single connection for all requests
SINGLETON_THREAD One connection per thread
ASYNC_QUEUE Async-compatible queue pool

Versioning Configuration

Enable version control for stored results.

from truthound.stores.versioning.base import (
    VersioningConfig,
    VersioningMode,
)

config = VersioningConfig(
    mode=VersioningMode.INCREMENTAL,    # Versioning strategy
    max_versions=0,                     # 0 = unlimited
    auto_cleanup=True,                  # Auto-remove old versions
    track_changes=True,                 # Track change history
    require_message=False,              # Require commit message
    enable_branching=False,             # Enable branches
    checksum_algorithm="sha256",        # Integrity checksum
)

Versioning Modes:

Mode Format Example
INCREMENTAL v{n} v1, v2, v3
SEMANTIC X.Y.Z 1.0.0, 1.1.0
TIMESTAMP ISO timestamp 2024-01-15T10:30:45
GIT_LIKE Short hash abc1234
from truthound.stores.versioning import VersionedStore

# Wrap any store with versioning
versioned_store = VersionedStore(
    store=base_store,
    config=VersioningConfig(mode=VersioningMode.SEMANTIC),
)

# Save with version
versioned_store.save(result, message="Initial validation")

# List versions
versions = versioned_store.list_versions(item_id)

# Get specific version
result = versioned_store.get_version(item_id, version=2)

# Rollback
versioned_store.rollback(item_id, version=1)

Retention Configuration

Configure retention policies for automatic cleanup.

from truthound.stores.retention.policies import (
    TimeBasedPolicy,
    CountBasedPolicy,
    SizeBasedPolicy,
    StatusBasedPolicy,
    TagBasedPolicy,
    CompositePolicy,
    RetentionAction,
)

# Time-based: Delete after 90 days
time_policy = TimeBasedPolicy(
    max_age_days=90,
    action=RetentionAction.DELETE,
)

# Count-based: Keep last 100 per asset
count_policy = CountBasedPolicy(
    max_count=100,
    per_asset=True,
    action=RetentionAction.ARCHIVE,
)

# Size-based: Max 1GB
size_policy = SizeBasedPolicy(
    max_size_bytes=1_000_000_000,
    action=RetentionAction.DELETE,
)

# Status-based: Delete failed after 7 days
status_policy = StatusBasedPolicy(
    status="failed",
    max_age_days=7,
    action=RetentionAction.DELETE,
)

# Tag-based: Keep production results longer
tag_policy = TagBasedPolicy(
    tag_key="environment",
    tag_value="production",
    max_age_days=365,
    action=RetentionAction.ARCHIVE,
)

# Composite: Combine multiple policies
composite = CompositePolicy(
    policies=[time_policy, count_policy],
    mode="any",  # "any" or "all"
)

Retention Actions:

Action Description
DELETE Permanently remove
ARCHIVE Move to archive tier
QUARANTINE Move to quarantine

Caching Configuration

Add caching layer for improved performance.

from truthound.stores.caching.base import (
    CacheConfig,
    EvictionPolicy,
)

config = CacheConfig(
    max_size=10000,                     # Max cache entries
    max_memory_mb=100.0,                # Max memory usage
    ttl_seconds=3600.0,                 # TTL (1 hour)
    eviction_policy=EvictionPolicy.LRU, # Eviction strategy
    eviction_batch_size=100,            # Batch eviction size
    enable_statistics=True,             # Track cache stats
    warm_on_startup=False,              # Warm cache on startup
    background_refresh=False,           # Background refresh
    refresh_threshold_percent=20.0,     # Refresh threshold
)

Eviction Policies:

Policy Description
LRU Least Recently Used
LFU Least Frequently Used
TTL Time To Live
FIFO First In First Out
RANDOM Random eviction
from truthound.stores.caching import CachedStore

cached_store = CachedStore(
    store=base_store,
    config=config,
)

# Cache statistics
stats = cached_store.get_stats()
print(f"Hit rate: {stats.hits / (stats.hits + stats.misses):.2%}")

Replication Configuration

Configure cross-region replication.

from truthound.stores.replication.base import (
    ReplicationConfig,
    ReplicationMode,
    ReadPreference,
    ConflictResolution,
    ReplicaTarget,
)

config = ReplicationConfig(
    mode=ReplicationMode.ASYNC,
    targets=[],
    read_preference=ReadPreference.PRIMARY,
    conflict_resolution=ConflictResolution.LAST_WRITE_WINS,
    min_sync_replicas=1,
    enable_health_checks=True,
    health_check_interval_seconds=30.0,
    max_replication_lag_ms=5000.0,
    enable_failover=True,
    enable_metrics=True,
)

# Add replica target
target = ReplicaTarget(
    name="us-west-replica",
    store=west_store,
    region="us-west-2",
    priority=1,
    is_read_replica=True,
    sync_timeout_seconds=30.0,
    max_retry_attempts=3,
)

Replication Modes:

Mode Description
SYNC Wait for all replicas
ASYNC Fire and forget
SEMI_SYNC Wait for at least one

Read Preferences:

Preference Description
PRIMARY Always read from primary
SECONDARY Prefer secondary replicas
NEAREST Read from nearest
ANY Read from any available

Backpressure Configuration

Manage system load with backpressure.

from truthound.stores.backpressure.base import BackpressureConfig

config = BackpressureConfig(
    enabled=True,
    memory_threshold_percent=80.0,      # Memory pressure threshold
    queue_depth_threshold=10000,        # Queue depth threshold
    latency_threshold_ms=100.0,         # Latency threshold
    min_pause_ms=10.0,                  # Minimum pause
    max_pause_ms=5000.0,                # Maximum pause
    base_rate=10000.0,                  # Base ops/sec
    min_rate=100.0,                     # Minimum ops/sec
    adaptive_window_size=100,           # Adaptive window
    recovery_rate=0.1,                  # Recovery rate
    sampling_interval_ms=100.0,         # Sampling interval
    pressure_decay_factor=0.95,         # Decay factor
)

Observability Configuration

Configure audit logging, metrics, and tracing.

from truthound.stores.observability.config import (
    ObservabilityConfig,
    AuditConfig,
    AuditLogLevel,
    MetricsConfig,
    TracingConfig,
)

# Audit logging
audit = AuditConfig(
    enabled=True,
    level=AuditLogLevel.STANDARD,       # MINIMAL, STANDARD, VERBOSE, DEBUG
    backend="json",                     # memory, file, json, elasticsearch, kafka
    file_path="./audit.log",
    include_data_preview=False,
    max_data_preview_size=1024,
    redact_sensitive=True,
    sensitive_fields=["password", "secret", "token", "api_key"],
    retention_days=90,
    batch_size=100,
    flush_interval_seconds=5.0,
)

# Prometheus metrics
metrics = MetricsConfig(
    enabled=True,
    prefix="truthound_store",
    labels={"service": "data-quality"},
    histogram_buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0],
    enable_http_server=True,
    http_port=9090,
    http_path="/metrics",
    push_gateway_url=None,
    push_interval_seconds=10.0,
)

# Distributed tracing
tracing = TracingConfig(
    enabled=True,
    service_name="truthound-store",
    sampler="parent_based",             # parent_based, always_on, always_off
    sample_ratio=1.0,
    exporter="otlp",                    # otlp, jaeger, zipkin, console
    endpoint=None,
    propagators=["tracecontext", "baggage"],
    record_exceptions=True,
)

# Combined configuration
observability = ObservabilityConfig(
    audit=audit,
    metrics=metrics,
    tracing=tracing,
    correlation_id_header="X-Correlation-ID",
    environment="production",
)

# Presets
config = ObservabilityConfig.production()  # Production defaults
config = ObservabilityConfig.minimal()     # Minimal overhead
config = ObservabilityConfig.disabled()    # All disabled

Factory Functions

from truthound.stores import get_store, list_available_backends

# List available backends
backends = list_available_backends()
# ['filesystem', 's3', 'gcs', 'azure', 'database', 'memory']

# Create store by backend name
store = get_store("filesystem", base_path=".truthound/results")
store = get_store("s3", bucket="my-bucket", prefix="results/")
store = get_store("database", connection_url="postgresql://localhost/db")
store = get_store("azure", container="my-container", connection_string="...")
store = get_store("memory")  # For testing

# Check backend availability
from truthound.stores import is_backend_available

if is_backend_available("s3"):
    store = get_store("s3", bucket="my-bucket")

Store Operations

from truthound.stores.base import StoreQuery

# Save result
result_id = store.save(validation_result)

# Get result
result = store.get(result_id)

# Check existence
if store.exists(result_id):
    store.delete(result_id)

# Query results
query = StoreQuery(
    data_asset="users_table",
    start_time=datetime(2024, 1, 1),
    end_time=datetime(2024, 12, 31),
    status="failure",
    tags={"environment": "production"},
    limit=100,
    offset=0,
    order_by="run_time",
    ascending=False,
)

results = store.query(query)

# Iterate with batching
for result in store.iter_query(query, batch_size=100):
    process(result)