Skip to content

Validator Optimization

Truthound's optimization module provides tools to maximize validator execution performance on large-scale datasets.

Overview

The optimization module covers the following areas:

Area Module Key Features
DAG Execution orchestrator Dependency-based execution order, parallel execution
Graph Algorithms graph Cycle detection, topological sort, DFS
Metric Deduplication metrics (VE-3) Cross-validator metric caching via SharedMetricStore
Conditional Execution base (VE-4) SkipCondition, dependency-based skip logic
3-Tier Fallback base (VE-5) Batch → per-validator → per-expression graceful degradation
Covariance Computation covariance Incremental covariance, Woodbury updates
Geographic Operations geo Vectorized Haversine, spatial indexing
Aggregation Optimization aggregation Lazy aggregation, streaming joins
Profiling profiling Execution time, memory, throughput measurement

DAG-Based Execution Orchestration

ValidatorDAG manages dependencies between validators and determines optimal execution order.

Basic Usage

from truthound.validators.optimization import (
    ValidatorDAG,
    ExecutionPlan,
    ParallelExecutionStrategy,
)

# Create DAG
dag = ValidatorDAG()

# Add validators
dag.add_validators([
    NullValidator(),
    DuplicateValidator(),
    RangeValidator(),
])

# Build execution plan
plan = dag.build_execution_plan()

# Execute with parallel strategy
strategy = ParallelExecutionStrategy(max_workers=4)
results = plan.execute(lf, strategy)

Execution Phases (ValidatorPhase)

Validators are automatically assigned to execution phases:

Phase Description Priority
SCHEMA Schema validation (column existence, types) 1
COMPLETENESS Null checks, missing values 2
UNIQUENESS Duplicate detection, key validation 3
FORMAT Pattern matching, format validation 4
RANGE Value range, distribution checks 5
STATISTICAL Statistics, outliers 6
CROSS_TABLE Multi-table validation 7
CUSTOM User-defined 8

ValidatorNode

Metadata can be specified when adding validators to the DAG:

from truthound.validators.optimization import ValidatorNode, ValidatorPhase

node = dag.add_validator(
    validator=MyValidator(),
    dependencies={"null_validator"},  # Explicit dependencies
    provides={"completeness"},         # Provided capabilities
    phase=ValidatorPhase.STATISTICAL,  # Execution phase
    priority=50,                       # Priority within phase (lower = earlier)
    estimated_cost=2.5,                # Estimated cost (for adaptive scheduling)
)

Execution Strategies

SequentialExecutionStrategy

Executes validators sequentially:

from truthound.validators.optimization import SequentialExecutionStrategy

strategy = SequentialExecutionStrategy()
result = plan.execute(lf, strategy)

ParallelExecutionStrategy

Parallel execution using ThreadPoolExecutor:

from truthound.validators.optimization import ParallelExecutionStrategy

strategy = ParallelExecutionStrategy(max_workers=8)
result = plan.execute(lf, strategy)

Parameters:

Parameter Type Default Description
max_workers int \| None None Maximum workers. If None, uses min(32, cpu_count + 4)

AdaptiveExecutionStrategy

Dynamically selects sequential/parallel strategy based on the number of validators per level:

from truthound.validators.optimization import AdaptiveExecutionStrategy

strategy = AdaptiveExecutionStrategy(
    parallel_threshold=3,  # Use parallel when 3 or more validators
    max_workers=4,
)
result = plan.execute(lf, strategy)

ExecutionResult

Execution results provide various metrics:

result = plan.execute(lf, strategy)

# Basic metrics
print(f"Total duration: {result.total_duration_ms}ms")
print(f"Total validators: {result.total_validators}")
print(f"Success: {result.success_count}")
print(f"Failure: {result.failure_count}")
print(f"Total issues: {len(result.all_issues)}")

# Detailed metrics
metrics = result.get_metrics()
# {
#     "total_duration_ms": 150.5,
#     "total_validators": 10,
#     "total_issues": 25,
#     "success_count": 8,
#     "failure_count": 2,
#     "levels": 4,
#     "strategy": "parallel",
#     "parallelism_factor": 2.5,  # Sequential time / actual time
# }

Convenience Functions

from truthound.validators.optimization import (
    create_execution_plan,
    execute_validators,
)

# Create execution plan
plan = create_execution_plan(
    validators=[v1, v2, v3],
    dependencies={"v2": {"v1"}},  # v2 depends on v1
)

# Execute at once
result = execute_validators(
    validators=[v1, v2, v3],
    lf=df.lazy(),
    strategy=ParallelExecutionStrategy(max_workers=4),
)

Metric Deduplication (VE-3)

When multiple validators require the same computed metric (e.g., null_count on column email), the SharedMetricStore ensures each metric is computed exactly once per session.

Architecture

ExpressionBatchExecutor._precompute_shared_metrics()
├── Collects MetricKey lists from all validators via get_required_metrics()
├── Deduplicates into unique MetricKey set
├── Builds single lf.select([expr1, expr2, ...]).collect()
└── Stores results in SharedMetricStore

Declaring Metric Dependencies

from truthound.validators.base import Validator
from truthound.validators.metrics import MetricKey, CommonMetrics

class MyValidator(Validator):
    def get_required_metrics(self, columns: list[str]) -> list[MetricKey]:
        keys = []
        for col in columns:
            key, _ = CommonMetrics.null_count(col)
            keys.append(key)
            key, _ = CommonMetrics.row_count()
            keys.append(key)
        return keys

    def validate_with_metrics(self, lf, metric_store):
        for col in self._get_target_columns(lf):
            key, _ = CommonMetrics.null_count(col)
            null_count = metric_store.get(key)
            # Use cached value instead of recomputing

Available Common Metrics

Metric Scope Expression
row_count() Table pl.len()
null_count(col) Column pl.col(col).is_null().sum()
non_null_count(col) Column pl.col(col).is_not_null().sum()
n_unique(col) Column pl.col(col).n_unique()
mean(col) Column pl.col(col).mean()
std(col) Column pl.col(col).std()
min(col) Column pl.col(col).min()
max(col) Column pl.col(col).max()
sum(col) Column pl.col(col).sum()
quantile(col, q) Column pl.col(col).quantile(q)
median(col) Column pl.col(col).median()

SharedMetricStore API

from truthound.validators.metrics import SharedMetricStore, MetricKey

store = SharedMetricStore()

# Basic operations
store.put(key, value)
value = store.get(key)             # Returns None if missing
value = store.get_or_compute(key, compute_fn)

# Bulk operations
store.put_many({key1: val1, key2: val2})
results = store.get_many([key1, key2])
missing = store.missing_keys([key1, key2, key3])

# Statistics
stats = store.stats  # MetricStoreStats(hits, misses, size)

Conditional Execution via SkipCondition (VE-4)

Validators can declare conditions under which they should be skipped based on prior validator results. This eliminates redundant work when upstream validators have already detected fatal issues.

Priority Hierarchy

Validators are grouped into priority levels for execution ordering:

Priority Range Category Examples
10–30 Schema ColumnExistsValidator, ColumnTypeValidator
50 Completeness NullValidator, NotNullValidator
60 Uniqueness UniqueValidator, DuplicateValidator
70–80 Distribution RangeValidator, BetweenValidator
90 Referential ForeignKeyValidator

Declaring Skip Conditions

from truthound.validators.base import Validator, SkipCondition

class DistributionValidator(Validator):
    dependencies = {"schema_check", "null_check"}
    priority = 75

    def get_skip_conditions(self) -> list[SkipCondition]:
        return [
            # Skip if schema validation failed entirely
            SkipCondition(depends_on="schema_check", skip_when="failed"),
            # Skip if null check found critical-level nulls
            SkipCondition(depends_on="null_check", skip_when="critical"),
        ]

3-Stage Dependency Resolution

  1. Dependency status check: If any dependencies validator ended FAILED/TIMEOUT/SKIPPED → skip.
  2. SkipCondition evaluation: Each condition is evaluated against prior ValidatorExecutionResult.
  3. Column context filtering: _filter_columns_by_context() narrows target columns based on upstream results.

3-Tier Expression Fallback (VE-5)

The ExpressionBatchExecutor implements progressive fallback to maximize result collection:

Tier 1: Batch all validators → single lf.select([...]).collect()
        │ ComputeError / SchemaError
        â–¼
Tier 2: Per-validator execution → individual collect() per validator
        │ failure on specific validator
        â–¼
Tier 3: Per-expression execution → individual collect() per expression
        partial_failure_mode controls behavior:
          "collect" → gather partial results, continue
          "skip"    → discard failing expressions
          "raise"   → re-raise the exception

Graph Traversal Algorithms

Optimized graph algorithms for hierarchy and relationship data validation.

IterativeDFS

Iterative depth-first search to avoid recursion limits:

from truthound.validators.optimization import IterativeDFS

adjacency = {
    "A": ["B", "C"],
    "B": ["D"],
    "C": ["D"],
    "D": [],
}

dfs = IterativeDFS(adjacency)

# Preorder traversal
for node in dfs.traverse(start="A", order="preorder"):
    print(node)  # A, B, D, C

# Find path
path = dfs.find_path("A", "D")  # ["A", "B", "D"]

# Compute depths
depths = dfs.compute_depths(roots=["A"])  # {"A": 0, "B": 1, "C": 1, "D": 2}

TarjanSCC

Tarjan's Strongly Connected Components algorithm for O(V+E) cycle detection:

from truthound.validators.optimization import TarjanSCC, CycleInfo

adjacency = {
    "A": ["B"],
    "B": ["C"],
    "C": ["A", "D"],  # A-B-C cycle
    "D": ["D"],       # Self-loop
}

tarjan = TarjanSCC(adjacency)

# Find all SCCs
sccs = tarjan.find_sccs()
# [["A", "C", "B"], ["D"]]  # SCCs with size > 1 are cycles

# Get cycle info
cycles = tarjan.find_cycles()
for cycle in cycles:
    print(f"Cycle: {cycle.nodes}")
    print(f"Length: {cycle.length}")
    print(f"Self-loop: {cycle.is_self_loop}")

TopologicalSort

Topological sort using Kahn's algorithm:

from truthound.validators.optimization import TopologicalSort

dependencies = {
    "task_a": ["task_b", "task_c"],  # a before b, c
    "task_b": ["task_d"],
    "task_c": ["task_d"],
    "task_d": [],
}

sorter = TopologicalSort(dependencies)

# Perform sort
try:
    order = sorter.sort()  # ["task_a", "task_b", "task_c", "task_d"]
except ValueError:
    print("Cycle exists")

# Check for cycles
if sorter.has_cycles():
    print("Cycle detected")

GraphTraversalMixin

Mixin for graph operations in validators:

from truthound.validators.optimization import GraphTraversalMixin

class HierarchyValidator(BaseValidator, GraphTraversalMixin):
    def validate(self, df):
        # Build adjacency list
        adj = self.build_adjacency_list(
            df,
            id_column="id",
            parent_column="parent_id",
            cache_key="my_hierarchy",  # Caching key
        )

        # Find all cycles
        cycles = self.find_all_cycles(adj)

        # Find hierarchy cycles (parent-child relationships)
        child_to_parent = self.build_child_to_parent(df, "id", "parent_id")
        hierarchy_cycles = self.find_hierarchy_cycles(
            child_to_parent,
            max_depth=1000,
        )

        # Compute node depths
        depths = self.compute_node_depths(adj, roots=["root"])

        # Topological sort
        sorted_nodes = self.topological_sort(adj)

        # Clear cache
        self.clear_adjacency_cache()

Covariance Computation Optimization

Efficient covariance computation for multivariate methods like Mahalanobis distance on large datasets.

IncrementalCovariance

Incremental covariance using Welford's online algorithm:

from truthound.validators.optimization import IncrementalCovariance

# Incremental covariance for 10 features
cov = IncrementalCovariance(n_features=10)

# Update with streaming data
for batch in data_stream:
    cov.update_batch(batch)  # Batch update

# Single sample update
cov.update(sample)

# Get results
result = cov.get_result(regularization=1e-6)
print(result.mean)       # Mean vector
print(result.covariance) # Covariance matrix
print(result.n_samples)  # Sample count

WoodburyCovariance

Woodbury matrix identity for efficient rank-k updates:

from truthound.validators.optimization import WoodburyCovariance

# Create from data
cov = WoodburyCovariance.from_data(
    training_data,
    regularization=1e-6,
)

# Efficient sample add/remove
cov.add_sample(new_sample)
cov.remove_sample(old_sample)

# Compute Mahalanobis distance
distance = cov.mahalanobis(query_point)
distances = cov.mahalanobis_batch(query_points)

RobustCovarianceEstimator

MCD (Minimum Covariance Determinant) estimation robust to outliers:

from truthound.validators.optimization import RobustCovarianceEstimator

estimator = RobustCovarianceEstimator(
    contamination=0.1,    # Expected outlier ratio
    n_subsamples=10,      # Number of subsamples
    subsample_size=500,   # Each subsample size
    random_state=42,
)

result = estimator.fit(large_data)
# result.is_robust = True

BatchCovarianceMixin

Mixin for batch covariance computation in validators:

from truthound.validators.optimization import BatchCovarianceMixin

class MahalanobisValidator(BaseValidator, BatchCovarianceMixin):
    def validate(self, data):
        # Automatically select optimal method
        result = self.compute_covariance_auto(
            data,
            use_robust=True,
        )

        # Compute Mahalanobis distances
        distances = self.compute_mahalanobis_distances(data, result)

        # Create Woodbury covariance (for efficient updates)
        woodbury = self.create_woodbury_covariance(data)

Configuration Attributes:

Attribute Default Description
_batch_size 10000 Batch size for incremental processing
_robust_threshold 5000 Use subsampled robust estimation above this
_regularization 1e-6 Regularization parameter

Geographic Operations Optimization

Vectorized geospatial computations for improved distance and polygon validation performance.

VectorizedGeoMixin

from truthound.validators.optimization import (
    VectorizedGeoMixin,
    DistanceUnit,
    BoundingBox,
)

class GeoValidator(BaseValidator, VectorizedGeoMixin):
    def validate(self, coords):
        lats = coords[:, 0]
        lons = coords[:, 1]

        # Vectorized Haversine distance
        distances = self.haversine_vectorized(
            lat1=lats,
            lon1=lons,
            lat2=target_lat,
            lon2=target_lon,
            unit=DistanceUnit.KILOMETERS,
        )

        # Pairwise distance matrix
        dist_matrix = self.pairwise_distances(
            lats1, lons1, lats2, lons2,
            unit=DistanceUnit.METERS,
        )

        # Memory-efficient chunked computation
        for start, end, chunk in self.pairwise_distances_chunked(
            lats1, lons1, lats2, lons2,
            chunk_size=50000,
        ):
            process_chunk(chunk)

        # Find nearest point
        idx, dist = self.nearest_point(
            query_lat, query_lon, lats, lons,
        )

        # k-nearest neighbors
        indices, distances = self.k_nearest_points(
            query_lat, query_lon, lats, lons, k=5,
        )

        # Find points within radius
        indices, distances = self.points_within_radius(
            center_lat, center_lon, lats, lons,
            radius=100,
            unit=DistanceUnit.KILOMETERS,
        )

        # Bearing calculation
        bearings = self.bearing_vectorized(lat1, lon1, lat2, lon2)

        # Destination point calculation
        dest_lat, dest_lon = self.destination_point(
            lat, lon,
            bearing=45,
            distance=100,
            unit=DistanceUnit.KILOMETERS,
        )

        # Create bounding box
        bbox = self.create_bounding_box(
            center_lat, center_lon,
            radius=50,
            unit=DistanceUnit.KILOMETERS,
        )

        # Filter by bounding box
        mask, filtered_lats, filtered_lons = self.filter_by_bounding_box(
            lats, lons, bbox,
        )

        # Coordinate validation
        valid_mask = self.validate_coordinates(lats, lons)

DistanceUnit

Supported distance units:

Unit Earth Radius
METERS 6,371,000 m
KILOMETERS 6,371 km
MILES 3,958.8 mi
NAUTICAL_MILES 3,440.1 nm

SpatialIndexMixin

Efficient spatial queries using BallTree:

from truthound.validators.optimization import SpatialIndexMixin

class IndexedGeoValidator(BaseValidator, SpatialIndexMixin):
    def setup(self, reference_coords):
        # Build spatial index
        self.build_spatial_index(
            lats=reference_coords[:, 0],
            lons=reference_coords[:, 1],
            leaf_size=40,
        )

    def validate(self, query_coords):
        # Query within radius
        results = self.query_radius(
            query_lats=query_coords[:, 0],
            query_lons=query_coords[:, 1],
            radius_km=10,
        )

        # k-nearest neighbor query
        distances_km, indices = self.query_nearest(
            query_lats=query_coords[:, 0],
            query_lons=query_coords[:, 1],
            k=5,
        )

        # Clear index
        self.clear_spatial_index()

Aggregation Optimization

Leverage Polars' lazy evaluation for memory-efficient aggregation operations.

LazyAggregationMixin

from truthound.validators.optimization import LazyAggregationMixin
import polars as pl

class CrossTableValidator(BaseValidator, LazyAggregationMixin):
    def validate(self, orders, order_items):
        # Lazy aggregation
        result = self.aggregate_lazy(
            lf=order_items.lazy(),
            group_by=["order_id"],
            agg_exprs=[
                pl.col("quantity").sum().alias("total_qty"),
                pl.col("price").sum().alias("total_price"),
            ],
            cache_key="order_totals",  # Caching
        )

        # Join and aggregate in one operation
        result = self.aggregate_with_join(
            left=orders.lazy(),
            right=order_items.lazy(),
            left_on="order_id",
            right_on="order_id",
            agg_exprs=[pl.col("quantity").sum()],
            how="left",
        )

        # Streaming join for large tables
        result = self.streaming_aggregate_join(
            left=orders.lazy(),
            right=order_items.lazy(),
            join_key="order_id",
            agg_exprs=[pl.col("quantity").sum()],
            slice_size=100000,
        )

        # Compare aggregate values
        mismatches = self.compare_aggregates(
            source=orders,
            aggregated=result,
            key_column="order_id",
            source_column="expected_total",
            agg_column="total_qty",
            tolerance=0.01,
        )

        # Incremental aggregation
        updated = self.incremental_aggregate(
            existing=previous_result,
            new_data=new_items.lazy(),
            group_by="order_id",
            sum_columns=["quantity", "price"],
            count_column="item_count",
        )

        # Window aggregation
        with_windows = self.window_aggregate(
            lf=order_items.lazy(),
            partition_by=["order_id"],
            agg_exprs=[
                pl.col("quantity").sum().alias("order_total"),
                pl.col("price").mean().alias("order_avg_price"),
            ],
        )

        # Semi-join filter
        filtered = self.semi_join_filter(
            main=orders.lazy(),
            filter_by=active_orders.lazy(),
            on="order_id",
            anti=False,  # True for anti-join
        )

        # Multi-table aggregation
        result = self.multi_table_aggregate(
            tables={
                "orders": orders.lazy(),
                "items": order_items.lazy(),
                "products": products.lazy(),
            },
            joins=[
                ("orders", "items", ["order_id"]),
                ("items", "products", ["product_id"]),
            ],
            final_agg=[
                pl.col("quantity").sum(),
                pl.col("unit_price").mean(),
            ],
            final_group_by="category",
        )

        # Clear cache
        self.clear_aggregation_cache()

AggregationExpressionBuilder

Fluent interface for building aggregation expressions:

from truthound.validators.optimization import AggregationExpressionBuilder

builder = AggregationExpressionBuilder()

exprs = (
    builder
    .sum("quantity", alias="total_qty")
    .mean("price", alias="avg_price")
    .min("created_at", alias="first_order")
    .max("created_at", alias="last_order")
    .std("price", alias="price_std")
    .count(alias="order_count")
    .n_unique("customer_id", alias="unique_customers")
    .first("status")
    .last("updated_at")
    .custom(pl.col("discount").filter(pl.col("discount") > 0).mean())
    .build()
)

# Usage
result = lf.group_by("category").agg(exprs)

Validator Profiling

Measure and analyze validator execution performance.

ProfilerConfig

Profiler configuration:

from truthound.validators.optimization import (
    ProfilerConfig,
    ProfilerMode,
)

# Basic configuration (timing only)
config = ProfilerConfig.basic()

# Standard configuration (timing + memory)
config = ProfilerConfig()  # Default

# Detailed configuration (with snapshots)
config = ProfilerConfig.detailed()

# Diagnostic configuration (maximum detail)
config = ProfilerConfig.diagnostic()

# Custom configuration
config = ProfilerConfig(
    mode=ProfilerMode.STANDARD,
    track_memory=True,
    track_gc=True,
    track_throughput=True,
    record_snapshots=False,
    max_snapshots=1000,
    memory_warning_mb=100,
    memory_critical_mb=500,
)

ValidatorProfiler

from truthound.validators.optimization import (
    ValidatorProfiler,
    ProfilerConfig,
)

# Create profiler
profiler = ValidatorProfiler(config=ProfilerConfig.detailed())

# Start session
profiler.start_session("validation_run_1", attributes={"env": "prod"})

# Profile validators
for validator in validators:
    with profiler.profile(validator, rows_processed=100000) as ctx:
        issues = validator.validate(lf)
        ctx.set_issue_count(len(issues))
        ctx.add_attribute("columns", ["a", "b", "c"])

# End session
session = profiler.end_session()

# Analyze results
print(session.summary())
print(session.to_json())

# Slowest validators
slowest = profiler.get_slowest_validators(n=10)
# [("SlowValidator", 150.5), ("AnotherValidator", 100.2), ...]

# Memory-intensive validators
memory_heavy = profiler.get_memory_intensive_validators(n=10)

# Overall summary
summary = profiler.summary()
# {
#     "total_validators": 15,
#     "total_executions": 150,
#     "total_issues": 2500,
#     "total_time_ms": 5000.5,
#     "completed_sessions": 3,
#     "current_session_active": False,
#     "memory_tracking_available": True,
# }

Convenience Functions

from truthound.validators.optimization import (
    profile_validator,
    profiled,
    get_default_profiler,
)

# Context manager
with profile_validator(my_validator, rows_processed=10000) as ctx:
    issues = my_validator.validate(lf)
    ctx.set_issue_count(len(issues))

print(ctx.metrics.timing.mean_ms)

# Decorator
class MyValidator(Validator):
    @profiled(track_issues=True)
    def validate(self, lf):
        return [issue1, issue2]

# Global profiler
profiler = get_default_profiler()

ValidatorMetrics

Collected metrics:

metrics = profiler.get_metrics("MyValidator")

# Timing metrics
print(metrics.timing.mean_ms)    # Mean execution time
print(metrics.timing.median_ms)  # Median
print(metrics.timing.p95_ms)     # 95th percentile
print(metrics.timing.p99_ms)     # 99th percentile
print(metrics.timing.std_ms)     # Standard deviation

# Memory metrics
print(metrics.memory.mean_peak_mb)       # Mean peak memory
print(metrics.memory.max_peak_mb)        # Maximum peak memory
print(metrics.memory.total_gc_collections)  # GC collection count

# Throughput metrics
print(metrics.throughput.mean_rows_per_sec)  # Rows per second
print(metrics.throughput.total_rows)         # Total rows processed

# Issue metrics
print(metrics.total_issues)  # Total issues found
print(metrics.mean_issues)   # Mean issues
print(metrics.error_counts)  # Error counts

ProfilingReport

Report generation:

from truthound.validators.optimization import ProfilingReport

report = ProfilingReport(profiler)

# Text summary
print(report.text_summary())
# ============================================================
# VALIDATOR PROFILING REPORT
# ============================================================
# Total Validators: 15
# Total Executions: 150
# Total Issues Found: 2500
# Total Time: 5000.50ms
#
# ------------------------------------------------------------
# TOP 10 SLOWEST VALIDATORS (by mean execution time)
# ------------------------------------------------------------
#  1. SlowValidator: 150.50ms
#  2. AnotherValidator: 100.20ms
# ...

# HTML report
html = report.html_report()
with open("profile_report.html", "w") as f:
    f.write(html)

# Prometheus format export
prometheus_metrics = profiler.to_prometheus()
# # HELP validator_execution_duration_ms Validator execution duration
# # TYPE validator_execution_duration_ms gauge
# validator_execution_duration_ms_mean{validator="MyValidator",category="completeness"} 150.500
# ...

Integration Example

Example using all optimization features together:

import polars as pl
from truthound.validators.optimization import (
    # DAG orchestration
    ValidatorDAG,
    ParallelExecutionStrategy,
    # Profiling
    ValidatorProfiler,
    ProfilerConfig,
    # Mixins
    GraphTraversalMixin,
    BatchCovarianceMixin,
    VectorizedGeoMixin,
    LazyAggregationMixin,
)
from truthound.validators import NullValidator, RangeValidator


# Define custom validator
class OptimizedHierarchyValidator(
    BaseValidator,
    GraphTraversalMixin,
    BatchCovarianceMixin,
):
    def validate(self, df):
        # Graph cycle detection
        adj = self.build_adjacency_list(df, "id", "parent_id")
        cycles = self.find_all_cycles(adj)

        # Covariance for outlier detection
        numeric_data = df.select(pl.col(pl.Float64)).to_numpy()
        cov_result = self.compute_covariance_auto(numeric_data, use_robust=True)
        distances = self.compute_mahalanobis_distances(numeric_data, cov_result)

        return []


# Configure profiler
profiler = ValidatorProfiler(config=ProfilerConfig.detailed())
profiler.start_session("optimized_validation")

# Build DAG
dag = ValidatorDAG()
dag.add_validators([
    NullValidator(),
    RangeValidator(min_value=0),
    OptimizedHierarchyValidator(),
])

# Create and execute plan
plan = dag.build_execution_plan()
print(plan.get_summary())

# Parallel execution
strategy = ParallelExecutionStrategy(max_workers=4)

df = pl.DataFrame({"id": [1, 2, 3], "parent_id": [None, 1, 2], "value": [10, 20, 30]})

with profiler.profile(plan, rows_processed=len(df)) as ctx:
    result = plan.execute(df.lazy(), strategy)
    ctx.set_issue_count(len(result.all_issues))

# End session and report
session = profiler.end_session()
print(session.to_json())

API Reference

orchestrator Module

Class/Function Description
ValidatorDAG Validator dependency DAG
ValidatorNode Validator node wrapper
ValidatorPhase Execution phase enum
ExecutionPlan Execution plan
ExecutionLevel Group of validators that can run in parallel
ExecutionResult Execution result
ExecutionStrategy Execution strategy abstract class
SequentialExecutionStrategy Sequential execution strategy
ParallelExecutionStrategy Parallel execution strategy
AdaptiveExecutionStrategy Adaptive execution strategy
create_execution_plan() Convenience function for creating execution plan
execute_validators() Convenience function for executing validators

graph Module

Class/Function Description
IterativeDFS Iterative depth-first search
TarjanSCC Tarjan SCC algorithm
TopologicalSort Topological sort
GraphTraversalMixin Graph traversal mixin
CycleInfo Cycle info dataclass
NodeState Node state enum

covariance Module

Class/Function Description
IncrementalCovariance Welford's incremental covariance
WoodburyCovariance Woodbury update covariance
RobustCovarianceEstimator MCD-based robust estimation
BatchCovarianceMixin Batch covariance mixin
CovarianceResult Covariance result dataclass

geo Module

Class/Function Description
VectorizedGeoMixin Vectorized geographic operations mixin
SpatialIndexMixin Spatial indexing mixin
DistanceUnit Distance unit enum
BoundingBox Bounding box dataclass

aggregation Module

Class/Function Description
LazyAggregationMixin Lazy aggregation mixin
AggregationExpressionBuilder Aggregation expression builder
AggregationResult Aggregation result dataclass
JoinStrategy Join strategy configuration

profiling Module

Class/Function Description
ValidatorProfiler Main profiler class
ProfilerConfig Profiler configuration
ProfilerMode Profiler mode enum
ValidatorMetrics Validator metrics
TimingMetrics Timing metrics
MemoryMetrics Memory metrics
ThroughputMetrics Throughput metrics
ProfilingSession Profiling session
ExecutionSnapshot Execution snapshot
ProfilingReport Report generator
profile_validator() Profiling context manager
profiled() Profiling decorator