Skip to content

Checkpoint & CI/CD Integration (Phase 6)

Truthound's Checkpoint system provides a comprehensive framework for orchestrating automated data quality validation pipelines with enterprise-grade CI/CD integration.

Table of Contents

  1. Overview
  2. Quick Start
  3. Core Components
  4. Actions
  5. Triggers
  6. Async Execution
  7. Transaction Management
  8. CI/CD Integration
  9. CI Reporters
  10. CheckpointRunner
  11. Registry
  12. Advanced Notifications
  13. Best Practices
  14. API Reference
  15. Enterprise Assessment

Overview

Checkpoints combine data sources, validators, actions, and triggers into reusable validation pipelines that can be run manually or automatically. The system provides:

  • Automated Validation Pipelines: Define once, run anywhere
  • 12 CI/CD Platform Support: Native integration with major CI systems
  • Async Execution: Non-blocking, high-throughput validation
  • Transaction Management: Saga pattern with compensation and idempotency
  • Flexible Triggers: Schedule, Cron, Event, and File-based triggers

Architecture

┌─────────────────────────────────────────────────────────────────────┐
│                         Checkpoint                                   │
├─────────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌────────────┐ │
│  │ DataSource  │  │ Validators  │  │   Actions   │  │  Triggers  │ │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘  └─────┬──────┘ │
│         │                │                │                │        │
│         └────────────────┼────────────────┼────────────────┘        │
│                          ▼                ▼                          │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │                    CheckpointRunner                              ││
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐ ││
│  │  │   Sync      │  │   Async     │  │  Transaction Coordinator │ ││
│  │  │  Execution  │  │  Execution  │  │  (Saga + Idempotency)    │ ││
│  │  └─────────────┘  └─────────────┘  └─────────────────────────┘ ││
│  └─────────────────────────────────────────────────────────────────┘│
│                          │                                           │
│                          ▼                                           │
│  ┌─────────────────────────────────────────────────────────────────┐│
│  │                     CI/CD Reporters                              ││
│  │  GitHub │ GitLab │ Jenkins │ CircleCI │ Azure │ Bitbucket │ ... ││
│  └─────────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────────┘

Quick Start

Basic Usage

from truthound.checkpoint import Checkpoint
from truthound.checkpoint.actions import (
    StoreValidationResult,
    SlackNotification,
)

# Create a checkpoint
checkpoint = Checkpoint(
    name="daily_user_validation",
    data_source="users.csv",
    validators=["null", "duplicate", "range"],
    actions=[
        StoreValidationResult(store_path="./results"),
        SlackNotification(
            webhook_url="https://hooks.slack.com/...",
            notify_on="failure",
            channel="#data-quality"
        ),
    ],
)

# Run the checkpoint
result = checkpoint.run()
print(result.summary())

CLI Usage

# Initialize a sample configuration
truthound checkpoint init -o truthound.yaml

# Run a checkpoint from config
truthound checkpoint run daily_data_validation --config truthound.yaml

# Run ad-hoc checkpoint
truthound checkpoint run quick_check --data data.csv --validators null,duplicate

# List checkpoints
truthound checkpoint list --config truthound.yaml

# Validate configuration
truthound checkpoint validate truthound.yaml

Core Components

CheckpointConfig

from truthound.checkpoint import Checkpoint, CheckpointConfig

config = CheckpointConfig(
    name="production_validation",
    data_source="s3://bucket/data.parquet",
    validators=["null", "duplicate", "range"],
    min_severity="medium",
    schema="schema.yaml",
    auto_schema=False,
    run_name_template="%Y%m%d_%H%M%S",
    tags={"env": "production", "team": "data-platform"},
    metadata={"owner": "data-team@company.com"},
    fail_on_critical=True,
    fail_on_high=False,
    timeout_seconds=3600,
    sample_size=100000,
)

checkpoint = Checkpoint(config=config)

YAML Configuration

# truthound.yaml
checkpoints:
- name: daily_data_validation
  data_source: data/production.csv
  validators:
  - 'null'
  - duplicate
  - range
  - regex
  validator_config:
    regex:
      patterns:
        email: ^[\w.+-]+@[\w-]+\.[\w.-]+$
        product_code: ^[A-Z]{2,4}[-_][0-9]{3,6}$
        phone: ^(\+\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}$
    range:
      columns:
        age:
          min_value: 0
          max_value: 150
        price:
          min_value: 0
  min_severity: medium
  auto_schema: true
  tags:
    environment: production
    team: data-platform
  actions:
  - type: store_result
    store_path: ./truthound_results
    partition_by: date
  - type: update_docs
    site_path: ./truthound_docs
    include_history: true
  - type: slack
    webhook_url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL
    notify_on: failure
    channel: '#data-quality'
  triggers:
  - type: schedule
    interval_hours: 24
    run_on_weekdays: [0, 1, 2, 3, 4]  # Mon-Fri

Actions

Actions are executed after validation completes. They can store results, send notifications, or integrate with external systems.

Available Actions

Action Description Key Features
StoreValidationResult Save results to filesystem, S3, or GCS Partitioning, compression, retention
UpdateDataDocs Generate HTML/Markdown documentation History tracking, templates
SlackNotification Send Slack messages via webhook Mentions, custom formatting
EmailNotification Send email notifications SMTP, SendGrid, SES support
WebhookAction Call any HTTP endpoint Auth types, custom headers
PagerDutyAction Create/resolve PagerDuty incidents Auto-resolve on success
GitHubAction GitHub Actions integration Summaries, outputs, annotations
TeamsNotification Microsoft Teams notifications Adaptive Cards, themes, templates
OpsGenieAction OpsGenie alert management Priorities, responders, escalation
DiscordNotification Discord webhook notifications Embeds, mentions
TelegramNotification Telegram bot notifications Markdown, photos
CustomAction Execute Python callbacks or shell commands Full flexibility

StoreValidationResult

from truthound.checkpoint.actions import StoreValidationResult

action = StoreValidationResult(
    store_path="./results",      # Local path, s3://, or gs://
    store_type="file",           # file, s3, gcs
    format="json",               # json, yaml
    partition_by="date",         # date, checkpoint, status
    retention_days=30,
    compress=True,
)

UpdateDataDocs

from truthound.checkpoint.actions import UpdateDataDocs

action = UpdateDataDocs(
    site_path="./docs",
    format="html",               # html, markdown
    include_history=True,
    max_history_items=100,
    template="default",          # default, minimal, detailed
)

SlackNotification

from truthound.checkpoint.actions import SlackNotification

action = SlackNotification(
    webhook_url="https://hooks.slack.com/...",
    channel="#data-quality",
    notify_on="failure",         # always, success, failure, error
    mention_on_failure=["U12345", "@here"],
    include_details=True,
    custom_message="Data quality check completed",
)

WebhookAction

from truthound.checkpoint.actions import WebhookAction

action = WebhookAction(
    url="https://api.example.com/webhook",
    method="POST",
    auth_type="bearer",          # none, basic, bearer, api_key
    auth_credentials={"token": "${API_TOKEN}"},
    headers={"X-Custom-Header": "value"},
    include_full_result=True,
    timeout_seconds=30,
    retry_count=3,
)

TeamsNotification

Microsoft Teams notifications with Adaptive Cards:

from truthound.checkpoint.actions import (
    TeamsNotification,
    TeamsConfig,
    AdaptiveCardBuilder,
    MessageTheme,
    create_teams_notification,
    create_failure_alert,
)

# Basic usage
action = TeamsNotification(
    webhook_url="https://outlook.office.com/webhook/...",
    notify_on="failure",
    channel="Data Quality",
    include_details=True,
)

# With custom Adaptive Card
builder = AdaptiveCardBuilder()
builder.add_header("Data Quality Alert")
builder.add_fact("Dataset", "users.csv")
builder.add_fact("Issues", "150")
builder.add_action_button("View Report", "https://...")

action = TeamsNotification(
    webhook_url="...",
    card_builder=builder,
    theme=MessageTheme.CRITICAL,
)

# Factory functions
action = create_failure_alert(
    webhook_url="...",
    mention_users=["user@company.com"],
)

OpsGenieAction

OpsGenie alert management with priorities and responders:

from truthound.checkpoint.actions import (
    OpsGenieAction,
    OpsGenieConfig,
    AlertPriority,
    ResponderType,
    Responder,
    create_opsgenie_action,
    create_critical_alert,
    create_team_alert,
)

# Basic usage
action = OpsGenieAction(
    api_key="${OPSGENIE_API_KEY}",
    notify_on="failure",
    priority=AlertPriority.P1,
    tags=["data-quality", "production"],
)

# With responders
action = OpsGenieAction(
    api_key="...",
    responders=[
        Responder(type=ResponderType.TEAM, name="data-platform"),
        Responder(type=ResponderType.USER, username="oncall@company.com"),
    ],
    visible_to=[
        Responder(type=ResponderType.TEAM, name="engineering"),
    ],
    auto_resolve_on_success=True,
)

# Factory functions
action = create_critical_alert(
    api_key="...",
    team="data-platform",
    escalation_policy="data-quality-escalation",
)

DiscordNotification

Discord webhook notifications:

from truthound.checkpoint.actions import DiscordNotification, DiscordConfig

action = DiscordNotification(
    webhook_url="https://discord.com/api/webhooks/...",
    notify_on="failure",
    username="Truthound Bot",
    avatar_url="https://example.com/logo.png",
    embed_color=0xFF0000,  # Red for errors
    include_mentions=["@here"],
)

# With custom embed
action = DiscordNotification(
    webhook_url="...",
    embed_title="Data Quality Alert",
    embed_description="Validation failed for users.csv",
    embed_fields=[
        {"name": "Issues", "value": "150", "inline": True},
        {"name": "Severity", "value": "High", "inline": True},
    ],
)

TelegramNotification

Telegram bot notifications:

from truthound.checkpoint.actions import (
    TelegramNotification,
    TelegramConfig,
    TelegramNotificationWithPhoto,
)

# Basic text notification
action = TelegramNotification(
    bot_token="${TELEGRAM_BOT_TOKEN}",
    chat_id="-1001234567890",  # Channel/group ID
    notify_on="failure",
    parse_mode="Markdown",  # or "HTML"
)

# With photo (e.g., chart screenshot)
action = TelegramNotificationWithPhoto(
    bot_token="...",
    chat_id="...",
    photo_url="https://example.com/chart.png",
    caption="Data quality trend chart",
)

# Custom message template
action = TelegramNotification(
    bot_token="...",
    chat_id="...",
    message_template="""
🚨 *Data Quality Alert*

Dataset: `{checkpoint_name}`
Status: {status}
Issues: {issue_count}

[View Report]({report_url})
""",
)

CustomAction

from truthound.checkpoint.actions import CustomAction

# Python callback
def my_callback(result):
    print(f"Checkpoint completed: {result.status}")
    if result.status == "failure":
        # Custom alerting logic
        send_custom_alert(result)
    return {"processed": True}

action = CustomAction(callback=my_callback)

# Shell command
action = CustomAction(
    shell_command="./scripts/notify.sh",
    environment={"API_KEY": "${SECRET_KEY}"},
    pass_result_as_json=True,
)

Notify Conditions

All actions support the notify_on parameter:

Condition Triggers On
always Every run
success Validation passed
failure Validation failed
error System error occurred
failure_or_error Failure or error
not_success Any non-success status

Triggers

Triggers determine when checkpoints should run automatically.

ScheduleTrigger

Time-interval based execution:

from truthound.checkpoint.triggers import ScheduleTrigger

# Run every hour
trigger = ScheduleTrigger(interval_hours=1)

# Run every 30 minutes on weekdays
trigger = ScheduleTrigger(
    interval_minutes=30,
    run_on_weekdays=[0, 1, 2, 3, 4],  # Mon=0, Sun=6
    start_time=datetime(2024, 1, 1, 9, 0),  # Start at 9 AM
    end_time=datetime(2024, 12, 31, 18, 0),  # End at 6 PM
    timezone="America/New_York",
)

CronTrigger

Standard cron expression support (5 or 6 fields):

from truthound.checkpoint.triggers import CronTrigger

# Daily at midnight
trigger = CronTrigger(expression="0 0 * * *")

# Every 15 minutes
trigger = CronTrigger(expression="*/15 * * * *")

# Monday at 9am
trigger = CronTrigger(expression="0 9 * * 1")

# With seconds (6 fields)
trigger = CronTrigger(expression="30 0 9 * * 1")  # Monday 9:00:30

EventTrigger

Event-driven execution with filtering and debouncing:

from truthound.checkpoint.triggers import EventTrigger

trigger = EventTrigger(
    event_type="data_updated",
    event_filter={"source": "production", "priority": "high"},
    debounce_seconds=60,       # Minimum time between triggers
    batch_events=True,         # Batch multiple events
    batch_window_seconds=30,   # Batch window
)

# Fire event programmatically
trigger.fire_event({
    "source": "production",
    "priority": "high",
    "table": "users",
    "rows_affected": 1500,
})

FileWatchTrigger

File system change detection with hash-based verification:

from truthound.checkpoint.triggers import FileWatchTrigger

trigger = FileWatchTrigger(
    paths=["./data", "/shared/datasets"],
    patterns=["*.csv", "*.parquet"],
    recursive=True,
    events=["modified", "created"],  # modified, created, deleted
    ignore_patterns=[".*", "__pycache__", "*.tmp"],
    hash_check=True,           # Only trigger on content change
    poll_interval_seconds=5,
)

Async Execution

For high-throughput scenarios, use AsyncCheckpoint for non-blocking execution:

import asyncio
from truthound.checkpoint import AsyncCheckpoint
from truthound.checkpoint.async_actions import AsyncSlackNotification

# Create async checkpoint
checkpoint = AsyncCheckpoint(
    name="async_validation",
    data_source="large_dataset.parquet",
    validators=["null", "duplicate"],
    actions=[
        AsyncSlackNotification(webhook_url="..."),
    ],
    max_concurrent_actions=5,
    execution_strategy="concurrent",  # sequential, concurrent, pipeline
)

# Run asynchronously
async def main():
    result = await checkpoint.run_async()
    print(result.summary())

asyncio.run(main())

Execution Strategies

from truthound.checkpoint.async_base import (
    SequentialStrategy,
    ConcurrentStrategy,
    PipelineStrategy,
)

# Sequential: One action at a time
checkpoint = AsyncCheckpoint(
    execution_strategy=SequentialStrategy()
)

# Concurrent: All actions in parallel with limit
checkpoint = AsyncCheckpoint(
    execution_strategy=ConcurrentStrategy(max_concurrency=10)
)

# Pipeline: Staged execution
# Stage 1: Store result and update docs (parallel)
# Stage 2: Notify (after stage 1)
checkpoint = AsyncCheckpoint(
    execution_strategy=PipelineStrategy(
        stages=[[0, 1], [2]]  # Action indices
    )
)

Running Multiple Checkpoints Concurrently

from truthound.checkpoint import run_checkpoints_async

checkpoints = [checkpoint1, checkpoint2, checkpoint3]

results = await run_checkpoints_async(
    checkpoints,
    max_concurrent=5,
    context={"batch_id": "2024-01-15"},
)

for result in results:
    print(f"{result.checkpoint_name}: {result.status}")

Async Callbacks

async def on_complete(result):
    await send_metrics_async(result.to_dict())

async def on_error(result):
    await alert_team_async(result.error)

checkpoint = AsyncCheckpoint(
    name="monitored_check",
    on_complete=on_complete,
    on_error=on_error,
)

Transaction Management

Truthound provides enterprise-grade transaction support with the Saga pattern:

Compensatable Actions

Actions that can be rolled back on failure:

from truthound.checkpoint.transaction import CompensatableAction

class DatabaseUpdateAction(CompensatableAction):
    def execute(self, result):
        # Forward action
        self.backup_id = create_backup()
        update_database(result)
        return ActionResult(status="success")

    def compensate(self, result, execute_result):
        # Rollback action
        restore_from_backup(self.backup_id)
        return ActionResult(status="compensated")

Transaction Coordinator

from truthound.checkpoint.transaction import TransactionCoordinator

coordinator = TransactionCoordinator(
    actions=[action1, action2, action3],
    compensation_strategy="reverse",  # reverse, parallel
    max_compensation_retries=3,
)

result = coordinator.execute(checkpoint_result)

if result.needs_rollback:
    coordinator.compensate(result)

Idempotency

Prevent duplicate executions:

from truthound.checkpoint.idempotency import IdempotencyService

service = IdempotencyService(
    store="redis://localhost:6379",  # Or filesystem, memory
    ttl_seconds=3600,
)

# Check before execution
idempotency_key = f"checkpoint:{name}:{run_id}"

if service.is_duplicate(idempotency_key):
    return cached_result

result = checkpoint.run()
service.mark_completed(idempotency_key, result)

CI/CD Integration

GitHub Actions

# .github/workflows/data-quality.yml
name: Data Quality Check

on:
  schedule:
    - cron: '0 0 * * *'
  push:
    paths:
      - 'data/**'
  pull_request:
    paths:
      - 'data/**'

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install Truthound
        run: pip install truthound[all]

      - name: Run Validation
        run: |
          truthound checkpoint run daily_data_validation \
            --config truthound.yaml \
            --github-summary \
            --strict
        env:
          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
          SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}

      - name: Upload Results
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: data-quality-report
          path: truthound_results/

GitLab CI

# .gitlab-ci.yml
stages:
  - validate

data-quality:
  stage: validate
  image: python:3.11-slim
  variables:
    PIP_CACHE_DIR: "$CI_PROJECT_DIR/.pip-cache"
  cache:
    paths:
      - .pip-cache/
  script:
    - pip install truthound[all]
    - truthound checkpoint run $CHECKPOINT_NAME --config truthound.yaml
  artifacts:
    when: always
    paths:
      - truthound_results/
      - truthound_docs/
    reports:
      dotenv: truthound.env
  rules:
    - if: $CI_PIPELINE_SOURCE == "schedule"
    - if: $CI_PIPELINE_SOURCE == "merge_request_event"
      changes:
        - data/**/*

Jenkins

// Jenkinsfile
pipeline {
    agent any

    environment {
        SLACK_WEBHOOK = credentials('slack-webhook')
    }

    stages {
        stage('Setup') {
            steps {
                sh 'pip install truthound[all]'
            }
        }

        stage('Data Quality') {
            steps {
                sh '''
                    truthound checkpoint run daily_data_validation \
                        --config truthound.yaml \
                        --format json \
                        --output truthound-result.json
                '''
            }
            post {
                always {
                    archiveArtifacts artifacts: 'truthound-result.json'
                    junit 'truthound-junit.xml'
                }
                failure {
                    slackSend channel: '#data-quality',
                              message: "Data Quality Check Failed: ${env.BUILD_URL}"
                }
            }
        }
    }
}

CircleCI

# .circleci/config.yml
version: 2.1

jobs:
  data-quality:
    docker:
      - image: cimg/python:3.11
    steps:
      - checkout
      - run:
          name: Install Dependencies
          command: pip install truthound[all]
      - run:
          name: Run Validation
          command: |
            truthound checkpoint run daily_data_validation \
              --config truthound.yaml \
              --format json
      - store_test_results:
          path: test-results/truthound
      - store_artifacts:
          path: artifacts

workflows:
  nightly:
    triggers:
      - schedule:
          cron: "0 0 * * *"
          filters:
            branches:
              only: main
    jobs:
      - data-quality

Azure DevOps

# azure-pipelines.yml
trigger:
  paths:
    include:
      - data/*

schedules:
  - cron: "0 0 * * *"
    displayName: Daily midnight run
    branches:
      include:
        - main

pool:
  vmImage: 'ubuntu-latest'

steps:
  - task: UsePythonVersion@0
    inputs:
      versionSpec: '3.11'

  - script: pip install truthound[all]
    displayName: 'Install Truthound'

  - script: |
      truthound checkpoint run $(CHECKPOINT_NAME) \
        --config truthound.yaml \
        --format json
    displayName: 'Run Data Quality Check'
    env:
      SLACK_WEBHOOK: $(SLACK_WEBHOOK)

  - publish: truthound_results
    artifact: DataQualityReport
    condition: always()

Generate CI Configs

from truthound.checkpoint.ci import (
    generate_github_workflow,
    generate_gitlab_ci,
    generate_jenkinsfile,
    generate_circleci_config,
    write_ci_config,
)

# Generate GitHub Actions workflow
workflow = generate_github_workflow(
    checkpoint_name="daily_data_validation",
    schedule="0 0 * * *",
    notify_slack=True,
    python_version="3.11",
)

# Generate all configs
write_ci_config("github", checkpoint_name="daily_data_validation")
write_ci_config("gitlab", checkpoint_name="daily_data_validation")
write_ci_config("jenkins", checkpoint_name="daily_data_validation")

CI Reporters

Truthound automatically detects CI environments and provides platform-specific reporters:

Supported Platforms

Platform Detection Features
GitHub Actions GITHUB_ACTIONS=true Step Summary, Annotations, Outputs
GitLab CI GITLAB_CI=true dotenv artifacts, ANSI colors
Jenkins JENKINS_URL JUnit XML, Properties file
CircleCI CIRCLECI=true test-results, Artifacts
Azure DevOps TF_BUILD=True Build Variables
Bitbucket Pipelines BITBUCKET_BUILD_NUMBER Pipes compatible
Travis CI TRAVIS=true Environment mapping
TeamCity TEAMCITY_VERSION Service messages
Buildkite BUILDKITE=true Annotations API
Drone DRONE=true Environment variables
AWS CodeBuild CODEBUILD_BUILD_ID BuildSpec compatible
GCP Cloud Build BUILDER_OUTPUT Environment detection

Using CI Reporters

from truthound.checkpoint.ci import (
    detect_ci_platform,
    get_ci_environment,
    get_ci_reporter,
    is_ci_environment,
)

# Check if in CI
if is_ci_environment():
    env = get_ci_environment()
    print(f"Platform: {env.platform}")
    print(f"Repository: {env.repository}")
    print(f"Branch: {env.branch}")
    print(f"Commit: {env.commit_sha}")
    print(f"PR Number: {env.pr_number}")
    print(f"Run URL: {env.run_url}")

# Get platform-specific reporter
reporter = get_ci_reporter()
reporter.report_status(result)
reporter.set_output("total_issues", stats.total_issues)
reporter.set_output("status", result.status.value)

GitHub Actions Reporter

from truthound.reporters.ci import GitHubActionsReporter

reporter = GitHubActionsReporter(
    step_summary=True,      # Write to GITHUB_STEP_SUMMARY
    use_groups=True,        # Use ::group:: for collapsible sections
    emoji_enabled=True,     # Include emojis in output
    set_output=True,        # Set workflow outputs
)

# Report to GitHub
exit_code = reporter.report_to_ci(result)

# The reporter automatically:
# - Writes job summary in Markdown
# - Emits annotations (::error::, ::warning::, ::notice::)
# - Sets output variables via GITHUB_OUTPUT

Custom Annotations

from truthound.reporters.ci.base import CIAnnotation, AnnotationLevel

annotation = CIAnnotation(
    message="Null values exceed threshold (15% > 5%)",
    level=AnnotationLevel.ERROR,
    file="data/users.csv",
    line=42,
    title="Data Quality Issue",
    validator_name="NullValidator",
)

reporter.format_annotation(annotation)
# Output: ::error file=data/users.csv,line=42,title=Data Quality Issue::Null values exceed threshold (15% > 5%)

CheckpointRunner

The runner manages automated execution of checkpoints with triggers:

from truthound.checkpoint import Checkpoint, CheckpointRunner
from truthound.checkpoint.triggers import ScheduleTrigger, CronTrigger

# Create checkpoints with triggers
hourly_metrics_check = Checkpoint(
    name="hourly_metrics_check",
    data_source="data.csv",
    validators=["null", "duplicate"],
).add_trigger(ScheduleTrigger(interval_hours=1))

daily_data_validation = Checkpoint(
    name="daily_data_validation",
    data_source="data.parquet",
    validators=["range", "distribution"],
).add_trigger(CronTrigger(expression="0 0 * * *"))

# Create runner
runner = CheckpointRunner(
    max_workers=4,
    result_callback=lambda r: print(f"Completed: {r.checkpoint_name}"),
    error_callback=lambda e: print(f"Error: {e}"),
)

# Add checkpoints
runner.add_checkpoint(hourly_metrics_check)
runner.add_checkpoint(daily_data_validation)

# Start background execution
runner.start()

# Run specific checkpoint once
result = runner.run_once("hourly_metrics_check")

# Run all checkpoints
results = runner.run_all()

# Iterate over results
for result in runner.iter_results(timeout=60):
    print(result.summary())

# Stop runner
runner.stop()

Registry

Register checkpoints for global access:

from truthound.checkpoint import (
    Checkpoint,
    CheckpointRegistry,
    register_checkpoint,
    get_checkpoint,
    list_checkpoints,
    load_checkpoints,
)

# Create registry
registry = CheckpointRegistry()

# Register checkpoints
checkpoint = Checkpoint(name="my_check", data_source="data.csv")
registry.register(checkpoint)

# Or use global registry
register_checkpoint(checkpoint)

# Retrieve
cp = get_checkpoint("my_check")
result = cp.run()

# List all
names = list_checkpoints()
print(names)  # ['my_check', ...]

# Load from file
checkpoints = load_checkpoints("truthound.yaml")
for cp in checkpoints:
    registry.register(cp)

# Check existence
if "my_check" in registry:
    print("Checkpoint exists")

Advanced Notifications

Truthound provides enterprise-grade notification management including routing, deduplication, throttling, and escalation.

Rule-based Routing

Route notifications to different actions based on conditions:

from truthound.checkpoint.routing import ActionRouter, SeverityRule, Route
from truthound.checkpoint.actions import SlackNotification, PagerDutyAction

router = ActionRouter()

# Critical alerts go to PagerDuty
router.add_route(Route(
    name="critical",
    rule=SeverityRule(min_severity="critical"),
    actions=[PagerDutyAction(service_key="...")],
    priority=1,
))

# High severity goes to Slack
router.add_route(Route(
    name="high",
    rule=SeverityRule(min_severity="high"),
    actions=[SlackNotification(webhook_url="...")],
    priority=2,
))

# Use with checkpoint
checkpoint = Checkpoint(
    name="daily_data_validation",
    data_source="data.csv",
    router=router,
)

Available Rules: SeverityRule, IssueCountRule, StatusRule, TagRule, PassRateRule, TimeWindowRule, DataAssetRule, MetadataRule, ErrorRule, AlwaysRule, NeverRule

Combinators: AllOf, AnyOf, NotRule for complex conditions.

Notification Deduplication

Prevent duplicate notifications within a time window:

from truthound.checkpoint.deduplication import (
    NotificationDeduplicator,
    InMemoryDeduplicationStore,
    TimeWindow,
)

deduplicator = NotificationDeduplicator(
    store=InMemoryDeduplicationStore(),
    default_window=TimeWindow(seconds=300),  # 5 minutes
)

fingerprint = deduplicator.generate_fingerprint(
    checkpoint_name="daily_data_validation",
    action_type="slack",
    severity="high",
)

if not deduplicator.is_duplicate(fingerprint):
    await action.execute(result)
    deduplicator.mark_sent(fingerprint)

Window Strategies: Sliding, Tumbling, Session, Adaptive

Storage Backends: InMemory, Redis Streams

Rate Limiting / Throttling

Control notification frequency:

from truthound.checkpoint.throttling import ThrottlerBuilder, ThrottlingMiddleware

throttler = (
    ThrottlerBuilder()
    .with_per_minute_limit(10)
    .with_per_hour_limit(100)
    .with_per_day_limit(500)
    .build()
)

middleware = ThrottlingMiddleware(throttler=throttler)
throttled_action = middleware.wrap(slack_action)

Algorithms: Token Bucket, Fixed Window, Sliding Window, Composite

Escalation Policies

Multi-level alert escalation:

from truthound.checkpoint.escalation import (
    EscalationPolicy,
    EscalationLevel,
    EscalationEngine,
)

policy = EscalationPolicy(
    name="critical_alerts",
    levels=[
        EscalationLevel(level=1, delay_minutes=0, targets=["team-lead"]),
        EscalationLevel(level=2, delay_minutes=15, targets=["manager"]),
        EscalationLevel(level=3, delay_minutes=30, targets=["director"]),
    ],
)

engine = EscalationEngine(policy=policy)
await engine.trigger("incident-123", context={"severity": "critical"})

# Later: acknowledge or resolve
await engine.acknowledge("incident-123", acknowledged_by="john@company.com")
await engine.resolve("incident-123", resolved_by="jane@company.com")

States: PENDING → TRIGGERED → ACKNOWLEDGED → ESCALATED → RESOLVED

Storage Backends: InMemory, Redis, SQLite


Best Practices

1. Use Configuration Files

Store checkpoint definitions in version-controlled YAML files:

# truthound.yaml
checkpoints:
  - name: production_daily
    data_source: ${DATA_PATH}  # Use environment variables
    validators:
      - "null"
      - duplicate
    actions:
      - type: store_result
        store_path: ${RESULTS_PATH}

2. Set Up Appropriate Notifications

actions = [
    # Always store results for audit
    StoreValidationResult(notify_on="always"),

    # Update docs on success
    UpdateDataDocs(notify_on="success"),

    # Alert only on failures
    SlackNotification(notify_on="failure"),
    PagerDutyAction(notify_on="failure_or_error"),
]

3. Use Strict Mode in CI

truthound checkpoint run my_check --strict

In strict mode (--strict), exit code 1 is returned if: - Any validation issues are found (regardless of severity) - The checkpoint status is "failure" or "error"

This ensures CI pipeline fails appropriately on any data quality issues.

4. Leverage Async for Large Datasets

# For large datasets, use async execution
checkpoint = AsyncCheckpoint(
    name="large_data_check",
    data_source="large_dataset.parquet",
    sample_size=100000,  # Sample for faster validation
    max_concurrent_actions=10,
)

result = await checkpoint.run_async()

5. Implement Idempotency for Production

from truthound.checkpoint.idempotency import IdempotencyService

service = IdempotencyService(store="redis://localhost:6379")

# Prevent duplicate runs
if not service.is_duplicate(run_key):
    result = checkpoint.run()
    service.mark_completed(run_key, result)
StoreValidationResult(
    store_path="s3://bucket/dq-results",
    partition_by="date",
    retention_days=90,
)

API Reference

CheckpointResult

result = checkpoint.run()

result.run_id              # Unique run identifier
result.checkpoint_name     # Checkpoint name
result.run_time           # When the checkpoint ran
result.status             # CheckpointStatus (success/failure/error/warning)
result.validation_result  # ValidationResult from check()
result.action_results     # List of ActionResult
result.duration_ms        # Execution duration in milliseconds
result.error              # Error message if failed
result.metadata           # Custom metadata dict

# Methods
result.to_dict()          # Serialize to dictionary
result.from_dict(d)       # Deserialize from dictionary
result.summary()          # Human-readable summary string

CheckpointStatus

from truthound.checkpoint.checkpoint import CheckpointStatus

CheckpointStatus.SUCCESS    # All validations passed
CheckpointStatus.FAILURE    # Validation failures detected
CheckpointStatus.WARNING    # Non-critical issues found
CheckpointStatus.ERROR      # System error occurred

ActionResult

from truthound.checkpoint.actions.base import ActionResult, ActionStatus

result = ActionResult(
    action_name="slack_notification",
    action_type="notification",
    status=ActionStatus.SUCCESS,
    message="Notification sent successfully",
    started_at=datetime.now(),
    completed_at=datetime.now(),
    duration_ms=150.5,
    details={"message_id": "abc123"},
)

CIEnvironment

from truthound.checkpoint.ci import get_ci_environment

env = get_ci_environment()

env.platform        # CIPlatform enum
env.is_ci           # bool
env.is_pr           # bool (is pull request)
env.branch          # str
env.commit_sha      # str
env.commit_message  # str
env.pr_number       # int | None
env.pr_target_branch # str
env.repository      # str (owner/repo)
env.run_id          # str
env.run_url         # str
env.actor           # str (user who triggered)
env.job_name        # str
env.workflow_name   # str

Enterprise Assessment

Feature Completeness

Feature Status Notes
Core Checkpoint Full implementation
Multiple Actions 12 built-in actions
4 Trigger Types Schedule, Cron, Event, FileWatch
Async Execution Native async/await
Transaction Management Saga pattern
Idempotency Duplicate prevention
12 CI Platforms Industry-leading coverage
JUnit XML Output For Jenkins/CI
Rule-based Routing 11 rules, combinators, Python/Jinja2 engine
Notification Deduplication InMemory/Redis, 4 window strategies
Rate Limiting Token Bucket, 5 throttler types
Escalation Policies State machine, 3 storage backends

Code Metrics

Metric Value
Total LOC ~25,000
Test LOC ~4,600
Test Files 5
CI Platforms 12
Action Types 12
Trigger Types 4

Comparison with Great Expectations

Feature Great Expectations Truthound
Checkpoint Definition
Multiple Actions
Schedule Triggers ✅ (+ Cron)
Event Triggers ⚠️ Limited ✅ Full
File Watch Triggers
Async Execution ✅ Native
Transaction/Saga
Idempotency
CI Platforms 3-4 12
JUnit Output Plugin ✅ Built-in
Rule-based Routing ✅ 11 rules
Deduplication ✅ InMemory/Redis
Rate Limiting ✅ Token Bucket
Escalation Policies ✅ APScheduler

See Also