Schema Evolution¶
This guide covers the comprehensive schema evolution system for detecting, tracking, and managing schema changes over time.
Overview¶
The schema evolution module provides:
- Change Detection: Identify schema differences (columns added/removed, type changes, renames)
- History Management: Version schemas with semantic versioning, diff, and rollback
- Continuous Monitoring: Watch schemas for changes with real-time alerts
- Impact Analysis: Assess the impact of breaking changes on downstream systems
- Advanced Rename Detection: Multiple similarity algorithms for detecting column renames
Installation¶
The schema evolution module is included in the core Truthound package:
Quick Start¶
Detect Schema Changes¶
from truthound.profiler.evolution import SchemaEvolutionDetector
# Create detector
detector = SchemaEvolutionDetector()
# Define schemas
old_schema = {"id": "Int64", "name": "Utf8", "email": "Utf8"}
new_schema = {"id": "Int64", "name": "Utf8", "user_email": "Utf8", "age": "Int32"}
# Detect changes
changes = detector.detect_changes(new_schema, old_schema)
for change in changes:
print(f"[{change.severity.value}] {change.description}")
if change.breaking:
print(f" ⚠️ BREAKING CHANGE")
if change.migration_hint:
print(f" 💡 Hint: {change.migration_hint}")
Output:
[critical] Column 'email' removed (was type Utf8)
⚠️ BREAKING CHANGE
💡 Hint: Column removed - update consumers to not depend on this column
[info] Column 'user_email' added with type Utf8
💡 Hint: New column - ensure consumers handle missing data for older records
[info] Column 'age' added with type Int32
💡 Hint: New column - ensure consumers handle missing data for older records
Manage Schema History¶
from truthound.profiler.evolution import SchemaHistory
# Create history with semantic versioning
history = SchemaHistory.create(
storage_type="file",
path="./schema_history",
version_strategy="semantic",
)
# Save initial schema
v1 = history.save(
{"id": "Int64", "name": "Utf8"},
metadata={"author": "data-team"},
)
print(f"Version: {v1.version}") # 1.0.0
# Save updated schema (non-breaking addition)
v2 = history.save({"id": "Int64", "name": "Utf8", "email": "Utf8"})
print(f"Version: {v2.version}") # 1.1.0 (minor bump)
# Get diff between versions
diff = history.diff(v1, v2)
print(diff.format_text())
# Rollback if needed
v3 = history.rollback(v1, reason="Incompatible with legacy systems")
Watch for Changes¶
from truthound.profiler.evolution import SchemaWatcher, FileSchemaSource
# Create watcher
watcher = SchemaWatcher()
# Add schema files to watch
watcher.add_source(FileSchemaSource("./schemas/users.json"))
watcher.add_source(FileSchemaSource("./schemas/orders.json"))
# Add event handler
def on_change(event):
if event.has_breaking_changes():
print(f"🚨 Breaking changes in {event.source}!")
for change in event.changes:
if change.breaking:
print(f" - {change.description}")
from truthound.profiler.evolution import CallbackEventHandler
watcher.add_handler(CallbackEventHandler(on_change))
# Start watching (polls every 60 seconds)
watcher.start(poll_interval=60)
# ... later ...
watcher.stop()
CLI Commands¶
Schema Check¶
Compare schemas and detect changes:
# Compare current schema to baseline
th schema-check current.json --baseline baseline.json
# Output as JSON
th schema-check current.json -b baseline.json --format json
# Output as Markdown (for reports)
th schema-check current.json -b baseline.json --format markdown
# Fail CI/CD on breaking changes
th schema-check current.json -b baseline.json --fail-on-breaking
# Disable rename detection
th schema-check current.json -b baseline.json --no-detect-renames
# Adjust similarity threshold for rename detection
th schema-check current.json -b baseline.json --similarity 0.7
Schema History¶
Manage schema version history:
# Initialize history storage
th schema-history init ./my_history
th schema-history init ./my_history --strategy timestamp
# Save a schema version
th schema-history save schema.json
th schema-history save schema.json --version 2.0.0
th schema-history save schema.json -m "Added email column"
# List versions
th schema-history list
th schema-history list --limit 20
th schema-history list --format json
# Show version details
th schema-history show 1.0.0
th schema-history show abc12345 --format json
# Rollback to a version
th schema-history rollback 1.0.0
th schema-history rollback 1.0.0 --reason "Incompatible change" --yes
Schema Diff¶
Show diff between schema versions:
# Diff between two versions
th schema-diff 1.0.0 2.0.0
# Diff to latest
th schema-diff 1.0.0
# Output formats
th schema-diff 1.0.0 2.0.0 --format json
th schema-diff 1.0.0 2.0.0 --format markdown
Schema Watch¶
Monitor schemas for changes:
# Watch a single file
th schema-watch schema.json
# Watch multiple files
th schema-watch schema1.json schema2.json
# Custom poll interval
th schema-watch schema.json --interval 30
# Track history
th schema-watch schema.json --history ./schema_history
# Write alerts to file
th schema-watch schema.json --alert-file alerts.jsonl
# Only alert on breaking changes
th schema-watch schema.json --only-breaking
# Single check (for CI/CD)
th schema-watch schema.json --once --only-breaking
Python API¶
SchemaEvolutionDetector¶
The core class for detecting schema changes:
from truthound.profiler.evolution import SchemaEvolutionDetector
detector = SchemaEvolutionDetector(
detect_renames=True, # Enable rename detection
rename_similarity_threshold=0.8, # Threshold for considering a rename
)
# Can accept various schema formats
# - dict: {"col": "Type"}
# - polars.Schema
# - polars.DataFrame
# - polars.LazyFrame
changes = detector.detect_changes(current_schema, baseline_schema)
summary = detector.get_change_summary(changes)
print(f"Total changes: {summary.total_changes}")
print(f"Breaking changes: {summary.breaking_changes}")
print(f"Compatibility: {summary.compatibility_level.value}")
Change Types¶
The system detects these change types:
| Change Type | Breaking? | Severity |
|---|---|---|
COLUMN_ADDED |
No | INFO |
COLUMN_REMOVED |
Yes | CRITICAL |
COLUMN_RENAMED |
Yes | CRITICAL |
TYPE_CHANGED |
Depends* | WARNING/CRITICAL |
NULLABLE_CHANGED |
Depends* | INFO/CRITICAL |
*Type changes are non-breaking for compatible upgrades (e.g., Int32 → Int64). Nullable→NonNullable is breaking.
SchemaHistory¶
Manage schema versions with history:
from truthound.profiler.evolution import SchemaHistory
# Create with different strategies
history = SchemaHistory.create(
storage_type="file", # "file" or "memory"
path="./schema_history", # Required for "file"
max_versions=100, # Max versions to keep
version_strategy="semantic", # "semantic", "incremental", "timestamp", "git"
compress=True, # Compress stored files
)
# Save versions
v1 = history.save(schema, version="1.0.0", metadata={"author": "team"})
# Get versions
latest = history.latest
baseline = history.baseline
specific = history.get("version-id")
by_string = history.get_by_version("1.0.0")
# List and filter
versions = history.list(limit=10)
recent = history.list(since=datetime.now() - timedelta(days=7))
# Diff
diff = history.diff("1.0.0", "2.0.0")
print(diff.format_text())
# Check breaking changes
if history.has_breaking_changes_since("1.0.0"):
print("Breaking changes detected!")
# Rollback
new_version = history.rollback("1.0.0", reason="Revert breaking change")
Version Strategies¶
| Strategy | Example | Use Case |
|---|---|---|
semantic |
1.2.3 | Standard versioning, auto-bumps based on change type |
incremental |
1, 2, 3 | Simple incrementing numbers |
timestamp |
20260128.143052 | Time-based versions |
git |
a1b2c3d4 | Git-like short hashes |
ColumnRenameDetector¶
Advanced rename detection with multiple algorithms:
from truthound.profiler.evolution import (
ColumnRenameDetector,
CompositeSimilarity,
LevenshteinSimilarity,
JaroWinklerSimilarity,
TokenSimilarity,
)
# Use composite similarity (default)
detector = ColumnRenameDetector(
similarity_threshold=0.8,
require_type_match=True,
allow_compatible_types=True,
)
result = detector.detect(
added_columns={"user_email": "Utf8", "customer_name": "Utf8"},
removed_columns={"email": "Utf8", "cust_name": "Utf8"},
)
# Confirmed renames (high confidence)
for rename in result.confirmed_renames:
print(f"✓ {rename.old_name} → {rename.new_name}")
print(f" Similarity: {rename.similarity:.0%}")
print(f" Confidence: {rename.confidence.value}")
# Possible renames (need review)
for rename in result.possible_renames:
print(f"? {rename.old_name} → {rename.new_name}")
print(f" Reasons: {', '.join(rename.reasons)}")
# Unmatched columns
print(f"Unmatched added: {result.unmatched_added}")
print(f"Unmatched removed: {result.unmatched_removed}")
Similarity algorithms:
| Algorithm | Best For |
|---|---|
LevenshteinSimilarity |
General edit distance |
JaroWinklerSimilarity |
Short strings, common prefixes |
NgramSimilarity |
Partial matches, abbreviations |
TokenSimilarity |
snake_case/camelCase names |
CompositeSimilarity |
Weighted combination (default) |
SchemaWatcher¶
Continuous monitoring:
from truthound.profiler.evolution import (
SchemaWatcher,
FileSchemaSource,
PolarsSchemaSource,
DictSchemaSource,
LoggingEventHandler,
AlertingEventHandler,
HistoryEventHandler,
)
# Create watcher
watcher = SchemaWatcher()
# Add sources
watcher.add_source(FileSchemaSource("schema.json"))
watcher.add_source(PolarsSchemaSource(lambda: pl.read_csv("data.csv"), "data"))
watcher.add_source(DictSchemaSource({"id": "Int64"}, "manual"))
# Add handlers
watcher.add_handler(LoggingEventHandler())
watcher.add_handler(HistoryEventHandler(history))
watcher.add_handler(AlertingEventHandler(alert_manager))
# Lifecycle control
watcher.start(poll_interval=60, daemon=True)
watcher.pause()
watcher.resume()
watcher.stop()
# Check immediately
events = watcher.check_now()
Breaking Change Alerts¶
Enhanced alerting with impact analysis:
from truthound.profiler.evolution import (
BreakingChangeAlertManager,
ImpactAnalyzer,
ColumnRemovedChange,
)
# Setup impact analyzer with consumer mappings
analyzer = ImpactAnalyzer()
analyzer.register_consumer("dashboard", ["users", "orders"])
analyzer.register_consumer("reports", ["users", "products"])
analyzer.register_query("users", "SELECT email FROM users")
# Create alert manager
manager = BreakingChangeAlertManager(
impact_analyzer=analyzer,
alert_storage_path="./alerts.json",
)
# Create alert
changes = [ColumnRemovedChange("email", "Utf8")]
alert = manager.create_alert(changes, source="users")
print(f"Alert: {alert.title}")
print(f"Impact Scope: {alert.impact.scope.value}")
print(f"Affected Consumers: {alert.impact.affected_consumers}")
print(f"Risk Level: {alert.impact.data_risk_level}/5")
print(f"Recommendations:")
for rec in alert.impact.recommendations:
print(f" - {rec}")
# Track alerts
history = manager.get_alert_history(status="open")
manager.acknowledge_alert(alert.alert_id)
manager.resolve_alert(alert.alert_id)
# Get statistics
stats = manager.get_stats()
print(f"Total: {stats['total']}, Open: {stats['open']}")
Format alerts for notifications:
# Slack
slack_payload = alert.format_slack_message()
# Email
subject, html_body = alert.format_email()
Factory Functions¶
Create pre-configured components:
from truthound.profiler.evolution import (
create_watcher,
create_rename_detector,
)
# Create watcher with common settings
watcher = create_watcher(
sources=[FileSchemaSource("schema.json")],
poll_interval=60,
enable_logging=True,
enable_history=True,
history_path="./history",
on_change=lambda e: print(f"Change: {e.source}"),
auto_start=True,
)
# Create rename detector with specific algorithm
detector = create_rename_detector(
algorithm="jaro_winkler",
threshold=0.85,
require_type_match=True,
)
Integration Examples¶
CI/CD Pipeline¶
# GitHub Actions
- name: Check Schema Changes
run: |
th schema-check current.json -b baseline.json --fail-on-breaking
# Python script for CI
from truthound.profiler.evolution import SchemaEvolutionDetector
import sys
detector = SchemaEvolutionDetector()
changes = detector.detect_changes(current, baseline)
breaking = [c for c in changes if c.breaking]
if breaking:
print("❌ Breaking changes detected:")
for c in breaking:
print(f" - {c.description}")
sys.exit(1)
print("✅ No breaking changes")
Airflow DAG¶
from airflow import DAG
from airflow.operators.python import PythonOperator
def check_schema():
from truthound.profiler.evolution import SchemaHistory
history = SchemaHistory.create(storage_type="file", path="/data/history")
# Get current schema from database
current = get_database_schema()
# Save and check for breaking changes
version = history.save(current)
if version.has_breaking_changes():
raise ValueError(f"Breaking changes: {version.changes_from_parent}")
with DAG("schema_monitor", schedule_interval="@daily") as dag:
check_schema_task = PythonOperator(
task_id="check_schema",
python_callable=check_schema,
)
Slack Notifications¶
import requests
from truthound.profiler.evolution import (
SchemaWatcher,
BreakingChangeAlertManager,
AlertingEventHandler,
)
class SlackNotifier:
def __init__(self, webhook_url):
self.webhook_url = webhook_url
def send(self, alert):
payload = alert.format_slack_message()
requests.post(self.webhook_url, json=payload)
return True
# Setup
manager = BreakingChangeAlertManager(notifiers=[
SlackNotifier("https://hooks.slack.com/services/...")
])
watcher = SchemaWatcher()
watcher.add_handler(AlertingEventHandler(manager))
watcher.start()
Best Practices¶
1. Version Strategy Selection¶
- Semantic: Best for APIs and shared schemas
- Timestamp: Best for audit trails
- Git-like: Best for development environments
2. Breaking Change Policy¶
# Define what constitutes breaking
BREAKING_TYPES = {
ChangeType.COLUMN_REMOVED,
ChangeType.TYPE_CHANGED, # Only incompatible
ChangeType.NULLABLE_CHANGED, # Only nullable→non-nullable
}
# Allow grace period for breaking changes
MIN_DEPRECATION_DAYS = 30
3. Rename Detection Tuning¶
# Strict (production)
detector = ColumnRenameDetector(
similarity_threshold=0.9,
require_type_match=True,
)
# Lenient (development)
detector = ColumnRenameDetector(
similarity_threshold=0.7,
require_type_match=False,
)
4. Alert Configuration¶
# Critical alerts only
watcher.add_handler(AlertingEventHandler(
alert_manager,
only_breaking=True,
))
# All changes with logging
watcher.add_handler(LoggingEventHandler(
min_severity=ChangeSeverity.INFO,
))
Related Guides¶
- Drift Detection - Data distribution drift
- Quality Scoring - Impact on data quality
- Checkpoint Basics - CI/CD integration
- Notifications - Alert routing
API Reference¶
Full API documentation available at:
truthound.profiler.evolution.SchemaEvolutionDetectortruthound.profiler.evolution.SchemaHistorytruthound.profiler.evolution.SchemaWatchertruthound.profiler.evolution.ColumnRenameDetectortruthound.profiler.evolution.BreakingChangeAlertManagertruthound.profiler.evolution.ImpactAnalyzer