Skip to content

Security Guide

Truthound provides comprehensive security features to prevent security threats during validator execution.

Overview

Security Module Architecture:

┌─────────────────────────────────────────────────────────────────────┐
│                        Security Module                              │
└─────────────────────────────────────────────────────────────────────┘
        ┌───────────────────────┴───────────────────────┐
        ▼                                               ▼
┌───────────────────────────────┐   ┌───────────────────────────────────────┐
│      SQL Injection Prevention  │   │          ReDoS Protection              │
├───────────────────────────────┤   ├───────────────────────────────────────┤
│ • Query Validator             │   │ • Static Analyzer                     │
│ • Parameterized Query         │   │ • ML Pattern Analyzer                 │
│ • Whitelist Validator         │   │ • Pattern Optimizer                   │
│ • Security Policy             │   │ • CVE Database                        │
│ • Audit Logger                │   │ • CPU Monitor                         │
└───────────────────────────────┘   │ • Profiler                            │
                                    │ • RE2 Engine                          │
                                    └───────────────────────────────────────┘

1. SQL Injection Prevention

Prevents injection attacks during SQL datasource validation.

SecurityLevel

Level Description
STRICT Maximum security, minimal allowed operations
STANDARD Balanced security (default)
PERMISSIVE Relaxed security for trusted environments

SecurityPolicy

from truthound.validators.security import (
    SecurityPolicy,
    SecurityLevel,
    SQLQueryValidator,
)

# Preset policies
strict_policy = SecurityPolicy.strict()
standard_policy = SecurityPolicy.standard()
permissive_policy = SecurityPolicy.permissive()

# Custom policy
policy = SecurityPolicy(
    level=SecurityLevel.STANDARD,
    max_query_length=10000,           # Maximum query length
    max_identifier_length=128,        # Maximum identifier length

    # Structural permissions
    allow_joins=True,                 # Allow JOIN
    allow_subqueries=True,            # Allow subqueries
    allow_aggregations=True,          # Allow aggregate functions
    allow_window_functions=True,      # Allow window functions
    allow_cte=True,                   # Allow WITH clause
    allow_union=False,                # Block UNION (injection vector)

    # Allowed statement types
    allowed_statements={"SELECT", "WITH"},

    # Blocked patterns (regex)
    blocked_patterns=[r"xp_cmdshell", r"sp_executesql"],

    # Blocked functions
    blocked_functions=[
        "SLEEP",
        "BENCHMARK",
        "LOAD_FILE",
        "INTO OUTFILE",
        "INTO DUMPFILE",
    ],

    # Whitelist (empty allows all)
    allowed_tables={"orders", "customers"},
    allowed_columns={"id", "name", "amount"},

    # Violation callback
    on_violation=lambda name, matched: print(f"Violation: {name}"),
)

SQLQueryValidator

from truthound.validators.security import (
    SQLQueryValidator,
    validate_sql_query,
    SQLInjectionError,
    QueryValidationError,
)

# Create validator
validator = SQLQueryValidator(policy=policy)

# Validate query
try:
    validator.validate("SELECT * FROM orders WHERE amount > 100")
    print("Query is safe")
except SQLInjectionError as e:
    print(f"Injection detected: {e.pattern}")
except QueryValidationError as e:
    print(f"Validation failed: {e}")

# Convenience function
validate_sql_query(
    "SELECT id, amount FROM orders",
    allowed_tables=["orders", "customers"],
)

Dangerous Pattern Detection

Built-in dangerous pattern registry:

Category Pattern Severity
DDL CREATE, ALTER, DROP, TRUNCATE HIGH
DCL GRANT, REVOKE, DENY HIGH
DML Modification INSERT, UPDATE, DELETE HIGH
Execution EXEC, EXECUTE, CALL HIGH
File LOAD_FILE, INTO OUTFILE HIGH
Stacked Query ; SELECT, ; DROP HIGH
UNION Injection UNION SELECT MEDIUM
Time-Based SLEEP, WAITFOR DELAY, BENCHMARK HIGH
Error-Based EXTRACTVALUE, UPDATEXML MEDIUM
Boolean-Based OR 1=1, AND '1'='1' HIGH
Comment --, /* */ LOW-MEDIUM

SecureSQLBuilder

Fluent interface for building secure queries:

from truthound.validators.security import (
    SecureSQLBuilder,
    ParameterizedQuery,
)

builder = SecureSQLBuilder(
    allowed_tables=["orders", "customers"],
    policy=SecurityPolicy.standard(),
)

# Build query
query = (
    builder
    .select("orders", ["id", "amount", "status"])
    .join("customers", "orders.customer_id = customers.id")
    .where("amount > :min_amount")
    .where("status = :status")
    .group_by("status")
    .having("COUNT(*) > :min_count")
    .order_by("amount", desc=True)
    .limit(100)
    .offset(0)
    .build({
        "min_amount": 100,
        "status": "pending",
        "min_count": 5,
    })
)

# Execute with Polars SQL context
import polars as pl
ctx = pl.SQLContext()
ctx.register("orders", orders_lf)
ctx.register("customers", customers_lf)

result = builder.execute(ctx, query)

ParameterizedQuery

from truthound.validators.security import ParameterizedQuery

query = ParameterizedQuery(
    template="SELECT * FROM orders WHERE amount > :min_amount AND status = :status",
    parameters={"min_amount": 100, "status": "pending"},
)

# Render (escape values)
rendered = query.render()
# SELECT * FROM orders WHERE amount > 100 AND status = 'pending'

Supported types and escaping:

Type Escaping
None NULL
bool TRUE/FALSE
int, float As-is
str Single quotes, '''
list, tuple (val1, val2, ...)

SchemaWhitelist

from truthound.validators.security import (
    SchemaWhitelist,
    WhitelistValidator,
)

# Define schema whitelist
whitelist = SchemaWhitelist()
whitelist.add_table("orders", ["id", "customer_id", "amount", "status"])
whitelist.add_table("customers", ["id", "name", "email"])

# Validate table/column
whitelist.validate_table("orders")  # OK
whitelist.validate_column("orders", "amount")  # OK
whitelist.validate_column("orders", "password")  # QueryValidationError

# Validate query
validator = WhitelistValidator(whitelist)
validator.validate_query("SELECT id, amount FROM orders")  # OK
validator.validate_query("SELECT password FROM users")  # Error

SecureQueryMixin

Secure query execution in validators:

from truthound.validators.security import SecureQueryMixin
from truthound.validators.base import Validator

class MyValidator(Validator, SecureQueryMixin):
    def __init__(self):
        super().__init__()
        self.set_security_policy(SecurityPolicy.strict())

    def validate(self, lf):
        # Build secure query
        query = self.build_secure_query(
            table="data",
            columns=["id", "value"],
            where="value > :threshold",
            parameters={"threshold": 100},
            allowed_tables=["data"],
        )

        # Execute secure query
        result = self.execute_secure_query(lf, query, table_name="data")
        return self.process_result(result)

QueryAuditLogger

Query execution audit logging:

from truthound.validators.security import QueryAuditLogger

logger = QueryAuditLogger(
    max_entries=10000,
    log_full_queries=False,  # Mask values
    python_logger=logging.getLogger("sql_audit"),
)

# Log query
logger.log_query(
    query="SELECT * FROM users WHERE email = 'test@example.com'",
    success=True,
    user="admin",
    context={"source": "api"},
)

# Query audit
recent = logger.get_recent(100)
failures = logger.get_failures(50)
by_hash = logger.get_by_hash("abc123...")

# Statistics
stats = logger.get_stats()
# {
#   "total_queries": 1000,
#   "successful": 950,
#   "failed": 50,
#   "success_rate": 0.95,
#   "unique_queries": 120,
# }

# Export to file
logger.export_to_file("audit.log")

2. ReDoS Protection

Prevents Regular Expression Denial of Service (ReDoS) attacks.

ReDoSRisk

Level Description
NONE No known vulnerabilities
LOW Minimal concern, mostly safe
MEDIUM Some risky patterns, caution needed
HIGH Dangerous patterns detected, avoid use
CRITICAL Known ReDoS pattern, reject

SafeRegexConfig

from truthound.validators.security import (
    SafeRegexConfig,
    RegexSafetyChecker,
    check_regex_safety,
)

# Presets
strict_config = SafeRegexConfig.strict()    # For untrusted patterns
lenient_config = SafeRegexConfig.lenient()  # For trusted patterns

# Custom configuration
config = SafeRegexConfig(
    max_pattern_length=1000,      # Maximum pattern length
    max_groups=20,                # Maximum capture groups
    max_quantifier_range=100,     # Maximum {n,m} range
    max_alternations=50,          # Maximum alternation branches
    max_nested_depth=10,          # Maximum nesting depth
    allow_backreferences=False,   # Allow backreferences
    allow_lookaround=True,        # Allow lookahead/lookbehind
    timeout_seconds=1.0,          # Matching timeout
    max_input_length=100_000,     # Maximum input length
)

RegexComplexityAnalyzer

Static analysis to detect dangerous patterns:

from truthound.validators.security import (
    RegexComplexityAnalyzer,
    analyze_regex_complexity,
)

analyzer = RegexComplexityAnalyzer(config)
result = analyzer.analyze(r"(a+)+b")

print(result.risk_level)          # ReDoSRisk.CRITICAL
print(result.complexity_score)    # High score
print(result.dangerous_constructs)  # ["nested_quantifiers"]
print(result.is_safe)             # False
print(result.recommendation)      # Safe alternative suggestion

# Convenience function
result = analyze_regex_complexity(r"(a+)+b")

Detected Dangerous Patterns

Pattern Name Risk Description
(a+)+ nested_quantifiers CRITICAL Exponential backtracking
(a+){2,} nested_quantifiers_bounded CRITICAL Bounded nested quantifiers
((a)+)+ deeply_nested_quantifiers CRITICAL Deeply nested
(a\|b)+ alternation_with_quantifier HIGH Alternation with quantifier
\1+ quantified_backreference HIGH Quantified backreference
.*.* adjacent_quantifiers MEDIUM Adjacent quantifiers
(a\|b\|c\|...)+ long_alternation_chain MEDIUM Long alternation chain
.+. greedy_dot_conflict MEDIUM Greedy conflict

RegexSafetyChecker

from truthound.validators.security import (
    RegexSafetyChecker,
    check_regex_safety,
)

checker = RegexSafetyChecker(config)

# Safety check
is_safe, result = checker.check(r"^[a-z]+$")
if not is_safe:
    print(f"Unsafe: {result.dangerous_constructs}")

# Convenience function
is_safe, result = check_regex_safety(r"(a+)+b")

SafeRegexExecutor

Safe regex execution with timeout:

from truthound.validators.security import (
    SafeRegexExecutor,
    create_safe_regex,
    safe_match,
    safe_search,
)

# Create safe regex
executor = create_safe_regex(r"^[a-z]+$", config)

# Safe matching (with timeout)
match = executor.match("hello")
match = executor.search("test string")
matches = executor.findall("hello world")

# Convenience functions
match = safe_match(r"^[a-z]+$", "hello")
match = safe_search(r"[0-9]+", "test123")

ML-Based Risk Prediction

ReDoS risk prediction using machine learning:

from truthound.validators.security import (
    MLPatternAnalyzer,
    predict_redos_risk,
    FeatureExtractor,
)

# ML analyzer
analyzer = MLPatternAnalyzer()
result = analyzer.analyze(r"(a+)+b")

print(result.risk_probability)  # 0.95
print(result.confidence)        # 0.87
print(result.features)          # Extracted features

# Convenience function
risk_level = predict_redos_risk(r"(a+)+b")

PatternOptimizer

Safely optimize dangerous patterns:

from truthound.validators.security import (
    PatternOptimizer,
    optimize_pattern,
    OptimizationRule,
)

optimizer = PatternOptimizer()
result = optimizer.optimize(r"(a+)+b")

print(result.original_pattern)    # (a+)+b
print(result.optimized_pattern)   # a+b
print(result.rules_applied)       # Applied rules
print(result.is_equivalent)       # Equivalence status

# Convenience function
optimized = optimize_pattern(r"(a+)+b")

CVE Database

Known vulnerable pattern database:

from truthound.validators.security import (
    CVEDatabase,
    check_cve_vulnerability,
    CVEEntry,
)

db = CVEDatabase()

# CVE check
result = db.check(r"(a+)+b")
if result.is_vulnerable:
    print(f"CVE: {result.cve_id}")
    print(f"Severity: {result.severity}")
    print(f"Description: {result.description}")

# Convenience function
result = check_cve_vulnerability(r"pattern")

CPU Monitoring

Runtime resource monitoring:

from truthound.validators.security import (
    CPUMonitor,
    execute_with_monitoring,
    ResourceLimits,
)

limits = ResourceLimits(
    max_cpu_percent=50.0,
    max_memory_mb=100,
    max_time_seconds=1.0,
)

monitor = CPUMonitor(limits)

# Execute with monitoring
result = execute_with_monitoring(
    lambda: re.match(pattern, input_text),
    monitor=monitor,
)

if result.timed_out:
    print(f"Timeout after {result.elapsed_seconds}s")
print(f"CPU: {result.cpu_percent}%")
print(f"Memory: {result.memory_mb}MB")

Pattern Profiling

Regex performance profiling:

from truthound.validators.security import (
    PatternProfiler,
    profile_pattern,
    BenchmarkConfig,
)

config = BenchmarkConfig(
    iterations=1000,
    input_sizes=[100, 1000, 10000],
    timeout_per_iteration=0.1,
)

profiler = PatternProfiler(config)
result = profiler.profile(r"^[a-z]+$")

print(result.mean_time_ms)
print(result.std_time_ms)
print(result.complexity_class)  # O(n), O(n^2), O(2^n)
print(result.backtrack_count)

# Convenience function
result = profile_pattern(r"pattern")

RE2 Engine

Linear-time guaranteed engine (requires google-re2):

from truthound.validators.security import (
    RE2Engine,
    safe_match_re2,
    safe_search_re2,
    is_re2_available,
    check_re2_compatibility,
)

# Check RE2 availability
if is_re2_available():
    # Compatibility check
    compatible, reason = check_re2_compatibility(r"pattern")
    if not compatible:
        print(f"Not compatible: {reason}")

    # Use RE2 engine
    engine = RE2Engine()
    match = engine.match(r"^[a-z]+$", "hello")

    # Convenience functions
    match = safe_match_re2(r"^[a-z]+$", "hello")
    match = safe_search_re2(r"[0-9]+", "test123")

Features not supported by RE2: - Backreferences (\1, \2, ...) - Lookahead ((?=...), (?!...)) - Lookbehind ((?<=...), (?<!...)) - Conditional patterns - Atomic groups


3. Integrated Usage

Applying Security in Validators

from truthound.validators.base import Validator
from truthound.validators.security import (
    SecureQueryMixin,
    SecurityPolicy,
    RegexSafetyChecker,
    SafeRegexConfig,
)

class SecurePatternValidator(Validator, SecureQueryMixin):
    def __init__(self, pattern: str):
        super().__init__()
        self.set_security_policy(SecurityPolicy.strict())

        # Pattern safety check
        checker = RegexSafetyChecker(SafeRegexConfig.strict())
        is_safe, result = checker.check(pattern)
        if not is_safe:
            raise ValueError(
                f"Unsafe pattern: {result.dangerous_constructs}"
            )

        self.pattern = pattern

    def validate(self, lf):
        # Use secure queries and patterns
        ...

Integration with Enterprise SDK

from truthound.validators.sdk.enterprise import EnterpriseSDKManager

manager = EnterpriseSDKManager()

# Execute with security features included
result = await manager.execute_validator(
    validator_class=SecurePatternValidator,
    data=my_dataframe,
)

Next Steps