Skip to content

Engines API Reference

engines/base.py

DataQualityEngine Protocol

@runtime_checkable
class DataQualityEngine(Protocol):
    @property
    def engine_name(self) -> str: ...

    @property
    def engine_version(self) -> str: ...

    def check(
        self,
        data: Any,
        rules: Sequence[Mapping[str, Any]] | None = None,
        **kwargs: Any,
    ) -> CheckResult: ...

    def profile(self, data: Any, **kwargs: Any) -> ProfileResult: ...

    def learn(self, data: Any, **kwargs: Any) -> LearnResult: ...

EngineCapabilities

@dataclass(frozen=True)
class EngineCapabilities:
    supports_check: bool = True
    supports_profile: bool = True
    supports_learn: bool = True
    supports_async: bool = False
    supports_streaming: bool = False
    supported_data_types: tuple[str, ...] = ()
    supported_rule_types: tuple[str, ...] = ()

EngineInfo

@dataclass(frozen=True)
class EngineInfo:
    name: str
    version: str
    description: str | None = None
    capabilities: EngineCapabilities | None = None

engines/lifecycle.py

EngineState

class EngineState(Enum):
    CREATED = "created"
    STARTING = "starting"
    RUNNING = "running"
    STOPPING = "stopping"
    STOPPED = "stopped"
    FAILED = "failed"

ManagedEngine Protocol

@runtime_checkable
class ManagedEngine(Protocol):
    def start(self) -> None: ...
    def stop(self) -> None: ...
    def health_check(self) -> HealthCheckResult: ...
    def get_state(self) -> EngineState: ...

EngineStateSnapshot

@dataclass(frozen=True)
class EngineStateSnapshot:
    state: EngineState
    uptime_seconds: float
    error_count: int
    last_error: Exception | None
    health_status: HealthStatus | None

engines/batch.py

BatchConfig

@dataclass(frozen=True)
class BatchConfig:
    batch_size: int = 1000
    max_workers: int = 4
    execution_strategy: ExecutionStrategy = ExecutionStrategy.PARALLEL
    aggregation_strategy: AggregationStrategy = AggregationStrategy.MERGE
    fail_fast: bool = False
    timeout_seconds: float | None = None

ExecutionStrategy

class ExecutionStrategy(Enum):
    SEQUENTIAL = "sequential"
    PARALLEL = "parallel"
    ADAPTIVE = "adaptive"

BatchResult

@dataclass(frozen=True)
class BatchResult:
    status: CheckStatus
    total_chunks: int
    passed_chunks: int
    failed_chunks: int
    duration_seconds: float
    results: tuple[CheckResult, ...] = ()

engines/chain.py

FallbackConfig

@dataclass(frozen=True)
class FallbackConfig:
    strategy: FallbackStrategy = FallbackStrategy.SEQUENTIAL
    retry_count: int = 1
    retry_delay_seconds: float = 0.0
    check_health: bool = False
    skip_unhealthy: bool = False
    timeout_seconds: float | None = None
    weights: Mapping[str, float] = field(default_factory=dict)

FallbackStrategy

class FallbackStrategy(Enum):
    SEQUENTIAL = "sequential"
    FIRST_AVAILABLE = "first_available"
    ROUND_ROBIN = "round_robin"
    RANDOM = "random"
    PRIORITY = "priority"
    WEIGHTED = "weighted"

ChainExecutionResult

@dataclass(frozen=True)
class ChainExecutionResult:
    success: bool
    final_engine: str | None
    result: CheckResult | None
    attempts: tuple[ChainExecutionAttempt, ...]
    total_duration_ms: float

engines/context.py

ContextConfig

@dataclass(frozen=True)
class ContextConfig:
    auto_start_engine: bool = True
    auto_stop_engine: bool = True
    cleanup_strategy: CleanupStrategy = CleanupStrategy.ALWAYS
    track_resources: bool = True
    enable_savepoints: bool = False
    propagate_exceptions: bool = True
    cleanup_timeout_seconds: float = 30.0

CleanupStrategy

class CleanupStrategy(Enum):
    ALWAYS = "always"
    ON_SUCCESS = "on_success"
    ON_FAILURE = "on_failure"
    NEVER = "never"

engines/aggregation.py

AggregationConfig

@dataclass(frozen=True)
class AggregationConfig:
    strategy: ResultAggregationStrategy = ResultAggregationStrategy.MERGE
    conflict_resolution: ConflictResolution = ConflictResolution.PREFER_FAILURE
    consensus_threshold: float = 0.5
    weights: Mapping[str, float] = field(default_factory=dict)
    preserve_individual_results: bool = False

ResultAggregationStrategy

class ResultAggregationStrategy(Enum):
    MERGE = "merge"
    WORST = "worst"
    BEST = "best"
    MAJORITY = "majority"
    FIRST_FAILURE = "first_failure"
    WEIGHTED = "weighted"
    CONSENSUS = "consensus"
    STRICT_ALL = "strict_all"
    LENIENT_ANY = "lenient_any"

engines/version.py

SemanticVersion

@dataclass(frozen=True)
class SemanticVersion:
    major: int
    minor: int
    patch: int
    prerelease: str | None = None
    build: str | None = None

    def __lt__(self, other): ...
    def __le__(self, other): ...
    def __gt__(self, other): ...
    def __ge__(self, other): ...

VersionConstraint

class VersionConstraint:
    def __init__(self, operator: VersionOperator, version: SemanticVersion): ...
    def satisfies(self, version: SemanticVersion) -> bool: ...

engines/plugin.py

PluginSpec

@dataclass(frozen=True)
class PluginSpec:
    name: str
    module_path: str
    class_name: str
    plugin_type: PluginType = PluginType.ENGINE
    priority: int = 100
    enabled: bool = True
    aliases: tuple[str, ...] = ()
    config: Mapping[str, Any] = field(default_factory=dict)

PluginType

class PluginType(Enum):
    ENGINE = "engine"
    VALIDATOR = "validator"
    HOOK = "hook"
    SOURCE = "source"

PluginState

class PluginState(Enum):
    DISCOVERED = "discovered"
    LOADING = "loading"
    LOADED = "loaded"
    FAILED = "failed"