Airflow Hooks¶
Hooks for connection management and data loading.
DataQualityHook¶
Manages connections with data quality engines.
from packages.airflow.hooks import DataQualityHook
hook = DataQualityHook(conn_id="my_data_connection")
# Data loading
data = hook.load_data("s3://bucket/data.parquet")
# Validation execution
result = hook.check(data, auto_schema=True)
# Profiling
profile = hook.profile(data)
# Schema learning
learn_result = hook.learn(data)
Parameters¶
| Parameter | Type | Description |
|---|---|---|
conn_id |
str | Airflow Connection ID |
engine_name |
str | Engine name to use |
Methods¶
| Method | Description |
|---|---|
load_data(source) |
Load data |
check(data, ...) |
Data validation |
profile(data) |
Data profiling |
learn(data) |
Schema learning |
get_engine() |
Return engine instance |
TruthoundHook¶
Truthound-specific Hook:
from packages.airflow.hooks import TruthoundHook
hook = TruthoundHook(conn_id="my_connection")
# Use Truthound-specific features
data = hook.load_data("s3://bucket/data.parquet")
result = hook.check(data, auto_schema=True, parallel=True)
ConnectionConfig¶
Connection configuration class:
from packages.airflow.hooks import ConnectionConfig
config = ConnectionConfig(
host="localhost",
port=5432,
login="user",
password="pass",
schema="database",
extra={"ssl": True},
)
DataLoader¶
Data loading utility:
from packages.airflow.hooks import DataLoader
loader = DataLoader()
# Load data from various sources
df = loader.load("s3://bucket/data.parquet")
df = loader.load("gs://bucket/data.csv")
df = loader.load("/path/to/local/file.parquet")
DataWriter¶
Data saving utility:
from packages.airflow.hooks import DataWriter
writer = DataWriter()
# Save data to various destinations
writer.write(df, "s3://bucket/output.parquet")
writer.write(df, "/path/to/local/output.csv")
Using Hooks in Operators¶
from packages.airflow.operators import BaseDataQualityOperator
from packages.airflow.hooks import DataQualityHook
class MyOperator(BaseDataQualityOperator):
def __init__(self, conn_id, **kwargs):
super().__init__(**kwargs)
self.conn_id = conn_id
def execute(self, context):
hook = DataQualityHook(conn_id=self.conn_id)
data = hook.load_data(self.data_source)
result = hook.check(data, auto_schema=True)
return self.serialize_result(result)
Connection Setup¶
Configure Connection via Airflow UI or CLI:
airflow connections add 'my_data_connection' \
--conn-type 's3' \
--conn-extra '{"aws_access_key_id": "...", "aws_secret_access_key": "..."}'
Or via environment variable: