Data Sources Guide¶
This guide covers connecting Truthound to various data backends through the Python API. It includes connection patterns, authentication methods, and practical workflows for database validation.
Quick Start¶
import truthound as th
from truthound.datasources import get_sql_datasource
# Quick data loading with th.read()
df = th.read("data.csv") # File path
df = th.read({"a": [1, 2, 3], "b": ["x", "y", "z"]}) # Dict data
df = th.read("large_data.parquet", sample_size=10000) # With sampling
df = th.read({"path": "data.csv", "delimiter": ";"}) # Config dict
# File-based validation
report = th.check("data.csv")
# Database validation
source = get_sql_datasource("mydb.db", table="users")
report = th.check(source=source)
# With query pushdown (runs on database server)
from truthound.datasources import PostgreSQLDataSource
source = PostgreSQLDataSource(table="users", host="localhost", database="mydb")
report = th.check(source=source, pushdown=True)
Common Workflows¶
Workflow 1: Multi-Source Validation Pipeline¶
import truthound as th
from truthound.datasources import PostgreSQLDataSource, BigQueryDataSource
# Define sources
sources = {
"postgres_users": PostgreSQLDataSource(
table="users", host="localhost", database="app"
),
"bigquery_events": BigQueryDataSource(
project="my-project", dataset="analytics", table="events"
),
}
# Validate all sources
results = {}
for name, source in sources.items():
report = th.check(source=source)
results[name] = report
print(f"{name}: {len(report.issues)} issues found")
Workflow 2: Cross-Database Comparison¶
import truthound as th
from truthound.validators import CrossTableRowCountValidator
from truthound.datasources import PostgreSQLDataSource, MySQLDataSource
# Source and target databases
source_db = PostgreSQLDataSource(table="users", host="source-db", database="app")
target_db = MySQLDataSource(table="users", host="target-db", database="app")
# Load data
source_data = source_db.to_polars_lazyframe().collect()
target_data = target_db.to_polars_lazyframe().collect()
# Compare row counts
validator = CrossTableRowCountValidator(
reference_data=source_data,
operator="==",
tolerance=0.0
)
issues = validator.validate(target_data.lazy())
Workflow 3: Query Mode Validation¶
from truthound.datasources.sql import PostgreSQLDataSource
# Validate custom query results instead of entire table
source = PostgreSQLDataSource(
database="analytics",
host="localhost",
query="""
SELECT user_id, email, created_at
FROM users
WHERE status = 'active'
AND created_at > '2024-01-01'
"""
)
# Validate query results
report = th.check(source=source, validators=["null", "email"])
Workflow 4: Environment-Based Connection¶
import os
from truthound.datasources.sql import PostgreSQLDataSource
# Set environment variables
# export DB_HOST=localhost
# export DB_NAME=myapp
# export DB_USER=readonly
# export DB_PASSWORD=secret
source = PostgreSQLDataSource(
table="users",
host=os.environ["DB_HOST"],
database=os.environ["DB_NAME"],
user=os.environ["DB_USER"],
password=os.environ["DB_PASSWORD"],
)
# Or use from_env() where available
from truthound.datasources.sql import SnowflakeDataSource
source = SnowflakeDataSource.from_env(
table="users",
database="MY_DB",
env_prefix="SNOWFLAKE", # Reads SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, etc.
)
Overview¶
Truthound supports multiple data backends through a unified abstraction layer:
- DataSource: Abstraction for data storage and access
- ExecutionEngine: Abstraction for running validation operations
This architecture enables validators to work seamlessly across Polars, Pandas, SQL databases, and Spark.
Important: Python API Only
DataSource integration is available only through the Python API. The CLI (
truthound check,truthound scan, etc.) only supports file-based inputs (CSV, JSON, Parquet, NDJSON, JSONL).For SQL databases, Spark, or Cloud Data Warehouses, you must use the Python API with the
source=parameter as shown below.
Quick Start with DataSource¶
All core API functions (check, scan, mask, profile) support the source parameter for DataSource integration:
import truthound as th
from truthound.datasources import get_sql_datasource
# Create a DataSource
source = get_sql_datasource("mydb.db", table="users")
# Use with all core API functions
report = th.check(source=source) # Data validation
pii_report = th.scan(source=source) # PII detection
profile = th.profile(source=source) # Statistical profiling
masked_df = th.mask(source=source) # Data masking
# You can also combine with other options
report = th.check(
source=source,
validators=["null", "duplicate"],
min_severity="medium",
parallel=True,
)
Supported Data Sources¶
Truthound supports a comprehensive range of data backends:
| Category | Sources |
|---|---|
| DataFrame | Polars, Pandas, PySpark |
| Core SQL | SQLite, PostgreSQL, MySQL |
| Cloud DW | BigQuery, Snowflake, Redshift, Databricks |
| Enterprise | Oracle, SQL Server |
| File | CSV, JSON, Parquet, NDJSON, JSONL |
Polars (Primary)¶
Polars is the primary and most feature-complete backend.
import polars as pl
from truthound import check
from truthound.datasources import PolarsDataSource
df = pl.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
})
# Direct usage with check()
report = check(df, rules={...})
# Explicit DataSource usage
source = PolarsDataSource(df)
engine = source.get_execution_engine()
print(f"Row count: {engine.count_rows()}")
Pandas¶
Native Pandas support with automatic conversion when needed.
import pandas as pd
from truthound.datasources import PandasDataSource
df = pd.DataFrame({
"id": [1, 2, 3],
"value": [100, 200, 300],
})
source = PandasDataSource(df)
engine = source.get_execution_engine()
print(f"Null counts: {engine.count_nulls_all()}")
SQL Databases¶
SQL sources with connection pooling and query pushdown optimization.
SQLite¶
from truthound.datasources.sql import SQLiteDataSource
# From file
source = SQLiteDataSource(table="users", database="data.db")
# From DataFrame (creates temporary DB)
import polars as pl
df = pl.DataFrame({"id": [1, 2], "name": ["A", "B"]})
source = SQLiteDataSource.from_dataframe(df, "test_table")
engine = source.get_execution_engine()
print(f"Distinct values: {engine.count_distinct('name')}")
PostgreSQL¶
from truthound.datasources.sql import PostgreSQLDataSource
source = PostgreSQLDataSource(
table="users",
host="localhost",
database="mydb",
user="postgres",
password="secret",
schema_name="public",
)
# Or using connection string
source = PostgreSQLDataSource.from_connection_string(
connection_string="postgresql://user:pass@localhost/db",
table="users",
)
MySQL¶
from truthound.datasources.sql import MySQLDataSource
source = MySQLDataSource(
table="customers",
host="localhost",
database="mydb",
user="root",
password="secret",
)
Cloud Data Warehouses¶
Google BigQuery¶
from truthound.datasources.sql import BigQueryDataSource
# Using service account credentials
source = BigQueryDataSource(
table="users",
project="my-gcp-project",
dataset="my_dataset",
credentials_path="/path/to/service-account.json",
)
# Using default credentials (ADC)
source = BigQueryDataSource(
table="users",
project="my-gcp-project",
dataset="my_dataset",
)
# Cost-aware query execution
engine = source.get_execution_engine()
results = source.execute_with_cost_check(
"SELECT * FROM large_table",
max_bytes=1_000_000_000, # 1GB limit
max_cost_usd=1.0, # $1 limit
)
Snowflake¶
from truthound.datasources.sql import SnowflakeDataSource
# Password authentication
source = SnowflakeDataSource(
table="users",
account="myaccount",
database="MY_DB",
schema="PUBLIC",
warehouse="COMPUTE_WH",
user="myuser",
password="mypassword",
)
# SSO authentication
source = SnowflakeDataSource(
table="users",
account="myaccount",
database="MY_DB",
authenticator="externalbrowser",
)
# Key-pair authentication
source = SnowflakeDataSource(
table="users",
account="myaccount",
database="MY_DB",
user="myuser",
private_key_path="/path/to/rsa_key.pem",
)
# From environment variables
source = SnowflakeDataSource.from_env(
table="users",
database="MY_DB",
env_prefix="SNOWFLAKE", # Reads SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, etc.
)
Amazon Redshift¶
from truthound.datasources.sql import RedshiftDataSource
# Password authentication
source = RedshiftDataSource(
table="users",
host="cluster.abc123.us-east-1.redshift.amazonaws.com",
database="mydb",
user="admin",
password="password",
schema="public",
)
# IAM authentication
source = RedshiftDataSource(
table="users",
host="cluster.abc123.us-east-1.redshift.amazonaws.com",
database="mydb",
cluster_identifier="my-cluster",
db_user="admin",
iam_auth=True,
)
# From environment variables
source = RedshiftDataSource.from_env(
table="users",
env_prefix="REDSHIFT",
)
Databricks SQL¶
from truthound.datasources.sql import DatabricksDataSource
# Personal Access Token (PAT)
source = DatabricksDataSource(
table="users",
host="adb-12345.azuredatabricks.net",
http_path="/sql/1.0/warehouses/abc123",
access_token="dapi...",
catalog="main", # Unity Catalog
schema="default",
)
# OAuth M2M
source = DatabricksDataSource(
table="users",
host="adb-12345.azuredatabricks.net",
http_path="/sql/1.0/warehouses/abc123",
client_id="...",
client_secret="...",
use_oauth=True,
catalog="main",
schema="default",
)
# Delta Lake features
history = source.get_table_history(limit=10)
source.optimize(zorder_by=["user_id"])
old_data = source.time_travel_query(version=5)
Enterprise Databases¶
Oracle Database¶
from truthound.datasources.sql import OracleDataSource
# Using service name
source = OracleDataSource(
table="USERS",
host="oracle.example.com",
service_name="ORCL",
user="myuser",
password="mypassword",
)
# Using SID
source = OracleDataSource(
table="USERS",
host="oracle.example.com",
sid="ORCL",
user="myuser",
password="mypassword",
)
# Using TNS name
source = OracleDataSource.from_tns(
table="USERS",
tns_name="MYDB",
user="myuser",
password="mypassword",
)
# Oracle Wallet (for cloud databases)
source = OracleDataSource(
table="USERS",
dsn="mydb_high",
user="ADMIN",
password="password",
wallet_location="/path/to/wallet",
)
Microsoft SQL Server¶
from truthound.datasources.sql import SQLServerDataSource
# SQL Server Authentication
source = SQLServerDataSource(
table="Users",
host="sqlserver.example.com",
database="MyDB",
user="sa",
password="password",
schema="dbo",
)
# Windows Authentication
source = SQLServerDataSource(
table="Users",
host="sqlserver.example.com",
database="MyDB",
trusted_connection=True,
)
# Azure SQL Database
source = SQLServerDataSource(
table="Users",
host="myserver.database.windows.net",
database="MyDB",
user="admin@myserver",
password="password",
encrypt=True,
)
# From connection string
source = SQLServerDataSource.from_connection_string(
table="Users",
connection_string="SERVER=myserver,1433;DATABASE=mydb;UID=user;PWD=pass",
)
Spark¶
PySpark support with automatic sampling for large datasets.
from pyspark.sql import SparkSession
from truthound.datasources import SparkDataSource
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("large_data.parquet")
# Create source with automatic sampling
source = SparkDataSource(df)
# Check if sampling is needed
if source.needs_sampling():
source = source.sample(n=100_000)
engine = source.get_execution_engine()
File-Based Sources¶
Direct loading from CSV, JSON, Parquet, NDJSON, and JSONL files.
from truthound.datasources import FileDataSource
# CSV
source = FileDataSource("data.csv")
# Parquet
source = FileDataSource("data.parquet")
# JSON
source = FileDataSource("data.json")
# NDJSON (newline-delimited JSON)
source = FileDataSource("data.ndjson")
# JSONL (JSON Lines - same as NDJSON)
source = FileDataSource("data.jsonl")
Factory Functions¶
th.read() - Convenience Data Loading¶
The th.read() function provides a simple way to load data from various sources:
import truthound as th
# File paths
df = th.read("data.csv")
df = th.read("data.parquet")
df = th.read("data.json")
# Raw data dict (column-oriented)
df = th.read({"a": [1, 2, 3], "b": ["x", "y", "z"]})
# Config dict with options
df = th.read({"path": "data.csv", "delimiter": ";"})
# With sampling for large datasets
df = th.read("large_data.csv", sample_size=10000)
# Polars DataFrame/LazyFrame passthrough
df = th.read(existing_df)
df = th.read(existing_lf)
Auto-Detection¶
The get_datasource() function auto-detects the input type:
from truthound import get_datasource
# Polars DataFrame
source = get_datasource(pl.DataFrame(...))
# Pandas DataFrame
source = get_datasource(pd.DataFrame(...))
# File path
source = get_datasource("data.csv")
# Dictionary
source = get_datasource({"a": [1, 2, 3], "b": ["x", "y", "z"]})
SQL Factory¶
from truthound import get_sql_datasource
# SQLite
source = get_sql_datasource("data.db", table="users")
# PostgreSQL
source = get_sql_datasource(
"postgresql://user:pass@localhost/db",
table="users",
)
Convenience Functions¶
from truthound.datasources import from_polars, from_pandas, from_file, from_dict
source = from_polars(polars_df)
source = from_pandas(pandas_df)
source = from_file("data.csv")
source = from_dict({"col1": [1, 2], "col2": ["a", "b"]})
Execution Engines¶
Execution engines handle the actual validation operations.
PolarsExecutionEngine¶
Primary engine using Polars' lazy evaluation.
from truthound.execution import PolarsExecutionEngine
engine = PolarsExecutionEngine(lazyframe)
# Core operations
engine.count_rows()
engine.get_columns()
engine.count_nulls("column")
engine.count_distinct("column")
# Statistics
engine.get_stats("numeric_column")
# Returns: {count, null_count, mean, min, max, sum, std, median, q25, q75}
# Aggregations
from truthound.execution import AggregationType
engine.aggregate({
"col1": AggregationType.SUM,
"col2": AggregationType.COUNT_DISTINCT,
})
SQLExecutionEngine¶
SQL engine with query pushdown.
from truthound.execution import SQLExecutionEngine
engine = SQLExecutionEngine(sql_datasource)
# Operations are pushed to the database
engine.count_matching("age > 30") # Runs: SELECT COUNT(*) WHERE age > 30
engine.count_in_range("salary", min_value=50000, max_value=100000)
engine.count_duplicates(["email"])
PandasExecutionEngine¶
Native Pandas operations for compatibility.
from truthound.execution import PandasExecutionEngine
engine = PandasExecutionEngine(pandas_df)
engine.get_value_counts("category")
Size Limits and Sampling¶
Configuration¶
from truthound.datasources.base import DataSourceConfig
config = DataSourceConfig(
max_rows=10_000_000, # Maximum rows allowed
max_memory_mb=4096, # Maximum memory in MB
sample_size=100_000, # Default sample size
sample_seed=42, # Reproducible sampling
)
Checking Limits¶
source = PolarsDataSource(large_df)
# Check if sampling is needed
if source.needs_sampling():
print("Data exceeds limits, sampling recommended")
source = source.sample(n=100_000)
# Get safe sample (auto-samples if needed)
safe_source = source.get_safe_sample()
Size Warnings¶
When using the check() API, you'll get warnings for large datasets:
from truthound import check
# Warning will be emitted if data exceeds limits
report = check(large_source, rules={...})
# WARNING: Data source exceeds recommended size limits...
Column Types¶
Unified column type representation across backends:
from truthound.datasources import ColumnType
# Numeric types
ColumnType.INTEGER
ColumnType.FLOAT
ColumnType.DECIMAL
# String types
ColumnType.STRING
ColumnType.TEXT
# Date/Time types
ColumnType.DATE
ColumnType.DATETIME
ColumnType.TIME
ColumnType.DURATION
# Boolean
ColumnType.BOOLEAN
# Binary
ColumnType.BINARY
# Complex types
ColumnType.LIST
ColumnType.STRUCT
ColumnType.JSON
# Other
ColumnType.NULL
ColumnType.UNKNOWN
Type Inspection¶
source = PolarsDataSource(df)
# Get schema
schema = source.schema
# {'id': ColumnType.INTEGER, 'name': ColumnType.STRING, ...}
# Get columns by type
numeric_cols = source.get_numeric_columns()
string_cols = source.get_string_columns()
datetime_cols = source.get_datetime_columns()
# Check specific column type
col_type = source.get_column_type("my_column")
Connection Pooling (SQL)¶
SQL data sources use connection pooling for efficiency:
from truthound.datasources.sql import SQLDataSourceConfig
config = SQLDataSourceConfig(
pool_size=5, # Connection pool size
pool_timeout=30.0, # Timeout for acquiring connection
query_timeout=300.0, # Query execution timeout
fetch_size=10000, # Rows to fetch at a time
)
source = PostgreSQLDataSource(
table="users",
host="localhost",
config=config,
)
Spark-Specific Features¶
Partitioning¶
source = SparkDataSource(spark_df)
# Repartition for better distribution
source = source.repartition(num_partitions=10)
# Coalesce to reduce partitions
source = source.coalesce(num_partitions=2)
Persistence¶
# Persist in memory for repeated access
source = source.persist()
# Get execution plan
print(source.explain(extended=True))
# Unpersist when done
source = source.unpersist()
Safety Limits¶
Spark sources have conservative limits to prevent driver OOM:
from truthound.datasources.spark_source import SparkDataSourceConfig
config = SparkDataSourceConfig(
max_rows_for_local=100_000, # Max rows to collect to driver
persist_sampled=True, # Persist sampled data
force_sampling=False, # Always sample
)
Using with Validators¶
DataSources integrate seamlessly with the validation API:
from truthound import check
from truthound.datasources import get_datasource
# Create source
source = get_datasource("data.csv")
# Use with check() API
report = check(
source, # Pass source directly
rules={
"id": ["not_null", "unique"],
"email": ["not_null", {"type": "regex", "pattern": r".*@.*"}],
},
)
# Or pass to check() with source parameter
report = check(
df, # Original data
source=source, # Explicit source
rules={...},
)
Best Practices¶
- Use Polars when possible - it's the most optimized backend
- Sample large datasets before validation to avoid memory issues
- Use SQL pushdown for database sources - operations run on the server
- Configure connection pools appropriately for your workload
- Check
needs_sampling()before processing unknown-size data - Use factory functions for convenience and type auto-detection
Architecture¶
┌───────────────────────────────────────────────────────────────────────────┐
│ Truthound Validators │
├───────────────────────────────────────────────────────────────────────────┤
│ Execution Engines │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Polars │ │ Pandas │ │ SQL │ │ Spark │ │
│ │ Engine │ │ Engine │ │ Engine │ │ Engine │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
├───────────────────────────────────────────────────────────────────────────┤
│ Data Sources │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ DataFrame: Polars, Pandas, PySpark │ │
│ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ Core SQL: SQLite, PostgreSQL, MySQL │ │
│ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ Cloud DW: BigQuery, Snowflake, Redshift, Databricks │ │
│ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ Enterprise: Oracle, SQL Server │ │
│ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ File: CSV, Parquet, JSON, NDJSON │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────────────────┘
Installation¶
Different data sources require different dependencies:
# Core (always available)
pip install truthound
# Cloud Data Warehouses
pip install truthound[bigquery] # Google BigQuery
pip install truthound[snowflake] # Snowflake
pip install truthound[redshift] # Amazon Redshift
pip install truthound[databricks] # Databricks SQL
# Enterprise Databases
pip install truthound[oracle] # Oracle Database
pip install truthound[sqlserver] # SQL Server
# DataFrame Libraries
pip install truthound[spark] # PySpark
# All enterprise sources
pip install truthound[enterprise]
Checking Available Sources¶
from truthound.datasources.sql import get_available_sources, check_source_available
# Get all sources and their availability
sources = get_available_sources()
for name, cls in sources.items():
status = "available" if cls is not None else "not installed"
print(f"{name}: {status}")
# Check specific source
if check_source_available("bigquery"):
from truthound.datasources.sql import BigQueryDataSource
# Use BigQuery...
NoSQL Data Sources¶
Truthound supports NoSQL databases for document-based and search-based data validation.
MongoDB¶
Installation:
Usage:
from truthound.datasources import from_mongodb
# Async MongoDB data source
source = await from_mongodb(
connection_string="mongodb://localhost:27017",
database="mydb",
collection="users",
)
async with source:
schema = await source.get_schema_async()
lf = await source.to_polars_lazyframe_async()
MongoDB Atlas:
import os
from truthound.datasources import from_atlas
# Set MONGODB_ATLAS_URI environment variable with your connection string
# export MONGODB_ATLAS_URI="mongodb+srv://<username>:<password>@<cluster>.mongodb.net"
source = await from_atlas(
connection_string=os.environ["MONGODB_ATLAS_URI"],
database="mydb",
collection="users",
)
Elasticsearch¶
Installation:
Usage:
from truthound.datasources import from_elasticsearch
source = await from_elasticsearch(
hosts=["http://localhost:9200"],
index="users",
query={"match_all": {}}, # Optional query filter
)
async with source:
lf = await source.to_polars_lazyframe_async()
Streaming Data Sources¶
Truthound supports streaming platforms for real-time data validation.
Apache Kafka¶
Installation:
Usage:
from truthound.datasources import from_kafka
# Kafka consumer data source
source = await from_kafka(
bootstrap_servers="localhost:9092",
topic="user-events",
group_id="truthound-validator",
auto_offset_reset="earliest",
)
async with source:
# Consume messages as DataFrame
async for batch in source.consume_batches(batch_size=1000):
report = th.check(batch, validators=["null", "format"])
Confluent Kafka¶
Usage:
from truthound.datasources import from_confluent
source = await from_confluent(
bootstrap_servers="pkc-xxx.confluent.cloud:9092",
topic="user-events",
api_key="your-api-key",
api_secret="your-api-secret",
)
Async Data Sources¶
For I/O-bound workloads, Truthound provides async data source support with full async/await compatibility.
Async Protocols¶
from truthound.datasources import (
AsyncDataSourceProtocol,
AsyncConnectableProtocol,
AsyncStreamableProtocol,
AsyncQueryableProtocol,
)
Async Factory Functions¶
from truthound.datasources import (
get_async_datasource,
from_mongodb,
from_elasticsearch,
from_kafka,
from_confluent,
from_atlas,
detect_async_datasource_type,
is_native_async_source,
)
# Auto-detect async data source
source = await get_async_datasource(
"mongodb://localhost:27017",
database="mydb",
collection="users",
)
Sync-Async Adapters¶
Convert between sync and async data sources:
from truthound.datasources import (
SyncToAsyncAdapter,
AsyncToSyncAdapter,
adapt_to_async,
adapt_to_sync,
is_async_source,
is_sync_source,
)
# Wrap sync source for async usage
sync_source = PolarsDataSource(df)
async_source = adapt_to_async(sync_source)
# Wrap async source for sync usage
async_source = await from_mongodb(...)
sync_source = adapt_to_sync(async_source)
Async Base Classes¶
from truthound.datasources import (
AsyncBaseDataSource,
AsyncDataSourceConfig,
AsyncConnectionPool,
)
class MyAsyncDataSource(AsyncBaseDataSource):
async def connect(self) -> None:
...
async def to_polars_lazyframe_async(self) -> pl.LazyFrame:
...
Optimized Pandas Conversion¶
For large Pandas DataFrames, use the optimized converter for better memory efficiency:
from truthound.datasources import (
OptimizedPandasDataSource,
OptimizedPandasConfig,
DataFrameOptimizer,
optimize_pandas_to_polars,
estimate_polars_memory,
get_optimal_chunk_size,
)
# Estimate memory before conversion
memory_estimate = estimate_polars_memory(pandas_df)
print(f"Estimated Polars memory: {memory_estimate / 1024**2:.1f} MB")
# Get optimal chunk size for conversion
chunk_size = get_optimal_chunk_size(pandas_df)
# Optimized conversion
config = OptimizedPandasConfig(
chunk_size=chunk_size,
use_pyarrow=True,
preserve_index=False,
)
source = OptimizedPandasDataSource(pandas_df, config=config)
Further Reading¶
- Data Sources Architecture - Design patterns, extensibility guide, and quality assessment
- Architecture Overview - Overall system architecture
- README - Main documentation