Upstream Source
This page is part of Truthound Orchestration 3.x.
Source repository: seadonggyun4/truthound-orchestration
Upstream docs path: docs/airflow/sensors.md
Edit upstream page: Edit in orchestration
Airflow Sensors And Triggers¶
Sensors are for waiting on a quality condition before a downstream task is allowed to continue. In this integration, they stay Airflow-native while leaning on the shared runtime for validation semantics.
DataQualitySensor¶
DataQualitySensor re-checks a source until the quality threshold is met or the timeout expires.
from truthound_airflow import DataQualitySensor
sensor = DataQualitySensor(
task_id="wait_for_quality",
data_path="/opt/airflow/data/upstream_users.parquet",
quality_threshold=0.95,
poke_interval=60,
timeout=3600,
mode="poke",
)
Common Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
task_id |
str | - | Task ID |
data_path / source input |
str | - | path, URI, or other supported source |
quality_threshold |
float | 0.95 | pass rate threshold |
poke_interval |
int | 60 | polling interval |
timeout |
int | 3600 | total wait time |
mode |
str | "poke" |
poke or reschedule |
engine_name |
str | "truthound" |
explicit engine override |
DeferrableDataQualitySensor¶
Use DeferrableDataQualitySensor when long waits would otherwise waste worker slots.
TruthoundSensor¶
Use TruthoundSensor when you want the Truthound-first engine choice to be explicit.
Triggers¶
The Airflow package also includes trigger support so deferrable sensor behavior can stay aligned with the same first-party quality semantics. Treat triggers as the deferrable execution mechanism and sensors as the DAG authoring surface.
Usage Example¶
from airflow import DAG
from truthound_airflow import DataQualitySensor, DataQualityCheckOperator
from datetime import datetime
with DAG(
dag_id="quality_pipeline",
start_date=datetime(2024, 1, 1),
schedule="@daily",
) as dag:
wait_for_quality = DataQualitySensor(
task_id="wait_for_quality",
data_path="/opt/airflow/data/upstream_users.parquet",
quality_threshold=0.99,
poke_interval=300,
timeout=7200,
)
process_data = DataQualityCheckOperator(
task_id="process_data",
data_path="/opt/airflow/data/upstream_users.parquet",
rules=[{"column": "user_id", "type": "not_null"}],
)
wait_for_quality >> process_data
SensorConfig¶
Use SensorConfig when you want reusable defaults across several DAGs or sensors.
Sensor Behavior¶
- Airflow calls the sensor on the configured schedule.
- The source is normalized through the shared runtime.
- Validation executes against the configured threshold.
- The sensor returns success only when the threshold is satisfied.
- Timeout or repeated failure stays visible to the DAG like any other Airflow sensor failure.
Modes¶
reschedule mode is recommended for long wait times and worker efficiency.