Resilience Patterns Configuration¶
Truthound provides resilience patterns to protect against failures in external dependencies like databases, APIs, and message queues.
Overview¶
| Pattern | Purpose | Use Case |
|---|---|---|
| Circuit Breaker | Prevent cascading failures | External APIs, databases |
| Retry | Recover from transient failures | Network issues, timeouts |
| Bulkhead | Isolate resources | Connection pools, thread pools |
| Rate Limiter | Control request rate | API rate limits, quotas |
CircuitBreakerConfig¶
Prevents cascading failures by monitoring error rates and temporarily stopping requests to failing services.
Configuration¶
from truthound.common.resilience import CircuitBreakerConfig
config = CircuitBreakerConfig(
failure_threshold=5, # Failures to open circuit
success_threshold=3, # Successes to close circuit
timeout_seconds=30.0, # Time before half-open
half_open_max_calls=3, # Test calls in half-open state
failure_rate_threshold=50.0, # Failure rate % to open
slow_call_threshold_ms=1000.0, # Slow call definition
slow_call_rate_threshold=50.0, # Slow call % to open
window_size=100, # Measurement window
excluded_exceptions=(), # Exceptions that don't count as failures
record_slow_calls=True, # Whether to track slow calls
)
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
failure_threshold |
int |
5 |
Number of failures before opening circuit |
success_threshold |
int |
3 |
Number of successes in half-open to close circuit |
timeout_seconds |
float |
30.0 |
Time in open state before transitioning to half-open |
half_open_max_calls |
int |
3 |
Maximum calls allowed in half-open state |
failure_rate_threshold |
float |
50.0 |
Failure rate percentage (0-100) to trigger open |
slow_call_threshold_ms |
float |
1000.0 |
Latency threshold in ms for slow calls |
slow_call_rate_threshold |
float |
50.0 |
Slow call rate percentage (0-100) to trigger open |
window_size |
int |
100 |
Number of calls to track for rate calculations |
excluded_exceptions |
tuple |
() |
Exceptions that don't count as failures |
record_slow_calls |
bool |
True |
Whether to track slow calls |
Circuit States¶
┌──────────────────────────────────────┐
│ │
▼ │
┌───────┐ failure_threshold ┌───────┐
│ CLOSED │ ─────────────────────► │ OPEN │
└───────┘ └───────┘
▲ │
│ │ timeout_seconds
│ ▼
│ success_threshold ┌───────────┐
└───────────────────────── │ HALF-OPEN │
└───────────┘
Presets¶
# Aggressive - opens quickly, recovers slowly
# failure_threshold=3, success_threshold=3, timeout_seconds=60.0, failure_rate_threshold=30.0
config = CircuitBreakerConfig.aggressive()
# Lenient - tolerates more failures
# failure_threshold=10, success_threshold=1, timeout_seconds=15.0, failure_rate_threshold=80.0
config = CircuitBreakerConfig.lenient()
# Disabled - effectively disabled (high threshold)
# failure_threshold=1_000_000, timeout_seconds=0.1
config = CircuitBreakerConfig.disabled()
# Database optimized
# failure_threshold=5, success_threshold=2, timeout_seconds=30.0, slow_call_threshold_ms=5000.0
config = CircuitBreakerConfig.for_database()
# External API optimized
# failure_threshold=3, success_threshold=2, timeout_seconds=60.0, slow_call_threshold_ms=2000.0
config = CircuitBreakerConfig.for_external_api()
RetryConfig¶
Automatically retries failed operations with configurable backoff strategies.
Configuration¶
from truthound.common.resilience import RetryConfig
config = RetryConfig(
max_attempts=3, # Maximum number of attempts (1 = no retry)
base_delay=0.1, # Base delay in seconds
max_delay=30.0, # Maximum delay cap in seconds
exponential_base=2.0, # Multiplier for exponential backoff
jitter=True, # Whether to add random jitter
jitter_factor=0.5, # Maximum jitter as a fraction (0.0-1.0)
retryable_exceptions=(ConnectionError, TimeoutError, OSError),
non_retryable_exceptions=(ValueError, TypeError, KeyError),
)
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
max_attempts |
int |
3 |
Maximum number of attempts (1 = no retry) |
base_delay |
float |
0.1 |
Base delay in seconds |
max_delay |
float |
30.0 |
Maximum delay cap in seconds |
exponential_base |
float |
2.0 |
Multiplier for exponential backoff |
jitter |
bool |
True |
Whether to add random jitter to delays |
jitter_factor |
float |
0.5 |
Maximum jitter as a fraction (0.0-1.0) |
retryable_exceptions |
tuple |
(ConnectionError, TimeoutError, OSError) |
Exceptions that trigger retry |
non_retryable_exceptions |
tuple |
(ValueError, TypeError, KeyError) |
Exceptions that should not be retried |
Delay Calculation¶
The delay for attempt n (0-indexed) is calculated as:
delay = min(base_delay * (exponential_base ** n), max_delay)
# With jitter:
jitter_range = delay * jitter_factor
delay = delay + random.uniform(-jitter_range, jitter_range)
Example with default settings: - Attempt 0: ~0.1s - Attempt 1: ~0.2s - Attempt 2: ~0.4s - Attempt 3: ~0.8s
Presets¶
# No retry - fail immediately
# max_attempts=1
config = RetryConfig.no_retry()
# Quick retry for transient failures
# max_attempts=3, base_delay=0.05, max_delay=1.0
config = RetryConfig.quick()
# Persistent retry for important operations
# max_attempts=5, base_delay=0.5, max_delay=30.0
config = RetryConfig.persistent()
# Standard exponential backoff
# max_attempts=4, base_delay=0.1, max_delay=10.0, exponential_base=2.0
config = RetryConfig.exponential()
Helper Methods¶
config = RetryConfig()
# Calculate delay for a specific attempt
delay = config.calculate_delay(attempt=2) # Returns delay in seconds
# Check if an exception should trigger retry
should_retry = config.is_retryable(ConnectionError()) # True
should_retry = config.is_retryable(ValueError()) # False
BulkheadConfig¶
Isolates resources to prevent one component from consuming all available resources.
Configuration¶
from truthound.common.resilience import BulkheadConfig
config = BulkheadConfig(
max_concurrent=10, # Maximum concurrent executions
max_wait_time=0.0, # Maximum time to wait for a slot (0 = fail immediately)
fairness=True, # FIFO ordering for waiting requests
)
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
max_concurrent |
int |
10 |
Maximum concurrent executions |
max_wait_time |
float |
0.0 |
Maximum time to wait for a slot in seconds (0 = fail immediately) |
fairness |
bool |
True |
Whether to use fair (FIFO) ordering for waiting requests |
Presets¶
# Small bulkhead for limited resources
# max_concurrent=5
config = BulkheadConfig.small()
# Medium bulkhead for moderate concurrency
# max_concurrent=20
config = BulkheadConfig.medium()
# Large bulkhead for high concurrency
# max_concurrent=50
config = BulkheadConfig.large()
# Database optimized (with wait time)
# max_concurrent=10, max_wait_time=5.0
config = BulkheadConfig.for_database()
RateLimiterConfig¶
Controls the rate of requests to prevent overwhelming services or exceeding quotas.
Configuration¶
from truthound.common.resilience import RateLimiterConfig
config = RateLimiterConfig(
rate=100, # Number of permits per period
period_seconds=1.0, # Period duration in seconds
burst_size=None, # Maximum burst size (defaults to rate)
algorithm="token_bucket", # Rate limiting algorithm
)
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
rate |
int |
100 |
Number of permits per period |
period_seconds |
float |
1.0 |
Period duration in seconds |
burst_size |
int \| None |
None |
Maximum burst size (defaults to rate if not set) |
algorithm |
str |
"token_bucket" |
Rate limiting algorithm |
Algorithms¶
| Algorithm | Description |
|---|---|
token_bucket |
Tokens refill at a steady rate, allows bursting up to burst_size |
sliding_window |
Counts requests in a sliding time window |
fixed_window |
Counts requests in fixed time intervals |
Presets¶
# N requests per second
config = RateLimiterConfig.per_second(rate=100, burst=150)
# N requests per minute
config = RateLimiterConfig.per_minute(rate=1000, burst=1200)
# N requests per hour
config = RateLimiterConfig.per_hour(rate=10000, burst=12000)
Properties¶
config = RateLimiterConfig(rate=100, burst_size=None)
# Get effective burst size (defaults to rate if not set)
burst = config.effective_burst_size # Returns 100
Combining Patterns¶
Use the ResilienceBuilder to combine multiple patterns:
from truthound.common.resilience import (
ResilienceBuilder,
CircuitBreakerConfig,
RetryConfig,
BulkheadConfig,
RateLimiterConfig,
)
wrapper = (
ResilienceBuilder("my-service")
.with_circuit_breaker(CircuitBreakerConfig.for_external_api())
.with_retry(RetryConfig.exponential())
.with_bulkhead(BulkheadConfig.medium())
.with_rate_limit(RateLimiterConfig.per_second(100))
.build()
)
# Execute with all resilience patterns
result = wrapper.execute(my_function, args)
# Or use as decorator
@wrapper
def risky_operation():
return external_service.call()
Pattern Execution Order¶
When combined, patterns are applied in this order (outer to inner):
- Rate Limiter - Controls request rate
- Bulkhead - Limits concurrent executions
- Circuit Breaker - Monitors failures and opens circuit
- Retry - Retries failed operations
Use Case Examples¶
Database Connection¶
db_config = (
ResilienceBuilder("database")
.with_circuit_breaker(CircuitBreakerConfig.for_database())
.with_retry(RetryConfig(
max_attempts=3,
base_delay=0.5,
retryable_exceptions=(ConnectionError, TimeoutError),
))
.with_bulkhead(BulkheadConfig.for_database())
.build()
)
External API¶
api_config = (
ResilienceBuilder("external-api")
.with_circuit_breaker(CircuitBreakerConfig.for_external_api())
.with_retry(RetryConfig.exponential())
.with_rate_limit(RateLimiterConfig.per_second(100))
.build()
)
Message Queue Consumer¶
queue_config = (
ResilienceBuilder("message-queue")
.with_circuit_breaker(CircuitBreakerConfig.lenient())
.with_retry(RetryConfig.persistent())
.with_bulkhead(BulkheadConfig(max_concurrent=50, max_wait_time=10.0))
.build()
)
Validation Pipeline Integration (VE-5)¶
The ValidationResiliencePolicy bridges Truthound's validation engine with the resilience patterns above, providing per-validator circuit breakers and retry logic.
ValidationResiliencePolicy¶
from truthound.validators.resilience_bridge import (
ValidationResiliencePolicy,
create_default_policy,
create_strict_policy,
)
# Default: lenient circuit breaker (10 failures to open, 15s timeout)
policy = create_default_policy(max_retries=2)
# Strict: no retries, aggressive circuit breaker
policy = create_strict_policy()
# Custom policy
from truthound.common.resilience import CircuitBreakerConfig
policy = ValidationResiliencePolicy(
circuit_breaker_config=CircuitBreakerConfig(
failure_threshold=5,
success_threshold=2,
timeout_seconds=30.0,
),
max_retries=3,
on_retry=lambda attempt, exc, delay: logger.warning(
f"Retry {attempt} after {delay:.1f}s: {exc}"
),
)
Execution Flow¶
policy.execute(validator, lf)
│
├── Circuit breaker check → OPEN? → return SKIPPED result
│
├── _validate_safe(validator, lf, max_retries=policy.max_retries)
│ │
│ ├── Attempt 1 → success? → record success → return
│ ├── Attempt 2 → exponential backoff (0.1s × 2^n, cap 5s)
│ └── Attempt N → max_retries exceeded → record failure
│
└── Circuit breaker update → failure count → maybe OPEN
Per-Validator Circuit States¶
Each validator gets its own circuit breaker instance:
# Check circuit state for a specific validator
state = policy.get_circuit_state("null_check") # "CLOSED" | "OPEN" | "HALF_OPEN"
# Reset a specific validator's circuit
policy.reset("null_check")
# Reset all circuits
policy.reset()
Exception Classification¶
The ExceptionInfo.from_exception() factory automatically classifies exceptions:
| Category | Retryable | Exception Types |
|---|---|---|
transient |
Yes | TimeoutError, ConnectionError, OSError, ValidationTimeoutError |
configuration |
No | ValueError, TypeError, KeyError, ColumnNotFoundError |
data |
No | Polars ComputeError, SchemaError |
permanent |
No | All other exceptions |
Retry logic only attempts retries for transient category exceptions.
Integration with th.check()¶
The catch_exceptions and max_retries parameters on th.check() are propagated to all execution paths:
import truthound as th
# All validators wrapped with exception isolation and retry
report = th.check(
"data.csv",
catch_exceptions=True, # Default: True
max_retries=3, # Retry transient errors
)
# Access exception summary on the report
if report.exception_summary:
print(f"Exceptions: {report.exception_summary.total_count}")
print(f"By category: {report.exception_summary.by_category}")
Validation¶
All configuration classes validate their parameters on initialization:
# Raises ValueError: failure_threshold must be positive
CircuitBreakerConfig(failure_threshold=0)
# Raises ValueError: max_attempts must be at least 1
RetryConfig(max_attempts=0)
# Raises ValueError: max_concurrent must be positive
BulkheadConfig(max_concurrent=0)
# Raises ValueError: algorithm must be one of {'token_bucket', 'sliding_window', 'fixed_window'}
RateLimiterConfig(algorithm="invalid")