Realtime Commands
Streaming validation commands for real-time data quality monitoring.
Overview
Command
Description
Primary Use Case
validate
Validate streaming data
Real-time quality checks
monitor
Monitor validation metrics
Continuous monitoring
checkpoint
Manage validation checkpoints
State management
Checkpoint Subcommands
What is Realtime Validation?
Realtime validation enables continuous data quality monitoring on streaming sources:
Streaming validation - Check data as it flows through pipelines
Metric monitoring - Track quality metrics over time
Checkpoint recovery - Resume validation from saved state
Multiple sources - Kafka, Kinesis, and mock sources
Supported Streaming Sources
Source
Format
Description
Dependency
Mock
mock
Test mock data source
Built-in
Kafka
kafka:topic_name
Apache Kafka topic
aiokafka
Kinesis
kinesis:stream_name
AWS Kinesis stream
aiobotocore
Installation
# For Kafka support
pip install truthound[ kafka]
# For Kinesis support
pip install truthound[ kinesis]
# For all streaming sources
pip install truthound[ streaming]
Workflow
graph LR
A[Streaming Source] --> B[realtime validate]
B --> C[Validation Results]
B --> D[Checkpoints]
A --> E[realtime monitor]
E --> F[Metrics Dashboard]
D --> G[realtime checkpoint]
Quick Examples
Validate Streaming Data
# Mock source for testing
truthound realtime validate mock
# Kafka topic
truthound realtime validate kafka:my_topic --batch-size 500
# Kinesis stream
truthound realtime validate kinesis:my_stream --max-batches 100
Monitor Metrics
# Basic monitoring
truthound realtime monitor mock
# Custom interval and duration
truthound realtime monitor kafka:my_topic --interval 10 --duration 300
Manage Checkpoints
# List checkpoints
truthound realtime checkpoint list
# Show checkpoint details
truthound realtime checkpoint show abc12345
# Delete checkpoint
truthound realtime checkpoint delete abc12345 --force
Architecture
Validation Flow
Connect to streaming source (Kafka/Kinesis/Mock)
Batch incoming records (configurable size)
Validate each batch with specified validators
Save checkpoint after each batch
Report results and metrics
Checkpoint System
Checkpoints enable:
- Recovery - Resume from last successful batch
- Progress tracking - Know exactly where validation stopped
- State persistence - Survive process restarts
# Checkpoints are saved automatically
./checkpoints/
├── abc12345.json
├── def67890.json
└── ...
Use Cases
1. Kafka Pipeline Monitoring
# Validate Kafka messages
truthound realtime validate kafka:orders --validators null,range --batch-size 1000
# Monitor continuously
truthound realtime monitor kafka:orders --interval 5 --duration 0
2. Kinesis Stream Validation
# Validate Kinesis records
truthound realtime validate kinesis:events --max-batches 50
# Check validation status
truthound realtime checkpoint list --format json
3. Development Testing
# Use mock source for testing
truthound realtime validate mock --validators null,unique --batch-size 100
# Monitor mock metrics
truthound realtime monitor mock --interval 2 --duration 30
4. CI/CD Integration
# GitHub Actions
- name : Validate Streaming Pipeline
run : |
truthound realtime validate kafka:test_topic \
--validators null,range \
--max-batches 10 \
-o results.json
# Check results
python -c "
import json
with open('results.json') as f:
data = json.load(f)
if data['failed_batches'] > 0:
exit(1)
"
Parameter
Impact
Recommendation
--batch-size
Memory usage, latency
500-2000 for most cases
--max-batches
Total processing time
Set based on test scope
--interval
Monitoring overhead
5-30 seconds
Command Reference
See Also