Enterprise Sampling Guide¶
This guide covers Truthound's enterprise-scale sampling capabilities for datasets ranging from 100 million to billions of rows.
Overview¶
Truthound provides specialized sampling strategies designed for enterprise-scale data processing:
- O(1) Memory Footprint: Process any dataset size with constant memory
- Parallel Processing: Multi-threaded block processing with work stealing
- Probabilistic Data Structures: HyperLogLog, Count-Min Sketch, Bloom Filter
- Statistical Guarantees: Configurable confidence levels and error margins
- Time/Memory Budget Awareness: Automatic adaptation to resource constraints
Quick Start¶
from truthound.profiler.enterprise_sampling import (
EnterpriseScaleSampler,
EnterpriseScaleConfig,
sample_large_dataset,
)
# Quick sampling with defaults
result = sample_large_dataset(lf, target_rows=100_000)
print(f"Sampled {result.metrics.sample_size:,} rows")
print(f"Strategy: {result.metrics.strategy_used}")
# Advanced configuration
config = EnterpriseScaleConfig(
target_rows=100_000,
time_budget_seconds=60.0,
quality=SamplingQuality.HIGH,
)
sampler = EnterpriseScaleSampler(config)
result = sampler.sample(lf)
Scale Categories¶
| Category | Row Count | Default Strategy | Typical Use Case |
|---|---|---|---|
| SMALL | < 1M | No sampling | Quick analysis |
| MEDIUM | 1M - 10M | Column-aware | Daily reports |
| LARGE | 10M - 100M | Block sampling | Data warehouse |
| XLARGE | 100M - 1B | Multi-stage | Enterprise analytics |
| XXLARGE | > 1B | Sketches + Multi-stage | Big data platforms |
from truthound.profiler.enterprise_sampling import classify_dataset_scale
scale = classify_dataset_scale(500_000_000) # Returns ScaleCategory.XLARGE
Sampling Strategies¶
1. Block Sampling¶
Divides data into fixed-size blocks and samples from each block proportionally. Ensures even coverage across the dataset.
from truthound.profiler.enterprise_sampling import BlockSamplingStrategy
config = EnterpriseScaleConfig(
target_rows=100_000,
block_size=1_000_000, # 1M rows per block (0 = auto)
)
strategy = BlockSamplingStrategy(config)
result = strategy.sample(lf, base_config)
# Access block-specific metrics
print(f"Blocks processed: {result.metrics.blocks_processed}")
print(f"Time per block: {result.metrics.time_per_block_ms:.1f}ms")
Best for: Datasets 10M-100M rows, when coverage across data is important
2. Multi-Stage Sampling¶
Hierarchical sampling that progressively reduces data in multiple stages. Ideal for billion-row datasets.
from truthound.profiler.enterprise_sampling import MultiStageSamplingStrategy
strategy = MultiStageSamplingStrategy(
config,
num_stages=3, # 3 progressive reduction stages
)
result = strategy.sample(lf, base_config)
print(f"Strategy: {result.metrics.strategy_used}") # "multi_stage(3)"
Reduction formula: Each stage reduces by factor (total_rows / target)^(1/stages)
Best for: Datasets > 100M rows, when quick estimates are acceptable
3. Column-Aware Sampling¶
Analyzes column types and adjusts sample size based on complexity: - Strings: 2.0x sample multiplier (high cardinality) - Categoricals: 0.5x multiplier (low cardinality) - Complex types (List/Struct): 3.0x multiplier - Numeric: 1.0x baseline
from truthound.profiler.enterprise_sampling import ColumnAwareSamplingStrategy
strategy = ColumnAwareSamplingStrategy(config)
result = strategy.sample(lf, base_config)
Best for: Datasets with mixed column types, when accuracy varies by column
4. Progressive Sampling¶
Iteratively increases sample size until convergence. Supports early stopping.
from truthound.profiler.enterprise_sampling import ProgressiveSamplingStrategy
strategy = ProgressiveSamplingStrategy(
config,
convergence_threshold=0.01, # Stop when estimates stabilize within 1%
max_stages=5,
)
result = strategy.sample(lf, base_config)
Best for: Exploratory analysis where quick estimates are needed first
Parallel Block Processing¶
For maximum throughput, use the parallel block sampler with configurable concurrency.
from truthound.profiler.parallel_sampling import (
ParallelBlockSampler,
ParallelSamplingConfig,
sample_parallel,
)
# Quick parallel sampling
result = sample_parallel(lf, target_rows=100_000, max_workers=4)
# Advanced configuration
config = ParallelSamplingConfig(
target_rows=100_000,
max_workers=4, # 0 = auto (CPU count)
enable_work_stealing=True, # Dynamic load balancing
backpressure_threshold=0.75, # GC trigger threshold
chunk_timeout_seconds=30.0, # Per-block timeout
)
sampler = ParallelBlockSampler(config)
result = sampler.sample(lf)
# Access parallel metrics
print(f"Workers used: {result.metrics.workers_used}")
print(f"Parallel speedup: {result.metrics.parallel_speedup:.2f}x")
print(f"Worker utilization: {result.metrics.worker_utilization:.1%}")
Work Stealing¶
Enable work stealing for better load balancing when block processing times vary:
config = ParallelSamplingConfig(
target_rows=100_000,
max_workers=8,
enable_work_stealing=True,
scheduling_policy=SchedulingPolicy.WORK_STEALING,
)
Memory-Aware Scheduling¶
The parallel sampler automatically monitors memory and applies backpressure:
config = ParallelSamplingConfig(
target_rows=100_000,
memory_budget=MemoryBudgetConfig(
max_memory_mb=2048,
gc_threshold_mb=1536,
backpressure_enabled=True,
),
backpressure_threshold=0.75,
)
Probabilistic Data Structures¶
For 10B+ row datasets, use probabilistic sketches for O(1) memory aggregations.
HyperLogLog (Cardinality Estimation)¶
Estimates distinct counts with configurable precision.
from truthound.profiler.sketches import HyperLogLog, HyperLogLogConfig
# Create with specific precision
hll = HyperLogLog(HyperLogLogConfig(precision=14)) # ~16KB, ±0.41% error
# Or create with target error rate
from truthound.profiler.sketches import create_sketch
hll = create_sketch("hyperloglog", target_error=0.01) # Auto-select precision
# Add values
for user_id in user_ids:
hll.add(user_id)
# Or batch add (more efficient)
hll.add_batch(user_ids)
# Get estimate
distinct_count = hll.estimate()
error = hll.standard_error()
print(f"Distinct users: ~{distinct_count:,} (±{error:.2%})")
Precision vs Memory vs Error:
| Precision | Memory | Error |
|---|---|---|
| 10 | ~1KB | ±1.04% |
| 12 | ~4KB | ±0.65% |
| 14 | ~16KB | ±0.41% |
| 16 | ~64KB | ±0.26% |
| 18 | ~256KB | ±0.16% |
Count-Min Sketch (Frequency Estimation)¶
Estimates element frequencies and finds heavy hitters.
from truthound.profiler.sketches import CountMinSketch, CountMinSketchConfig
# Create with specific dimensions
cms = CountMinSketch(CountMinSketchConfig(width=2000, depth=5))
# Or create with target accuracy
cms = create_sketch(
"countmin",
epsilon=0.001, # Error bound (0.1% of total count)
delta=0.01, # 99% confidence
)
# Add values
for item in stream:
cms.add(item)
# Estimate frequency
freq = cms.estimate_frequency("popular_item")
print(f"Frequency: ~{freq:,}")
# Find heavy hitters (items appearing in >1% of stream)
heavy_hitters = cms.get_heavy_hitters(threshold=0.01)
for item, count in heavy_hitters:
print(f" {item}: ~{count:,}")
Bloom Filter (Membership Testing)¶
Space-efficient set membership with no false negatives.
from truthound.profiler.sketches import BloomFilter, BloomFilterConfig
# Create filter for expected capacity
bf = BloomFilter(BloomFilterConfig(
capacity=10_000_000, # Expected items
error_rate=0.01, # 1% false positive rate
))
# Add items
for item in items:
bf.add(item)
# Test membership
if bf.contains(query_item):
print("Item possibly in set")
else:
print("Item definitely not in set") # Guaranteed!
# Check current false positive rate
print(f"FP rate: {bf.false_positive_rate():.2%}")
Distributed Processing with Sketches¶
All sketches support merging for distributed processing:
# Process partitions in parallel
def process_partition(partition_data):
hll = HyperLogLog(HyperLogLogConfig(precision=14))
hll.add_batch(partition_data)
return hll
# Merge results
with ThreadPoolExecutor(max_workers=8) as executor:
partition_hlls = list(executor.map(process_partition, partitions))
merged_hll = partition_hlls[0]
for hll in partition_hlls[1:]:
merged_hll = merged_hll.merge(hll)
print(f"Total distinct: ~{merged_hll.estimate():,}")
Configuration Reference¶
EnterpriseScaleConfig¶
@dataclass
class EnterpriseScaleConfig:
# Sampling targets
target_rows: int = 100_000 # Target sample size
quality: SamplingQuality = STANDARD # Quality level
# Resource budgets
memory_budget: MemoryBudgetConfig # Memory limits
time_budget_seconds: float = 0.0 # 0 = unlimited
# Block processing
block_size: int = 0 # 0 = auto
max_parallel_blocks: int = 4
# Statistical parameters
confidence_level: float = 0.95
margin_of_error: float = 0.05
# Adaptive parameters
min_sample_ratio: float = 0.001 # At least 0.1%
max_sample_ratio: float = 0.10 # At most 10%
# Reproducibility
seed: int | None = None
enable_progressive: bool = True
Quality Presets¶
from truthound.profiler.enterprise_sampling import SamplingQuality
# Available quality levels
SKETCH # Fast approximation, 10K samples
QUICK # 90% confidence, 50K samples
STANDARD # 95% confidence, 100K samples (default)
HIGH # 99% confidence, 500K samples
EXACT # Full scan, 100% accuracy
# Use preset
config = EnterpriseScaleConfig.for_quality("high")
Memory Budget Configuration¶
from truthound.profiler.enterprise_sampling import MemoryBudgetConfig
# Default configuration
config = MemoryBudgetConfig() # 1GB max, 256MB reserved
# Auto-detect from system
config = MemoryBudgetConfig.auto_detect() # Uses 25% of available RAM
# For specific scale
config = MemoryBudgetConfig.for_scale(ScaleCategory.XLARGE) # 2GB
# Custom configuration
config = MemoryBudgetConfig(
max_memory_mb=4096,
reserved_memory_mb=512,
gc_threshold_mb=3072,
backpressure_enabled=True,
)
Validators with Enterprise Sampling¶
Use EnterpriseScaleSamplingMixin in custom validators:
from truthound.validators.base import Validator, EnterpriseScaleSamplingMixin
class MyLargeDataValidator(Validator, EnterpriseScaleSamplingMixin):
"""Validator optimized for large datasets."""
sampling_threshold = 10_000_000 # Enable sampling above 10M rows
sampling_target_rows = 100_000 # Target sample size
sampling_quality = "standard"
def validate(self, lf: pl.LazyFrame) -> list[ValidationIssue]:
# Automatically samples if dataset is large
sampled_lf, metrics = self._sample_for_validation(lf)
# Validate on sampled data
issues = self._do_validation(sampled_lf)
# Extrapolate counts if sampled
if metrics.is_sampled:
issues = self._extrapolate_issues(issues, metrics)
return issues
Performance Guidelines¶
Choosing the Right Strategy¶
| Dataset Size | Strategy | Workers | Expected Throughput |
|---|---|---|---|
| < 1M | Column-aware | 1 | 10M+ rows/sec |
| 1M - 10M | Column-aware | 2-4 | 5M+ rows/sec |
| 10M - 100M | Block | 4-8 | 2M+ rows/sec |
| 100M - 1B | Multi-stage | 4-8 | 1M+ rows/sec |
| > 1B | Sketches | 8+ | 500K+ rows/sec |
Memory Optimization¶
-
Set appropriate memory budget:
-
Enable backpressure:
-
Use streaming for sketches:
Time Budget Management¶
config = EnterpriseScaleConfig(
target_rows=100_000,
time_budget_seconds=30.0, # Stop after 30 seconds
)
sampler = EnterpriseScaleSampler(config)
result = sampler.sample(lf)
if result.metrics.sampling_time_ms > 25_000:
print("Warning: Approaching time budget")
Best Practices¶
1. Start with Auto-Selection¶
Let the sampler choose the best strategy:
sampler = EnterpriseScaleSampler(config)
result = sampler.sample(lf) # Auto-selects based on data size
2. Use Reproducible Seeds for Production¶
3. Monitor Memory in Parallel Processing¶
result = sampler.sample(lf)
if result.metrics.backpressure_events > 0:
print(f"Memory pressure detected: {result.metrics.backpressure_events} events")
4. Validate Sample Representativeness¶
# Compare sample statistics with known population
sample_df = result.data.collect()
if abs(sample_df["amount"].mean() - known_mean) > margin:
print("Warning: Sample may not be representative")
5. Use Sketches for Exact Aggregations¶
# For exact distinct count on 10B rows:
hll = HyperLogLog(HyperLogLogConfig(precision=16)) # ±0.26% error
# Process in chunks
for chunk in data_source.iter_chunks(1_000_000):
hll.add_batch(chunk["user_id"])
print(f"Distinct users: ~{hll.estimate():,}")
API Reference¶
Core Classes¶
EnterpriseScaleSampler: Main sampling interface with auto-strategy selectionBlockSamplingStrategy: Block-based parallel samplingMultiStageSamplingStrategy: Hierarchical sampling for billion-row datasetsColumnAwareSamplingStrategy: Type-aware adaptive samplingProgressiveSamplingStrategy: Iterative sampling with early stoppingParallelBlockSampler: Multi-threaded parallel processing
Probabilistic Structures¶
HyperLogLog: Cardinality estimationCountMinSketch: Frequency estimationBloomFilter: Membership testingSketchFactory: Factory for creating sketches
Convenience Functions¶
sample_large_dataset(): Quick sampling with quality presetssample_parallel(): Parallel sampling with configurable workersestimate_optimal_sample_size(): Calculate statistically optimal sample sizeclassify_dataset_scale(): Classify dataset by scale categorycreate_sketch(): Factory function for creating sketches
See Also¶
- Data Profiling Guide - Core profiling documentation
- Distributed Processing - Multi-node processing
- Statistical Methods - Sampling theory background