Async Execution¶
A system for asynchronous checkpoint execution. Supports non-blocking execution suitable for high-throughput workloads.
Overview¶
┌─────────────────────────────────────────────────────────────────┐
│ AsyncCheckpointRunner │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Checkpoint 1 │ │ Checkpoint 2 │ │ Checkpoint 3 │ │
│ │ (Triggers) │ │ (Triggers) │ │ (Triggers) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Async Execution Pool (Semaphore) │ │
│ │ │ │
│ │ Task 1 ──┐ │ │
│ │ Task 2 ──┼── await gather(*) ──▶ Results Queue │ │
│ │ Task 3 ──┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Core Classes¶
AsyncBaseAction¶
Base class for asynchronous actions.
from truthound.checkpoint.async_base import AsyncBaseAction, ActionConfig
class MyAsyncAction(AsyncBaseAction[MyConfig]):
action_type = "my_async_action"
@classmethod
def _default_config(cls) -> MyConfig:
return MyConfig()
async def _execute_async(self, checkpoint_result) -> ActionResult:
async with aiohttp.ClientSession() as session:
async with session.post(self.url, json=payload) as resp:
return ActionResult(
status=ActionStatus.SUCCESS,
action_name=self.name,
action_type=self.action_type,
)
Key features:
- Automatic timeout (asyncio.wait_for)
- Exponential backoff retry
- Cancellation support (CancelledError)
- notify_on condition checking
AsyncBaseTrigger¶
Base class for asynchronous triggers.
from truthound.checkpoint.async_base import AsyncBaseTrigger, TriggerConfig
class MyAsyncTrigger(AsyncBaseTrigger[MyTriggerConfig]):
trigger_type = "my_async_trigger"
@classmethod
def _default_config(cls) -> MyTriggerConfig:
return MyTriggerConfig()
async def should_trigger_async(self) -> TriggerResult:
# Asynchronous condition check
message = await self._consumer.poll()
return TriggerResult(should_run=message is not None)
Protocols¶
AsyncExecutable¶
from truthound.checkpoint.async_base import AsyncExecutable
@runtime_checkable
class AsyncExecutable(Protocol):
async def execute_async(self, checkpoint_result) -> ActionResult:
...
SyncExecutable¶
from truthound.checkpoint.async_base import SyncExecutable
@runtime_checkable
class SyncExecutable(Protocol):
def execute(self, checkpoint_result) -> ActionResult:
...
SyncActionAdapter¶
Executes synchronous actions in an asynchronous context.
from truthound.checkpoint.async_base import SyncActionAdapter, adapt_to_async
from truthound.checkpoint.actions import SlackNotification
# Wrap synchronous action
sync_action = SlackNotification(webhook_url="...")
async_action = SyncActionAdapter(sync_action)
# Or use adapt_to_async
async_action = adapt_to_async(sync_action)
# Execute asynchronously (uses ThreadPoolExecutor)
await async_action.execute_async(checkpoint_result)
adapt_to_async Function¶
from concurrent.futures import ThreadPoolExecutor
# Use custom executor
executor = ThreadPoolExecutor(max_workers=8)
async_action = adapt_to_async(sync_action, executor=executor)
Execution Strategies (3 Types)¶
SequentialStrategy¶
Executes actions sequentially.
from truthound.checkpoint.async_base import (
SequentialStrategy,
AsyncExecutionContext,
)
strategy = SequentialStrategy()
context = AsyncExecutionContext(cancel_on_first_error=True)
results = await strategy.execute(
actions=[action1, action2, action3],
checkpoint_result=result,
context=context,
)
Characteristics:
- Order guaranteed
- Can abort on error (cancel_on_first_error)
- Suitable for actions with dependencies
ConcurrentStrategy¶
Executes all actions concurrently.
from truthound.checkpoint.async_base import ConcurrentStrategy
strategy = ConcurrentStrategy(max_concurrency=5)
results = await strategy.execute(
actions=actions,
checkpoint_result=result,
)
Characteristics:
- Parallel execution
- Concurrency limit configurable (max_concurrency)
- Suitable for independent actions
PipelineStrategy¶
Executes actions in pipeline stages.
from truthound.checkpoint.async_base import PipelineStrategy
# 3 stages: [0,1] → [2] → [3,4]
strategy = PipelineStrategy(stages=[[0, 1], [2], [3, 4]])
results = await strategy.execute(
actions=[a0, a1, a2, a3, a4],
checkpoint_result=result,
)
Characteristics: - Parallel execution within stages - Sequential execution between stages - Suitable for complex dependency graphs
AsyncExecutionContext¶
Asynchronous execution context.
from truthound.checkpoint.async_base import AsyncExecutionContext
context = AsyncExecutionContext(
executor=ThreadPoolExecutor(max_workers=4), # For sync actions
semaphore=asyncio.Semaphore(10), # Concurrency limit
timeout=30.0, # Default timeout
cancel_on_first_error=False, # Abort on error
)
async with context:
results = await strategy.execute(actions, result, context)
AsyncCheckpointRunner¶
Asynchronous checkpoint runner.
from truthound.checkpoint.async_runner import (
AsyncCheckpointRunner,
AsyncRunnerConfig,
)
# Configuration
config = AsyncRunnerConfig(
max_concurrent_checkpoints=10, # Concurrent checkpoints
trigger_poll_interval=1.0, # Trigger polling interval
result_queue_size=1000, # Result queue size
stop_on_error=False, # Stop on error
max_consecutive_failures=10, # Maximum consecutive failures
graceful_shutdown_timeout=30.0, # Shutdown timeout
)
# Create runner
runner = AsyncCheckpointRunner(
config=config,
result_callback=lambda r: print(f"Completed: {r.checkpoint_name}"),
error_callback=lambda e: print(f"Error: {e}"),
)
# Register checkpoints
runner.add_checkpoint(checkpoint1)
runner.add_checkpoint(checkpoint2)
# Start background execution
await runner.start_async()
# Stream results
async for result in runner.iter_results_async():
print(f"Result: {result.status}")
# Shutdown
await runner.stop_async()
Single Execution¶
# Execute by name
result = await runner.run_once_async("my_checkpoint")
# Execute by object
result = await runner.run_once_async(checkpoint, context={"key": "value"})
Execute All¶
Query Statistics¶
stats = runner.get_stats()
print(f"Running: {stats['running']}")
print(f"Checkpoints: {stats['checkpoints']}")
print(f"Pending tasks: {stats['pending_tasks']}")
print(f"Queued results: {stats['queued_results']}")
Convenience Functions¶
run_checkpoint_async¶
Executes a single checkpoint asynchronously.
from truthound.checkpoint.async_runner import run_checkpoint_async
# Execute by name (looks up in registry)
result = await run_checkpoint_async("my_checkpoint")
# Execute by object
result = await run_checkpoint_async(checkpoint, context={"key": "value"})
run_checkpoints_parallel¶
Executes multiple checkpoints in parallel.
from truthound.checkpoint.async_runner import run_checkpoints_parallel
results = await run_checkpoints_parallel(
checkpoints=[cp1, cp2, cp3],
max_concurrent=5,
context={"shared": "data"},
on_complete=lambda r: print(f"Done: {r.checkpoint_name}"),
)
CheckpointPool¶
Worker pool for high-throughput scenarios.
from truthound.checkpoint.async_runner import CheckpointPool
async with CheckpointPool(workers=10) as pool:
# Single submission
result = await pool.submit(checkpoint)
# Multiple submissions
results = await pool.submit_many([cp1, cp2, cp3])
Manual Management¶
pool = CheckpointPool(
workers=10,
result_callback=lambda r: print(f"Completed: {r.checkpoint_name}"),
)
await pool.start()
# Submit work...
result = await pool.submit(checkpoint)
await pool.stop()
Decorators¶
@with_retry¶
Adds retry logic.
from truthound.checkpoint.async_base import with_retry
@with_retry(max_retries=3, delay=1.0, backoff=2.0)
async def fetch_data():
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
@with_timeout¶
Adds timeout.
from truthound.checkpoint.async_base import with_timeout
@with_timeout(seconds=30.0)
async def slow_operation():
await asyncio.sleep(60) # Raises TimeoutError
@with_semaphore¶
Limits concurrency.
from truthound.checkpoint.async_base import with_semaphore
semaphore = asyncio.Semaphore(5)
@with_semaphore(semaphore)
async def limited_operation():
# Maximum 5 concurrent executions
await do_work()
Complete Example¶
import asyncio
from truthound.checkpoint import Checkpoint
from truthound.checkpoint.async_runner import (
AsyncCheckpointRunner,
AsyncRunnerConfig,
run_checkpoints_parallel,
)
from truthound.checkpoint.async_base import (
AsyncBaseAction,
ConcurrentStrategy,
adapt_to_async,
)
from truthound.checkpoint.actions import SlackNotification
from truthound.checkpoint.triggers import ScheduleTrigger
# Custom async action
class AsyncWebhookAction(AsyncBaseAction):
action_type = "async_webhook"
async def _execute_async(self, result):
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(
self._config.url,
json=result.to_dict(),
) as resp:
return ActionResult(
status=ActionStatus.SUCCESS if resp.ok else ActionStatus.ERROR,
action_name=self.name,
action_type=self.action_type,
)
# Convert synchronous action to async
slack = SlackNotification(webhook_url="${SLACK_WEBHOOK}")
async_slack = adapt_to_async(slack)
# Create checkpoints
checkpoint1 = Checkpoint(
name="hourly_validation",
data_source="data.csv",
validators=["null"],
actions=[async_slack],
)
checkpoint1.add_trigger(ScheduleTrigger(interval_hours=1))
checkpoint2 = Checkpoint(
name="daily_data_validation",
data_source="data.parquet",
validators=["range", "distribution"],
)
async def main():
# 1. Parallel execution
results = await run_checkpoints_parallel(
checkpoints=[checkpoint1, checkpoint2],
max_concurrent=5,
on_complete=lambda r: print(f"Done: {r.checkpoint_name}"),
)
# 2. Trigger-based execution with runner
runner = AsyncCheckpointRunner(
config=AsyncRunnerConfig(max_concurrent_checkpoints=10),
result_callback=lambda r: print(f"Completed: {r.status}"),
)
runner.add_checkpoint(checkpoint1)
runner.add_checkpoint(checkpoint2)
await runner.start_async()
# Stream results
async for result in runner.iter_results_async(timeout=5.0):
print(f"{result.checkpoint_name}: {result.status}")
# Stop after 10 results
if runner._result_queue.qsize() >= 10:
break
await runner.stop_async()
asyncio.run(main())
Mixed Sync/Async Usage¶
Synchronous and asynchronous actions can be used in the same checkpoint.
from truthound.checkpoint.async_base import adapt_to_async, ConcurrentStrategy
# Synchronous action
sync_action = SlackNotification(webhook_url="...")
# Asynchronous action
async_action = MyAsyncWebhook(url="...")
# Convert all to async
actions = [
adapt_to_async(sync_action), # Runs in ThreadPoolExecutor
async_action, # Native async execution
]
# Execute concurrently
strategy = ConcurrentStrategy(max_concurrency=5)
results = await strategy.execute(actions, checkpoint_result)
Performance Considerations¶
| Scenario | Recommended Configuration |
|---|---|
| I/O bound | ConcurrentStrategy + high max_concurrency |
| CPU bound | SequentialStrategy or low max_concurrency |
| Has dependencies | PipelineStrategy or SequentialStrategy |
| High throughput | CheckpointPool + workers=CPU_COUNT * 2 |
| Memory limited | Limit result_queue_size + handle backpressure |