Skip to content

Distributed Processing

This document describes the distributed profiling system for large-scale datasets.

Overview

The distributed processing system implemented in src/truthound/profiler/distributed.py supports Spark, Dask, Ray, and Local backends with a unified interface.

DistributedBackend

class DistributedBackend(str, Enum):
    """Distributed processing backends"""

    LOCAL = "local"    # Local multithread/multiprocess
    SPARK = "spark"    # Apache Spark
    DASK = "dask"      # Dask Distributed
    RAY = "ray"        # Ray
    AUTO = "auto"      # Automatic detection

PartitionStrategy

class PartitionStrategy(str, Enum):
    """Partition strategies"""

    ROW_BASED = "row_based"       # Row-based partitioning
    COLUMN_BASED = "column_based" # Column-based partitioning
    HYBRID = "hybrid"             # Hybrid (row + column)
    HASH = "hash"                 # Hash-based partitioning

Backend-Specific Configuration

SparkConfig

@dataclass
class SparkConfig:
    """Spark configuration"""

    app_name: str = "truthound-profiler"
    master: str = "local[*]"
    executor_memory: str = "4g"
    executor_cores: int = 2
    num_executors: int = 4
    spark_config: dict[str, str] = field(default_factory=dict)

DaskConfig

@dataclass
class DaskConfig:
    """Dask configuration"""

    scheduler: str = "threads"  # threads, processes, synchronous
    num_workers: int | None = None  # None = number of CPU cores
    memory_limit: str = "4GB"
    dashboard_address: str = ":8787"

RayConfig

@dataclass
class RayConfig:
    """Ray configuration"""

    address: str | None = None  # None = local cluster
    num_cpus: int | None = None
    num_gpus: int | None = None
    object_store_memory: int | None = None

DistributedProfiler

A unified distributed profiler interface.

from truthound.profiler.distributed import DistributedProfiler, DistributedBackend

# Automatic backend selection
profiler = DistributedProfiler.create(backend=DistributedBackend.AUTO)

# Spark backend
profiler = DistributedProfiler.create(
    backend=DistributedBackend.SPARK,
    config=SparkConfig(master="spark://master:7077"),
)

# Dask backend
profiler = DistributedProfiler.create(
    backend=DistributedBackend.DASK,
    config=DaskConfig(scheduler="distributed"),
)

# Execute profiling
profile = profiler.profile("hdfs://data/large.parquet")

LocalBackend

Local multithread/multiprocess backend.

from truthound.profiler.distributed import LocalBackend

backend = LocalBackend(
    num_workers=4,
    use_processes=True,  # True = processes, False = threads
)

# Parallel column profiling
results = backend.profile_columns_parallel(lf, columns)

SparkBackend

Apache Spark backend.

from truthound.profiler.distributed import SparkBackend, SparkConfig

config = SparkConfig(
    master="spark://master:7077",
    executor_memory="8g",
    num_executors=10,
)

backend = SparkBackend(config)

# Profile Spark DataFrame
profile = backend.profile(spark_df)

# Profile HDFS file
profile = backend.profile_file("hdfs://data/large.parquet")

DaskBackend

Dask Distributed backend.

from truthound.profiler.distributed import DaskBackend, DaskConfig

config = DaskConfig(
    scheduler="distributed",
    num_workers=8,
    memory_limit="8GB",
)

backend = DaskBackend(config)

# Profile Dask DataFrame
profile = backend.profile(dask_df)

# Profile partitioned files
profile = backend.profile_file("s3://bucket/data/*.parquet")

RayBackend

Ray backend.

from truthound.profiler.distributed import RayBackend, RayConfig

config = RayConfig(
    address="ray://cluster:10001",
    num_cpus=16,
)

backend = RayBackend(config)

# Profile Ray Dataset
profile = backend.profile(ray_dataset)

BackendRegistry

A pluggable backend registry.

from truthound.profiler.distributed import BackendRegistry

# List registered backends
backends = BackendRegistry.list()  # ["local", "spark", "dask", "ray"]

# Create backend instance
backend = BackendRegistry.create("spark", config=spark_config)

# Register custom backend
@BackendRegistry.register("my_backend")
class MyCustomBackend:
    def profile(self, data):
        # Custom profiling logic
        pass

Partition Strategies

ROW_BASED - Row-Based

from truthound.profiler.distributed import RowBasedPartitioner

partitioner = RowBasedPartitioner(num_partitions=10)
partitions = partitioner.partition(lf)

# Each partition contains 1/10 of total rows

COLUMN_BASED - Column-Based

from truthound.profiler.distributed import ColumnBasedPartitioner

partitioner = ColumnBasedPartitioner()
column_groups = partitioner.partition(lf, num_workers=4)

# Each worker processes 1/4 of columns

HYBRID - Hybrid

from truthound.profiler.distributed import HybridPartitioner

partitioner = HybridPartitioner(
    row_partitions=4,
    column_partitions=2,
)
# 4 x 2 = 8 partitions

Automatic Backend Selection

from truthound.profiler.distributed import auto_select_backend

# Automatic selection based on environment
backend = auto_select_backend()

# Selection logic:
# 1. Spark session exists → Spark
# 2. Dask client exists → Dask
# 3. Ray initialized → Ray
# 4. Default → Local

Result Merging

Merge distributed processing results.

from truthound.profiler.distributed import ProfileMerger

merger = ProfileMerger()

# Merge partition profiles
partial_profiles = [profile1, profile2, profile3, profile4]
merged = merger.merge(partial_profiles)

# Statistics merge strategy:
# - count: sum
# - mean: weighted average
# - min/max: global min/max
# - quantiles: approximate merge

Monitoring

The distributed monitoring system provides comprehensive observability for distributed profiling operations.

Overview

from truthound.profiler.distributed.monitoring import (
    DistributedMonitor,
    MonitorConfig,
    MonitorFactory,
)

# Quick start with factory presets
monitor = MonitorFactory.with_console()

# Or create with full configuration
config = MonitorConfig.production()
monitor = DistributedMonitor(config)

# Use with profiler
profiler = DistributedProfiler.create(
    backend="spark",
    monitor=monitor,
)

profile = profiler.profile(data)

MonitorConfig

Configuration presets for different use cases:

from truthound.profiler.distributed.monitoring import MonitorConfig

# Minimal overhead - basic tracking only
config = MonitorConfig.minimal()

# Standard features (default)
config = MonitorConfig.standard()

# All features enabled
config = MonitorConfig.full()

# Production-optimized
config = MonitorConfig.production()

Configuration Options

@dataclass
class MonitorConfig:
    enable_monitoring: bool = True
    enable_task_tracking: bool = True
    enable_progress_aggregation: bool = True
    enable_health_monitoring: bool = True
    enable_metrics_collection: bool = True
    task_tracker: TaskTrackerConfig = ...
    health_check: HealthCheckConfig = ...
    metrics: MetricsConfig = ...
    callbacks: CallbackConfig = ...
    log_level: str = "INFO"

Task Tracking

Track task lifecycle across distributed workers:

from truthound.profiler.distributed.monitoring import TaskTracker, TaskTrackerConfig

config = TaskTrackerConfig(
    max_history_size=1000,
    timeout_seconds=3600,
    max_retries=3,
    enable_progress_tracking=True,
)

tracker = TaskTracker(config=config)

# Submit and track tasks
task_id = tracker.submit_task(
    task_type="profile_partition",
    partition_id=0,
    metadata={"rows": 1000000},
)

tracker.start_task(task_id, worker_id="worker-1")
tracker.update_progress(task_id, progress=0.5, rows_processed=500000)
tracker.complete_task(task_id, result={"stats": {...}})

# Query task state
task = tracker.get_task(task_id)
print(f"Status: {task.state}, Duration: {task.duration_seconds}s")

Progress Aggregation

Aggregate progress from multiple partitions:

from truthound.profiler.distributed.monitoring import (
    DistributedProgressAggregator,
    StreamingProgressAggregator,
)

# Basic aggregator
aggregator = DistributedProgressAggregator(
    on_progress=lambda p: print(f"Overall: {p.percent:.1f}%"),
    milestone_interval=10,  # Emit event every 10%
)

aggregator.set_total_partitions(10)
aggregator.start_partition(0, total_rows=1000000)
aggregator.update_partition(0, progress=0.5, rows_processed=500000)
aggregator.complete_partition(0, rows_processed=1000000)

# Get aggregated progress
progress = aggregator.get_progress()
print(f"Completed: {progress.completed_partitions}/{progress.total_partitions}")
print(f"ETA: {progress.estimated_remaining_seconds:.0f}s")
print(f"Throughput: {progress.rows_per_second:.0f} rows/s")

# Streaming aggregator for real-time UI
streaming = StreamingProgressAggregator(
    on_progress=update_ui,
    interpolation_interval_ms=100,
)

Health Monitoring

Monitor worker health and detect issues:

from truthound.profiler.distributed.monitoring import (
    WorkerHealthMonitor,
    HealthCheckConfig,
    HealthStatus,
)

config = HealthCheckConfig(
    heartbeat_timeout_seconds=30,
    stall_threshold_seconds=60,
    memory_warning_percent=80,
    memory_critical_percent=95,
    cpu_warning_percent=90,
    error_rate_warning=0.1,
    error_rate_critical=0.25,
)

monitor = WorkerHealthMonitor(
    config=config,
    on_event=lambda e: print(f"Health: {e.message}"),
)

# Register and track workers
monitor.register_worker("worker-1")
monitor.record_heartbeat(
    "worker-1",
    cpu_percent=65,
    memory_percent=70,
    active_tasks=3,
)
monitor.record_task_complete("worker-1", duration_seconds=2.5, success=True)

# Check health
health = monitor.get_worker_health("worker-1")
print(f"Status: {health.status}")  # HEALTHY, DEGRADED, UNHEALTHY, CRITICAL

# Get overall system health
overall = monitor.get_overall_health()
unhealthy = monitor.get_unhealthy_workers()
stalled = monitor.check_stalled_workers()

Metrics Collection

Collect and export metrics:

from truthound.profiler.distributed.monitoring import (
    DistributedMetricsCollector,
    MetricsConfig,
)

config = MetricsConfig(
    enable_metrics=True,
    collection_interval_seconds=5.0,
    enable_percentiles=True,
    percentiles=[0.5, 0.75, 0.9, 0.95, 0.99],
    enable_prometheus=True,
    prometheus_port=9090,
)

collector = DistributedMetricsCollector(config=config)

# Record metrics
collector.record_task_duration("profile_partition", 2.5)
collector.record_partition_progress(0, 0.5)
collector.record_worker_metric("worker-1", "cpu_percent", 65)
collector.increment_counter("tasks_completed")

# Get metrics
metrics = collector.get_metrics()
print(f"Tasks: {metrics.tasks_completed}")
print(f"Avg duration: {metrics.avg_task_duration_seconds:.2f}s")
print(f"P99 duration: {metrics.task_duration_p99:.2f}s")

# Export to Prometheus format
prometheus_output = collector.export_prometheus()

Callback System

Flexible callback adapters for various outputs:

from truthound.profiler.distributed.monitoring import (
    ConsoleMonitorCallback,
    ProgressBarCallback,
    LoggingMonitorCallback,
    FileMonitorCallback,
    WebhookMonitorCallback,
    CallbackChain,
    AsyncMonitorCallback,
)

# Console output with colors
console = ConsoleMonitorCallback(
    show_progress=True,
    show_events=True,
    use_colors=True,
)

# Progress bar (tqdm)
progress_bar = ProgressBarCallback(
    desc="Profiling",
    unit="partitions",
)

# Structured logging
logging_cb = LoggingMonitorCallback(
    logger_name="truthound.monitor",
    log_level=logging.INFO,
)

# File output (JSONL)
file_cb = FileMonitorCallback(
    output_path="monitoring.jsonl",
    format="jsonl",  # or "json"
)

# Webhook notifications
webhook = WebhookMonitorCallback(
    url="https://hooks.slack.com/...",
    headers={"Authorization": "Bearer token"},
    event_filter=lambda e: e.severity >= EventSeverity.WARNING,
)

# Combine callbacks
chain = CallbackChain([console, logging_cb, file_cb])

# Async callback for non-blocking operations
async_cb = AsyncMonitorCallback(
    callback=webhook,
    buffer_size=100,
)

Monitor Factory

Create monitors with common configurations:

from truthound.profiler.distributed.monitoring import MonitorFactory

# Minimal - basic tracking only
monitor = MonitorFactory.minimal()

# Console output
monitor = MonitorFactory.with_console(
    show_progress=True,
    use_colors=True,
)

# Logging output
monitor = MonitorFactory.with_logging(
    logger_name="my_app.profiler",
    log_level=logging.INFO,
)

# Production setup
monitor = MonitorFactory.production(
    log_path="/var/log/truthound/monitor.jsonl",
    webhook_url="https://alerts.example.com/webhook",
)

Monitor Registry

Register custom callbacks and presets:

from truthound.profiler.distributed.monitoring import MonitorRegistry

# Register custom callback
@MonitorRegistry.register_callback("my_callback")
class MyCallback(MonitorCallbackAdapter):
    def on_event(self, event: MonitorEvent) -> None:
        # Custom handling
        pass

# Register custom preset
@MonitorRegistry.register_preset("my_preset")
def my_preset(**kwargs) -> DistributedMonitor:
    config = MonitorConfig(...)
    return DistributedMonitor(config)

# Use registered components
monitor = MonitorRegistry.create_monitor("my_preset")
callback = MonitorRegistry.create_callback("my_callback")

Backend Adapters

Backend-specific monitoring adapters:

from truthound.profiler.distributed.monitoring.adapters import (
    LocalMonitorAdapter,
    DaskMonitorAdapter,
    SparkMonitorAdapter,
    RayMonitorAdapter,
)

# Local backend adapter
local_adapter = LocalMonitorAdapter(
    executor=thread_pool_executor,
    poll_interval_seconds=1.0,
)

# Dask adapter
dask_adapter = DaskMonitorAdapter(
    client=dask_client,
    poll_interval_seconds=2.0,
)
dask_adapter.connect()
workers = dask_adapter.get_workers()
metrics = dask_adapter.get_metrics()

# Spark adapter
spark_adapter = SparkMonitorAdapter(
    spark_session=spark,
    poll_interval_seconds=5.0,
)

# Ray adapter
ray_adapter = RayMonitorAdapter(
    address="ray://cluster:10001",
    poll_interval_seconds=5.0,
)

Event Types

Available monitoring event types:

from truthound.profiler.distributed.monitoring import MonitorEventType

# Task lifecycle
MonitorEventType.TASK_SUBMITTED
MonitorEventType.TASK_STARTED
MonitorEventType.TASK_PROGRESS
MonitorEventType.TASK_COMPLETED
MonitorEventType.TASK_FAILED
MonitorEventType.TASK_RETRYING

# Partition events
MonitorEventType.PARTITION_START
MonitorEventType.PARTITION_COMPLETE
MonitorEventType.PARTITION_ERROR

# Progress events
MonitorEventType.PROGRESS_UPDATE
MonitorEventType.PROGRESS_MILESTONE
MonitorEventType.AGGREGATION_COMPLETE

# Worker events
MonitorEventType.WORKER_REGISTERED
MonitorEventType.WORKER_UNREGISTERED
MonitorEventType.WORKER_HEALTHY
MonitorEventType.WORKER_UNHEALTHY
MonitorEventType.WORKER_STALLED
MonitorEventType.WORKER_RECOVERED

# Monitor lifecycle
MonitorEventType.MONITOR_START
MonitorEventType.MONITOR_STOP

Complete Example

from truthound.profiler.distributed import (
    DistributedProfiler,
    DistributedBackend,
    SparkConfig,
)
from truthound.profiler.distributed.monitoring import (
    DistributedMonitor,
    MonitorConfig,
    ConsoleMonitorCallback,
    LoggingMonitorCallback,
    CallbackChain,
)

# Configure monitoring
config = MonitorConfig.production()

# Create callback chain
callbacks = CallbackChain([
    ConsoleMonitorCallback(show_progress=True, use_colors=True),
    LoggingMonitorCallback(logger_name="profiler.monitor"),
])

# Create monitor
monitor = DistributedMonitor(config, callbacks=[callbacks])

# Create profiler with monitoring
profiler = DistributedProfiler.create(
    backend=DistributedBackend.SPARK,
    config=SparkConfig(master="spark://master:7077"),
    monitor=monitor,
)

# Profile with full observability
profile = profiler.profile("hdfs://data/large.parquet")

# Get monitoring statistics
stats = monitor.get_statistics()
print(f"Tasks completed: {stats['tasks_completed']}")
print(f"Average duration: {stats['avg_task_duration']:.2f}s")
print(f"Success rate: {stats['success_rate']:.1%}")

CLI Usage

# Local parallel processing
th profile data.csv --distributed --backend local --workers 4

# Spark processing
th profile hdfs://data/large.parquet --distributed --backend spark \
  --master spark://master:7077

# Dask processing
th profile s3://bucket/data/*.parquet --distributed --backend dask \
  --scheduler distributed

# Automatic backend selection
th profile data.parquet --distributed --backend auto

Environment Variables

Variable Description Default
TRUTHOUND_DISTRIBUTED_BACKEND Default backend local
TRUTHOUND_NUM_WORKERS Number of workers CPU core count
SPARK_HOME Spark home -
DASK_SCHEDULER_ADDRESS Dask scheduler -
RAY_ADDRESS Ray cluster -

Integration Example

from truthound.profiler.distributed import (
    DistributedProfiler,
    DistributedBackend,
    SparkConfig,
    PartitionStrategy,
)

# Profile large data on Spark cluster
config = SparkConfig(
    master="spark://master:7077",
    executor_memory="16g",
    num_executors=20,
    spark_config={
        "spark.sql.shuffle.partitions": "200",
    },
)

profiler = DistributedProfiler.create(
    backend=DistributedBackend.SPARK,
    config=config,
    partition_strategy=PartitionStrategy.HYBRID,
)

# Profile 100GB data
profile = profiler.profile("hdfs://data/100gb_dataset.parquet")

print(f"Rows: {profile.row_count:,}")
print(f"Columns: {profile.column_count}")
print(f"Duration: {profile.profile_duration_ms / 1000:.2f}s")

Performance Tips

Data Size Recommended Backend Recommended Settings
< 1GB Local 4-8 threads
1-10GB Local/Dask Process mode
10-100GB Dask/Spark Cluster
> 100GB Spark Large cluster

Next Steps

  • Sampling - Sampling in distributed environments
  • Caching - Distributed cache sharing