Airflow Sensors¶
Sensors that wait until data quality conditions are met.
DataQualitySensor¶
Waits until data quality threshold is satisfied.
from packages.airflow.sensors import DataQualitySensor
sensor = DataQualitySensor(
task_id="wait_for_quality",
data_source="s3://bucket/data.parquet",
quality_threshold=0.95,
poke_interval=60,
timeout=3600,
mode="poke",
)
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
task_id |
str | - | Task ID |
data_source |
str | - | Data source path |
quality_threshold |
float | 0.95 | Pass rate threshold (0.0-1.0) |
poke_interval |
int | 60 | Retry interval (seconds) |
timeout |
int | 3600 | Timeout (seconds) |
mode |
str | "poke" | Sensor mode ("poke" or "reschedule") |
engine_name |
str | "truthound" | Engine name to use |
Usage Example¶
from airflow import DAG
from packages.airflow.sensors import DataQualitySensor
from packages.airflow.operators import DataQualityCheckOperator
from datetime import datetime
with DAG(
dag_id="quality_pipeline",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily",
) as dag:
# Wait for data quality condition
wait_for_quality = DataQualitySensor(
task_id="wait_for_quality",
data_source="s3://bucket/upstream_data.parquet",
quality_threshold=0.99,
poke_interval=300, # Check every 5 minutes
timeout=7200, # 2 hour timeout
)
# Process downstream after quality condition met
process_data = DataQualityCheckOperator(
task_id="process_data",
data_source="s3://bucket/upstream_data.parquet",
auto_schema=True,
)
wait_for_quality >> process_data
TruthoundSensor¶
Truthound-specific Sensor:
from packages.airflow.sensors import TruthoundSensor
sensor = TruthoundSensor(
task_id="truthound_sensor",
data_source="s3://bucket/data.parquet",
quality_threshold=0.95,
auto_schema=True,
parallel=True,
)
SensorConfig¶
Sensor configuration class:
from packages.airflow.sensors import SensorConfig
config = SensorConfig(
quality_threshold=0.95,
poke_interval=60,
timeout=3600,
mode="reschedule",
engine_name="truthound",
)
sensor = DataQualitySensor(
task_id="sensor",
data_source="s3://bucket/data.parquet",
config=config,
)
Sensor Behavior¶
poke()method is called atpoke_intervalintervals- Data quality validation is executed
- Returns
Trueif pass rate meets or exceedsquality_threshold - Waits until next poke if condition not met
- Task fails if
timeoutis exceeded
Modes¶
| Mode | Description |
|---|---|
poke |
Occupies worker slot while waiting |
reschedule |
Releases slot and reschedules |
reschedule mode is recommended for long wait times: