Ingest Pipeline Design¶
Overview¶
Use Claude API to analyze source schemas and generate a declarative "Mapping Spec" that drives the ingestion process.
Flow¶
Source Schema (Snowflake)
↓
Schema Analyzer (Claude API)
↓
Mapping Spec (JSON)
↓
Ingest Engine
↓
DuckDB
Mapping Spec Format¶
{
"version": "1.0",
"source": {
"provider": "snowflake",
"database": "CONSERVATIVECONNECTOR",
"schema": "KPI",
"table": "GLOBAL_DONATIONS"
},
"target": {
"database": "analysis.db",
"table": "donations"
},
"columns": [
{
"source": "EMAIL",
"target": "email",
"source_type": "VARCHAR",
"target_type": "VARCHAR",
"transform": null,
"index": "hash",
"nullable": false
},
{
"source": "DONATION_AMOUNT",
"target": "amount",
"source_type": "NUMBER(38,2)",
"target_type": "DECIMAL(10,2)",
"transform": null,
"index": null,
"nullable": true
},
{
"source": "DONATION_DATE",
"target": "donated_at",
"source_type": "TIMESTAMP_NTZ",
"target_type": "TIMESTAMP",
"transform": "parse_timestamp",
"index": "btree",
"nullable": false
}
],
"indexes": [
{"columns": ["email"], "type": "hash", "reason": "High cardinality lookup column"},
{"columns": ["donated_at"], "type": "btree", "reason": "Range queries on date"}
],
"chunking": {
"strategy": "row_count",
"size": 100000,
"order_by": "DONATION_DATE"
},
"validation": {
"row_count_match": true,
"null_checks": ["email"],
"unique_checks": []
}
}
Claude Prompt Strategy¶
Input to Claude¶
Analyze this source schema and generate a mapping spec for DuckDB ingestion:
Source: CONSERVATIVECONNECTOR.KPI.GLOBAL_DONATIONS
Columns:
- EMAIL (VARCHAR, NOT NULL, 541353 distinct)
- CC_SOURCE (VARCHAR, 15 distinct values: ['WinRed', 'Revv', ...])
- DONATION_AMOUNT (NUMBER(38,2), range: 1.00 - 50000.00)
- DONATION_DATE (TIMESTAMP_NTZ)
- DONOR_ID (VARCHAR, nullable)
Sample data:
[first 5 rows as JSON]
Consider:
1. Optimal DuckDB types
2. Which columns benefit from indexing
3. Chunking strategy for 541K rows
4. Any type transformations needed
Output from Claude¶
The mapping spec JSON above.
Implementation Plan¶
Phase 1: Schema Analyzer¶
class SchemaAnalyzer:
def __init__(self, anthropic_client):
self.client = anthropic_client
def analyze(self, source_schema: dict, sample_data: list) -> MappingSpec:
prompt = self._build_prompt(source_schema, sample_data)
response = self.client.messages.create(...)
return MappingSpec.from_json(response.content)
Phase 2: Ingest Engine¶
class IngestEngine:
def __init__(self, db_path: Path):
self.conn = duckdb.connect(str(db_path))
def execute(self, spec: MappingSpec, data_path: Path):
# 1. Create table from spec
self._create_table(spec)
# 2. Load in chunks
for chunk in self._chunk_data(data_path, spec.chunking):
self._insert_chunk(chunk, spec)
# 3. Create indexes
for idx in spec.indexes:
self._create_index(idx)
# 4. Validate
self._validate(spec)
Phase 3: CLI Integration¶
# Generate mapping spec
cbmodels ingest analyze -t KPI.GLOBAL_DONATIONS -o donations_spec.json
# Review/edit spec manually if needed
# Execute ingestion
cbmodels ingest run donations_spec.json --db analysis.db
Index Type Recommendations¶
| Pattern | Index Type | DuckDB Support |
|---|---|---|
| Equality lookups (email, id) | Hash | Via ART index |
| Range queries (dates, amounts) | B-tree | Via ART index |
| Full-text search | None | Use FTS extension |
| Low cardinality (status, type) | None | Column compression handles it |
Chunking Strategies¶
| Table Size | Strategy | Chunk Size |
|---|---|---|
| < 100K | Single load | All |
| 100K - 1M | Row count | 100K |
| 1M - 10M | Row count | 500K |
| > 10M | Partition by date/key | 1M |
Questions for Implementation¶
- Should we cache mapping specs for reuse?
- How to handle schema changes (drift detection)?
- Incremental loads (upsert vs append)?
- Error handling for partial loads?