Skip to content

External Data Sources Integration

Overview

This document maps the lochness project architecture and identifies components for building an external data extraction library for cbmodels.

Lochness Project Map

/home/bisenbek/projects/lochness/
├── src/cli/                      # Interactive CLI (Claude-powered)
│   ├── main.py                   # Entry point with Rich UI
│   ├── ai_interface.py           # Claude API integration
│   ├── query_executor.py         # Snowflake query execution
│   ├── query_generator.py        # SQL template rendering
│   └── utils/
│       ├── session_manager.py    # Session lifecycle
│       └── normalization_simple.py  # Field mapping
├── nessie/                       # Pipeline orchestration engine
│   └── src/
│       ├── pipeline/             # Core pipeline engine
│       │   ├── pipeline.py       # Pipeline definition
│       │   └── executor/         # Stage execution
│       └── stages/
│           ├── sql/              # SQL executor
│           ├── python/           # Python sandbox
│           └── observable/       # Export to Observable
└── v2/
    ├── snow-maker.py             # Snowflake connector (KEY FILE)
    └── .env                      # Credentials

Core Components for Reuse

1. Snowflake Connector (v2/snow-maker.py)

What it does: - Reads SQL from stdin - Connects to Snowflake using env vars - Executes query, returns JSON lines (one object per row) - Handles type conversion (Decimal → float, datetime → ISO string)

Integration pattern:

# Current usage in lochness
echo "SELECT * FROM table" | ./run-query-cc.sh
# Returns: {"col1": "val1", "col2": 123}\n{"col1": "val2", "col2": 456}

For cbmodels: Can be adapted to pull data into intermediate format before DuckDB load.

2. Query Templates (src/cli/templates/queries/)

Template Purpose
cost_analysis.sql Find losing list owners (bad monsters)
revenue_opportunities.sql Find high performers (good monsters)
overlap_analysis.sql Contact overlap between lists
donor_analysis.sql Donor value attribution
p2p_sentiment_analysis.sql P2P messaging performance
quarantine_aware_analysis.sql Suppression impact

Pattern: Parameterized SQL with {months_back}, {limit}, {threshold} placeholders.

3. Session Manager (src/cli/utils/session_manager.py)

Structure:

sessions/{session_id}/
├── queries/     # Generated SQL
├── data/        # Query results (CSV)
├── results/     # Execution metadata (JSON)
└── reports/     # Final reports

For cbmodels: Reuse for tracking data extraction runs.

4. Nessie Pipeline Engine

Key classes: - Pipeline - Define multi-stage workflows - PipelineExecutor - Execute with dependency resolution - SQLStageExecutor - Run Snowflake queries - PythonStageExecutor - Sandboxed Python transforms

Example:

from nessie import Pipeline

pipeline = Pipeline()
pipeline.add_sql_stage("extract", "SELECT * FROM source_table")
pipeline.add_python_stage("transform", """
    df = extract  # input from previous stage
    result = df.groupby('category').sum()
""", requires=["extract"])
pipeline.execute()

Proposed Library Architecture

Goal

Create cbmodels.sources - a library for extracting data from external sources into an intermediate format loadable by DuckDB.

Design

src/cbmodels/sources/
├── __init__.py
├── base.py           # Abstract DataSource class
├── snowflake.py      # Snowflake extractor (from lochness)
├── formats.py        # Intermediate formats (Parquet, JSON, CSV)
├── pipeline.py       # Multi-stage extraction (from Nessie)
└── registry.py       # Source registration

Core Abstractions

from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path

@dataclass
class ExtractionResult:
    """Result of a data extraction."""
    source_name: str
    table_name: str
    row_count: int
    output_path: Path
    format: str  # 'parquet', 'json', 'csv'
    schema: dict  # column name -> type
    extracted_at: datetime

class DataSource(ABC):
    """Abstract base for external data sources."""

    @abstractmethod
    def connect(self) -> None:
        """Establish connection to source."""
        pass

    @abstractmethod
    def extract_table(
        self,
        table: str,
        output_dir: Path,
        format: str = "parquet"
    ) -> ExtractionResult:
        """Extract a table to intermediate format."""
        pass

    @abstractmethod
    def execute_query(
        self,
        query: str,
        output_dir: Path,
        name: str = "query_result"
    ) -> ExtractionResult:
        """Execute custom query and save results."""
        pass

    @abstractmethod
    def list_tables(self) -> list[str]:
        """List available tables."""
        pass

Snowflake Implementation

class SnowflakeSource(DataSource):
    """Extract data from Snowflake warehouse."""

    def __init__(
        self,
        account: str,
        user: str,
        password: str,
        warehouse: str,
        database: str,
        schema: str = "PUBLIC",
        role: str | None = None
    ):
        self.config = {...}
        self.conn = None

    def connect(self) -> None:
        import snowflake.connector
        self.conn = snowflake.connector.connect(**self.config)

    def extract_table(self, table, output_dir, format="parquet"):
        query = f'SELECT * FROM {table}'
        return self.execute_query(query, output_dir, table, format)

    def execute_query(self, query, output_dir, name, format="parquet"):
        cursor = self.conn.cursor(DictCursor)
        cursor.execute(query)

        # Stream to intermediate format
        if format == "parquet":
            return self._write_parquet(cursor, output_dir, name)
        elif format == "json":
            return self._write_jsonl(cursor, output_dir, name)
        # ...

DuckDB Loader

def load_extraction(
    db_path: Path,
    extraction: ExtractionResult,
    table_name: str | None = None
) -> None:
    """Load extracted data into DuckDB."""
    import duckdb

    table = table_name or extraction.table_name
    conn = duckdb.connect(str(db_path))

    if extraction.format == "parquet":
        conn.execute(f"""
            CREATE OR REPLACE TABLE {table} AS
            SELECT * FROM read_parquet('{extraction.output_path}')
        """)
    elif extraction.format == "json":
        conn.execute(f"""
            CREATE OR REPLACE TABLE {table} AS
            SELECT * FROM read_json_auto('{extraction.output_path}')
        """)

CLI Integration

# Extract from Snowflake
cbmodels extract snowflake \
  --table ACCOUNTING.EOM_LIST_OWNERS_REPORT \
  --output ./data/extracts/ \
  --format parquet

# Load into DuckDB
cbmodels load ./data/extracts/EOM_LIST_OWNERS_REPORT.parquet \
  --db ./data/analysis.db \
  --table eom_report

# Or pipeline: extract + load + analyze
cbmodels pipeline run extract_and_analyze.yaml

Files to Copy/Adapt from Lochness

Source File Target Adaptation Needed
v2/snow-maker.py sources/snowflake.py Wrap in class, add streaming
src/cli/utils/session_manager.py sources/session.py Simplify for extraction tracking
nessie/src/pipeline/pipeline.py sources/pipeline.py Adapt for extraction workflows
nessie/src/stages/sql/sql_executor.py sources/executors/sql.py Generalize beyond Snowflake

Intermediate Format Recommendations

Format Pros Cons Use When
Parquet Columnar, compressed, typed Binary, needs library Large datasets, analytics
JSON Lines Human readable, streaming Larger size, no types Small datasets, debugging
CSV Universal, simple No types, escaping issues Interop with other tools

Recommendation: Default to Parquet for production, JSON Lines for development.

Data Flow

External Source (Snowflake)
DataSource.extract_table()
Intermediate Format (Parquet/JSON)
load_extraction() → DuckDB
cbmodels.build() → Analysis Model
cbmodels-api → Segment Analysis

Next Steps

  1. Phase 1: Create cbmodels.sources.snowflake by adapting snow-maker.py
  2. Phase 2: Add Parquet/JSON output formats
  3. Phase 3: Build CLI commands (cbmodels extract, cbmodels load)
  4. Phase 4: Add pipeline orchestration for multi-table extraction
  5. Phase 5: Support additional sources (PostgreSQL, BigQuery, etc.)

Key Snowflake Tables (from Lochness)

CONSERVATIVECONNECTOR.ACCOUNTING.EOM_LIST_OWNERS_REPORT
  - Commission, send cost, click value per list owner

CONSERVATIVECONNECTOR.PIPELINE.LIST_EMAIL_MATRIX
  - Contact-to-list ownership mapping

CONSERVATIVECONNECTOR.PIPELINE.DONATIONS_RELOADED
  - Donor transactions

CONSERVATIVECONNECTOR.PIPELINE.P2P_MERGED_DETAILS_REPORT_REALIZED
  - P2P messaging metrics