Advanced Features¶
Advanced Python API for ML, Lineage, Real-time streaming, Profiling, and the Validation Engine Enhancement (VE) system.
Validation Engine Enhancement (VE)¶
The VE system introduces five cross-cutting enhancements to the validation pipeline, inspired by Great Expectations (GX) patterns and adapted for Polars LazyFrame architecture.
Result Format Control (VE-1)¶
Control the detail level of validation results with a 4-level hierarchy.
import truthound as th
from truthound.types import ResultFormat, ResultFormatConfig
# Quick pass/fail check (fastest)
report = th.check("data.csv", result_format="boolean_only")
# Default: summary with value counts
report = th.check("data.csv", result_format="summary")
# Full detail with unexpected rows and debug queries
report = th.check("data.csv", result_format="complete")
# Fine-grained control
config = ResultFormatConfig(
format=ResultFormat.COMPLETE,
partial_unexpected_count=50, # Collect up to 50 samples
include_unexpected_rows=True, # Include failing rows as DataFrame
max_unexpected_rows=500, # Cap at 500 rows
return_debug_query=True, # Include Polars query string
)
report = th.check("data.csv", result_format=config)
# Access structured results
for issue in report.issues:
if issue.result:
print(f"Elements: {issue.result.element_count}")
print(f"Missing: {issue.result.missing_count}")
print(f"Unexpected: {issue.result.unexpected_count} ({issue.result.unexpected_percent:.1%})")
if issue.result.partial_unexpected_list:
print(f"Samples: {issue.result.partial_unexpected_list[:5]}")
if issue.result.unexpected_rows is not None:
print(f"Failing rows: {issue.result.unexpected_rows.shape}")
if issue.result.debug_query:
print(f"Query: {issue.result.debug_query}")
Shared Metric Store (VE-3)¶
Deduplicate metric computations across validators in a session.
from truthound.validators.metrics import MetricKey, SharedMetricStore, CommonMetrics
# The store is automatically created and managed by th.check()
# For manual use:
store = SharedMetricStore()
# Common metrics provide standard expressions
key, expr = CommonMetrics.null_count("email")
key, expr = CommonMetrics.row_count()
key, expr = CommonMetrics.n_unique("user_id")
# Pre-compute and cache
store.put(key, 42)
value = store.get(key) # 42
# Or compute-on-demand
value = store.get_or_compute(key, lambda: lf.select(expr).collect().item())
# Statistics
print(store.stats) # hits, misses, evictions
Validators with built-in metric declarations: NullValidator, NotNullValidator, CompletenessRatioValidator, UniqueValidator, UniqueRatioValidator, DistinctCountValidator, BetweenValidator.
Exception Isolation & Auto Retry (VE-5)¶
Gracefully handle validation failures without aborting the entire pipeline.
import truthound as th
# Enable exception isolation with retries
report = th.check(
"data.csv",
catch_exceptions=True, # Default: True
max_retries=3, # Retry transient errors up to 3 times
)
# Inspect exception summary
if report.exception_summary:
summary = report.exception_summary
print(f"Total exceptions: {summary.total_count}")
print(f"By category: {summary.by_category}")
print(f"Retried: {summary.retried_count}")
# Per-issue exception detail
for issue in report.issues:
if issue.exception_info:
info = issue.exception_info
print(f"Validator: {info.validator_name}")
print(f"Category: {info.failure_category}") # transient/permanent/configuration/data
print(f"Retries: {info.retry_count}/{info.max_retries}")
print(f"Retryable: {info.is_retryable}")
Resilience Bridge (VE-5)¶
Combine circuit breaker and retry patterns for validator execution.
from truthound.validators.resilience_bridge import (
ValidationResiliencePolicy,
create_default_policy,
create_strict_policy,
)
from truthound.common.resilience import CircuitBreakerConfig
# Default policy: lenient circuit breaker
policy = create_default_policy(max_retries=2)
# Strict policy: no retries, aggressive circuit breaker
policy = create_strict_policy()
# Custom policy
policy = ValidationResiliencePolicy(
circuit_breaker_config=CircuitBreakerConfig.for_database(),
max_retries=3,
on_retry=lambda attempt, exc, delay: print(f"Retry {attempt}: {exc}"),
)
# Execute with resilience
result = policy.execute(validator, lf)
# Check circuit state
state = policy.get_circuit_state("my_validator") # CLOSED | OPEN | HALF_OPEN
Parallel Execution¶
For large datasets with many validators, use parallel execution for better performance.
Basic Parallel Execution¶
import truthound as th
# Enable parallel execution (uses all available cores)
report = th.check("large_data.csv", parallel=True)
# Control thread count
report = th.check("large_data.csv", parallel=True, max_workers=4)
DAG-Based Execution¶
Validators are organized into a Directed Acyclic Graph (DAG) based on dependencies:
from truthound.validators.optimization.orchestrator import (
ValidatorDAG,
ParallelExecutionStrategy,
AdaptiveExecutionStrategy,
)
# Build DAG from validators
dag = ValidatorDAG()
dag.add_validators(validator_instances)
plan = dag.build_execution_plan()
# Execute with adaptive strategy (auto-selects parallelism)
strategy = AdaptiveExecutionStrategy()
result = plan.execute(lf, strategy)
Query Pushdown¶
For SQL data sources, push validation logic to the database server:
import truthound as th
from truthound.datasources.sql import PostgreSQLDataSource
source = PostgreSQLDataSource(
table="large_table",
host="localhost",
database="mydb",
user="postgres",
)
# Enable pushdown - validations execute server-side
report = th.check(source=source, pushdown=True)
# Example: null_check becomes:
# SELECT COUNT(*) FROM table WHERE column IS NULL
Pushdown Benefits¶
| Benefit | Description |
|---|---|
| Reduced data transfer | Only aggregated results are returned |
| Database optimization | Leverages database query optimizer |
| Scalability | Handles billions of rows efficiently |
ML Module¶
Machine learning-based validation and anomaly detection.
Anomaly Detection¶
from truthound import ml
from truthound.ml.anomaly_models.statistical import StatisticalConfig
from truthound.ml.anomaly_models.isolation_forest import IsolationForestConfig
from truthound.ml.anomaly_models.ensemble import EnsembleConfig, EnsembleStrategy
import polars as pl
# Statistical anomaly detectors (using Config objects)
detector = ml.ZScoreAnomalyDetector(
config=StatisticalConfig(
z_threshold=3.0,
columns=["amount", "count"], # specify columns in config
)
)
detector = ml.IQRAnomalyDetector(
config=StatisticalConfig(
iqr_multiplier=1.5,
)
)
detector = ml.MADAnomalyDetector(
config=StatisticalConfig(
z_threshold=3.5, # MAD uses z_threshold for scaling
)
)
# Isolation Forest
detector = ml.IsolationForestDetector(
config=IsolationForestConfig(
n_estimators=100,
contamination=0.01,
max_samples=256,
)
)
# Ensemble detector (combines multiple methods)
detector = ml.EnsembleAnomalyDetector(
detectors=[
ml.ZScoreAnomalyDetector(),
ml.IQRAnomalyDetector(),
],
config=EnsembleConfig(
strategy=EnsembleStrategy.AVERAGE, # AVERAGE, MAX, MIN, VOTE, UNANIMOUS
),
)
# Fit and predict (use LazyFrame)
df = pl.read_csv("data.csv")
detector.fit(df.lazy()) # pass LazyFrame
result = detector.predict(df.lazy())
print(f"Anomalies: {result.anomaly_count}")
for score in result.get_anomalies(): # use get_anomalies() method
print(f" Row {score.index}: {score.anomaly_type.value}")
ML Drift Detection¶
from truthound import ml
from truthound.ml.drift_detection.distribution import DistributionDriftConfig
from truthound.ml.drift_detection.feature import FeatureDriftConfig
# Distribution drift detector (using Config object)
detector = ml.DistributionDriftDetector(
config=DistributionDriftConfig(
method="psi", # "ks", "psi", "jensen_shannon", "wasserstein"
threshold=0.05,
n_bins=10,
)
)
# Feature drift detector (multi-column)
detector = ml.FeatureDriftDetector(
config=FeatureDriftConfig(
threshold=0.05,
relative_threshold=True,
alert_on_new_values=True,
)
)
# Fit baseline (use LazyFrame)
detector.fit(baseline_df.lazy())
# Detect drift with predict()
result = detector.predict(current_df.lazy())
if result.is_drifted:
drifted_cols = result.get_drifted_columns(threshold=0.05)
print(f"Drift detected: {drifted_cols}")
Rule Learning¶
from truthound import ml
from truthound.ml.rule_learning.profile_learner import ProfileLearnerConfig
from truthound.ml.rule_learning.pattern_learner import PatternLearnerConfig
from truthound.ml.rule_learning.constraint_miner import ConstraintMinerConfig
# Learn rules from data (using Config object)
learner = ml.DataProfileRuleLearner(
config=ProfileLearnerConfig(
strictness="medium", # "loose", "medium", "strict"
min_support=0.1,
min_confidence=0.8,
include_range_rules=True,
include_null_rules=True,
)
)
learner.fit(df.lazy())
result = learner.predict(df.lazy())
for rule in result.rules:
print(f"{rule.column}: {rule.rule_type} - {rule.condition}")
# Pattern-based rule learning
learner = ml.PatternRuleLearner(
config=PatternLearnerConfig(
min_pattern_ratio=0.9,
learn_custom_patterns=True,
)
)
learner.fit(df.lazy())
result = learner.predict(df.lazy())
# Constraint mining
miner = ml.ConstraintMiner(
config=ConstraintMinerConfig(
discover_functional_deps=True,
discover_value_constraints=True,
)
)
miner.fit(df.lazy())
result = miner.predict(df.lazy())
Model Registry¶
from truthound import ml
# Global registry
registry = ml.model_registry
# Register custom model
@ml.register_model("my_detector")
class MyDetector(ml.AnomalyDetector):
def fit(self, data, columns):
...
def predict(self, data):
...
# Use registered model
detector = registry.create("my_detector")
ML Module Classes¶
| Category | Classes |
|---|---|
| Anomaly Detectors | IsolationForestDetector, StatisticalAnomalyDetector, ZScoreAnomalyDetector, IQRAnomalyDetector, MADAnomalyDetector, EnsembleAnomalyDetector |
| Drift Detectors | DistributionDriftDetector, FeatureDriftDetector, ConceptDriftDetector, MultivariateDriftDetector |
| Rule Learners | DataProfileRuleLearner, ConstraintMiner, PatternRuleLearner |
| Base Classes | MLModel, AnomalyDetector, MLDriftDetector, RuleLearner |
| Results | AnomalyScore, AnomalyResult, DriftResult, LearnedRule, RuleLearningResult, ModelInfo |
| Configurations | AnomalyConfig, StatisticalConfig, IsolationForestConfig, EnsembleConfig, DriftConfig, DistributionDriftConfig, FeatureDriftConfig, ConceptDriftConfig, MultivariateDriftConfig, RuleLearningConfig, ProfileLearnerConfig, PatternLearnerConfig, ConstraintMinerConfig |
| Enums | ModelType, ModelState, AnomalyType, SeverityLevel |
| Registry | ModelRegistry, model_registry, register_model |
| Exceptions | MLError, ModelNotTrainedError, ModelTrainingError, ModelLoadError, InsufficientDataError |
Lineage Module¶
Track data flow and analyze impact of changes.
Basic Lineage Tracking¶
from truthound import lineage
# Create tracker (with optional config)
tracker = lineage.LineageTracker()
# Track data sources
tracker.track_source(
name="raw_customers",
source_type="file",
location="/data/customers.csv",
schema={"id": "Int64", "name": "Utf8", "email": "Utf8"},
description="Raw customer data",
owner="data_team",
tags=["raw", "pii"],
)
tracker.track_source(
name="raw_orders",
source_type="table",
location="postgresql://localhost/db/orders",
)
# Track transformations
tracker.track_transformation(
name="cleaned_customers",
sources=["raw_customers"],
operation="clean",
location="memory://cleaned_customers",
description="Removed nulls and normalized emails",
)
tracker.track_transformation(
name="customer_orders",
sources=["cleaned_customers", "raw_orders"],
operation="join",
)
# Track validation
tracker.track_validation(
name="validated_data",
sources=["customer_orders"],
validators=["null", "range", "format"],
)
Impact Analysis¶
from truthound import lineage
# Use .graph property (not get_graph() method)
analyzer = lineage.ImpactAnalyzer(tracker.graph)
# Forward impact: what depends on this node?
impact = analyzer.analyze_impact("raw_customers")
print(f"Affected nodes: {[n.node.id for n in impact.affected_nodes]}")
print(f"Total affected: {impact.total_affected}")
print(f"Max depth: {impact.max_depth}")
# Get nodes by impact level
critical_nodes = impact.get_by_level(lineage.ImpactLevel.CRITICAL)
high_nodes = impact.get_by_level(lineage.ImpactLevel.HIGH)
# Get summary
print(impact.summary())
# Backward lineage: use get_upstream() on graph
upstream = tracker.graph.get_upstream("validated_data")
print(f"Source nodes: {[n.id for n in upstream]}")
# Find path between nodes
path = tracker.get_path("raw_customers", "validated_data")
if path:
print(f"Path: {[n.id for n in path]}")
Visualization¶
from truthound.lineage.visualization import (
D3Renderer,
CytoscapeRenderer,
GraphvizRenderer,
MermaidRenderer,
RenderConfig,
get_renderer, # factory function
)
# Use .graph property
graph = tracker.graph
# D3.js visualization (interactive HTML)
renderer = D3Renderer(theme="light") # or "dark"
html = renderer.render_html(graph, RenderConfig(
layout="hierarchical", # force, hierarchical, circular, grid
width=1200,
height=800,
orientation="TB", # TB, BT, LR, RL
include_metadata=True,
))
with open("lineage.html", "w") as f:
f.write(html)
# Mermaid diagram (for documentation)
renderer = MermaidRenderer()
mermaid_code = renderer.render(graph, RenderConfig())
print(mermaid_code)
# Graphviz (for static images)
renderer = GraphvizRenderer()
dot = renderer.render(graph, RenderConfig())
# Cytoscape.js (interactive graph)
renderer = CytoscapeRenderer(theme="dark")
html = renderer.render_html(graph, RenderConfig(layout="cose"))
# Factory function for creating renderers
renderer = get_renderer("d3", theme="light")
renderer = get_renderer("mermaid")
OpenLineage Integration¶
from truthound.lineage.integrations.openlineage import (
OpenLineageEmitter,
OpenLineageConfig,
)
# Create emitter with config
emitter = OpenLineageEmitter(
config=OpenLineageConfig(
endpoint="http://localhost:5000/api/v1/lineage",
namespace="my-pipeline",
producer="truthound",
)
)
# Start a run
run = emitter.start_run(
job_name="data-validation",
inputs=[
emitter.build_input_dataset(
name="raw_data",
namespace="file",
schema=[{"name": "id", "type": "int"}, {"name": "value", "type": "string"}],
)
],
)
# Emit running status (optional)
emitter.emit_running(run)
# Emit completion with outputs
emitter.emit_complete(
run,
outputs=[
emitter.build_output_dataset(
name="/data/validated.parquet",
namespace="file",
row_count=10000,
)
],
)
# Or emit from existing lineage graph
events = emitter.emit_from_graph(tracker.graph, job_name="etl-pipeline")
Lineage Module Classes¶
| Category | Classes |
|---|---|
| Core | LineageTracker, TrackingContext, LineageGraph, LineageNode, LineageEdge |
| Analysis | ImpactAnalyzer, ImpactResult, AffectedNode, ImpactLevel |
| Visualization | D3Renderer, CytoscapeRenderer, GraphvizRenderer, MermaidRenderer, RenderConfig |
| Enums | NodeType, EdgeType, OperationType |
| Data Classes | LineageMetadata, LineageConfig |
| Exceptions | LineageError, NodeNotFoundError, CyclicDependencyError |
| Integration | OpenLineageEmitter, OpenLineageConfig, RunEvent, EventType, DatasetFacets |
Real-time Module¶
Streaming and incremental validation.
Streaming Validator¶
from truthound import realtime
from truthound.realtime import StreamingConfig, StreamingMode
# Create streaming validator with config
validator = realtime.StreamingValidator(
validators=["null", "range", "format"],
config=StreamingConfig(
mode=StreamingMode.MICRO_BATCH,
batch_size=1000,
batch_timeout_ms=1000,
error_handling="skip", # "skip", "fail", "retry"
),
)
# Process batches (pass DataFrame, not LazyFrame)
for batch in data_batches:
result = validator.validate_batch(batch, batch_id="batch_001")
if result.has_issues:
for issue in result.issues:
print(f"Issue: {issue.column} - {issue.issue_type}")
print(f"Processed: {result.record_count}, Issues: {result.issue_count}")
Incremental Validator¶
from truthound import realtime
from truthound.realtime import StreamingConfig, WindowConfig, WindowType
# Create incremental validator with state
validator = realtime.IncrementalValidator(
validators=["unique", "aggregate"],
config=StreamingConfig(checkpoint_interval_ms=5000),
window_config=WindowConfig(
window_type=WindowType.TUMBLING,
window_size=60, # seconds
),
state_store=realtime.MemoryStateStore(),
)
# Process increments (use validate_batch method)
for chunk in data_chunks:
result = validator.validate_batch(chunk)
print(f"Total rows processed: {validator.total_records}")
print(f"Total issues: {validator.total_issues}")
# Get aggregate statistics
stats = validator.get_aggregate_stats()
print(f"Batch count: {stats['batch_count']}")
print(f"Issue rate: {stats['issue_rate']:.2%}")
Kafka Integration¶
from truthound.realtime.adapters.kafka import KafkaAdapter, KafkaAdapterConfig
from truthound.realtime.factory import StreamAdapterFactory
from truthound.realtime import OffsetReset, DeserializationFormat
# Create via factory (pass dict config)
adapter = StreamAdapterFactory.create("kafka", {
"bootstrap_servers": "localhost:9092",
"topic": "events",
"consumer_group": "truthound-validators",
})
# Or direct instantiation with typed config
adapter = KafkaAdapter(
config=KafkaAdapterConfig(
bootstrap_servers="localhost:9092",
topic="events",
consumer_group="truthound-validators",
auto_offset_reset=OffsetReset.EARLIEST,
value_deserializer=DeserializationFormat.JSON,
)
)
async with adapter:
async for message in adapter.consume():
# Validate message
df = message.to_polars()
report = th.check(df, validators=["null", "range"])
if not report.has_issues:
await adapter.commit(message)
Kinesis Integration¶
from truthound.realtime.adapters.kinesis import KinesisAdapter, KinesisAdapterConfig
from truthound.realtime import DeserializationFormat
adapter = KinesisAdapter(
config=KinesisAdapterConfig(
stream_name="my-stream",
region_name="us-east-1",
shard_iterator_type="LATEST", # or "TRIM_HORIZON", "AT_SEQUENCE_NUMBER"
value_deserializer=DeserializationFormat.JSON,
# AWS credentials (optional, uses default credential chain)
# aws_access_key_id="...",
# aws_secret_access_key="...",
)
)
async with adapter:
async for message in adapter.consume():
process_message(message)
Window-Based Validation¶
from truthound import realtime
from truthound.realtime import WindowConfig, WindowType, WindowResult
from datetime import datetime, timedelta
# Configure window
window_config = WindowConfig(
window_type=WindowType.TUMBLING, # TUMBLING, SLIDING, SESSION, GLOBAL
window_size=60, # seconds
# For SLIDING windows:
# slide_interval=10, # slide every 10 seconds
# For SESSION windows:
# allowed_lateness=30, # allow 30s late data
)
# IncrementalValidator supports windowing
validator = realtime.IncrementalValidator(
validators=["null", "range"],
window_config=window_config,
state_store=realtime.MemoryStateStore(),
)
# Process batches and get window results
for batch in data_batches:
result = validator.validate_batch(batch)
# Get current window result
window_result = validator.get_current_window()
if window_result:
print(f"Window {window_result.window_id}:")
print(f" Total records: {window_result.total_records}")
print(f" Total issues: {window_result.total_issues}")
print(f" Batch count: {window_result.batch_count}")
Realtime Module Classes¶
| Category | Classes |
|---|---|
| Validators | StreamingValidator, IncrementalValidator |
| Sources (Legacy) | StreamingSource, KafkaSource, KinesisSource, PubSubSource, MockStreamingSource |
| State | StateStore, MemoryStateStore, CheckpointManager |
| Protocols | IStreamSource, IStreamSink, IStreamProcessor, IStateStoreProtocol, IMetricsCollector |
| Data Classes | StreamMessage, MessageBatch, MessageHeader, StreamMetrics |
| Results | BatchResult, WindowResult |
| Configs | StreamingConfig, WindowConfig, KafkaAdapterConfig, KinesisAdapterConfig |
| Enums | StreamingMode, WindowType, TriggerType, DeserializationFormat, OffsetReset, AckMode |
| Factory | StreamAdapterFactory |
| Exceptions | StreamingError, ConnectionError, TimeoutError |
Profiler Module¶
Advanced data profiling and rule generation.
Data Profiler¶
from truthound import profiler
# Profile a file
profile = profiler.profile_file("data.parquet")
print(f"Rows: {profile.row_count}")
print(f"Columns: {profile.column_count}")
print(f"Memory: {profile.estimated_memory_bytes / 1024 / 1024:.1f} MB")
# Profile columns
for col in profile:
print(f"\n{col.name} ({col.inferred_type}):")
print(f" Null ratio: {col.null_ratio:.2%}")
print(f" Unique ratio: {col.unique_ratio:.2%}")
if col.numeric_stats:
print(f" Mean: {col.numeric_stats.mean:.2f}")
print(f" Std: {col.numeric_stats.std:.2f}")
Profile DataFrame¶
from truthound import profiler
import polars as pl
df = pl.read_csv("data.csv")
profile = profiler.profile_dataframe(df, name="my_data")
# Save profile
profiler.save_profile(profile, "profile.json")
# Load profile
loaded = profiler.load_profile("profile.json")
Validation Suite Generation¶
from truthound import profiler
# Generate validation suite from profile
suite = profiler.generate_suite(
profile,
strictness="medium", # "low", "medium", "high"
include_categories=["schema", "completeness", "format"],
)
# Export as YAML
yaml_rules = suite.to_yaml()
with open("validation_rules.yaml", "w") as f:
f.write(yaml_rules)
# Export as Python code
python_code = suite.to_python_code()
print(python_code)
# Export as JSON
json_rules = suite.to_json()
Custom Profiler¶
from truthound import profiler
# Create profiler with custom config
prof = profiler.DataProfiler(
config=profiler.ProfilerConfig(
sample_size=10000,
pattern_detection=True,
correlation_analysis=True,
histogram_bins=50,
)
)
# Profile with custom analyzers
profile = prof.profile(
df,
analyzers=[
profiler.BasicStatsAnalyzer(),
profiler.PatternAnalyzer(),
profiler.CorrelationAnalyzer(),
],
)
Data Docs Module¶
Generate HTML reports and documentation.
Basic HTML Report¶
from truthound import datadocs
# Generate HTML report from validation result
html = datadocs.generate_html_report(report)
with open("report.html", "w") as f:
f.write(html)
# From file
datadocs.generate_report_from_file(
"validation_result.json",
output_path="report.html",
)
Report Builder¶
from truthound import datadocs
builder = datadocs.HTMLReportBuilder(
config=datadocs.ReportConfig(
title="Data Quality Report",
include_charts=True,
include_samples=True,
max_issues=100,
),
theme=datadocs.ReportTheme.DARK,
)
html = builder.build(report)
Custom Themes¶
from truthound import datadocs
from truthound.datadocs.themes import ThemeConfig
# Use built-in theme
builder = datadocs.HTMLReportBuilder(
theme=datadocs.ReportTheme.PROFESSIONAL,
)
# Custom theme
custom_theme = ThemeConfig(
primary_color="#1a73e8",
secondary_color="#34a853",
background_color="#ffffff",
font_family="Inter, sans-serif",
logo_url="https://example.com/logo.png",
)
builder = datadocs.HTMLReportBuilder(theme=custom_theme)
Checkpoint Module¶
CI/CD integration and validation pipelines.
Basic Checkpoint¶
from truthound import checkpoint
# Create checkpoint runner
runner = checkpoint.CheckpointRunner(
config=checkpoint.CheckpointConfig(
fail_on_critical=True,
fail_on_high=False,
notify_on_failure=True,
)
)
# Run validation
result = runner.run(
data="data.csv",
validators=["null", "duplicate", "range"],
)
# Check result
if result.passed:
print("Validation passed!")
else:
print(f"Failed: {len(result.issues)} issues")
runner.notify(result) # Send notifications
Notification Providers¶
from truthound.checkpoint.actions import (
SlackNotifier,
EmailNotifier,
PagerDutyNotifier,
WebhookNotifier,
)
# Slack
notifier = SlackNotifier(
webhook_url="https://hooks.slack.com/...",
channel="#data-quality",
)
# Email
notifier = EmailNotifier(
smtp_host="smtp.gmail.com",
smtp_port=587,
username="alerts@example.com",
recipients=["team@example.com"],
)
# PagerDuty
notifier = PagerDutyNotifier(
routing_key="your-routing-key",
severity_mapping={"critical": "critical", "high": "error"},
)
Type Definitions¶
Import Metrics¶
from truthound import get_truthound_import_metrics
# Get lazy loading metrics
metrics = get_truthound_import_metrics()
print(f"Total lazy loads: {metrics['total_lazy_loads']}")
print(f"Total load time: {metrics['total_load_time_ms']:.2f}ms")
print(f"Slowest loads: {metrics['slowest_loads'][:5]}")
Module Structure¶
| Module | Description |
|---|---|
truthound.types |
ResultFormat, ResultFormatConfig, ValidationDetail (VE-½) |
truthound.validators.metrics |
MetricKey, SharedMetricStore, CommonMetrics (VE-3) |
truthound.validators.base |
SkipCondition, ExceptionInfo, _validate_safe (VE-⅘) |
truthound.validators.resilience_bridge |
ValidationResiliencePolicy (VE-5) |
truthound.ml |
ML anomaly/drift detection, rule learning |
truthound.lineage |
Data lineage tracking and visualization |
truthound.realtime |
Streaming and incremental validation |
truthound.profiler |
Data profiling and rule generation |
truthound.datadocs |
HTML report generation |
truthound.checkpoint |
CI/CD integration |
See Also¶
- Core Functions - Basic API functions
- Validators - Validator interface
- Data Sources - Database connections
- Reporters - Output formatters