Airflow Operators¶
Airflow Operators for data quality validation, profiling, and schema learning.
DataQualityCheckOperator¶
Executes data validation.
from packages.airflow.operators import DataQualityCheckOperator
check = DataQualityCheckOperator(
task_id="quality_check",
data_source="s3://bucket/data.parquet",
auto_schema=True,
fail_on_error=True,
engine_name="truthound",
)
Parameters¶
| Parameter | Type | Description |
|---|---|---|
task_id |
str | Task ID |
data_source |
str | Data source path |
auto_schema |
bool | Automatic schema generation |
rules |
list | List of validation rules |
fail_on_error |
bool | Raise exception on failure |
engine_name |
str | Engine name to use |
XCom Output¶
Validation results are pushed to XCom:
# Use results in subsequent task
def process_result(**context):
result = context["ti"].xcom_pull(task_ids="quality_check")
print(f"Status: {result['status']}")
print(f"Passed: {result['passed_count']}")
DataQualityProfileOperator¶
Performs data profiling.
from packages.airflow.operators import DataQualityProfileOperator
profile = DataQualityProfileOperator(
task_id="profile_data",
data_source="s3://bucket/data.parquet",
engine_name="truthound",
)
Parameters¶
| Parameter | Type | Description |
|---|---|---|
task_id |
str | Task ID |
data_source |
str | Data source path |
engine_name |
str | Engine name to use |
XCom Output¶
Profile results are pushed to XCom:
def analyze_profile(**context):
profile = context["ti"].xcom_pull(task_ids="profile_data")
for col in profile["columns"]:
print(f"{col['column_name']}: {col['null_percentage']}% null")
DataQualityLearnOperator¶
Learns schemas.
from packages.airflow.operators import DataQualityLearnOperator
learn = DataQualityLearnOperator(
task_id="learn_schema",
data_source="s3://bucket/baseline.parquet",
engine_name="truthound",
)
Parameters¶
| Parameter | Type | Description |
|---|---|---|
task_id |
str | Task ID |
data_source |
str | Baseline data path |
engine_name |
str | Engine name to use |
XCom Output¶
Learned rules are pushed to XCom:
def use_learned_schema(**context):
learn_result = context["ti"].xcom_pull(task_ids="learn_schema")
for rule in learn_result["rules"]:
print(f"{rule['column']}: {rule['rule_type']}")
TruthoundCheckOperator¶
Truthound-specific Operator:
from packages.airflow.operators import TruthoundCheckOperator
check = TruthoundCheckOperator(
task_id="truthound_check",
data_source="s3://bucket/data.parquet",
auto_schema=True,
parallel=True,
max_workers=4,
)
BaseDataQualityOperator¶
Base class for custom Operator implementation:
from packages.airflow.operators import BaseDataQualityOperator
class MyCustomOperator(BaseDataQualityOperator):
def execute(self, context):
engine = self.get_engine()
data = self.load_data()
result = engine.check(data, auto_schema=True)
return self.serialize_result(result)
Configuration Classes¶
OperatorConfig¶
from packages.airflow.operators import OperatorConfig
config = OperatorConfig(
engine_name="truthound",
fail_on_error=True,
timeout_seconds=3600,
)
CheckOperatorConfig¶
from packages.airflow.operators import CheckOperatorConfig
config = CheckOperatorConfig(
auto_schema=True,
rules=[
{"type": "not_null", "column": "id"},
],
fail_on_error=True,
)
ProfileOperatorConfig¶
from packages.airflow.operators import ProfileOperatorConfig
config = ProfileOperatorConfig(
include_statistics=True,
)
LearnOperatorConfig¶
from packages.airflow.operators import LearnOperatorConfig
config = LearnOperatorConfig(
min_confidence=0.8,
)
Result Handlers¶
Custom processing of validation results: