Skip to content

Ingest Pipeline Design

Overview

Use Claude API to analyze source schemas and generate a declarative "Mapping Spec" that drives the ingestion process.

Flow

Source Schema (Snowflake)
Schema Analyzer (Claude API)
Mapping Spec (JSON)
Ingest Engine
DuckDB

Mapping Spec Format

{
  "version": "1.0",
  "source": {
    "provider": "snowflake",
    "database": "CONSERVATIVECONNECTOR",
    "schema": "KPI",
    "table": "GLOBAL_DONATIONS"
  },
  "target": {
    "database": "analysis.db",
    "table": "donations"
  },
  "columns": [
    {
      "source": "EMAIL",
      "target": "email",
      "source_type": "VARCHAR",
      "target_type": "VARCHAR",
      "transform": null,
      "index": "hash",
      "nullable": false
    },
    {
      "source": "DONATION_AMOUNT",
      "target": "amount",
      "source_type": "NUMBER(38,2)",
      "target_type": "DECIMAL(10,2)",
      "transform": null,
      "index": null,
      "nullable": true
    },
    {
      "source": "DONATION_DATE",
      "target": "donated_at",
      "source_type": "TIMESTAMP_NTZ",
      "target_type": "TIMESTAMP",
      "transform": "parse_timestamp",
      "index": "btree",
      "nullable": false
    }
  ],
  "indexes": [
    {"columns": ["email"], "type": "hash", "reason": "High cardinality lookup column"},
    {"columns": ["donated_at"], "type": "btree", "reason": "Range queries on date"}
  ],
  "chunking": {
    "strategy": "row_count",
    "size": 100000,
    "order_by": "DONATION_DATE"
  },
  "validation": {
    "row_count_match": true,
    "null_checks": ["email"],
    "unique_checks": []
  }
}

Claude Prompt Strategy

Input to Claude

Analyze this source schema and generate a mapping spec for DuckDB ingestion:

Source: CONSERVATIVECONNECTOR.KPI.GLOBAL_DONATIONS
Columns:
- EMAIL (VARCHAR, NOT NULL, 541353 distinct)
- CC_SOURCE (VARCHAR, 15 distinct values: ['WinRed', 'Revv', ...])
- DONATION_AMOUNT (NUMBER(38,2), range: 1.00 - 50000.00)
- DONATION_DATE (TIMESTAMP_NTZ)
- DONOR_ID (VARCHAR, nullable)

Sample data:
[first 5 rows as JSON]

Consider:
1. Optimal DuckDB types
2. Which columns benefit from indexing
3. Chunking strategy for 541K rows
4. Any type transformations needed

Output from Claude

The mapping spec JSON above.

Implementation Plan

Phase 1: Schema Analyzer

class SchemaAnalyzer:
    def __init__(self, anthropic_client):
        self.client = anthropic_client

    def analyze(self, source_schema: dict, sample_data: list) -> MappingSpec:
        prompt = self._build_prompt(source_schema, sample_data)
        response = self.client.messages.create(...)
        return MappingSpec.from_json(response.content)

Phase 2: Ingest Engine

class IngestEngine:
    def __init__(self, db_path: Path):
        self.conn = duckdb.connect(str(db_path))

    def execute(self, spec: MappingSpec, data_path: Path):
        # 1. Create table from spec
        self._create_table(spec)

        # 2. Load in chunks
        for chunk in self._chunk_data(data_path, spec.chunking):
            self._insert_chunk(chunk, spec)

        # 3. Create indexes
        for idx in spec.indexes:
            self._create_index(idx)

        # 4. Validate
        self._validate(spec)

Phase 3: CLI Integration

# Generate mapping spec
cbmodels ingest analyze -t KPI.GLOBAL_DONATIONS -o donations_spec.json

# Review/edit spec manually if needed

# Execute ingestion
cbmodels ingest run donations_spec.json --db analysis.db

Index Type Recommendations

Pattern Index Type DuckDB Support
Equality lookups (email, id) Hash Via ART index
Range queries (dates, amounts) B-tree Via ART index
Full-text search None Use FTS extension
Low cardinality (status, type) None Column compression handles it

Chunking Strategies

Table Size Strategy Chunk Size
< 100K Single load All
100K - 1M Row count 100K
1M - 10M Row count 500K
> 10M Partition by date/key 1M

Questions for Implementation

  1. Should we cache mapping specs for reuse?
  2. How to handle schema changes (drift detection)?
  3. Incremental loads (upsert vs append)?
  4. Error handling for partial loads?