Skip to content

Data Ingestion Pipeline

A schema-agnostic EAV (Entity-Attribute-Value) rotation pipeline for heterogeneous campaign data sources.

Primary Consumer

This pipeline serves the CampaignBrain AI Chat - an Anthropic Claude-powered natural language interface for voter and campaign data queries. The chat engine can:

  1. Query i360 voter data (keyed by SVID - State Voter ID)
  2. Query campaign engagement data via this pipeline (keyed by email/phone)

The EAV rotation provides a "generic lookup" layer that complements voter data with campaign context: donations, volunteer signups, event attendance, petition signatures, etc.

See CBAPP-INTEGRATION.md for chat integration patterns.

Overview

Campaign data arrives in dozens of formats: donor exports, volunteer signups, delegate lists, petition signers, event attendees. Each source has different column names, structures, and conventions. Traditional ETL requires custom schemas for each source—which doesn't scale.

The EAV rotation approach solves this by storing every cell as a row:

(source_path, source_id, row_idx, col_idx, field_name, value)

This provides: - Schema-free storage - Any CSV/XLSX loads without schema definition - Cross-source querying - Find all emails, phones, or names regardless of source format - Field normalization - Map Email, email, Email (Ptvfact)email - Deduplication - Identify contacts appearing across multiple sources - Record rehydration - Reconstruct full records when needed

Architecture

src/cbmodels/ingest/
├── rotator.py      # Core rotation: wide → tall transformation
├── normalizer.py   # Field name canonicalization
├── storage.py      # DuckDB storage and query layer
└── cli.py          # CLI commands

Rotation (rotator.py)

Transforms each cell into a row:

# Input: CSV with columns [First Name, Email, Phone]
# Row 0: ["John", "john@example.com", "555-1234"]

# Output: 3 rotated rows
{"source_path": "donors.csv", "source_id": "abc123",
 "row_idx": 0, "col_idx": 0, "field_raw": "First Name",
 "field_name": "first_name", "value": "John"}

{"source_path": "donors.csv", "source_id": "abc123",
 "row_idx": 0, "col_idx": 1, "field_raw": "Email",
 "field_name": "email", "value": "john@example.com"}

{"source_path": "donors.csv", "source_id": "abc123",
 "row_idx": 0, "col_idx": 2, "field_raw": "Phone",
 "field_name": "phone", "value": "555-1234"}

Key design decisions: - source_id: SHA256 hash of file path for stable references - field_raw: Original column name (preserved for debugging) - field_name: Normalized/canonical version for querying - value: Always stored as VARCHAR (type inference happens at query time)

Normalization (normalizer.py)

Maps raw column names to canonical fields using rule-based patterns:

CANONICAL_FIELDS = {
    "first_name": "Person's first/given name",
    "last_name": "Person's last/family name",
    "email": "Email address",
    "phone": "Primary phone number",
    "mobile": "Mobile/cell phone",
    "address": "Street address line 1",
    "city": "City name",
    "state": "State code (2-letter)",
    "zip": "ZIP/postal code",
    "state_voter_id": "State voter ID number",
    # ... 40+ canonical fields
}

The normalizer applies rules in priority order: 1. AI mapping (optional) - Claude can map ambiguous names 2. Rule-based fallback - Pattern matching for common variations 3. Sanitized original - Snake_case version of raw name

Example transformations: - Email (Ptvfact)email - First Namefirst_name - Cell Phonemobile - Zip Codezip - State Voter IDstate_voter_id

Storage (storage.py)

DuckDB table with indexes for efficient querying:

CREATE TABLE rotated_data (
    source_path VARCHAR,
    source_id VARCHAR,
    row_idx INTEGER,
    col_idx INTEGER,
    field_raw VARCHAR,
    field_name VARCHAR,
    value VARCHAR
);

-- Indexes for common access patterns
CREATE INDEX idx_rotated_source ON rotated_data (source_id, row_idx);
CREATE INDEX idx_rotated_field ON rotated_data (field_name);
CREATE INDEX idx_rotated_value ON rotated_data (field_name, value);

Query capabilities: - find_by_email(email) - Find all records matching an email across sources - find_by_phone(phone) - Phone lookup with digit normalization - rehydrate_record(source_id, row_idx) - Reconstruct full record - get_field_value_counts(field) - Distribution analysis - get_source_stats() - Per-file statistics

CLI Usage

# Ingest all files from a directory
cbmodels rotate ingest ./data/sources/ --output-db ./data/rotated.db

# View statistics
cbmodels rotate stats ./data/rotated.db

# Lookup by email or phone
cbmodels rotate lookup ./data/rotated.db --email john@example.com
cbmodels rotate lookup ./data/rotated.db --phone "555-123-4567"

# Find cross-source duplicates
cbmodels rotate duplicates ./data/rotated.db --field email --min-sources 2

# Export to Parquet
cbmodels rotate export ./data/rotated.db --output ./data/rotated.parquet

Results

Tested with 46 real campaign files:

Metric Value
Files processed 46
Total cells 788,136
Unique field names 92 (normalized from 133 raw)
Cross-source duplicates Contacts found in up to 9 sources

Sample duplicate detection:

Email: example@domain.com found in 9 source(s):
- Donors_2024.csv (row 1547)
- Volunteers.xlsx (row 23)
- Event_Attendees.csv (row 892)
- ...

Techniques Learned

1. EAV Trade-offs

Pros: - Zero schema maintenance - Handles any data shape - Natural for cross-source queries - Easy field normalization

Cons: - Storage amplification (~10x row count) - Record reconstruction is expensive - Type inference at query time - Complex aggregations need careful query design

2. Field Normalization Strategy

Rule-based normalization handles 90%+ of cases. The remaining edge cases (ambiguous names, domain-specific fields) can be handled by: - AI-powered mapping for initial discovery - Manual overrides via saved mapping files - Iterative refinement as new sources arrive

3. Deduplication via Contact Fields

Email is the strongest dedup key, followed by phone. Name matching is unreliable due to: - Nicknames (Robert/Bob/Bobby) - Typos and OCR errors - Common names with no distinguishing info

Multi-field matching (email + name + zip) provides higher confidence.

4. DuckDB for Analytical Queries

DuckDB excels at: - Fast aggregations on rotated data - Efficient GROUP BY for duplicate detection - REGEXP for phone normalization - Parquet export for downstream tools

Next Steps

Phase 1: AI Chat Integration

  • Expose lookup endpoints for cbapp chat engine
  • Add campaign operations to QueryIR schema
  • Enable "Find donors who volunteered" style queries

Phase 2: AI-Powered Field Mapping

  • Generate mapping prompts with field samples
  • Call Claude API for ambiguous field resolution
  • Cache and persist mappings for consistency

Phase 3: i360 Voter Linkage (Optional)

  • Match campaign contacts to i360 records by email/phone
  • SVID is i360-specific, not a universal key
  • Build linkage table for contacts with voter matches

Phase 4: Entity Resolution

  • Implement probabilistic matching (email, phone, name+address)
  • Build unified person graph across sources
  • Track match confidence scores

Phase 5: Enrichment API

  • Batch enrichment: given emails, return campaign engagement
  • Engagement scoring (recency, frequency, source breadth)
  • Source type classification (donor, volunteer, event, petition)

Phase 6: Incremental Updates

  • Track processed files by hash
  • Detect new/modified sources
  • Update rotated table incrementally

File Formats Supported

Format Handler Notes
CSV pandas Auto-detects encoding
XLSX openpyxl First sheet only
XLS xlrd Legacy Excel

Configuration

The pipeline uses sensible defaults but can be customized:

# Exclude patterns (e.g., large aggregated files)
--exclude-pattern "WinRed_ALL"

# Batch size for inserts
batch_size=10000

# Custom output path
--output-db ./data/custom.db

Troubleshooting

Slow ingestion: - Large Excel files with formatting are slow to parse - Consider pre-converting to CSV - Batch size of 10K balances memory vs. commit overhead

Missing normalizations: - Check cbmodels rotate stats for unmapped fields - Add rules to normalizer.py or use AI mapping

Phone matching misses: - Ensure phone fields are normalized to phone, mobile, etc. - Check digit extraction regex for edge cases