Drift Detection (데이터 드리프트 탐지)
데이터 분포 변화를 통계적으로 감지합니다. baseline 데이터와 current 데이터를 비교하여 컬럼별 드리프트를 탐지합니다.
개요
- 14개 통계 방법: KS, PSI, Chi2, KL, JS, Wasserstein, Hellinger, Bhattacharyya, TV, Energy, MMD, CVM, Anderson-Darling, Auto
- 컬럼별 분석: 각 컬럼에 대해 독립적으로 드리프트 탐지
- Protocol 기반:
DriftDetectionEngine Protocol을 구현하는 엔진에서만 사용 가능
- Truthound 지원: TruthoundEngine이 기본적으로 모든 drift 방법을 지원
빠른 시작
from common.engines import TruthoundEngine
import polars as pl
engine = TruthoundEngine()
baseline = pl.read_parquet("baseline.parquet")
current = pl.read_parquet("current.parquet")
result = engine.detect_drift(baseline, current)
if result.is_drifted:
print(f"드리프트 감지! {result.drifted_count}/{result.total_columns} 컬럼")
for col in result.drifted_columns:
print(f" {col.column}: {col.method.name} stat={col.statistic:.4f}")
API
detect_drift(baseline, current, *, method, columns, threshold, **kwargs)
| 파라미터 |
타입 |
기본값 |
설명 |
baseline |
Any |
(필수) |
기준 데이터셋 |
current |
Any |
(필수) |
비교 대상 데이터셋 |
method |
str |
"auto" |
통계 방법 |
columns |
Sequence[str] | None |
None |
검사 대상 컬럼 (None=전체) |
threshold |
float | None |
None |
드리프트 판정 임계값 |
반환값: DriftResult
| 속성 |
타입 |
설명 |
status |
DriftStatus |
NO_DRIFT, DRIFT_DETECTED, WARNING, ERROR |
is_drifted |
bool |
드리프트 감지 여부 (property) |
drift_rate |
float |
드리프트 비율 (property) |
drifted_columns |
tuple[ColumnDrift] |
컬럼별 결과 |
total_columns |
int |
전체 컬럼 수 |
drifted_count |
int |
드리프트 감지 컬럼 수 |
method |
DriftMethod |
사용된 방법 |
execution_time_ms |
float |
실행 시간 |
ColumnDrift 속성
| 속성 |
타입 |
설명 |
column |
str |
컬럼명 |
method |
DriftMethod |
사용된 통계 방법 |
statistic |
float |
검정 통계량 |
p_value |
float | None |
p-value |
threshold |
float |
판정 임계값 |
is_drifted |
bool |
드리프트 여부 |
severity |
Severity |
심각도 |
baseline_stats |
dict |
기준 데이터 통계 |
current_stats |
dict |
현재 데이터 통계 |
통계 방법
연속형 데이터
| Method |
설명 |
특징 |
ks |
Kolmogorov-Smirnov |
가장 일반적, 분포 형태에 민감 |
wasserstein |
Earth Mover's Distance |
분포 간 "이동 비용" 측정 |
anderson_darling |
Anderson-Darling |
꼬리 분포에 민감 |
cvm |
Cramér-von Mises |
누적분포 차이의 제곱 적분 |
energy |
Energy Distance |
다변량 분포 비교 가능 |
범주형 데이터
| Method |
설명 |
특징 |
chi2 |
카이제곱 검정 |
범주형 분포 표준 검정 |
psi |
Population Stability Index |
신용평가 산업 표준 |
정보 이론 기반
| Method |
설명 |
특징 |
kl |
KL Divergence |
비대칭, 방향성 있음 |
js |
Jensen-Shannon |
KL의 대칭 버전 |
hellinger |
Hellinger Distance |
[0,1] 범위, 해석 용이 |
bhattacharyya |
Bhattacharyya |
분포 겹침 측정 |
tv |
Total Variation |
최대 확률 차이 |
커널 기반
| Method |
설명 |
특징 |
mmd |
Maximum Mean Discrepancy |
고차원 데이터에 적합 |
DriftConfig
from common.base import DriftConfig
# 기본 생성
config = DriftConfig(method="ks", threshold=0.05)
# 빌더 패턴
config = DriftConfig().with_method("psi").with_columns(("revenue",)).with_threshold(0.1)
# 직렬화
d = config.to_dict()
restored = DriftConfig.from_dict(d)
엔진 설정
from common.engines import TruthoundEngineConfig, TruthoundEngine
config = TruthoundEngineConfig().with_drift_defaults(
method="ks",
threshold=0.05,
)
engine = TruthoundEngine(config=config)
Feature Detection
from common.engines.base import supports_drift
engine = TruthoundEngine()
assert supports_drift(engine) is True
from common.engines import GreatExpectationsAdapter
ge = GreatExpectationsAdapter()
assert supports_drift(ge) is False
플랫폼 통합
Airflow
from truthound_airflow.operators.drift import DataQualityDriftOperator
op = DataQualityDriftOperator(
task_id="drift_check",
baseline_data_path="s3://bucket/baseline.parquet",
current_data_path="s3://bucket/current.parquet",
method="ks",
threshold=0.05,
fail_on_drift=True,
)
Dagster
from truthound_dagster.ops.drift import data_quality_drift_op, create_drift_op
# Op으로 직접 사용
result = data_quality_drift_op(context, baseline=baseline_df, current=current_df)
# 팩토리로 사전 구성된 Op 생성
my_drift_op = create_drift_op(method="psi", threshold=0.1)
Prefect
from truthound_prefect.tasks.drift import data_quality_drift_task
result = await data_quality_drift_task(
block=dq_block,
baseline_data_path="baseline.parquet",
current_data_path="current.parquet",
method="ks",
)
Mage
from truthound_mage.blocks.drift import DriftTransformer, create_drift_transformer
transformer = create_drift_transformer(method="ks", threshold=0.05)
result = transformer.execute(baseline_data, current_data)
Kestra
from truthound_kestra.scripts.drift import drift_detection_script
drift_detection_script(
baseline_data_path="baseline.parquet",
current_data_path="current.parquet",
method="ks",
)
dbt
# schema.yml
models:
- name: orders
tests:
- truthound_drift_mean:
column: revenue
baseline_model: ref('orders_baseline')
threshold: 0.1
- truthound_drift_null_rate:
column: email
baseline_model: ref('orders_baseline')
threshold: 0.05
직렬화
result_dict = result.to_dict()
result_json = result.to_json()
restored = DriftResult.from_dict(result_dict)
배치 처리
from common.engines import BatchExecutor
executor = BatchExecutor(engine, config)
batch_result = executor.drift_batch(baseline_list, current_list, method="ks")
batch_result = executor.drift_chunked(large_baseline, large_current)
멀티 엔진 집계
from common.engines import aggregate_drift_results
aggregated = aggregate_drift_results({
"engine1": drift_result_1,
"engine2": drift_result_2,
})