Drift Detection¶
This document describes the schema change and data drift detection system.
Overview¶
The drift detection system implemented in src/truthound/profiler/evolution/detector.py tracks schema and data changes over time.
SchemaChangeType¶
class SchemaChangeType(str, Enum):
"""Schema change types"""
COLUMN_ADDED = "column_added" # New column added
COLUMN_REMOVED = "column_removed" # Column removed
COLUMN_RENAMED = "column_renamed" # Column renamed
TYPE_CHANGED = "type_changed" # Data type changed
SchemaChange¶
@dataclass
class SchemaChange:
"""Schema change information"""
change_type: SchemaChangeType
column_name: str
old_value: Any = None # Previous value (type, name, etc.)
new_value: Any = None # New value
severity: str = "medium"
description: str = ""
SchemaChangeDetector Protocol¶
from typing import Protocol
class SchemaChangeDetector(Protocol):
"""Schema change detector protocol"""
def detect_changes(
self,
old_profile: TableProfile,
new_profile: TableProfile,
) -> list[SchemaChange]:
"""Detect schema changes between two profiles"""
...
Type Compatibility Mapping¶
Defines safe type upgrades.
# Compatible type conversions (safe upgrade)
TYPE_COMPATIBILITY = {
"Int8": ["Int16", "Int32", "Int64", "Float32", "Float64"],
"Int16": ["Int32", "Int64", "Float32", "Float64"],
"Int32": ["Int64", "Float64"],
"Int64": ["Float64"],
"Float32": ["Float64"],
"Utf8": ["LargeUtf8"],
}
def is_compatible_change(old_type: str, new_type: str) -> bool:
"""Check if type change is compatible"""
return new_type in TYPE_COMPATIBILITY.get(old_type, [])
Basic Usage¶
from truthound.profiler.evolution import SchemaEvolutionDetector
detector = SchemaEvolutionDetector()
# Detect schema changes
changes = detector.detect_changes(old_profile, new_profile)
for change in changes:
print(f"Type: {change.change_type}")
print(f"Column: {change.column_name}")
print(f"Severity: {change.severity}")
if change.change_type == SchemaChangeType.TYPE_CHANGED:
print(f" {change.old_value} -> {change.new_value}")
Column Rename Detection¶
Infers renames by analyzing columns with similar statistics.
from truthound.profiler.evolution import ColumnRenameDetector
detector = ColumnRenameDetector(
similarity_threshold=0.9, # 90% or higher similarity
)
renames = detector.detect_renames(old_profile, new_profile)
for rename in renames:
print(f"Rename detected: {rename.old_name} -> {rename.new_name}")
print(f"Confidence: {rename.confidence:.2%}")
Compatibility Analysis¶
from truthound.profiler.evolution import CompatibilityAnalyzer
analyzer = CompatibilityAnalyzer()
report = analyzer.analyze(old_profile, new_profile)
print(f"Compatible: {report.is_compatible}")
print(f"Breaking changes: {len(report.breaking_changes)}")
print(f"Warnings: {len(report.warnings)}")
for breaking in report.breaking_changes:
print(f" BREAKING: {breaking.description}")
Drift Severity Levels¶
| Severity | Description | Example |
|---|---|---|
info |
Informational change | New column added |
low |
Minor change | Compatible type expansion |
medium |
Attention required | Column renamed |
high |
Investigation needed | Incompatible type change |
critical |
Immediate action required | Required column removed |
Breaking Change Alerts¶
from truthound.profiler.evolution import BreakingChangeAlert
alerts = detector.get_breaking_alerts(changes)
for alert in alerts:
print(f"ALERT: {alert.message}")
print(f"Impact: {alert.impact}")
print(f"Recommendation: {alert.recommendation}")
History Tracking¶
from truthound.profiler.evolution import SchemaHistory
history = SchemaHistory(storage_dir=".truthound/schema_history")
# Save profile
history.save(profile, version="v1.0")
history.save(new_profile, version="v1.1")
# Retrieve history
versions = history.list_versions()
# Compare versions
changes = history.compare("v1.0", "v1.1")
# Load specific version
old_profile = history.load("v1.0")
Automatic Alerting¶
from truthound.profiler.evolution import SchemaWatcher
watcher = SchemaWatcher(
alert_callback=lambda alert: send_slack_notification(alert),
check_interval_minutes=60,
)
# Start monitoring
watcher.watch("data.csv", baseline_profile)
# Automatic alert sent when changes are detected
CLI Usage¶
# Compare two profiles
th compare profile_v1.json profile_v2.json
# Detect schema changes
th schema-diff old_profile.json new_profile.json
# Compatibility analysis
th check-compatibility old_profile.json new_profile.json
# Check for breaking changes
th check-breaking old_profile.json new_profile.json
Integration Example¶
from truthound.profiler import TableProfiler
from truthound.profiler.evolution import SchemaEvolutionDetector
from truthound.profiler.caching import ProfileCache
# Set up profiler and cache
profiler = TableProfiler()
cache = ProfileCache()
detector = SchemaEvolutionDetector()
# Baseline profile (load from cache or create)
baseline_key = cache.compute_fingerprint("data_baseline.csv")
baseline = cache.get_or_compute(
baseline_key,
lambda: profiler.profile_file("data_baseline.csv"),
)
# Current profile
current = profiler.profile_file("data_current.csv")
# Detect changes
changes = detector.detect_changes(baseline, current)
if changes:
print(f"Found {len(changes)} schema changes:")
for change in changes:
print(f" - {change.change_type}: {change.column_name}")
# Check for breaking changes
breaking = [c for c in changes if c.severity == "critical"]
if breaking:
raise ValueError(f"Breaking changes detected: {breaking}")
Next Steps¶
- Quality Scoring - Impact of drift on quality
- Visualization - Generate drift reports