Engine Chain
Engine chains combine multiple engines to implement advanced patterns such as fallback, load balancing, and conditional routing.
Basic Usage
from common.engines import EngineChain, TruthoundEngine, GreatExpectationsAdapter
primary = TruthoundEngine()
backup = GreatExpectationsAdapter()
chain = EngineChain([primary, backup])
# Use like a regular engine (automatically tries backup on primary failure)
result = chain.check(data, rules)
Fallback Strategies
| Strategy |
Description |
SEQUENTIAL |
Try sequentially (default) |
FIRST_AVAILABLE |
Use first healthy engine |
ROUND_ROBIN |
Distribute via round robin |
RANDOM |
Random selection |
PRIORITY |
Priority-based selection |
WEIGHTED |
Weighted random selection |
Execution Modes
| Mode |
Description |
FAIL_FAST |
Stop immediately on first failure |
FALLBACK |
Try next engine on failure (default) |
ALL |
Execute all engines, aggregate results |
FIRST_SUCCESS |
Execute until first success |
QUORUM |
Execute until quorum success |
Preset Configurations
from common.engines import (
DEFAULT_FALLBACK_CONFIG, # Default: sequential fallback
RETRY_FALLBACK_CONFIG, # Retry: 3 attempts
HEALTH_AWARE_FALLBACK_CONFIG, # Health check: healthy engines only
LOAD_BALANCED_CONFIG, # Load balancing: round robin
WEIGHTED_CONFIG, # Weighted: weight-based selection
)
Builder Pattern Configuration
from common.engines import FallbackConfig, FallbackStrategy
config = FallbackConfig()
config = config.with_strategy(FallbackStrategy.ROUND_ROBIN)
config = config.with_retry(count=3, delay_seconds=1.0)
config = config.with_health_check(enabled=True, skip_unhealthy=True)
config = config.with_timeout(30.0)
config = config.with_weights(truthound=2.0, ge=1.0)
chain = EngineChain([engine1, engine2], config=config)
Factory Functions
from common.engines import (
create_fallback_chain,
create_load_balanced_chain,
)
# Simple fallback chain
chain = create_fallback_chain(
primary,
backup1,
backup2,
retry_count=2,
check_health=True,
name="production_chain",
)
# Load-balanced chain
chain = create_load_balanced_chain(
engine1,
engine2,
engine3,
strategy=FallbackStrategy.WEIGHTED,
weights={"engine1": 3.0, "engine2": 2.0, "engine3": 1.0},
)
Conditional Routing
from common.engines import ConditionalEngineChain
chain = ConditionalEngineChain(name="smart_router")
# Add conditions
chain.add_route(
lambda data, rules: len(data) > 1_000_000, # Large data
heavy_engine,
priority=10,
name="large_data",
)
chain.add_route(
lambda data, rules: any(r.get("type") == "regex" for r in rules),
regex_engine,
priority=5,
name="regex_rules",
)
# Default engine
chain.set_default_engine(general_engine)
result = chain.check(data, rules)
Hooks
from common.engines import (
LoggingChainHook,
MetricsChainHook,
CompositeChainHook,
)
logging_hook = LoggingChainHook()
metrics_hook = MetricsChainHook()
composite = CompositeChainHook([logging_hook, metrics_hook])
chain = EngineChain([primary, backup], hooks=[composite])
result = chain.check(data, rules)
# Query metrics
print(metrics_hook.get_chain_success_rate("chain"))
print(metrics_hook.get_fallback_rate("chain"))
print(metrics_hook.get_average_duration_ms("chain"))
Execution Result Query
chain = EngineChain([primary, backup])
result = chain.check(data, rules)
exec_result = chain.last_execution_result
print(f"Success: {exec_result.success}")
print(f"Final engine: {exec_result.final_engine}")
print(f"Attempt count: {exec_result.attempt_count}")
print(f"Total duration: {exec_result.total_duration_ms}ms")
Exception Handling
from common.engines import (
AllEnginesFailedError,
NoEngineSelectedError,
EngineChainConfigError,
)
try:
result = chain.check(data, rules)
except AllEnginesFailedError as e:
print(f"All engines failed: {e.attempted_engines}")
except NoEngineSelectedError as e:
print(f"No engine selected: {e.chain_name}")
Async Chain
from common.engines import AsyncEngineChain, create_async_fallback_chain
chain = AsyncEngineChain([async_engine1, async_engine2])
result = await chain.check(data, rules)