Data Sources¶
Multi-backend data source support for connecting to various data systems.
Overview¶
Truthound supports multiple data backends through the DataSource abstraction:
| Category | Sources |
|---|---|
| Files | CSV, Parquet, JSON, NDJSON |
| DataFrames | Polars, Pandas |
| SQL Databases | PostgreSQL, MySQL, SQLite, DuckDB |
| Cloud Warehouses | BigQuery, Snowflake, Redshift, Databricks |
| Enterprise | Oracle, SQL Server |
| Big Data | Apache Spark |
| NoSQL | MongoDB, Elasticsearch (async) |
| Streaming | Apache Kafka, Kinesis (async) |
Installation & Dependencies¶
Each data source requires specific Python packages. Core file formats (CSV, JSON, Parquet, NDJSON, JSONL) and SQLite are built-in with no additional dependencies.
| Category | Data Source | Required Package | Install Command |
|---|---|---|---|
| DataFrame | Polars | (built-in) | - |
| Pandas | pandas |
pip install truthound[pandas] |
|
| Spark | pyspark |
pip install truthound[spark] |
|
| Core SQL | SQLite | (built-in) | - |
| DuckDB | duckdb |
pip install truthound[duckdb] |
|
| PostgreSQL | psycopg2 |
pip install truthound[postgresql] |
|
| MySQL | mysql-connector-python |
pip install truthound[mysql] |
|
| Cloud DW | BigQuery | google-cloud-bigquery |
pip install truthound[bigquery] |
| Snowflake | snowflake-connector-python |
pip install truthound[snowflake] |
|
| Redshift | redshift-connector |
pip install truthound[redshift] |
|
| Databricks | databricks-sql-connector |
pip install truthound[databricks] |
|
| Enterprise | Oracle | oracledb |
pip install truthound[oracle] |
| SQL Server | pyodbc |
pip install truthound[sqlserver] |
|
| NoSQL | MongoDB | pymongo |
pip install truthound[mongodb] |
| Elasticsearch | elasticsearch |
pip install truthound[elasticsearch] |
|
| Streaming | Kafka | confluent-kafka |
pip install truthound[kafka] |
| Kinesis | aiobotocore |
pip install truthound[kinesis] |
|
| File | CSV, JSON, Parquet, NDJSON, JSONL | (built-in) | - |
Install Multiple Dependencies¶
# Install all enterprise data sources
pip install truthound[enterprise]
# Install specific combinations
pip install truthound[postgresql,bigquery,spark]
# Install all optional dependencies
pip install truthound[all]
Source Code Locations¶
| Category | Data Source | Source File |
|---|---|---|
| DataFrame | Polars | datasources/polars_source.py |
| Pandas | datasources/pandas_source.py |
|
| Spark | datasources/spark_source.py |
|
| Core SQL | SQLite | datasources/sql/sqlite.py |
| DuckDB | datasources/sql/duckdb.py |
|
| PostgreSQL | datasources/sql/postgresql.py |
|
| MySQL | datasources/sql/mysql.py |
|
| Cloud DW | BigQuery | datasources/sql/bigquery.py |
| Snowflake | datasources/sql/snowflake.py |
|
| Redshift | datasources/sql/redshift.py |
|
| Databricks | datasources/sql/databricks.py |
|
| Enterprise | Oracle | datasources/sql/oracle.py |
| SQL Server | datasources/sql/sqlserver.py |
|
| NoSQL | MongoDB | datasources/nosql/mongodb.py |
| Elasticsearch | datasources/nosql/elasticsearch.py |
|
| Streaming | Kafka | datasources/streaming/kafka.py |
| Kinesis | datasources/streaming/kinesis.py |
|
| File | adapters | datasources/adapters.py |
BaseDataSource¶
All data sources inherit from BaseDataSource.
Definition¶
from truthound.datasources.base import BaseDataSource
class BaseDataSource(ABC, Generic[ConfigT]):
"""Abstract base class for all data sources."""
source_type: str = "base"
@property
def name(self) -> str:
"""Get the data source name."""
@property
@abstractmethod
def schema(self) -> dict[str, ColumnType]:
"""Column name to type mapping."""
@property
def columns(self) -> list[str]:
"""Get list of column names."""
@property
def row_count(self) -> int | None:
"""Get row count if efficiently available."""
@property
def capabilities(self) -> set[DataSourceCapability]:
"""Get supported capabilities."""
@abstractmethod
def to_polars_lazyframe(self) -> pl.LazyFrame:
"""Convert to Polars LazyFrame."""
@abstractmethod
def get_execution_engine(self) -> BaseExecutionEngine:
"""Return execution engine."""
def needs_sampling(self) -> bool:
"""Check if sampling is needed for large datasets."""
@abstractmethod
def sample(self, n: int, seed: int | None = None) -> BaseDataSource:
"""Return sampled data source."""
Capabilities¶
from truthound.datasources import DataSourceCapability
class DataSourceCapability(Enum):
LAZY_EVALUATION = "lazy_evaluation" # Supports lazy/deferred execution
SQL_PUSHDOWN = "sql_pushdown" # Can push operations to database
SAMPLING = "sampling" # Supports efficient sampling
STREAMING = "streaming" # Supports streaming/chunked reads
SCHEMA_INFERENCE = "schema_inference" # Can infer schema without full scan
ROW_COUNT = "row_count" # Can get row count efficiently
ColumnType¶
from truthound.datasources import ColumnType
class ColumnType(Enum):
# Numeric types
INTEGER = "integer"
FLOAT = "float"
DECIMAL = "decimal"
# String types
STRING = "string"
TEXT = "text"
# Date/Time types
DATE = "date"
DATETIME = "datetime"
TIME = "time"
DURATION = "duration"
# Boolean
BOOLEAN = "boolean"
# Binary
BINARY = "binary"
# Complex types
LIST = "list"
STRUCT = "struct"
JSON = "json"
# Other
NULL = "null"
UNKNOWN = "unknown"
File Sources¶
FileDataSource¶
Automatically detects file format based on extension.
from truthound.datasources.polars_source import FileDataSource
# CSV
source = FileDataSource("data.csv")
# Parquet
source = FileDataSource("data.parquet")
# JSON
source = FileDataSource("data.json")
# NDJSON (newline-delimited JSON)
source = FileDataSource("data.ndjson")
FileDataSourceConfig¶
from truthound.datasources.polars_source import FileDataSourceConfig
config = FileDataSourceConfig(
path="data.csv",
name="my_data",
max_rows=1_000_000,
infer_schema_length=1000,
n_rows=None, # Limit rows to read
encoding="utf-8",
)
source = FileDataSource(config=config)
DataFrame Sources¶
PolarsDataSource¶
from truthound.datasources.polars_source import PolarsDataSource
import polars as pl
# From DataFrame
df = pl.read_csv("data.csv")
source = PolarsDataSource(df)
# From LazyFrame
lf = pl.scan_csv("data.csv")
source = PolarsDataSource(lf)
PandasDataSource¶
from truthound.datasources.pandas_source import PandasDataSource
import pandas as pd
pdf = pd.read_csv("data.csv")
source = PandasDataSource(pdf)
DictDataSource¶
from truthound.datasources.polars_source import DictDataSource
data = {"col1": [1, 2, 3], "col2": ["a", "b", "c"]}
source = DictDataSource(data)
SQL Databases¶
SQLiteDataSource¶
from truthound.datasources.sql import SQLiteDataSource
# From table
source = SQLiteDataSource(
database="mydb.db",
table="users",
)
# With custom query
source = SQLiteDataSource(
database="mydb.db",
query="SELECT * FROM users WHERE active = 1",
)
DuckDBDataSource¶
DuckDB is an in-process analytical database with excellent Polars integration.
from truthound.datasources.sql import DuckDBDataSource
# From table
source = DuckDBDataSource(
database="analytics.duckdb",
table="events",
)
# With custom query
source = DuckDBDataSource(
database="analytics.duckdb",
query="SELECT * FROM events WHERE date > '2024-01-01'",
)
# In-memory database
source = DuckDBDataSource(
database=":memory:",
query="SELECT 1 as id, 'test' as value",
)
# Read directly from Parquet file (DuckDB feature)
source = DuckDBDataSource.from_parquet("data/*.parquet")
# Read from CSV file
source = DuckDBDataSource.from_csv("data.csv")
# Create from Polars DataFrame
import polars as pl
df = pl.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]})
source = DuckDBDataSource.from_dataframe(df, "my_table")
PostgreSQLDataSource¶
from truthound.datasources.sql import PostgreSQLDataSource
# Basic connection
source = PostgreSQLDataSource(
table="users",
host="localhost",
port=5432,
database="mydb",
user="postgres",
password="secret",
)
# With schema
source = PostgreSQLDataSource(
table="users",
schema="public",
host="localhost",
database="mydb",
user="postgres",
)
# With connection URL
source = PostgreSQLDataSource.from_connection_string(
connection_string="postgresql://user:pass@localhost:5432/mydb",
table="users",
)
# With custom query
source = PostgreSQLDataSource(
host="localhost",
database="mydb",
user="postgres",
query="SELECT id, email FROM users WHERE created_at > '2024-01-01'",
)
MySQLDataSource¶
from truthound.datasources.sql import MySQLDataSource
source = MySQLDataSource(
table="orders",
host="localhost",
port=3306,
database="mydb",
user="root",
password="secret",
)
Cloud Data Warehouses¶
BigQueryDataSource¶
from truthound.datasources.sql import BigQueryDataSource
# Basic usage
source = BigQueryDataSource(
project="my-gcp-project",
dataset="analytics",
table="events",
)
# With credentials file
source = BigQueryDataSource(
project="my-gcp-project",
dataset="analytics",
table="events",
credentials_path="/path/to/credentials.json",
)
# With custom query
source = BigQueryDataSource(
project="my-gcp-project",
query="SELECT * FROM `analytics.events` WHERE date > '2024-01-01'",
)
SnowflakeDataSource¶
from truthound.datasources.sql import SnowflakeDataSource
# Password authentication
source = SnowflakeDataSource(
account="myaccount",
user="myuser",
password="mypassword",
database="MYDB",
schema="PUBLIC",
warehouse="COMPUTE_WH",
table="USERS",
)
# Key-pair authentication
source = SnowflakeDataSource(
account="myaccount",
user="myuser",
private_key_path="/path/to/key.p8",
database="MYDB",
schema="PUBLIC",
warehouse="COMPUTE_WH",
table="USERS",
)
RedshiftDataSource¶
from truthound.datasources.sql import RedshiftDataSource
# Password authentication
source = RedshiftDataSource(
host="cluster.region.redshift.amazonaws.com",
port=5439,
database="mydb",
user="myuser",
password="mypassword",
table="users",
)
# IAM authentication
source = RedshiftDataSource(
host="cluster.region.redshift.amazonaws.com",
database="mydb",
iam_role="arn:aws:iam::123456789:role/RedshiftRole",
table="users",
)
DatabricksDataSource¶
from truthound.datasources.sql import DatabricksDataSource
source = DatabricksDataSource(
server_hostname="adb-123456789.azuredatabricks.net",
http_path="/sql/1.0/warehouses/abc123",
access_token="dapi...",
catalog="main",
schema="default",
table="users",
)
Enterprise Databases¶
OracleDataSource¶
from truthound.datasources.sql import OracleDataSource
# With service name
source = OracleDataSource(
host="localhost",
port=1521,
service_name="ORCL",
user="myuser",
password="mypassword",
table="USERS",
)
SQLServerDataSource¶
from truthound.datasources.sql import SQLServerDataSource
# SQL authentication
source = SQLServerDataSource(
host="localhost",
port=1433,
database="mydb",
user="sa",
password="mypassword",
table="users",
)
# Windows authentication
source = SQLServerDataSource(
host="localhost",
database="mydb",
trusted_connection=True,
table="users",
)
Apache Spark¶
SparkDataSource¶
from truthound.datasources.spark_source import SparkDataSource
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("truthound").getOrCreate()
spark_df = spark.read.parquet("data.parquet")
source = SparkDataSource(spark_df)
Factory Functions¶
get_datasource()¶
Auto-detect source type from input.
from truthound.datasources import get_datasource
# File path
source = get_datasource("data.csv") # FileDataSource
source = get_datasource("data.parquet") # FileDataSource
source = get_datasource("data.json") # FileDataSource
source = get_datasource("data.ndjson") # FileDataSource
# Polars DataFrame
source = get_datasource(polars_df) # PolarsDataSource
# Pandas DataFrame
source = get_datasource(pandas_df) # PandasDataSource
# Dictionary
source = get_datasource({"col": [1, 2, 3]}) # DictDataSource
Note:
get_datasource()does NOT support.db(SQLite) files directly. Useget_sql_datasource()for SQLite databases.
get_sql_datasource()¶
Create SQL source from connection URL or file path.
from truthound.datasources import get_sql_datasource
# SQLite database file (requires table parameter)
source = get_sql_datasource("mydb.db", table="users")
# PostgreSQL connection URL
source = get_sql_datasource(
"postgresql://user:pass@localhost/db",
table="users"
)
Usage with Core Functions¶
import truthound as th
from truthound.datasources.sql import PostgreSQLDataSource, BigQueryDataSource, SQLiteDataSource
# Check
source = PostgreSQLDataSource(table="users", host="localhost", database="mydb")
report = th.check(source=source)
# Learn schema from database
source = SQLiteDataSource(database="mydb.db", table="users")
schema = th.learn(source=source)
schema.save("schema.yaml")
# Scan for PII
source = BigQueryDataSource(project="...", dataset="...", table="customers")
pii_report = th.scan(source=source)
# Profile
source = PostgreSQLDataSource(table="orders", host="localhost", database="mydb")
profile = th.profile(source=source)
# Mask
source = PostgreSQLDataSource(table="users", host="localhost", database="mydb")
masked_df = th.mask(source=source, strategy="hash")
Query Pushdown¶
For SQL sources, Truthound can push validation predicates to the database:
from truthound.datasources.sql import PostgreSQLDataSource
source = PostgreSQLDataSource(
table="large_table",
host="localhost",
database="mydb",
)
# Enable pushdown - validations run on the database server
report = th.check(source=source, pushdown=True)
# Example: null_check becomes:
# SELECT COUNT(*) FROM table WHERE column IS NULL
Sampling¶
For large datasets, use sampling to improve performance:
source = PostgreSQLDataSource(table="huge_table", host="localhost", database="mydb")
# Check if sampling is needed
if source.needs_sampling():
print(f"Dataset has {source.row_count:,} rows, sampling recommended")
source = source.sample(n=100_000, seed=42)
report = th.check(source=source)
Connection Pooling¶
SQL sources support connection pooling:
from truthound.datasources.sql.base import SQLDataSourceConfig
config = SQLDataSourceConfig(
table="users",
host="localhost",
database="mydb",
pool_size=5,
max_overflow=10,
pool_timeout=30,
)
source = PostgreSQLDataSource(config=config)
NoSQL Databases¶
MongoDBDataSource (Async)¶
from truthound.datasources import from_mongodb
# Connect to MongoDB
source = await from_mongodb(
connection_string="mongodb://localhost:27017",
database="mydb",
collection="users",
)
async with source:
lf = await source.to_polars_lazyframe_async()
report = th.check(lf)
ElasticsearchDataSource (Async)¶
from truthound.datasources import from_elasticsearch
source = await from_elasticsearch(
hosts=["http://localhost:9200"],
index="logs",
query={"match_all": {}},
)
async with source:
lf = await source.to_polars_lazyframe_async()
Streaming Platforms¶
KafkaDataSource (Async)¶
from truthound.datasources import from_kafka
source = await from_kafka(
bootstrap_servers="localhost:9092",
topic="events",
group_id="truthound-validators",
)
async with source:
async for batch in source.consume_batches(batch_size=1000):
report = th.check(batch)
Async Factory Functions¶
For async data sources (MongoDB, Elasticsearch, Kafka):
from truthound.datasources import get_async_datasource
# Auto-detect async source type
source = await get_async_datasource(
"mongodb://localhost:27017",
database="mydb",
collection="users",
)
# Check if source is async
from truthound.datasources import is_async_source, is_sync_source
if is_async_source(source):
async with source:
lf = await source.to_polars_lazyframe_async()
See Also¶
- Data Sources Guide - Detailed configuration
- DataSources Architecture - Technical deep dive
- Performance Guide - Optimization tips