Skip to content

Data Profiling Guide

This guide covers automatic data profiling with Truthound's Python API. It includes practical workflows for schema learning, rule generation, and integration with validation pipelines.


Quick Start

import truthound as th

# Profile a file
profile = th.profile("data.csv")
print(f"Rows: {profile.row_count}, Columns: {profile.column_count}")

# Generate validation rules from profile
from truthound.profiler import generate_suite
from truthound.profiler.generators import save_suite

suite = generate_suite(profile)
save_suite(suite, "rules.yaml", format="yaml")

Common Workflows

Workflow 1: Profile and Validate Pipeline

import truthound as th
from truthound.profiler import generate_suite

# Step 1: Profile baseline data
profile = th.profile("baseline.csv")

# Step 2: Generate validation rules
suite = generate_suite(profile, strictness="medium")

# Step 3: Validate new data using generated rules
report = suite.execute(new_data)

# Step 4: Check for issues
if report.issues:
    for issue in report.issues:
        print(f"{issue.column}: {issue.issue_type} ({issue.severity})")

Workflow 2: Schema Evolution Detection

from truthound.profiler import DataProfiler, ProfilerConfig
from truthound.profiler.evolution import SchemaEvolutionDetector

# Profile old and new data
config = ProfilerConfig(sample_size=10000)
profiler = DataProfiler(config)

old_profile = profiler.profile_file("data_v1.csv")
new_profile = profiler.profile_file("data_v2.csv")

# Detect schema changes
detector = SchemaEvolutionDetector()
changes = detector.detect(old_profile, new_profile)

for change in changes:
    print(f"{change.change_type}: {change.description}")
    if change.is_breaking:
        print(f"  WARNING: Breaking change detected!")

Workflow 3: Incremental Profiling with Cache

from truthound.profiler import ProfileCache, IncrementalProfiler

# Setup cache
cache = ProfileCache(cache_dir=".truthound/cache")

# Check if profile is cached
fingerprint = cache.compute_fingerprint("data.csv")
if cache.exists(fingerprint):
    profile = cache.get(fingerprint)
    print("Using cached profile")
else:
    profile = th.profile("data.csv")
    cache.set(fingerprint, profile)
    print("Profile cached for future use")

Workflow 4: Large File Streaming Profile

import polars as pl
from truthound.profiler import StreamingPatternMatcher, IncrementalAggregation

# Setup streaming matcher for pattern detection
matcher = StreamingPatternMatcher(
    aggregation_strategy=IncrementalAggregation(),
    chunk_size=100000,
)

# Process large file in chunks
for chunk in pl.scan_csv("large_file.csv").iter_slices(100000):
    for col in chunk.columns:
        matcher.process_chunk(chunk, col)

# Get aggregated results
results = matcher.finalize()
for col, patterns in results.items():
    print(f"{col}: {patterns.pattern_name} ({patterns.match_ratio:.1%})")

Full Documentation

This document describes Truthound's Auto-Profiling system, a streaming profiler with schema versioning and distributed processing capabilities. The system automatically analyzes datasets and generates validation rules without manual configuration.

Table of Contents

  1. Overview
  2. Quick Start
  3. Core Components
  4. CLI Commands
  5. Sampling Strategies
  6. Streaming Pattern Matching
  7. Rule Generation
  8. Caching & Incremental Profiling
  9. Resilience & Timeout
  10. Observability
  11. Distributed Processing
  12. Profile Comparison & Drift Detection
  13. Quality Scoring
  14. ML-based Type Inference
  15. Automatic Threshold Tuning
  16. Profile Visualization
  17. Internationalization (i18n)
  18. Incremental Validation
  19. Configuration Reference
  20. API Reference

Overview

The Auto-Profiling system provides enterprise-grade capabilities for data quality analysis:

  • Automatic Data Profiling: Column statistics, patterns, and data types with 100M+ rows/sec throughput
  • Rule Generation: Generate validation rules from profile results with configurable strictness
  • Validation Suite Export: Export rules to YAML, JSON, Python, or TOML formats
  • Streaming Support: Process large files in chunks with pattern aggregation strategies
  • Memory Safety: Sampling strategies and OOM prevention through LazyFrame architecture
  • Process Isolation: Timeout handling with multiprocessing and circuit breaker patterns
  • Schema Versioning: Forward/backward compatible profile serialization with automatic migration
  • Caching: Incremental profiling with fingerprint-based cache invalidation
  • Distributed Processing: Multi-threaded local processing with framework support for Spark, Dask, and Ray

Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                         Data Input                                       │
│  CSV / Parquet / JSON / DataFrame / LazyFrame                           │
└──────────────────────────────┬──────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│                      Sampling Layer                                      │
│  Random / Stratified / Reservoir / Adaptive / Head / Hash               │
└──────────────────────────────┬──────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│                     Data Profiler                                        │
├─────────────────────────────────────────────────────────────────────────┤
│  ColumnProfiler          │  TableProfiler          │  PatternMatcher    │
│  • Basic stats           │  • Row count            │  • Email           │
│  • Null analysis         │  • Column correlations  │  • Phone           │
│  • Distribution          │  • Duplicate detection  │  • UUID            │
│  • Type inference        │  • Schema extraction    │  • Date formats    │
└──────────────────────────────┬──────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│                     Rule Generators                                      │
├─────────────────────────────────────────────────────────────────────────┤
│  SchemaRuleGenerator     │  StatsRuleGenerator     │  PatternRuleGenerator│
│  • Column existence      │  • Range constraints    │  • Regex patterns   │
│  • Type constraints      │  • Null thresholds      │  • Format rules     │
│  • Nullable rules        │  • Uniqueness rules     │  • PII detection    │
└──────────────────────────────┬──────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│                     Suite Export                                         │
│  YAML / JSON / Python / TOML / Checkpoint                               │
└─────────────────────────────────────────────────────────────────────────┘

Quick Start

Python API

from truthound.profiler import (
    DataProfiler,
    ProfilerConfig,
    profile_file,
    generate_suite,
)

# Simple profiling
profile = profile_file("data.csv")
print(f"Rows: {profile.row_count}, Columns: {profile.column_count}")

# Generate validation suite
from truthound.profiler.generators import save_suite
suite = generate_suite(profile)
save_suite(suite, "rules.yaml", format="yaml")

# Or use to_yaml() method
with open("rules.yaml", "w") as f:
    f.write(suite.to_yaml())

# Advanced configuration
config = ProfilerConfig(
    sample_size=10000,
    include_patterns=True,
    include_correlations=False,
)
profiler = DataProfiler(config)
profile = profiler.profile_file("large_data.parquet")

CLI

# Profile a file
truthound auto-profile data.csv -o profile.json

# Generate validation suite from profile
truthound generate-suite profile.json -o rules.yaml

# One-step: profile + generate suite
truthound quick-suite data.csv -o rules.yaml

# List available options
truthound list-formats      # yaml, json, python, toml, checkpoint
truthound list-presets      # default, strict, loose, minimal, comprehensive
truthound list-categories   # schema, stats, pattern, completeness, etc.

Core Components

DataProfiler

The main profiling class that orchestrates column and table analysis.

from truthound.profiler import DataProfiler, ProfilerConfig

config = ProfilerConfig(
    sample_size=50000,            # Max rows to analyze (None = all)
    include_patterns=True,        # Enable pattern detection
    include_correlations=False,   # Compute correlations
    pattern_sample_size=1000,     # Samples for pattern matching
    top_n_values=10,              # Top/bottom values count
)

profiler = DataProfiler(config)

# Profile from various sources
profile = profiler.profile(df.lazy())           # LazyFrame
profile = profiler.profile_file("data.csv")     # File path
profile = profiler.profile_dataframe(df)        # DataFrame

ColumnProfiler

Analyzes individual columns for statistics and patterns.

from truthound.profiler import ColumnProfiler

column_profiler = ColumnProfiler()
col_profile = column_profiler.profile(lf, "email")

print(f"Type: {col_profile.inferred_type}")       # DataType.EMAIL
print(f"Null ratio: {col_profile.null_ratio}")    # 0.02
print(f"Unique ratio: {col_profile.unique_ratio}") # 0.95
print(f"Patterns: {col_profile.patterns}")        # [PatternMatch(...)]

TableProfile

Contains the complete analysis results for a dataset.

@dataclass(frozen=True)
class TableProfile:
    # Table info
    name: str = ""
    row_count: int = 0
    column_count: int = 0

    # Memory estimation
    estimated_memory_bytes: int = 0

    # Column profiles (immutable tuple)
    columns: tuple[ColumnProfile, ...] = field(default_factory=tuple)

    # Table-level metrics
    duplicate_row_count: int = 0
    duplicate_row_ratio: float = 0.0

    # Correlation matrix (column pairs with high correlation)
    correlations: tuple[tuple[str, str, float], ...] = field(default_factory=tuple)

    # Metadata
    source: str = ""
    profiled_at: datetime = field(default_factory=datetime.now)
    profile_duration_ms: float = 0.0

    # Methods
    def to_dict(self) -> dict[str, Any]
    def get(self, column_name: str) -> ColumnProfile | None
    @property
    def column_names(self) -> list[str]

Note: TableProfile is immutable (frozen=True). Use to_dict() and json.dumps() for serialization.


CLI Commands

auto-profile

Profile a data file and output analysis results.

truthound auto-profile <file> [OPTIONS]

Options:
  -o, --output PATH          Output file path (default: stdout)
  -f, --format [json|yaml]   Output format (default: json)
  --sample-size INTEGER      Maximum rows to sample
  --timeout INTEGER          Timeout in seconds per column
  --no-patterns              Disable pattern detection
  --streaming                Enable streaming mode for large files
  --chunk-size INTEGER       Chunk size for streaming (default: 100000)

generate-suite

Generate validation rules from a profile.

truthound generate-suite <profile> [OPTIONS]

Options:
  -o, --output PATH          Output file path (required)
  -f, --format FORMAT        Output format: yaml, json, python, toml, checkpoint
  -s, --strictness LEVEL     Strictness: loose, medium, strict
  -p, --preset PRESET        Configuration preset
  --min-confidence LEVEL     Minimum confidence: low, medium, high
  -i, --include CATEGORY     Include only these categories (repeatable)
  -e, --exclude CATEGORY     Exclude these categories (repeatable)

quick-suite

One-step profile and rule generation.

truthound quick-suite <file> [OPTIONS]

Options:
  -o, --output PATH          Output file path (required)
  -f, --format FORMAT        Output format (default: yaml)
  -s, --strictness LEVEL     Strictness level
  -p, --preset PRESET        Configuration preset
  --sample-size INTEGER      Maximum rows to sample

list-* Commands

# List available export formats
truthound list-formats
# Output: yaml, json, python, toml, checkpoint

# List available presets
truthound list-presets
# Output: default, strict, loose, minimal, comprehensive, ci_cd, ...

# List rule categories
truthound list-categories
# Output: schema, stats, pattern, completeness, uniqueness, ...

Sampling Strategies

The profiler supports multiple sampling strategies for handling large datasets efficiently.

Available Strategies

Strategy Description Best For
NONE No sampling, use all data Small datasets (<100K rows)
HEAD Take first N rows Quick previews
RANDOM Random row selection General use
SYSTEMATIC Every Nth row Ordered data
STRATIFIED Maintain distribution Categorical columns
RESERVOIR Reservoir sampling Streaming data
ADAPTIVE Auto-select based on size Default choice
HASH Hash-based deterministic Reproducibility

Usage

from truthound.profiler.sampling import Sampler, SamplingConfig, SamplingMethod

# Configure sampling
config = SamplingConfig(
    strategy=SamplingMethod.ADAPTIVE,
    max_rows=50000,
    confidence_level=0.95,
    margin_of_error=0.01,
)

sampler = Sampler(config)
result = sampler.sample(lf)
sampled_lf = result.data
print(f"Sampled {result.metrics.sample_size} rows")

Memory-Safe Sampling

The sampler automatically prevents OOM by using .head(limit).collect():

# Internal implementation ensures memory safety
def sample(self, lf: pl.LazyFrame) -> pl.LazyFrame:
    # Never calls .collect() on full dataset
    # Uses .head(limit) before collection
    return lf.head(self.config.max_rows)

Enterprise-Scale Sampling (100M+ rows)

For datasets exceeding 100 million rows, Truthound provides specialized sampling strategies with O(1) memory footprint.

Full Documentation: See Enterprise Sampling Guide for complete details on parallel processing, probabilistic data structures, and advanced configurations.

Quick Start

from truthound.profiler.enterprise_sampling import (
    EnterpriseScaleSampler,
    sample_large_dataset,
)

# Quick sampling with quality preset
result = sample_large_dataset(lf, target_rows=100_000, quality="high")
print(f"Sampled {result.metrics.sample_size:,} rows")

# Auto-select best strategy based on data size
sampler = EnterpriseScaleSampler()
result = sampler.sample(lf)

Scale Categories

Category Row Count Strategy Memory
SMALL < 1M No sampling Full
MEDIUM 1M - 10M Column-aware ~500MB
LARGE 10M - 100M Block (parallel) ~1GB
XLARGE 100M - 1B Multi-stage ~2GB
XXLARGE > 1B Sketches O(1)

Parallel Block Processing

For maximum throughput with multi-core parallelism:

from truthound.profiler.parallel_sampling import (
    ParallelBlockSampler,
    ParallelSamplingConfig,
    sample_parallel,
)

# Quick parallel sampling
result = sample_parallel(lf, target_rows=100_000, max_workers=4)

# Advanced configuration
config = ParallelSamplingConfig(
    target_rows=100_000,
    max_workers=8,
    enable_work_stealing=True,  # Dynamic load balancing
    backpressure_threshold=0.75,  # Memory-aware scheduling
)
sampler = ParallelBlockSampler(config)
result = sampler.sample(lf)

# Access parallel metrics
print(f"Workers: {result.metrics.workers_used}")
print(f"Speedup: {result.metrics.parallel_speedup:.2f}x")

Probabilistic Data Structures

For 10B+ row datasets, use O(1) memory sketches:

from truthound.profiler.sketches import (
    HyperLogLog,      # Cardinality estimation
    CountMinSketch,   # Frequency estimation
    BloomFilter,      # Membership testing
    create_sketch,
)

# Cardinality estimation (distinct count)
hll = create_sketch("hyperloglog", precision=14)  # ~16KB, ±0.41% error
for chunk in data_stream:
    hll.add_batch(chunk["user_id"])
print(f"Distinct users: ~{hll.estimate():,}")

# Frequency estimation (heavy hitters)
cms = create_sketch("countmin", epsilon=0.001, delta=0.01)
for item in stream:
    cms.add(item)
heavy_hitters = cms.get_heavy_hitters(threshold=0.01)

# Membership testing
bf = create_sketch("bloom", capacity=10_000_000, error_rate=0.001)
bf.add_batch(known_items)
if bf.contains(query_item):
    print("Possibly in set")

Enterprise Sampler Strategies

Strategy Best For Features
block 10M-100M rows Parallel, even coverage
multi_stage 100M-1B rows Hierarchical reduction
column_aware Mixed types Type-weighted sampling
progressive Exploratory Early stopping
parallel_block High throughput Work stealing
from truthound.profiler.enterprise_sampling import EnterpriseScaleSampler

sampler = EnterpriseScaleSampler(config)

# Auto-select best strategy
result = sampler.sample(lf)

# Or specify explicitly
result = sampler.sample(lf, strategy="block")
result = sampler.sample(lf, strategy="multi_stage")

Validators with Large Data Support

from truthound.validators.base import Validator, EnterpriseScaleSamplingMixin

class MyLargeDataValidator(Validator, EnterpriseScaleSamplingMixin):
    sampling_threshold = 10_000_000
    sampling_target_rows = 100_000

    def validate(self, lf):
        sampled_lf, metrics = self._sample_for_validation(lf)
        issues = self._do_validation(sampled_lf)
        if metrics.is_sampled:
            issues = self._extrapolate_issues(issues, metrics)
        return issues

Streaming Pattern Matching

For large files that don't fit in memory, use streaming mode with chunk-aware pattern aggregation.

Aggregation Strategies

Strategy Description
INCREMENTAL Running totals across chunks
WEIGHTED Size-weighted averages
SLIDING_WINDOW Recent chunks only
EXPONENTIAL Exponential moving average
CONSENSUS Agreement across chunks
ADAPTIVE Auto-select based on variance

Usage

from truthound.profiler import StreamingPatternMatcher, IncrementalAggregation

matcher = StreamingPatternMatcher(
    aggregation_strategy=IncrementalAggregation(),
    chunk_size=100000,
)

# Process file in chunks
for chunk in pl.scan_csv("large.csv").iter_slices(100000):
    matcher.process_chunk(chunk, "email")

# Get aggregated results
results = matcher.finalize()
print(f"Email pattern match: {results['email'].match_ratio:.2%}")

CLI Streaming Mode

truthound auto-profile large_file.csv --streaming --chunk-size 100000

Rule Generation

Rule Categories

Category Description Example Rules
schema Column structure Column exists, type check
stats Statistical constraints Range, mean bounds
pattern Format validation Email, phone, UUID
completeness Null handling Max null ratio
uniqueness Duplicate detection Primary key, unique
distribution Value distribution Allowed values, cardinality

Strictness Levels

Level Description
loose Permissive thresholds, fewer rules
medium Balanced defaults
strict Tight thresholds, comprehensive rules

Presets

Preset Use Case
default General purpose
strict Production data
loose Development/testing
minimal Essential rules only
comprehensive All available rules
ci_cd Optimized for CI/CD pipelines
schema_only Structure validation only
format_only Format/pattern rules only

Export Formats

from truthound.profiler import generate_suite
from truthound.profiler.generators import save_suite

suite = generate_suite(profile)

# YAML (human-readable)
save_suite(suite, "rules.yaml", format="yaml")

# JSON (machine-readable)
save_suite(suite, "rules.json", format="json")

# Python (executable)
save_suite(suite, "rules.py", format="python")

# Or use built-in methods
with open("rules.yaml", "w") as f:
    f.write(suite.to_yaml())

import json
with open("rules.json", "w") as f:
    json.dump(suite.to_dict(), f, indent=2)

Caching & Incremental Profiling

Fingerprint-Based Caching

The profiler caches results based on file fingerprints:

from truthound.profiler import ProfileCache

cache = ProfileCache(cache_dir=".truthound/cache")

# Check if profile is cached
fingerprint = cache.compute_fingerprint("data.csv")
if cache.exists(fingerprint):
    profile = cache.get(fingerprint)
else:
    profile = profiler.profile_file("data.csv")
    cache.set(fingerprint, profile)

Incremental Profiling

Only re-profile columns that have changed:

from truthound.profiler import IncrementalProfiler

inc_profiler = IncrementalProfiler()

# Initial profile
profile_v1 = inc_profiler.profile("data_v1.csv")

# Incremental update (only changed columns)
profile_v2 = inc_profiler.update("data_v2.csv", previous=profile_v1)

print(f"Columns re-profiled: {profile_v2.columns_updated}")
print(f"Columns reused: {profile_v2.columns_cached}")

Cache Backends

Backend Description
memory In-memory LRU cache
file Disk-based JSON cache
redis Redis backend (optional)
from truthound.profiler import CacheBackend, RedisCacheBackend

# Redis cache with fallback
cache = RedisCacheBackend(
    host="redis.example.com",
    fallback=FileCacheBackend(".truthound/cache"),
)

Resilience & Timeout

Process Isolation

Polars operations can't be interrupted by Python signals. The profiler uses process isolation:

from truthound.profiler import ProcessTimeoutExecutor

executor = ProcessTimeoutExecutor(
    timeout_seconds=60,
    backend="process",  # Use multiprocessing
)

# Safely profile with timeout
try:
    result = executor.execute(profiler.profile, lf)
except TimeoutError:
    print("Profiling timed out")

Circuit Breaker

Prevent cascade failures with circuit breaker pattern:

from truthound.profiler import CircuitBreaker, CircuitBreakerConfig

breaker = CircuitBreaker(CircuitBreakerConfig(
    failure_threshold=5,      # Open after 5 failures
    recovery_timeout=30,      # Try again after 30s
    success_threshold=2,      # Close after 2 successes
))

with breaker:
    result = risky_operation()

Resilient Cache

Automatic fallback between cache backends:

from truthound.profiler import ResilientCacheBackend, FallbackChain

cache = ResilientCacheBackend(
    primary=RedisCacheBackend(host="redis"),
    fallback=FileCacheBackend(".cache"),
    circuit_breaker=CircuitBreakerConfig(failure_threshold=3),
)

Observability

OpenTelemetry Integration

from truthound.profiler import ProfilerTelemetry, traced

# Initialize telemetry
telemetry = ProfilerTelemetry(service_name="truthound-profiler")

# Automatic tracing with decorator
@traced("profile_column")
def profile_column(lf: pl.LazyFrame, column: str):
    # Profiling logic
    pass

# Manual span creation
with telemetry.span("custom_operation") as span:
    span.set_attribute("column_count", 10)
    # Operation

Metrics Collection

from truthound.profiler import MetricsCollector

collector = MetricsCollector()

# Record profile duration
with collector.profile_duration.time(operation="column"):
    profile_column(lf, "email")

# Increment counters
collector.profiles_total.inc(status="success")
collector.columns_profiled.inc(data_type="string")

Progress Callbacks

from truthound.profiler import CallbackRegistry, ConsoleAdapter

# Console progress output
callback = ConsoleAdapter(show_progress=True)

profiler = DataProfiler(config, progress_callback=callback)
profile = profiler.profile_file("data.csv")

# Custom callback
def my_callback(event):
    print(f"Progress: {event.progress:.0%} - {event.message}")

profiler = DataProfiler(config, progress_callback=my_callback)

Distributed Processing

The distributed processing module enables parallel profiling across multiple cores or nodes, achieving linear scalability for large datasets.

Available Backends

Backend Description Execution Strategy Status
local Multi-threaded local ThreadPoolExecutor with adaptive sizing ✅ Ready
process Multi-process local ProcessPoolExecutor with isolation ✅ Ready
spark Apache Spark SparkContext-based distribution ⚠️ Framework
dask Dask distributed Delayed computation graph ⚠️ Framework
ray Ray framework Actor-based parallelism ⚠️ Framework

Usage

from truthound.profiler import DistributedProfiler, BackendType

# Auto-detect available backend
profiler = DistributedProfiler.create(backend="auto")

# Explicit Spark backend
profiler = DistributedProfiler.create(
    backend="spark",
    config={"spark.executor.memory": "4g"}
)

# Profile large dataset
profile = profiler.profile("hdfs://data/large.parquet")

Partition Strategies

Strategy Description Optimal Use Case
row_based Split by row ranges Large row counts, uniform columns
column_based Profile columns in parallel Many columns, limited rows
hybrid Combine both strategies Balanced datasets
hash Hash-based partitioning Reproducible results

Execution Backend Selection

The AdaptiveStrategy automatically selects the optimal execution backend based on operation characteristics:

from truthound.profiler import AdaptiveStrategy, ExecutionBackend

strategy = AdaptiveStrategy()

# Automatic selection based on data size and operation type
backend = strategy.select(
    estimated_rows=10_000_000,
    operation_type="profile_column",
    cpu_bound=True,
)
# Returns: ExecutionBackend.THREAD for I/O-bound
# Returns: ExecutionBackend.PROCESS for CPU-bound with large data

Profile Comparison & Drift Detection

Compare profiles over time to detect data drift and schema changes.

Drift Types

Drift Type Description
COMPLETENESS Changes in null ratios
UNIQUENESS Changes in unique value ratios
DISTRIBUTION Changes in value distribution
RANGE Changes in min/max values
CARDINALITY Changes in distinct value counts

Drift Severity

Severity Description
LOW Minor changes, informational
MEDIUM Notable changes, review recommended
HIGH Significant changes, investigation required
CRITICAL Breaking changes, immediate action needed

Usage

from truthound.profiler import (
    ProfileComparator,
    compare_profiles,
    detect_drift,
    DriftThresholds,
)

# Compare two profiles
comparator = ProfileComparator()
comparison = comparator.compare(profile_old, profile_new)

print(f"Overall drift score: {comparison.drift_score:.2%}")
print(f"Schema changes: {len(comparison.schema_changes)}")

for col_comparison in comparison.column_comparisons:
    if col_comparison.has_drift:
        print(f"Column {col_comparison.column}: {col_comparison.drift_type}")
        for drift in col_comparison.drifts:
            print(f"  - {drift.drift_type}: {drift.severity}")

# Quick drift detection
drifts = detect_drift(profile_old, profile_new)
for drift in drifts:
    print(f"{drift.column}: {drift.drift_type} ({drift.severity})")

Custom Thresholds

from truthound.profiler import DriftThresholds

thresholds = DriftThresholds(
    completeness_change=0.05,    # 5% change in null ratio
    uniqueness_change=0.1,       # 10% change in uniqueness
    distribution_divergence=0.2,  # Distribution KL-divergence
    range_change=0.15,           # 15% change in range
    cardinality_change=0.2,      # 20% change in cardinality
)

comparison = compare_profiles(profile_old, profile_new, thresholds=thresholds)

Individual Drift Detectors

from truthound.profiler import (
    CompletenessDriftDetector,
    UniquenessDriftDetector,
    DistributionDriftDetector,
    RangeDriftDetector,
    CardinalityDriftDetector,
)

# Detect specific drift types
completeness_detector = CompletenessDriftDetector(threshold=0.05)
drift = completeness_detector.detect(col_profile_old, col_profile_new)

if drift.detected:
    print(f"Null ratio changed: {drift.old_value:.2%} -> {drift.new_value:.2%}")

Quality Scoring

Evaluate and score generated validation rules for quality.

Quality Levels

Level Score Range Description
EXCELLENT 0.9 - 1.0 High precision and recall
GOOD 0.7 - 0.9 Balanced quality
FAIR 0.5 - 0.7 Acceptable with caveats
POOR 0.0 - 0.5 Needs improvement

Usage

from truthound.profiler import (
    RuleQualityScorer,
    ScoringConfig,
    estimate_quality,
    score_rule,
    compare_rules,
)

# Score a validation rule
scorer = RuleQualityScorer()
score = scorer.score(rule, sample_data)

print(f"Quality level: {score.quality_level}")
print(f"Precision: {score.metrics.precision:.2%}")
print(f"Recall: {score.metrics.recall:.2%}")
print(f"F1 Score: {score.metrics.f1_score:.2%}")

# Quick quality estimation
quality = estimate_quality(rule, lf)
print(f"Estimated quality: {quality.level}")

Quality Estimators

from truthound.profiler import (
    SamplingQualityEstimator,
    HeuristicQualityEstimator,
    CrossValidationEstimator,
)

# Sampling-based estimation (fast)
estimator = SamplingQualityEstimator(sample_size=1000)
result = estimator.estimate(rule, lf)

# Cross-validation (accurate)
estimator = CrossValidationEstimator(n_folds=5)
result = estimator.estimate(rule, lf)

# Heuristic-based (fastest)
estimator = HeuristicQualityEstimator()
result = estimator.estimate(rule, lf)

Trend Analysis

from truthound.profiler import QualityTrendAnalyzer

analyzer = QualityTrendAnalyzer()

# Track quality over time
for score in historical_scores:
    analyzer.add_point(score)

trend = analyzer.analyze()
print(f"Trend direction: {trend.direction}")  # IMPROVING, STABLE, DECLINING
print(f"Forecast: {trend.forecast_quality}")

ML-based Type Inference

Use machine learning models for improved semantic type inference.

Inference Models

Model Description Use Case
RuleBasedModel Pattern matching rules Fast, deterministic
NaiveBayesModel Probabilistic classifier Good for mixed types
EnsembleModel Combines multiple models Best accuracy

Usage

from truthound.profiler import (
    MLTypeInferrer,
    infer_column_type_ml,
    infer_table_types_ml,
    MLInferenceConfig,
)

# Configure ML inferrer
config = MLInferenceConfig(
    model="ensemble",
    confidence_threshold=0.7,
    use_name_features=True,
    use_value_features=True,
    use_statistical_features=True,
)

inferrer = MLTypeInferrer(config)

# Infer single column type
result = inferrer.infer(lf, "email_address")
print(f"Type: {result.inferred_type}")
print(f"Confidence: {result.confidence:.2%}")
print(f"Alternatives: {result.alternatives}")

# Infer all column types
results = infer_table_types_ml(lf)
for col, result in results.items():
    print(f"{col}: {result.inferred_type} ({result.confidence:.0%})")

Feature Extractors

from truthound.profiler import (
    NameFeatureExtractor,
    ValueFeatureExtractor,
    StatisticalFeatureExtractor,
    ContextFeatureExtractor,
)

# Extract features for custom models
name_extractor = NameFeatureExtractor()
features = name_extractor.extract("customer_email")
# Features: ['has_email_keyword', 'has_address_keyword', ...]

value_extractor = ValueFeatureExtractor()
features = value_extractor.extract(column_values)
# Features: ['has_at_symbol', 'avg_length', 'digit_ratio', ...]

Custom Models

from truthound.profiler import InferenceModel, InferenceResult

class MyCustomModel(InferenceModel):
    def infer(self, features: FeatureVector) -> InferenceResult:
        # Custom inference logic
        return InferenceResult(
            inferred_type=DataType.EMAIL,
            confidence=0.95,
        )

# Register custom model
from truthound.profiler import InferenceModelRegistry
InferenceModelRegistry.register("my_model", MyCustomModel)

Automatic Threshold Tuning

Automatically tune validation thresholds based on data characteristics.

Tuning Strategies

Strategy Description
CONSERVATIVE Strict thresholds, fewer false positives
BALANCED Balance between precision and recall
PERMISSIVE Relaxed thresholds, fewer false negatives
ADAPTIVE Learns optimal thresholds from data
STATISTICAL Uses statistical confidence intervals
DOMAIN_AWARE Incorporates domain knowledge

Usage

from truthound.profiler import (
    ThresholdTuner,
    tune_thresholds,
    TuningStrategy,
    TuningConfig,
)

# Quick threshold tuning
thresholds = tune_thresholds(profile, strategy="adaptive")

print(f"Null threshold: {thresholds.null_threshold:.2%}")
print(f"Uniqueness threshold: {thresholds.uniqueness_threshold:.2%}")
print(f"Range tolerance: {thresholds.range_tolerance:.2%}")

# Advanced configuration
config = TuningConfig(
    strategy=TuningStrategy.STATISTICAL,
    confidence_level=0.95,
    sample_size=10000,
)

tuner = ThresholdTuner(config)
thresholds = tuner.tune(profile)

# Per-column thresholds
for col, col_thresholds in thresholds.column_thresholds.items():
    print(f"{col}:")
    print(f"  null_threshold: {col_thresholds.null_threshold}")
    print(f"  min_value: {col_thresholds.min_value}")
    print(f"  max_value: {col_thresholds.max_value}")

A/B Testing Thresholds

from truthound.profiler import ThresholdTester, ThresholdTestResult

tester = ThresholdTester()

# Test different threshold configurations
result = tester.test(
    profile=profile,
    test_data=lf,
    threshold_configs=[config_a, config_b, config_c],
)

print(f"Best config: {result.best_config}")
print(f"Precision: {result.best_precision:.2%}")
print(f"Recall: {result.best_recall:.2%}")

Profile Visualization

Generate HTML reports from profile results.

Chart Types

Type Description
BAR Bar charts for categorical data
HISTOGRAM Distribution histograms
PIE Pie charts for proportions
LINE Trend lines
HEATMAP Correlation heatmaps

Themes

Theme Description
LIGHT Light background theme
DARK Dark background theme
CORPORATE Professional business theme
MINIMAL Clean minimal design
COLORFUL Vibrant color scheme

Usage

from truthound.profiler import (
    HTMLReportGenerator,
    ReportConfig,
    ReportTheme,
    generate_report,
)

# Quick report generation
html = generate_report(profile)
with open("report.html", "w") as f:
    f.write(html)

# Custom configuration
config = ReportConfig(
    theme=ReportTheme.DARK,
    include_charts=True,
    include_samples=True,
    include_recommendations=True,
    max_sample_rows=100,
)

generator = HTMLReportGenerator(config)
html = generator.generate(profile)

# Export to multiple formats
from truthound.profiler import ReportExporter

exporter = ReportExporter()
exporter.export(profile, "report.html", format="html")
exporter.export(profile, "report.pdf", format="pdf")

Section Renderers

from truthound.profiler import (
    OverviewSectionRenderer,
    DataQualitySectionRenderer,
    ColumnDetailsSectionRenderer,
    PatternsSectionRenderer,
    RecommendationsSectionRenderer,
    CustomSectionRenderer,
)

# Add custom section
class MySectionRenderer(CustomSectionRenderer):
    section_type = "my_section"

    def render(self, data: ProfileData) -> str:
        return f"<div class='my-section'>Custom content</div>"

from truthound.profiler import section_registry
section_registry.register(MySectionRenderer())

Compare Profiles Report

from truthound.profiler import compare_profile_reports

# Generate comparison report
html = compare_profile_reports(profile_old, profile_new)
with open("comparison_report.html", "w") as f:
    f.write(html)

Internationalization (i18n)

Multi-language support for profiler messages and reports.

Supported Locales

Locale Language
en English (default)
ko Korean
ja Japanese
zh Chinese
de German
fr French
es Spanish

Usage

from truthound.profiler import (
    set_locale,
    get_locale,
    get_message,
    translate as t,
    locale_context,
)

# Set global locale
set_locale("ko")

# Get translated message
msg = get_message("profiling_started")
print(msg)  # "프로파일링이 시작되었습니다"

# Short alias
msg = t("column_null_ratio", column="email", ratio=0.05)

# Context manager for temporary locale
with locale_context("ja"):
    msg = t("profiling_complete")
    print(msg)  # Japanese message

Custom Messages

from truthound.profiler import register_messages, MessageCatalog

# Register custom messages
custom_messages = {
    "my_custom_key": "Custom message with {placeholder}",
}
register_messages("en", custom_messages)

# Load from file
from truthound.profiler import load_messages_from_file
load_messages_from_file("my_messages.yaml", locale="ko")

I18n Exceptions

from truthound.profiler import (
    I18nError,
    I18nAnalysisError,
    I18nPatternError,
    I18nTypeError,
    I18nIOError,
    I18nTimeoutError,
    I18nValidationError,
)

try:
    profile = profiler.profile_file("data.csv")
except I18nAnalysisError as e:
    # Error message is automatically localized
    print(e.localized_message)

Incremental Validation

Validate incremental profiling results for correctness.

Validators

Validator Purpose
ChangeDetectionAccuracyValidator Validates change detection accuracy
SchemaChangeValidator Validates schema change detection
StalenessValidator Validates profile freshness
FingerprintConsistencyValidator Validates fingerprint consistency
FingerprintSensitivityValidator Validates fingerprint sensitivity
ProfileMergeValidator Validates profile merging
DataIntegrityValidator Validates data integrity
PerformanceValidator Validates performance metrics

Usage

from truthound.profiler import (
    IncrementalValidator,
    validate_incremental,
    validate_merge,
    validate_fingerprints,
    ValidationConfig,
)

# Validate incremental profiling
result = validate_incremental(
    old_profile=profile_v1,
    new_profile=profile_v2,
    new_data=lf_new,
)

if result.passed:
    print("Incremental profiling validated successfully")
else:
    for issue in result.issues:
        print(f"[{issue.severity}] {issue.message}")

# Validate merge operation
result = validate_merge(
    profile_a=part1_profile,
    profile_b=part2_profile,
    merged_profile=merged,
)

# Validate fingerprints
result = validate_fingerprints(
    fingerprints=[fp1, fp2, fp3],
    expected_columns=["email", "phone", "name"],
)

Custom Validators

from truthound.profiler import BaseValidator, ValidatorProtocol

class MyCustomValidator(BaseValidator):
    name = "my_validator"
    category = ValidationCategory.DATA_INTEGRITY

    def validate(self, context: ValidationContext) -> list[ValidationIssue]:
        issues = []
        # Custom validation logic
        return issues

# Register validator
from truthound.profiler import validator_registry, register_validator
register_validator(MyCustomValidator())

Validation Runner

from truthound.profiler import ValidationRunner, ValidationConfig

config = ValidationConfig(
    validators=["change_detection", "schema_change", "staleness"],
    fail_fast=False,
    severity_threshold="medium",
)

runner = ValidationRunner(config)
result = runner.run(context)

print(f"Total issues: {result.total_issues}")
print(f"Critical: {result.critical_count}")
print(f"High: {result.high_count}")
print(f"Passed: {result.passed}")

Configuration Reference

ProfilerConfig

@dataclass
class ProfilerConfig:
    """Configuration for profiling operations."""

    # Sampling
    sample_size: int | None = None    # None = use all data
    random_seed: int = 42

    # Analysis options
    include_patterns: bool = True     # Detect patterns in string columns
    include_correlations: bool = False  # Compute column correlations
    include_distributions: bool = True  # Include distribution stats

    # Performance tuning
    top_n_values: int = 10            # Top/bottom values to include
    pattern_sample_size: int = 1000   # Sample size for pattern matching
    correlation_threshold: float = 0.7

    # Pattern detection
    min_pattern_match_ratio: float = 0.8

    # Parallel processing
    n_jobs: int = 1

Note: Timeout and caching are handled separately through ProcessTimeoutExecutor and ProfileCache classes.

Environment Variables

Variable Description Default
TRUTHOUND_CACHE_DIR Cache directory .truthound/cache
TRUTHOUND_SAMPLE_SIZE Default sample size 50000
TRUTHOUND_TIMEOUT Default timeout (seconds) 120
TRUTHOUND_LOG_LEVEL Logging level INFO

API Reference

Main Functions

from truthound.profiler import (
    profile_file,
    generate_suite,
    DataProfiler,
    ProfilerConfig,
)
from truthound.profiler.generators import save_suite

# Profile a file
profile = profile_file(
    path: str,
    config: ProfilerConfig | None = None,
) -> TableProfile

# Profile a DataFrame/LazyFrame using DataProfiler
profiler = DataProfiler(config)
profile = profiler.profile(lf: pl.LazyFrame) -> TableProfile
profile = profiler.profile_dataframe(df: pl.DataFrame) -> TableProfile

# Generate validation suite
suite = generate_suite(
    profile: TableProfile | ProfileReport | dict,
    strictness: Strictness = Strictness.MEDIUM,
    preset: str = "default",
    include: list[str] | None = None,
    exclude: list[str] | None = None,
) -> ValidationSuite

# Save suite to file
save_suite(
    suite: ValidationSuite,
    path: str | Path,
    format: str = "json",  # "json", "yaml", or "python"
) -> None

# Profile serialization
import json
with open("profile.json", "w") as f:
    json.dump(profile.to_dict(), f, indent=2)

Note: There is no quick_suite() function. Use profile_file() + generate_suite() + save_suite() instead.

Data Types

class DataType(str, Enum):
    """Inferred logical data types for profiling."""

    # Basic types
    INTEGER = "integer"
    FLOAT = "float"
    STRING = "string"
    BOOLEAN = "boolean"
    DATETIME = "datetime"
    DATE = "date"
    TIME = "time"
    DURATION = "duration"

    # Semantic types (detected from patterns)
    EMAIL = "email"
    URL = "url"
    PHONE = "phone"
    UUID = "uuid"
    IP_ADDRESS = "ip_address"
    JSON = "json"

    # Identifiers
    CATEGORICAL = "categorical"
    IDENTIFIER = "identifier"

    # Numeric subtypes
    CURRENCY = "currency"
    PERCENTAGE = "percentage"

    # Korean specific
    KOREAN_RRN = "korean_rrn"
    KOREAN_PHONE = "korean_phone"
    KOREAN_BUSINESS_NUMBER = "korean_business_number"

    # Unknown
    UNKNOWN = "unknown"

Strictness Levels

class Strictness(Enum):
    LOOSE = "loose"      # Permissive thresholds
    MEDIUM = "medium"    # Balanced defaults
    STRICT = "strict"    # Tight constraints

Examples

Basic Profiling Pipeline

from truthound.profiler import (
    DataProfiler,
    ProfilerConfig,
    generate_suite,
)

# Configure profiler
config = ProfilerConfig(
    sample_size=100000,
    include_patterns=True,
)

# Profile data
profiler = DataProfiler(config)
profile = profiler.profile_file("sales_data.csv")

# Print summary
print(f"Dataset: {profile.name}")
print(f"Rows: {profile.row_count:,}")
print(f"Columns: {profile.column_count}")

for col in profile.columns:
    print(f"  {col.name}: {col.inferred_type.value}")
    print(f"    Null ratio: {col.null_ratio:.2%}")
    if col.patterns:
        print(f"    Pattern: {col.patterns[0].pattern_name}")

# Generate and save rules
from truthound.profiler.generators import save_suite
suite = generate_suite(profile)
save_suite(suite, "sales_rules.yaml", format="yaml")

Streaming Large Files

from truthound.profiler import (
    StreamingPatternMatcher,
    AdaptiveAggregation,
)
import polars as pl

# Setup streaming matcher
matcher = StreamingPatternMatcher(
    aggregation_strategy=AdaptiveAggregation(),
)

# Process 10GB file in chunks
for chunk in pl.scan_csv("huge_file.csv").iter_slices(100000):
    for col in chunk.columns:
        matcher.process_chunk(chunk, col)

# Get final aggregated patterns
results = matcher.finalize()

CI/CD Integration

# .github/workflows/data-quality.yml
name: Data Quality Check

on: [push]

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Generate validation rules
        run: |
          truthound quick-suite data/sample.csv \
            -o rules.yaml \
            --preset ci_cd \
            --strictness strict

      - name: Validate data
        run: |
          truthound check data/production.csv \
            --schema rules.yaml \
            --strict

Schema Versioning

The profiler implements semantic versioning for profile serialization, ensuring forward and backward compatibility across versions.

Version Format

major.minor.patch
  • major: Breaking changes requiring migration
  • minor: New features, backward compatible
  • patch: Bug fixes, fully compatible

Automatic Migration

from truthound.profiler import ProfileSerializer, SchemaVersion

serializer = ProfileSerializer()

# Load profile from any compatible version
profile = serializer.deserialize(old_profile_data)
# Automatically migrates from source version to current

# Save with current schema version
data = serializer.serialize(profile)
# Includes schema_version field for future compatibility

Validation

from truthound.profiler import SchemaValidator, ValidationResult

validator = SchemaValidator()
result = validator.validate(profile_data, fix_issues=True)

if result.result == ValidationResult.VALID:
    print("Profile data is valid")
elif result.result == ValidationResult.RECOVERABLE:
    print(f"Fixed issues: {result.warnings}")
    data = result.fixed_data
else:
    print(f"Invalid: {result.errors}")

See Also