Skip to content

Dagster Assets

Decorators and factories for defining Assets with data quality validation.

@quality_checked_asset

Asset with automatic quality validation:

from packages.dagster.assets import quality_checked_asset

@quality_checked_asset(
    auto_schema=True,
    fail_on_error=True,
)
def my_data_asset():
    return pl.read_parquet("data.parquet")

Parameters

Parameter Type Description
auto_schema bool Automatic schema generation
rules list List of validation rules
fail_on_error bool Raise exception on failure
engine_name str Engine name to use

@profiled_asset

Asset with automatic profiling:

from packages.dagster.assets import profiled_asset

@profiled_asset
def profiled_data():
    return pl.read_parquet("data.parquet")

Profile results are stored in Asset metadata.

Asset Factories

create_quality_asset

Create quality validation Asset:

from packages.dagster.assets import create_quality_asset, QualityAssetConfig

config = QualityAssetConfig(
    auto_schema=True,
    fail_on_error=True,
    engine_name="truthound",
)

my_asset = create_quality_asset(
    name="my_quality_asset",
    compute_fn=lambda: pl.read_parquet("data.parquet"),
    config=config,
)

create_quality_check_asset

Asset that returns only validation results:

from packages.dagster.assets import create_quality_check_asset

check_asset = create_quality_check_asset(
    name="quality_check_result",
    data_source="s3://bucket/data.parquet",
)

Asset Configuration

QualityAssetConfig

from packages.dagster.assets import QualityAssetConfig

config = QualityAssetConfig(
    auto_schema=True,
    rules=[
        {"type": "not_null", "column": "id"},
        {"type": "unique", "column": "email"},
    ],
    fail_on_error=True,
    engine_name="truthound",
)

ProfileAssetConfig

from packages.dagster.assets import ProfileAssetConfig

config = ProfileAssetConfig(
    include_statistics=True,
    store_in_metadata=True,
)

QualityCheckMode

from packages.dagster.assets import QualityCheckMode

# BEFORE: Validate before returning data
# AFTER: Validate after returning data
# BOTH: Validate both before and after

@quality_checked_asset(mode=QualityCheckMode.BEFORE)
def my_asset():
    return load_data()

Asset Dependencies

from dagster import asset
from packages.dagster.assets import quality_checked_asset

@asset
def raw_data():
    return pl.read_parquet("raw.parquet")

@quality_checked_asset(auto_schema=True)
def clean_data(raw_data):
    # raw_data is quality validated
    return transform(raw_data)

Metadata

Quality validation results are stored in Asset metadata:

from dagster import asset, Output

@quality_checked_asset(auto_schema=True)
def my_asset():
    data = load_data()
    return Output(
        data,
        metadata={
            "row_count": len(data),
            "quality_status": "passed",
        },
    )

Usage in Definitions

from dagster import Definitions
from packages.dagster.assets import quality_checked_asset
from packages.dagster.resources import DataQualityResource

@quality_checked_asset(auto_schema=True)
def my_asset():
    return load_data()

defs = Definitions(
    assets=[my_asset],
    resources={"data_quality": DataQualityResource()},
)