Batch Operations
A batch operation system for efficient processing of large-scale data.
Basic Usage
from common.engines import BatchExecutor, BatchConfig, TruthoundEngine
engine = TruthoundEngine()
config = BatchConfig(batch_size=10000, max_workers=4)
executor = BatchExecutor(engine, config)
result = executor.check_batch(large_dataframe, auto_schema=True)
print(f"Status: {result.status.name}")
print(f"Total chunks: {result.total_chunks}")
print(f"Passed: {result.passed_chunks}")
print(f"Duration: {result.duration_seconds:.2f}s")
Preset Configurations
from common.engines import (
DEFAULT_BATCH_CONFIG, # Default: 1000 rows, 4 workers
PARALLEL_BATCH_CONFIG, # Parallel: 8 workers
SEQUENTIAL_BATCH_CONFIG, # Sequential: single thread
FAIL_FAST_BATCH_CONFIG, # Stop on first failure
LARGE_DATA_BATCH_CONFIG, # Large data: 50000 rows
)
executor = BatchExecutor(engine, config=LARGE_DATA_BATCH_CONFIG)
Execution Strategies
| Strategy |
Description |
Use Case |
SEQUENTIAL |
Process chunks sequentially |
Memory-constrained environments |
PARALLEL |
Process chunks in parallel |
Multi-core systems |
ADAPTIVE |
Automatic selection based on data size |
General usage |
Aggregation Strategies
| Strategy |
Description |
MERGE |
Merge all results into one |
WORST |
Return worst status |
BEST |
Return best status |
MAJORITY |
Return majority status |
FIRST_FAILURE |
Stop and return on first failure |
ALL |
Return all individual results |
Builder Pattern Configuration
from common.engines import BatchConfig, ExecutionStrategy, AggregationStrategy
config = BatchConfig()
config = config.with_batch_size(5000)
config = config.with_max_workers(8)
config = config.with_execution_strategy(ExecutionStrategy.PARALLEL)
config = config.with_aggregation_strategy(AggregationStrategy.MERGE)
config = config.with_fail_fast(True)
config = config.with_timeout(300.0)
Batch Operation Types
# Validation batch
check_result = executor.check_batch(data, auto_schema=True)
# Profiling batch
profile_result = executor.profile_batch(data)
# Learning batch
learn_result = executor.learn_batch(data)
Async Batch Operations
from common.engines import AsyncBatchExecutor, SyncEngineAsyncAdapter
async_engine = SyncEngineAsyncAdapter(TruthoundEngine())
executor = AsyncBatchExecutor(async_engine, config)
async def validate_large_data(data):
result = await executor.check_batch(data, auto_schema=True)
return result
Data Chunkers
from common.engines import RowCountChunker, PolarsChunker, DatasetListChunker
# Row count-based chunking
chunker = RowCountChunker(chunk_size=5000)
# Polars-optimized chunking
chunker = PolarsChunker(chunk_size=10000)
# Dataset list chunking
chunker = DatasetListChunker()
Hooks
from common.engines import (
LoggingBatchHook,
MetricsBatchHook,
CompositeBatchHook,
)
logging_hook = LoggingBatchHook()
metrics_hook = MetricsBatchHook()
composite = CompositeBatchHook([logging_hook, metrics_hook])
executor = BatchExecutor(engine, config, hooks=[composite])
result = executor.check_batch(data)
# Query metrics
print(metrics_hook.chunks_processed)
print(metrics_hook.chunks_failed)
print(metrics_hook.average_chunk_duration_ms)
Exception Handling
from common.engines import (
BatchExecutionError,
ChunkingError,
AggregationError,
)
try:
result = executor.check_batch(data)
except ChunkingError as e:
print(f"Chunking failed: {e}")
except BatchExecutionError as e:
print(f"Batch execution failed: {e}")
except AggregationError as e:
print(f"Result aggregation failed: {e}")