Skip to content

Notifications

Multi-channel notification system.

NotificationRegistry

Notification registry:

from packages.enterprise.notifications import NotificationRegistry

registry = NotificationRegistry()

# Send notification
registry.notify(
    channel="slack",
    message="Data quality check failed",
    severity="warning",
)

Supported Channels

Slack

from packages.enterprise.notifications.handlers import SlackHandler

handler = SlackHandler(
    webhook_url="https://hooks.slack.com/services/...",
    channel="#data-quality",
)

handler.send(
    message="Data quality check failed",
    severity="warning",
    details={"table": "orders", "failed_count": 10},
)

Email

from packages.enterprise.notifications.handlers import EmailHandler

handler = EmailHandler(
    smtp_host="smtp.example.com",
    smtp_port=587,
    username="user@example.com",
    password="password",
    from_address="alerts@example.com",
    to_addresses=["team@example.com"],
)

handler.send(
    subject="Data Quality Alert",
    message="Data quality check failed for orders table",
)

Webhook

from packages.enterprise.notifications.handlers import WebhookHandler

handler = WebhookHandler(
    url="https://api.example.com/webhooks/alerts",
    headers={"Authorization": "Bearer token"},
)

handler.send(
    message="Data quality check failed",
    payload={"table": "orders", "status": "failed"},
)

PagerDuty

from packages.enterprise.notifications.handlers import PagerDutyHandler

handler = PagerDutyHandler(
    routing_key="your-routing-key",
    severity="critical",
)

handler.send(
    summary="Data quality check failed",
    source="truthound-orchestration",
    details={"table": "orders"},
)

Opsgenie

from packages.enterprise.notifications.handlers import OpsgenieHandler

handler = OpsgenieHandler(
    api_key="your-api-key",
    priority="P2",
)

handler.send(
    message="Data quality check failed",
    alias="data-quality-orders",
    description="10 validation failures in orders table",
)

Formatters

Text

from packages.enterprise.notifications.formatters import TextFormatter

formatter = TextFormatter()
message = formatter.format(check_result)

Markdown

from packages.enterprise.notifications.formatters import MarkdownFormatter

formatter = MarkdownFormatter()
message = formatter.format(check_result)

SlackBlock

from packages.enterprise.notifications.formatters import SlackBlockFormatter

formatter = SlackBlockFormatter()
blocks = formatter.format(check_result)

JSON

from packages.enterprise.notifications.formatters import JSONFormatter

formatter = JSONFormatter()
json_message = formatter.format(check_result)

Notification Hooks

Notification lifecycle events:

from packages.enterprise.notifications import NotificationHook

class MyNotificationHook(NotificationHook):
    def before_send(self, channel, message, severity):
        # Pre-send processing
        pass

    def after_send(self, channel, message, severity, success):
        # Post-send processing
        pass

    def on_error(self, channel, message, severity, error):
        # Error handling
        pass

Multi-Channel Notifications

Send to multiple channels simultaneously:

from packages.enterprise.notifications import NotificationRegistry

registry = NotificationRegistry()

# Register handlers
registry.register("slack", slack_handler)
registry.register("email", email_handler)
registry.register("pagerduty", pagerduty_handler)

# Send to multiple channels
registry.notify_all(
    channels=["slack", "email"],
    message="Data quality check failed",
    severity="warning",
)

Severity-Based Routing

from packages.enterprise.notifications import SeverityRouter

router = SeverityRouter()
router.add_route("info", ["slack"])
router.add_route("warning", ["slack", "email"])
router.add_route("critical", ["slack", "email", "pagerduty"])

# Route to appropriate channels based on severity
router.route(
    message="Critical data quality failure",
    severity="critical",
)

Retry

Retry on notification send failure:

from packages.enterprise.notifications import RetryingHandler

handler = RetryingHandler(
    base_handler=slack_handler,
    max_retries=3,
    retry_delay_seconds=5,
)

handler.send(message="Alert")

Batch Notifications

Batch multiple notifications together:

from packages.enterprise.notifications import BatchingHandler

handler = BatchingHandler(
    base_handler=email_handler,
    batch_size=10,
    batch_interval_seconds=60,
)

# Notifications are batched and sent together
handler.send(message="Alert 1")
handler.send(message="Alert 2")
# Batch sent after 60 seconds or when 10 messages accumulate