Prefect Flows¶
Flow templates and configurations for data quality pipelines.
Flow Configuration¶
FlowConfig¶
from packages.prefect.flows import FlowConfig
config = FlowConfig(
name="quality_flow",
timeout_seconds=3600,
retries=3,
retry_delay_seconds=60,
)
QualityFlowConfig¶
from packages.prefect.flows import QualityFlowConfig
config = QualityFlowConfig(
auto_schema=True,
fail_on_error=True,
engine_name="truthound",
parallel=True,
)
PipelineFlowConfig¶
from packages.prefect.flows import PipelineFlowConfig
config = PipelineFlowConfig(
stages=["load", "validate", "transform", "save"],
fail_fast=True,
)
Flow Examples¶
Basic Quality Validation Flow¶
from prefect import flow, task
from packages.prefect.tasks import data_quality_check_task
@task
def load_data():
return pl.read_parquet("data.parquet")
@task
def save_result(result):
# Result saving logic
pass
@flow(name="basic_quality_flow")
def basic_quality_flow():
data = load_data()
result = data_quality_check_task(data, auto_schema=True)
save_result(result)
return result
Flow with Profiling¶
from prefect import flow
from packages.prefect.tasks import (
data_quality_check_task,
data_quality_profile_task,
)
@flow(name="full_quality_flow")
def full_quality_flow():
data = load_data()
# Parallel execution
check_future = data_quality_check_task.submit(data, auto_schema=True)
profile_future = data_quality_profile_task.submit(data)
# Collect results
check_result = check_future.result()
profile_result = profile_future.result()
return {
"check": check_result,
"profile": profile_result,
}
Conditional Validation Flow¶
from prefect import flow, task
from packages.prefect.tasks import (
strict_check_task,
lenient_check_task,
)
@task
def determine_mode(data):
# Determine validation mode based on data size
return "strict" if len(data) < 10000 else "lenient"
@flow(name="conditional_quality_flow")
def conditional_quality_flow():
data = load_data()
mode = determine_mode(data)
if mode == "strict":
result = strict_check_task(data)
else:
result = lenient_check_task(data)
return result
Learn and Validate Flow¶
from prefect import flow
from packages.prefect.tasks import (
data_quality_learn_task,
data_quality_check_task,
)
@flow(name="learn_and_check_flow")
def learn_and_check_flow():
# Learn schema from baseline data
baseline = load_baseline_data()
learn_result = data_quality_learn_task(baseline)
# Validate new data
new_data = load_new_data()
check_result = data_quality_check_task(
new_data,
rules=learn_result.rules,
)
return check_result
Subflows¶
from prefect import flow
from packages.prefect.tasks import data_quality_check_task
@flow(name="quality_subflow")
def quality_subflow(data):
return data_quality_check_task(data, auto_schema=True)
@flow(name="main_flow")
def main_flow():
data1 = load_data("source1")
data2 = load_data("source2")
# Call subflows
result1 = quality_subflow(data1)
result2 = quality_subflow(data2)
return [result1, result2]
Scheduling¶
from prefect import flow
from prefect.server.schemas.schedules import CronSchedule
@flow(name="scheduled_quality_flow")
def scheduled_quality_flow():
data = load_data()
result = data_quality_check_task(data, auto_schema=True)
return result
# Schedule configuration during deployment
# prefect deployment build ./flow.py:scheduled_quality_flow \
# --cron "0 0 * * *" \
# --name "daily-quality-check"
Notifications¶
from prefect import flow
from prefect.blocks.notifications import SlackWebhook
@flow(name="flow_with_notification")
def flow_with_notification():
data = load_data()
result = data_quality_check_task(data, auto_schema=True)
if result.status.name == "FAILED":
slack = SlackWebhook.load("my-slack-webhook")
slack.notify(f"Quality check failed: {result.failed_count} failures")
return result