Realtime Commands
Streaming validation commands for real-time data quality monitoring.
Overview
| Command |
Description |
Primary Use Case |
validate |
Validate streaming data |
Real-time quality checks |
monitor |
Monitor validation metrics |
Continuous monitoring |
checkpoint |
Manage validation checkpoints |
State management |
Checkpoint Subcommands
What is Realtime Validation?
Realtime validation enables continuous data quality monitoring on streaming sources:
- Streaming validation - Check data as it flows through pipelines
- Metric monitoring - Track quality metrics over time
- Checkpoint recovery - Resume validation from saved state
- Multiple sources - Kafka, Kinesis, and mock sources
Supported Streaming Sources
| Source |
Format |
Description |
Dependency |
| Mock |
mock |
Test mock data source |
Built-in |
| Kafka |
kafka:topic_name |
Apache Kafka topic |
aiokafka |
| Kinesis |
kinesis:stream_name |
AWS Kinesis stream |
aiobotocore |
Installation
# For Kafka support
pip install truthound[kafka]
# For Kinesis support
pip install truthound[kinesis]
# For all streaming sources
pip install truthound[streaming]
Workflow
graph LR
A[Streaming Source] --> B[realtime validate]
B --> C[Validation Results]
B --> D[Checkpoints]
A --> E[realtime monitor]
E --> F[Metrics Dashboard]
D --> G[realtime checkpoint]
Quick Examples
Validate Streaming Data
# Mock source for testing
truthound realtime validate mock
# Kafka topic
truthound realtime validate kafka:my_topic --batch-size 500
# Kinesis stream
truthound realtime validate kinesis:my_stream --max-batches 100
Monitor Metrics
# Basic monitoring
truthound realtime monitor mock
# Custom interval and duration
truthound realtime monitor kafka:my_topic --interval 10 --duration 300
Manage Checkpoints
# List checkpoints
truthound realtime checkpoint list
# Show checkpoint details
truthound realtime checkpoint show abc12345
# Delete checkpoint
truthound realtime checkpoint delete abc12345 --force
Architecture
Validation Flow
- Connect to streaming source (Kafka/Kinesis/Mock)
- Batch incoming records (configurable size)
- Validate each batch with specified validators
- Save checkpoint after each batch
- Report results and metrics
Checkpoint System
Checkpoints enable:
- Recovery - Resume from last successful batch
- Progress tracking - Know exactly where validation stopped
- State persistence - Survive process restarts
# Checkpoints are saved automatically
./checkpoints/
├── abc12345.json
├── def67890.json
└── ...
Use Cases
1. Kafka Pipeline Monitoring
# Validate Kafka messages
truthound realtime validate kafka:orders --validators null,range --batch-size 1000
# Monitor continuously
truthound realtime monitor kafka:orders --interval 5 --duration 0
2. Kinesis Stream Validation
# Validate Kinesis records
truthound realtime validate kinesis:events --max-batches 50
# Check validation status
truthound realtime checkpoint list --format json
3. Development Testing
# Use mock source for testing
truthound realtime validate mock --validators null,unique --batch-size 100
# Monitor mock metrics
truthound realtime monitor mock --interval 2 --duration 30
4. CI/CD Integration
# GitHub Actions
- name: Validate Streaming Pipeline
run: |
truthound realtime validate kafka:test_topic \
--validators null,range \
--max-batches 10 \
-o results.json
# Check results
python -c "
import json
with open('results.json') as f:
data = json.load(f)
if data['failed_batches'] > 0:
exit(1)
"
| Parameter |
Impact |
Recommendation |
--batch-size |
Memory usage, latency |
500-2000 for most cases |
--max-batches |
Total processing time |
Set based on test scope |
--interval |
Monitoring overhead |
5-30 seconds |
Command Reference
See Also