Skip to content

Anomaly Detection (이상탐지)

ML 기반 이상탐지 기능입니다. Isolation Forest, Z-Score, LOF, Ensemble 탐지기를 지원합니다.

개요

  • 4개 탐지기: Isolation Forest, Z-Score, LOF, Ensemble
  • 컬럼별 분석: 각 컬럼에 대해 독립적으로 이상치 점수 산출
  • Protocol 기반: AnomalyDetectionEngine Protocol을 구현하는 엔진에서만 사용 가능
  • Truthound 지원: TruthoundEngine이 기본적으로 모든 탐지기를 지원

빠른 시작

from common.engines import TruthoundEngine
import polars as pl

engine = TruthoundEngine()
data = pl.read_parquet("data.parquet")

result = engine.detect_anomalies(data)

if result.has_anomalies:
    print(f"이상치 감지! rate={result.anomaly_rate:.2%}")
    for score in result.anomalies:
        print(f"  {score.column}: score={score.score:.4f}, anomaly={score.is_anomaly}")

API

detect_anomalies(data, *, detector, columns, contamination, **kwargs)

파라미터 타입 기본값 설명
data Any (필수) 분석 대상 데이터셋
detector str "isolation_forest" 탐지기 이름
columns Sequence[str] | None None 검사 대상 컬럼 (None=전체)
contamination float 0.05 예상 이상치 비율 (0 < x < 0.5)

반환값: AnomalyResult

속성 타입 설명
status AnomalyStatus NORMAL, ANOMALY_DETECTED, WARNING, ERROR
has_anomalies bool 이상치 감지 여부 (property)
anomaly_rate float 이상치 비율 (property)
anomalies tuple[AnomalyScore] 컬럼별 이상치 점수
anomalous_row_count int 이상치 행 수
total_row_count int 전체 행 수
detector str 사용된 탐지기
execution_time_ms float 실행 시간

AnomalyScore 속성

속성 타입 설명
column str 컬럼명
score float 이상치 점수
threshold float 판정 임계값
is_anomaly bool 이상치 여부
detector str 사용된 탐지기

Detector 비교

Detector 알고리즘 장점 단점 용도
isolation_forest Isolation Forest 고차원, 대용량 효율적 하이퍼파라미터 민감 범용 이상탐지
z_score Z-Score 단순, 해석 용이 정규분포 가정 정규분포 데이터
lof Local Outlier Factor 밀도 기반, 국소 이상치 감지 계산 비용 높음 군집형 데이터
ensemble 다수결 앙상블 높은 정확도, 강건함 느림 정확도 우선

AnomalyConfig

from common.base import AnomalyConfig

config = AnomalyConfig(detector="ensemble", contamination=0.03)
config = config.with_detector("lof").with_columns(("amount",)).with_contamination(0.01)

d = config.to_dict()
restored = AnomalyConfig.from_dict(d)

엔진 설정

from common.engines import TruthoundEngineConfig, TruthoundEngine

config = TruthoundEngineConfig().with_anomaly_defaults(
    detector="isolation_forest",
    contamination=0.05,
)
engine = TruthoundEngine(config=config)

Feature Detection

from common.engines.base import supports_anomaly

assert supports_anomaly(TruthoundEngine()) is True
assert supports_anomaly(GreatExpectationsAdapter()) is False

플랫폼 통합

Airflow

from truthound_airflow.operators.anomaly import DataQualityAnomalyOperator

op = DataQualityAnomalyOperator(
    task_id="anomaly_check",
    data_path="s3://bucket/data.parquet",
    detector="isolation_forest",
    contamination=0.05,
    fail_on_anomaly=True,
)

Dagster

from truthound_dagster.ops.anomaly import data_quality_anomaly_op, create_anomaly_op

result = data_quality_anomaly_op(context, data=df)
my_op = create_anomaly_op(detector="z_score", contamination=0.03)

Prefect

from truthound_prefect.tasks.anomaly import data_quality_anomaly_task

result = await data_quality_anomaly_task(
    block=dq_block,
    data_path="data.parquet",
    detector="isolation_forest",
)

Mage

from truthound_mage.blocks.anomaly import AnomalyTransformer, create_anomaly_transformer

transformer = create_anomaly_transformer(detector="lof")
result = transformer.execute(data)

Kestra

from truthound_kestra.scripts.anomaly import anomaly_detection_script

anomaly_detection_script(data_path="data.parquet", detector="ensemble")

dbt

models:
  - name: transactions
    tests:
      - truthound_anomaly_zscore:
          column: amount
          threshold: 3.0
      - truthound_anomaly_iqr:
          column: amount
          factor: 1.5

직렬화

result_dict = result.to_dict()
result_json = result.to_json()
restored = AnomalyResult.from_dict(result_dict)

배치 처리

from common.engines import BatchExecutor

executor = BatchExecutor(engine, config)
batch_result = executor.anomaly_batch(datasets, detector="isolation_forest")
batch_result = executor.anomaly_chunked(large_data, detector="z_score")

멀티 엔진 집계

from common.engines import aggregate_anomaly_results, AggregationConfig, ResultAggregationStrategy

# CONSENSUS 전략: 다수 엔진이 동의하는 anomaly만 유지
config = AggregationConfig(strategy=ResultAggregationStrategy.CONSENSUS)
aggregated = aggregate_anomaly_results(engine_results, config=config)