Skip to content

Escalation Policies

A multi-level notification escalation policy system. Manages notification lifecycle with APScheduler-based scheduling and a state machine.

Overview

┌─────────────────────────────────────────────────────────────┐
│                    EscalationEngine                         │
│                                                             │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│  │   Policy    │───▶│  Scheduler  │───▶│   Store     │     │
│  │  Manager    │    │ (APScheduler)│    │(InMemory/   │     │
│  │             │    │             │    │ Redis/SQLite)│     │
│  └─────────────┘    └─────────────┘    └─────────────┘     │
│         │                  │                   │            │
│         ▼                  ▼                   ▼            │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              EscalationStateMachine                  │   │
│  │  PENDING → ACTIVE → ESCALATING → (loop to ACTIVE)   │   │
│  │              ↓           ↓                           │   │
│  │         ACKNOWLEDGED  ACKNOWLEDGED                   │   │
│  │              ↓           ↓                           │   │
│  │           RESOLVED    RESOLVED                       │   │
│  │    Any state → CANCELLED, TIMED_OUT, FAILED         │   │
│  └─────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

Core Types

TargetType

Escalation target types.

from truthound.checkpoint.escalation import TargetType

class TargetType(str, Enum):
    USER = "user"         # Individual user
    TEAM = "team"         # Team
    CHANNEL = "channel"   # Channel (Slack, etc.)
    SCHEDULE = "schedule" # On-call schedule
    WEBHOOK = "webhook"   # Webhook URL
    EMAIL = "email"       # Email
    PHONE = "phone"       # Phone
    CUSTOM = "custom"     # Custom

EscalationTrigger

Escalation trigger conditions.

from truthound.checkpoint.escalation import EscalationTrigger

class EscalationTrigger(str, Enum):
    UNACKNOWLEDGED = "unacknowledged"    # Not acknowledged
    UNRESOLVED = "unresolved"            # Not resolved
    SEVERITY_UPGRADE = "severity_upgrade" # Severity upgrade
    REPEATED_FAILURE = "repeated_failure" # Repeated failure
    THRESHOLD_BREACH = "threshold_breach" # Threshold exceeded
    MANUAL = "manual"                     # Manual
    SCHEDULED = "scheduled"               # Scheduled
    CUSTOM = "custom"                     # Custom

EscalationState

Escalation states.

from truthound.checkpoint.escalation import EscalationState

class EscalationState(str, Enum):
    PENDING = "pending"         # Initial state, waiting to start
    ACTIVE = "active"           # Currently notifying at level
    ESCALATING = "escalating"   # Escalating to next level
    ACKNOWLEDGED = "acknowledged"  # Responder acknowledged
    RESOLVED = "resolved"       # Issue resolved
    CANCELLED = "cancelled"     # Manually cancelled
    TIMED_OUT = "timed_out"     # Max escalation reached or timed out
    FAILED = "failed"           # System error during escalation

EscalationTarget

Notification target definition.

from truthound.checkpoint.escalation import EscalationTarget, TargetType

# Constructor
target = EscalationTarget(
    type=TargetType.USER,
    identifier="user-123",
    name="Team Lead",
    priority=1,
    metadata={"slack_id": "U12345"},
)

# Factory methods
user_target = EscalationTarget.user("user-123", "Team Lead")
team_target = EscalationTarget.team("team-abc", "Platform Team")
channel_target = EscalationTarget.channel("#alerts", "Alert Channel")
schedule_target = EscalationTarget.schedule("oncall-123", "Primary On-call")
webhook_target = EscalationTarget.webhook("https://api.example.com/alert")
email_target = EscalationTarget.email("team@company.com", "Team Email")

EscalationLevel

Escalation level definition.

from truthound.checkpoint.escalation import EscalationLevel, EscalationTarget

level1 = EscalationLevel(
    level=1,                      # Level number (1 = first)
    delay_minutes=0,              # Delay time (0 = immediate)
    targets=[                     # Notification targets
        EscalationTarget.user("lead-123", "Team Lead"),
    ],
    repeat_count=2,               # Repeat count (0 = once only)
    repeat_interval_minutes=5,    # Repeat interval (minutes)
    require_ack=True,             # Require acknowledgement
    auto_resolve_minutes=0,       # Auto resolve time (0 = none)
    conditions={},                # Additional conditions
)

level2 = EscalationLevel(
    level=2,
    delay_minutes=15,             # Escalate after 15 minutes
    targets=[
        EscalationTarget.user("manager-456", "Manager"),
        EscalationTarget.team("team-platform", "Platform Team"),
    ],
)

level3 = EscalationLevel(
    level=3,
    delay_minutes=30,             # Escalate after 30 minutes
    targets=[
        EscalationTarget.user("director-789", "Director"),
        EscalationTarget.schedule("oncall-exec", "Executive On-call"),
    ],
)

EscalationPolicy

Complete escalation policy definition.

from truthound.checkpoint.escalation import (
    EscalationPolicy,
    EscalationLevel,
    EscalationTarget,
    EscalationTrigger,
)

policy = EscalationPolicy(
    name="critical_alerts",
    description="Critical production alerts",
    levels=[
        EscalationLevel(
            level=1,
            delay_minutes=0,
            targets=[EscalationTarget.user("lead", "Team Lead")],
        ),
        EscalationLevel(
            level=2,
            delay_minutes=15,
            targets=[EscalationTarget.user("manager", "Manager")],
        ),
        EscalationLevel(
            level=3,
            delay_minutes=30,
            targets=[EscalationTarget.user("director", "Director")],
        ),
    ],
    enabled=True,
    triggers=[
        EscalationTrigger.UNACKNOWLEDGED,
        EscalationTrigger.REPEATED_FAILURE,
    ],
    severity_filter=["critical", "high"],  # Apply only to critical, high
    max_escalations=5,                      # Maximum escalation count
    cooldown_minutes=60,                    # Cooldown for same incident
    business_hours_only=False,              # Apply 24/7
    timezone="Asia/Seoul",
)

# Properties
policy.max_level                 # Maximum level number
policy.get_level(1)              # Get level 1
policy.get_next_level(1)         # Get next level (level 2)

Business Hours Configuration

policy = EscalationPolicy(
    name="business_hours_alerts",
    levels=[...],
    business_hours_only=True,
    business_hours_start=9,      # 9 AM
    business_hours_end=18,       # 6 PM
    business_days=[0, 1, 2, 3, 4],  # Mon-Fri (0=Monday)
    timezone="Asia/Seoul",
)

EscalationEngine

The escalation engine. Manages the entire lifecycle.

from truthound.checkpoint.escalation import (
    EscalationEngine,
    EscalationEngineConfig,
    EscalationPolicy,
)

# Configuration
config = EscalationEngineConfig(
    store_type="memory",           # memory, redis, sqlite
    store_config={},
    check_business_hours=True,
    metrics_enabled=True,
)

# Create engine
engine = EscalationEngine(config)

# Register policy
engine.register_policy(policy)

# Set notification handler
async def notification_handler(record, level, targets):
    """Handler that sends actual notifications."""
    for target in targets:
        if target.type == TargetType.USER:
            await send_slack_dm(target.identifier, record)
        elif target.type == TargetType.CHANNEL:
            await send_slack_channel(target.identifier, record)
    return True

engine.set_notification_handler(notification_handler)

# Start engine
await engine.start()

# Trigger escalation
result = await engine.trigger(
    incident_id="incident-123",
    context={
        "severity": "critical",
        "checkpoint_name": "production_validation",
        "message": "Critical validation failure",
    },
    policy_name="critical_alerts",
)

if result.success:
    print(f"Escalation started: {result.record.id}")
    print(f"Current level: {result.record.current_level}")

# Acknowledge
result = await engine.acknowledge(
    record_id=result.record.id,
    acknowledged_by="user-123",
)

# Resolve
result = await engine.resolve(
    record_id=result.record.id,
    resolved_by="user-123",
)

# Cancel
result = await engine.cancel(
    record_id=result.record.id,
    cancelled_by="user-123",
    reason="False alarm",
)

# Manual escalation
result = await engine.escalate(
    record_id=result.record.id,
    force=True,  # Force regardless of state
)

# Query
record = engine.get_record(record_id)
active_records = engine.get_active_escalations(policy_name="critical_alerts")

# Stop engine
await engine.stop()

EscalationRecord

An escalation record. Tracks progress state.

@dataclass
class EscalationRecord:
    id: str                          # Record ID
    incident_id: str                 # External incident ID
    policy_name: str                 # Policy name
    current_level: int = 1           # Current level
    state: str = "pending"           # Current state
    created_at: datetime             # Creation time
    updated_at: datetime             # Update time
    acknowledged_at: datetime | None # Acknowledgement time
    acknowledged_by: str | None      # Acknowledger
    resolved_at: datetime | None     # Resolution time
    resolved_by: str | None          # Resolver
    next_escalation_at: datetime | None  # Next escalation time
    escalation_count: int = 0        # Escalation count
    notification_count: int = 0      # Notification count
    history: list[dict]              # Event history
    context: dict                    # Trigger context
    metadata: dict                   # Metadata

# Properties
record.is_active         # Is active
record.is_acknowledged   # Is acknowledged
record.is_resolved       # Is resolved
record.duration          # Duration (timedelta)

Storage Backends (3 Types)

InMemoryEscalationStore

In-memory store for single-process use.

from truthound.checkpoint.escalation import InMemoryEscalationStore

store = InMemoryEscalationStore()

RedisEscalationStore

Redis store for distributed environments.

from truthound.checkpoint.escalation import RedisEscalationStore

store = RedisEscalationStore(
    redis_url="redis://localhost:6379",
    key_prefix="truthound:escalation:",
)

SQLiteEscalationStore

SQLite store for persistent storage.

from truthound.checkpoint.escalation import SQLiteEscalationStore

store = SQLiteEscalationStore(
    db_path="./escalations.db",
)

create_store Factory

from truthound.checkpoint.escalation import create_store

# Create by type
store = create_store("memory")
store = create_store("redis", redis_url="redis://localhost:6379")
store = create_store("sqlite", db_path="./escalations.db")

Scheduler

Scheduler Types

from truthound.checkpoint.escalation import (
    InMemoryScheduler,
    AsyncioScheduler,
    create_scheduler,
    SchedulerConfig,
)

# InMemory scheduler (for testing)
scheduler = InMemoryScheduler(...)

# Asyncio scheduler
scheduler = AsyncioScheduler(...)

# Create via factory
config = SchedulerConfig(
    scheduler_type="asyncio",
    max_concurrent_jobs=100,
)
scheduler = create_scheduler(config, callback)

Routing Integration

Integrates with the routing system.

from truthound.checkpoint.escalation import (
    EscalationRule,
    EscalationRuleConfig,
    EscalationAction,
    create_escalation_route,
)
from truthound.checkpoint.routing import ActionRouter

# EscalationRule: Conditional escalation
config = EscalationRuleConfig(
    policy_name="critical_alerts",
    severity_filter=["critical", "high"],
    trigger_type="unacknowledged",
)
rule = EscalationRule(config=config)

# EscalationAction: Escalation trigger action
action = EscalationAction(
    engine=engine,
    policy_name="critical_alerts",
)

# Route creation helper
route = create_escalation_route(
    engine=engine,
    policy_name="critical_alerts",
    rule=SeverityRule(min_severity="high"),
)

# Add to router
router.add_route(route)

Integration with Existing Actions

from truthound.checkpoint.escalation import setup_escalation_with_existing_actions
from truthound.checkpoint.actions import SlackNotification

# Connect existing actions to escalation
slack_action = SlackNotification(webhook_url="...")

setup_escalation_with_existing_actions(
    engine=engine,
    policy_name="critical_alerts",
    level_actions={
        1: [slack_action],
        2: [slack_action, pagerduty_action],
        3: [slack_action, pagerduty_action, email_action],
    },
)

EscalationPolicyManager

A high-level manager for managing multiple policies.

from truthound.checkpoint.escalation import (
    EscalationPolicyManager,
    EscalationPolicyConfig,
)

# Create from config
config = EscalationPolicyConfig(
    default_policy="default",
    global_enabled=True,
    store_type="redis",
    store_config={"redis_url": "redis://localhost:6379"},
    max_concurrent_escalations=1000,
    cleanup_interval_minutes=60,
)

manager = EscalationPolicyManager(config)

# Add policies
manager.add_policy(critical_policy)
manager.add_policy(warning_policy)

# Query policies
policy = manager.get_policy("critical_alerts")
names = manager.list_policies()

# Start
await manager.start()

# Trigger
result = await manager.trigger(
    incident_id="incident-123",
    context={"severity": "critical"},
    policy_name="critical_alerts",
)

# Statistics
stats = manager.get_stats()

# Stop
await manager.stop()

Create from Dictionary

config = {
    "default_policy": "critical_alerts",
    "store_type": "redis",
    "policies": [
        {
            "name": "critical_alerts",
            "levels": [
                {"level": 1, "delay_minutes": 0, "targets": [...]},
                {"level": 2, "delay_minutes": 15, "targets": [...]},
            ],
        }
    ],
}

manager = EscalationPolicyManager.from_dict(config)

Statistics Retrieval

stats = engine.get_stats()

print(f"Total escalations: {stats.total_escalations}")
print(f"Active escalations: {stats.active_escalations}")
print(f"Acknowledged: {stats.acknowledged_count}")
print(f"Resolved: {stats.resolved_count}")
print(f"Timed out: {stats.timed_out_count}")
print(f"Acknowledgment rate: {stats.acknowledgment_rate:.2%}")
print(f"Resolution rate: {stats.resolution_rate:.2%}")
print(f"Avg time to acknowledge: {stats.avg_time_to_acknowledge_seconds}s")
print(f"Avg time to resolve: {stats.avg_time_to_resolve_seconds}s")
print(f"Notifications sent: {stats.notifications_sent}")

Complete Example

import asyncio
from truthound.checkpoint import Checkpoint
from truthound.checkpoint.escalation import (
    EscalationEngine,
    EscalationEngineConfig,
    EscalationPolicy,
    EscalationLevel,
    EscalationTarget,
    EscalationTrigger,
)
from truthound.checkpoint.actions import SlackNotification

# Notification handler
async def notification_handler(record, level, targets):
    slack = SlackNotification(webhook_url="${SLACK_WEBHOOK}")
    for target in targets:
        message = f"[Level {level.level}] Escalation for {record.incident_id}"
        # Actual notification logic
        print(f"Notifying {target.name}: {message}")
    return True

# Escalation policy
policy = EscalationPolicy(
    name="production_critical",
    description="Production critical alerts",
    levels=[
        EscalationLevel(
            level=1,
            delay_minutes=0,
            targets=[
                EscalationTarget.user("team-lead", "Team Lead"),
                EscalationTarget.channel("#alerts", "Alerts Channel"),
            ],
            repeat_count=2,
            repeat_interval_minutes=5,
        ),
        EscalationLevel(
            level=2,
            delay_minutes=15,
            targets=[
                EscalationTarget.user("eng-manager", "Engineering Manager"),
                EscalationTarget.schedule("oncall-primary", "Primary On-call"),
            ],
        ),
        EscalationLevel(
            level=3,
            delay_minutes=30,
            targets=[
                EscalationTarget.user("director", "Director of Engineering"),
                EscalationTarget.email("leadership@company.com", "Leadership"),
            ],
        ),
    ],
    triggers=[EscalationTrigger.UNACKNOWLEDGED],
    severity_filter=["critical", "high"],
    cooldown_minutes=60,
)

# Engine configuration and startup
config = EscalationEngineConfig(
    store_type="memory",
    metrics_enabled=True,
)

engine = EscalationEngine(config)
engine.register_policy(policy)
engine.set_notification_handler(notification_handler)


async def main():
    await engine.start()

    # Trigger escalation after checkpoint execution
    checkpoint = Checkpoint(
        name="production_validation",
        data_source="data.csv",
        validators=["null"],
    )
    result = checkpoint.run()

    if result.status.value == "failure":
        escalation_result = await engine.trigger(
            incident_id=f"cp-{result.run_id}",
            context={
                "severity": "critical",
                "checkpoint_name": result.checkpoint_name,
                "issues": result.validation_result.statistics.total_issues,
            },
            policy_name="production_critical",
        )

        print(f"Escalation triggered: {escalation_result.record.id}")

    await engine.stop()


asyncio.run(main())

YAML Configuration

escalation:
  enabled: true
  default_policy: production_critical
  store:
    type: redis
    redis_url: redis://localhost:6379

  policies:
    - name: production_critical
      description: Production critical alerts
      severity_filter:
        - critical
        - high
      triggers:
        - unacknowledged
      cooldown_minutes: 60
      levels:
        - level: 1
          delay_minutes: 0
          targets:
            - type: user
              identifier: team-lead
              name: Team Lead
            - type: channel
              identifier: "#alerts"
              name: Alerts Channel
          repeat_count: 2
          repeat_interval_minutes: 5

        - level: 2
          delay_minutes: 15
          targets:
            - type: user
              identifier: eng-manager
              name: Engineering Manager
            - type: schedule
              identifier: oncall-primary

        - level: 3
          delay_minutes: 30
          targets:
            - type: user
              identifier: director
            - type: email
              identifier: leadership@company.com

State Transition Diagram

                    ┌─────────────────────────┐
                    │        PENDING          │
                    └────────────┬────────────┘
                                 │ start()
        ┌───────────────────────────────────────────────┐
        │                    ACTIVE                     │
        └────┬─────────────┬────────────────┬───────────┘
             │             │                │
             │ ack()       │ timeout        │ escalate()
             ▼             ▼                ▼
    ┌────────────┐  ┌────────────┐  ┌────────────────┐
    │ACKNOWLEDGED│  │ TIMED_OUT  │  │   ESCALATING   │
    └──────┬─────┘  └────────────┘  └───────┬────────┘
           │                                │
           │ resolve()                      │ escalate_success
           ▼                                ▼
    ┌────────────┐                  ┌────────────────┐
    │  RESOLVED  │                  │     ACTIVE     │
    └────────────┘                  │ (next level)   │
                                    └────────────────┘

        cancel() from any state → CANCELLED
        error from any state → FAILED