Data Ingestion Pipeline¶
A schema-agnostic EAV (Entity-Attribute-Value) rotation pipeline for heterogeneous campaign data sources.
Primary Consumer¶
This pipeline serves the CampaignBrain AI Chat - an Anthropic Claude-powered natural language interface for voter and campaign data queries. The chat engine can:
- Query i360 voter data (keyed by SVID - State Voter ID)
- Query campaign engagement data via this pipeline (keyed by email/phone)
The EAV rotation provides a "generic lookup" layer that complements voter data with campaign context: donations, volunteer signups, event attendance, petition signatures, etc.
See CBAPP-INTEGRATION.md for chat integration patterns.
Overview¶
Campaign data arrives in dozens of formats: donor exports, volunteer signups, delegate lists, petition signers, event attendees. Each source has different column names, structures, and conventions. Traditional ETL requires custom schemas for each source—which doesn't scale.
The EAV rotation approach solves this by storing every cell as a row:
This provides:
- Schema-free storage - Any CSV/XLSX loads without schema definition
- Cross-source querying - Find all emails, phones, or names regardless of source format
- Field normalization - Map Email, email, Email (Ptvfact) → email
- Deduplication - Identify contacts appearing across multiple sources
- Record rehydration - Reconstruct full records when needed
Architecture¶
src/cbmodels/ingest/
├── rotator.py # Core rotation: wide → tall transformation
├── normalizer.py # Field name canonicalization
├── storage.py # DuckDB storage and query layer
└── cli.py # CLI commands
Rotation (rotator.py)¶
Transforms each cell into a row:
# Input: CSV with columns [First Name, Email, Phone]
# Row 0: ["John", "john@example.com", "555-1234"]
# Output: 3 rotated rows
{"source_path": "donors.csv", "source_id": "abc123",
"row_idx": 0, "col_idx": 0, "field_raw": "First Name",
"field_name": "first_name", "value": "John"}
{"source_path": "donors.csv", "source_id": "abc123",
"row_idx": 0, "col_idx": 1, "field_raw": "Email",
"field_name": "email", "value": "john@example.com"}
{"source_path": "donors.csv", "source_id": "abc123",
"row_idx": 0, "col_idx": 2, "field_raw": "Phone",
"field_name": "phone", "value": "555-1234"}
Key design decisions: - source_id: SHA256 hash of file path for stable references - field_raw: Original column name (preserved for debugging) - field_name: Normalized/canonical version for querying - value: Always stored as VARCHAR (type inference happens at query time)
Normalization (normalizer.py)¶
Maps raw column names to canonical fields using rule-based patterns:
CANONICAL_FIELDS = {
"first_name": "Person's first/given name",
"last_name": "Person's last/family name",
"email": "Email address",
"phone": "Primary phone number",
"mobile": "Mobile/cell phone",
"address": "Street address line 1",
"city": "City name",
"state": "State code (2-letter)",
"zip": "ZIP/postal code",
"state_voter_id": "State voter ID number",
# ... 40+ canonical fields
}
The normalizer applies rules in priority order: 1. AI mapping (optional) - Claude can map ambiguous names 2. Rule-based fallback - Pattern matching for common variations 3. Sanitized original - Snake_case version of raw name
Example transformations:
- Email (Ptvfact) → email
- First Name → first_name
- Cell Phone → mobile
- Zip Code → zip
- State Voter ID → state_voter_id
Storage (storage.py)¶
DuckDB table with indexes for efficient querying:
CREATE TABLE rotated_data (
source_path VARCHAR,
source_id VARCHAR,
row_idx INTEGER,
col_idx INTEGER,
field_raw VARCHAR,
field_name VARCHAR,
value VARCHAR
);
-- Indexes for common access patterns
CREATE INDEX idx_rotated_source ON rotated_data (source_id, row_idx);
CREATE INDEX idx_rotated_field ON rotated_data (field_name);
CREATE INDEX idx_rotated_value ON rotated_data (field_name, value);
Query capabilities: - find_by_email(email) - Find all records matching an email across sources - find_by_phone(phone) - Phone lookup with digit normalization - rehydrate_record(source_id, row_idx) - Reconstruct full record - get_field_value_counts(field) - Distribution analysis - get_source_stats() - Per-file statistics
CLI Usage¶
# Ingest all files from a directory
cbmodels rotate ingest ./data/sources/ --output-db ./data/rotated.db
# View statistics
cbmodels rotate stats ./data/rotated.db
# Lookup by email or phone
cbmodels rotate lookup ./data/rotated.db --email john@example.com
cbmodels rotate lookup ./data/rotated.db --phone "555-123-4567"
# Find cross-source duplicates
cbmodels rotate duplicates ./data/rotated.db --field email --min-sources 2
# Export to Parquet
cbmodels rotate export ./data/rotated.db --output ./data/rotated.parquet
Results¶
Tested with 46 real campaign files:
| Metric | Value |
|---|---|
| Files processed | 46 |
| Total cells | 788,136 |
| Unique field names | 92 (normalized from 133 raw) |
| Cross-source duplicates | Contacts found in up to 9 sources |
Sample duplicate detection:
Email: example@domain.com found in 9 source(s):
- Donors_2024.csv (row 1547)
- Volunteers.xlsx (row 23)
- Event_Attendees.csv (row 892)
- ...
Techniques Learned¶
1. EAV Trade-offs¶
Pros: - Zero schema maintenance - Handles any data shape - Natural for cross-source queries - Easy field normalization
Cons: - Storage amplification (~10x row count) - Record reconstruction is expensive - Type inference at query time - Complex aggregations need careful query design
2. Field Normalization Strategy¶
Rule-based normalization handles 90%+ of cases. The remaining edge cases (ambiguous names, domain-specific fields) can be handled by: - AI-powered mapping for initial discovery - Manual overrides via saved mapping files - Iterative refinement as new sources arrive
3. Deduplication via Contact Fields¶
Email is the strongest dedup key, followed by phone. Name matching is unreliable due to: - Nicknames (Robert/Bob/Bobby) - Typos and OCR errors - Common names with no distinguishing info
Multi-field matching (email + name + zip) provides higher confidence.
4. DuckDB for Analytical Queries¶
DuckDB excels at: - Fast aggregations on rotated data - Efficient GROUP BY for duplicate detection - REGEXP for phone normalization - Parquet export for downstream tools
Next Steps¶
Phase 1: AI Chat Integration¶
- Expose lookup endpoints for cbapp chat engine
- Add campaign operations to QueryIR schema
- Enable "Find donors who volunteered" style queries
Phase 2: AI-Powered Field Mapping¶
- Generate mapping prompts with field samples
- Call Claude API for ambiguous field resolution
- Cache and persist mappings for consistency
Phase 3: i360 Voter Linkage (Optional)¶
- Match campaign contacts to i360 records by email/phone
- SVID is i360-specific, not a universal key
- Build linkage table for contacts with voter matches
Phase 4: Entity Resolution¶
- Implement probabilistic matching (email, phone, name+address)
- Build unified person graph across sources
- Track match confidence scores
Phase 5: Enrichment API¶
- Batch enrichment: given emails, return campaign engagement
- Engagement scoring (recency, frequency, source breadth)
- Source type classification (donor, volunteer, event, petition)
Phase 6: Incremental Updates¶
- Track processed files by hash
- Detect new/modified sources
- Update rotated table incrementally
File Formats Supported¶
| Format | Handler | Notes |
|---|---|---|
| CSV | pandas | Auto-detects encoding |
| XLSX | openpyxl | First sheet only |
| XLS | xlrd | Legacy Excel |
Configuration¶
The pipeline uses sensible defaults but can be customized:
# Exclude patterns (e.g., large aggregated files)
--exclude-pattern "WinRed_ALL"
# Batch size for inserts
batch_size=10000
# Custom output path
--output-db ./data/custom.db
Troubleshooting¶
Slow ingestion: - Large Excel files with formatting are slow to parse - Consider pre-converting to CSV - Batch size of 10K balances memory vs. commit overhead
Missing normalizations:
- Check cbmodels rotate stats for unmapped fields
- Add rules to normalizer.py or use AI mapping
Phone matching misses:
- Ensure phone fields are normalized to phone, mobile, etc.
- Check digit extraction regex for edge cases