Throttling (Rate Limiting)¶
A rate limiting system that controls the frequency of notification delivery. It supports multi-level limits (per minute/hour/day) based on the Token Bucket algorithm.
Overview¶
Notification Request
│
▼
┌───────────────────────────┐
│ NotificationThrottler │
│ │
│ 1. Check Priority Bypass │
│ 2. Get Applicable Limits │
│ 3. Acquire Permits │
│ 4. Return ThrottleResult │
└───────────────────────────┘
│
┌────┴────┐
│ │
▼ ▼
[Allowed] [Throttled]
retry_after: N seconds
Core Types¶
TimeUnit¶
Time unit enumeration.
from truthound.checkpoint.throttling import TimeUnit
class TimeUnit(str, Enum):
SECOND = "second"
MINUTE = "minute"
HOUR = "hour"
DAY = "day"
RateLimitScope¶
Scope of rate limit application.
from truthound.checkpoint.throttling import RateLimitScope
class RateLimitScope(str, Enum):
GLOBAL = "global" # All notifications combined
PER_ACTION = "per_action" # Per action type (slack, email, etc.)
PER_CHECKPOINT = "per_checkpoint" # Per checkpoint
PER_ACTION_CHECKPOINT = "per_action_checkpoint" # Per action + checkpoint
PER_SEVERITY = "per_severity" # Per severity level
PER_DATA_ASSET = "per_data_asset" # Per data asset
CUSTOM = "custom" # Custom key
ThrottleStatus¶
Throttle result status.
from truthound.checkpoint.throttling import ThrottleStatus
class ThrottleStatus(str, Enum):
ALLOWED = "allowed" # Request permitted
THROTTLED = "throttled" # Rejected due to rate limit exceeded
QUEUED = "queued" # Queued for later processing
BURST_ALLOWED = "burst_allowed" # Allowed via burst capacity
ERROR = "error" # Error occurred
RateLimit¶
Rate limit configuration.
from truthound.checkpoint.throttling import RateLimit, TimeUnit
@dataclass(frozen=True)
class RateLimit:
limit: int # Maximum request count
time_unit: TimeUnit # Time unit
burst_multiplier: float = 1.0 # Burst capacity multiplier (1.0 = no burst)
# Factory methods
limit = RateLimit.per_minute(10, burst_multiplier=1.5) # 10/min, burst 15
limit = RateLimit.per_hour(100) # 100/hour
limit = RateLimit.per_day(500) # 500/day
# Properties
limit.window_seconds # Window size in seconds
limit.burst_limit # Burst limit (limit * multiplier)
limit.tokens_per_second # Token replenishment rate per second
ThrottleResult¶
Throttle check result.
@dataclass
class ThrottleResult:
status: ThrottleStatus # Result status
key: ThrottlingKey # Throttling key
allowed: bool # Whether allowed
retry_after: float = 0.0 # Retry wait time in seconds
remaining: int = 0 # Remaining tokens/requests
limit: RateLimit | None = None # Applied rate limit
message: str = "" # Message
metadata: dict = field(default_factory=dict) # Additional metadata
Throttler Types (5 Types)¶
TokenBucketThrottler¶
Token Bucket algorithm. Allows bursts then replenishes tokens at a steady rate.
from truthound.checkpoint.throttling import (
TokenBucketThrottler,
RateLimit,
ThrottlingKey,
TimeUnit,
)
throttler = TokenBucketThrottler("api")
# Define rate limit
limit = RateLimit.per_minute(10, burst_multiplier=1.5) # 10/min, burst 15
# Create throttling key
key = ThrottlingKey.for_global(TimeUnit.MINUTE)
# Check only (no token consumption)
result = throttler.check(key, limit)
if result.allowed:
print(f"Remaining: {result.remaining}")
# Acquire token (check + consume)
result = throttler.acquire(key, limit)
if result.allowed:
send_notification()
else:
print(f"Retry after {result.retry_after:.1f}s")
Characteristics: - Allows instantaneous burst traffic - Replenishes tokens at a constant rate - Smooth rate limiting
SlidingWindowThrottler¶
Sliding window algorithm. Provides more accurate rate limiting.
from truthound.checkpoint.throttling import SlidingWindowThrottler
throttler = SlidingWindowThrottler("api")
limit = RateLimit.per_hour(100)
key = ThrottlingKey.for_action("slack", TimeUnit.HOUR)
result = throttler.acquire(key, limit)
Characteristics: - No abrupt changes at window boundaries - More consistent rate limiting - Slightly higher memory usage
FixedWindowThrottler¶
Fixed window algorithm. Simple but allows up to 2x traffic at window boundaries.
from truthound.checkpoint.throttling import FixedWindowThrottler
throttler = FixedWindowThrottler("api")
limit = RateLimit.per_minute(10)
key = ThrottlingKey.for_global(TimeUnit.MINUTE)
result = throttler.acquire(key, limit)
Characteristics: - Simple implementation - Memory efficient - Can allow up to 2x traffic at window boundaries (edge effect)
CompositeThrottler¶
Combines multiple rate limits. Request must pass all limits to be allowed.
from truthound.checkpoint.throttling import CompositeThrottler, RateLimit
throttler = CompositeThrottler("multi-level", algorithm="token_bucket")
# Add multi-level rate limits
throttler.add_limit(RateLimit.per_minute(10))
throttler.add_limit(RateLimit.per_hour(100))
throttler.add_limit(RateLimit.per_day(500))
# Or configure at once
throttler.with_limits([
RateLimit.per_minute(10),
RateLimit.per_hour(100),
RateLimit.per_day(500),
])
key = ThrottlingKey.for_global(TimeUnit.MINUTE)
result = throttler.acquire(key) # Checks all limits
Characteristics: - Multi-level limits (minute/hour/day) - Must pass all limits - Most restrictive limit determines final decision
NoOpThrottler¶
Pass-through throttler that always allows. For testing or disabling.
from truthound.checkpoint.throttling import NoOpThrottler
throttler = NoOpThrottler()
result = throttler.acquire(key, limit)
assert result.allowed # Always True
NotificationThrottler¶
High-level throttling service.
from truthound.checkpoint.throttling import (
NotificationThrottler,
ThrottlingConfig,
RateLimitScope,
)
# Create with config
config = ThrottlingConfig(
per_minute_limit=10,
per_hour_limit=100,
per_day_limit=500,
burst_multiplier=1.5,
scope=RateLimitScope.PER_ACTION,
algorithm="token_bucket",
enabled=True,
priority_bypass=True, # Bypass for critical notifications
priority_threshold="critical", # Critical bypasses limit
)
throttler = NotificationThrottler(config=config)
# Check
result = throttler.check(
action_type="slack",
checkpoint_name="data_quality",
severity="high",
)
# Acquire
result = throttler.acquire(
action_type="slack",
checkpoint_name="data_quality",
severity="high",
)
if result.allowed:
send_notification()
# Check with CheckpointResult
result = throttler.acquire_result(
checkpoint_result=checkpoint_result,
action_type="slack",
)
# Convenience method
if throttler.is_throttled("slack", "my_checkpoint"):
print("Rate limit exceeded")
ThrottlingConfig¶
@dataclass
class ThrottlingConfig:
per_minute_limit: int | None = 10 # Per-minute limit
per_hour_limit: int | None = 100 # Per-hour limit
per_day_limit: int | None = 500 # Per-day limit
burst_multiplier: float = 1.0 # Burst multiplier
scope: RateLimitScope = RateLimitScope.GLOBAL # Scope
algorithm: str = "token_bucket" # Algorithm
enabled: bool = True # Enabled
custom_limits: dict[str, list[RateLimit]] = {} # Per-action custom limits
severity_limits: dict[str, list[RateLimit]] = {} # Per-severity limits
priority_bypass: bool = False # Priority bypass
priority_threshold: str = "critical" # Bypass threshold
queue_on_throttle: bool = False # Queue when throttled
max_queue_size: int = 1000 # Maximum queue size
ThrottlerBuilder¶
Configure throttler with fluent API.
from truthound.checkpoint.throttling import ThrottlerBuilder, RateLimitScope
throttler = (
ThrottlerBuilder()
# Default limits
.with_per_minute_limit(10)
.with_per_hour_limit(100)
.with_per_day_limit(500)
# Burst configuration
.with_burst_allowance(1.5)
# Algorithm and scope
.with_algorithm("token_bucket") # sliding_window, fixed_window
.with_scope(RateLimitScope.PER_ACTION)
# Priority bypass (critical notifications bypass)
.with_priority_bypass("critical")
# Per-action custom limits
.with_action_limit("pagerduty", per_minute=5, per_hour=20)
.with_action_limit("slack", per_minute=20, per_hour=200)
# Per-severity limits
.with_severity_limit("info", per_minute=5, per_hour=50)
.with_severity_limit("critical", per_minute=None, per_hour=None) # No limit
# Enable queueing
.with_queueing(max_size=1000)
.build()
)
Middleware¶
Automatically wraps actions to apply throttling.
ThrottlingMiddleware¶
from truthound.checkpoint.throttling import (
ThrottlingMiddleware,
ThrottlerBuilder,
)
from truthound.checkpoint.actions import SlackNotification
# Create throttler
throttler = (
ThrottlerBuilder()
.with_per_minute_limit(10)
.with_per_hour_limit(100)
.build()
)
# Create middleware
middleware = ThrottlingMiddleware(throttler=throttler)
# Wrap action
slack_action = SlackNotification(webhook_url="...")
throttled_action = middleware.wrap(slack_action)
# Use - throttling automatically applied
await throttled_action.execute(checkpoint_result)
@throttled Decorator¶
from truthound.checkpoint.throttling import throttled, RateLimit
@throttled(
limits=[
RateLimit.per_minute(10),
RateLimit.per_hour(100),
]
)
async def send_notification(result):
# Notification logic
pass
Global Throttling¶
from truthound.checkpoint.throttling import (
configure_global_throttling,
get_global_middleware,
)
# Global configuration
configure_global_throttling(
per_minute_limit=10,
per_hour_limit=100,
per_day_limit=500,
)
# Use global middleware
middleware = get_global_middleware()
throttled_action = middleware.wrap(slack_action)
Storage Backend¶
InMemoryThrottlingStore¶
In-memory store for single-process use.
from truthound.checkpoint.throttling import InMemoryThrottlingStore
store = InMemoryThrottlingStore()
# Used internally by NotificationThrottler
Statistics Retrieval¶
stats = throttler.get_stats()
print(f"Total checked: {stats.total_checked}")
print(f"Total allowed: {stats.total_allowed}")
print(f"Total throttled: {stats.total_throttled}")
print(f"Burst allowed: {stats.total_burst_allowed}")
print(f"Throttle rate: {stats.throttle_rate:.2%}")
print(f"Allow rate: {stats.allow_rate:.2%}")
print(f"Active buckets: {stats.buckets_active}")
Complete Example¶
from truthound.checkpoint import Checkpoint
from truthound.checkpoint.throttling import (
ThrottlerBuilder,
ThrottlingMiddleware,
RateLimitScope,
)
from truthound.checkpoint.actions import SlackNotification, PagerDutyAction
# Multi-level throttler
throttler = (
ThrottlerBuilder()
.with_per_minute_limit(10)
.with_per_hour_limit(100)
.with_per_day_limit(500)
.with_burst_allowance(1.5)
.with_scope(RateLimitScope.PER_ACTION)
.with_priority_bypass("critical")
.with_action_limit("pagerduty", per_minute=5, per_hour=20)
.with_action_limit("slack", per_minute=20, per_hour=200)
.build()
)
middleware = ThrottlingMiddleware(throttler=throttler)
# Wrap actions
slack_action = middleware.wrap(
SlackNotification(webhook_url="${SLACK_WEBHOOK}")
)
pagerduty_action = middleware.wrap(
PagerDutyAction(routing_key="${PAGERDUTY_KEY}")
)
# Apply to checkpoint
checkpoint = Checkpoint(
name="production_check",
data_source="data.csv",
validators=["null"],
actions=[slack_action, pagerduty_action],
)
YAML Configuration¶
throttling:
enabled: true
per_minute_limit: 10
per_hour_limit: 100
per_day_limit: 500
burst_multiplier: 1.5
scope: per_action
algorithm: token_bucket
priority_bypass: true
priority_threshold: critical
action_limits:
pagerduty:
per_minute: 5
per_hour: 20
slack:
per_minute: 20
per_hour: 200
severity_limits:
info:
per_minute: 5
critical:
per_minute: null # No limit
Throttler Comparison¶
| Throttler | Use Case | Burst | Accuracy | Memory |
|---|---|---|---|---|
| TokenBucket | General | Allowed | Medium | Low |
| SlidingWindow | Precise limiting | None | High | Medium |
| FixedWindow | Simple limiting | 2x possible | Low | Low |
| Composite | Multi-level limits | Per config | High | Medium |
| NoOp | Testing | N/A | N/A | None |