External Data Sources Integration¶
Overview¶
This document maps the lochness project architecture and identifies components for building an external data extraction library for cbmodels.
Lochness Project Map¶
/home/bisenbek/projects/lochness/
├── src/cli/ # Interactive CLI (Claude-powered)
│ ├── main.py # Entry point with Rich UI
│ ├── ai_interface.py # Claude API integration
│ ├── query_executor.py # Snowflake query execution
│ ├── query_generator.py # SQL template rendering
│ └── utils/
│ ├── session_manager.py # Session lifecycle
│ └── normalization_simple.py # Field mapping
├── nessie/ # Pipeline orchestration engine
│ └── src/
│ ├── pipeline/ # Core pipeline engine
│ │ ├── pipeline.py # Pipeline definition
│ │ └── executor/ # Stage execution
│ └── stages/
│ ├── sql/ # SQL executor
│ ├── python/ # Python sandbox
│ └── observable/ # Export to Observable
└── v2/
├── snow-maker.py # Snowflake connector (KEY FILE)
└── .env # Credentials
Core Components for Reuse¶
1. Snowflake Connector (v2/snow-maker.py)¶
What it does: - Reads SQL from stdin - Connects to Snowflake using env vars - Executes query, returns JSON lines (one object per row) - Handles type conversion (Decimal → float, datetime → ISO string)
Integration pattern:
# Current usage in lochness
echo "SELECT * FROM table" | ./run-query-cc.sh
# Returns: {"col1": "val1", "col2": 123}\n{"col1": "val2", "col2": 456}
For cbmodels: Can be adapted to pull data into intermediate format before DuckDB load.
2. Query Templates (src/cli/templates/queries/)¶
| Template | Purpose |
|---|---|
cost_analysis.sql |
Find losing list owners (bad monsters) |
revenue_opportunities.sql |
Find high performers (good monsters) |
overlap_analysis.sql |
Contact overlap between lists |
donor_analysis.sql |
Donor value attribution |
p2p_sentiment_analysis.sql |
P2P messaging performance |
quarantine_aware_analysis.sql |
Suppression impact |
Pattern: Parameterized SQL with {months_back}, {limit}, {threshold} placeholders.
3. Session Manager (src/cli/utils/session_manager.py)¶
Structure:
sessions/{session_id}/
├── queries/ # Generated SQL
├── data/ # Query results (CSV)
├── results/ # Execution metadata (JSON)
└── reports/ # Final reports
For cbmodels: Reuse for tracking data extraction runs.
4. Nessie Pipeline Engine¶
Key classes:
- Pipeline - Define multi-stage workflows
- PipelineExecutor - Execute with dependency resolution
- SQLStageExecutor - Run Snowflake queries
- PythonStageExecutor - Sandboxed Python transforms
Example:
from nessie import Pipeline
pipeline = Pipeline()
pipeline.add_sql_stage("extract", "SELECT * FROM source_table")
pipeline.add_python_stage("transform", """
df = extract # input from previous stage
result = df.groupby('category').sum()
""", requires=["extract"])
pipeline.execute()
Proposed Library Architecture¶
Goal¶
Create cbmodels.sources - a library for extracting data from external sources into an intermediate format loadable by DuckDB.
Design¶
src/cbmodels/sources/
├── __init__.py
├── base.py # Abstract DataSource class
├── snowflake.py # Snowflake extractor (from lochness)
├── formats.py # Intermediate formats (Parquet, JSON, CSV)
├── pipeline.py # Multi-stage extraction (from Nessie)
└── registry.py # Source registration
Core Abstractions¶
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
@dataclass
class ExtractionResult:
"""Result of a data extraction."""
source_name: str
table_name: str
row_count: int
output_path: Path
format: str # 'parquet', 'json', 'csv'
schema: dict # column name -> type
extracted_at: datetime
class DataSource(ABC):
"""Abstract base for external data sources."""
@abstractmethod
def connect(self) -> None:
"""Establish connection to source."""
pass
@abstractmethod
def extract_table(
self,
table: str,
output_dir: Path,
format: str = "parquet"
) -> ExtractionResult:
"""Extract a table to intermediate format."""
pass
@abstractmethod
def execute_query(
self,
query: str,
output_dir: Path,
name: str = "query_result"
) -> ExtractionResult:
"""Execute custom query and save results."""
pass
@abstractmethod
def list_tables(self) -> list[str]:
"""List available tables."""
pass
Snowflake Implementation¶
class SnowflakeSource(DataSource):
"""Extract data from Snowflake warehouse."""
def __init__(
self,
account: str,
user: str,
password: str,
warehouse: str,
database: str,
schema: str = "PUBLIC",
role: str | None = None
):
self.config = {...}
self.conn = None
def connect(self) -> None:
import snowflake.connector
self.conn = snowflake.connector.connect(**self.config)
def extract_table(self, table, output_dir, format="parquet"):
query = f'SELECT * FROM {table}'
return self.execute_query(query, output_dir, table, format)
def execute_query(self, query, output_dir, name, format="parquet"):
cursor = self.conn.cursor(DictCursor)
cursor.execute(query)
# Stream to intermediate format
if format == "parquet":
return self._write_parquet(cursor, output_dir, name)
elif format == "json":
return self._write_jsonl(cursor, output_dir, name)
# ...
DuckDB Loader¶
def load_extraction(
db_path: Path,
extraction: ExtractionResult,
table_name: str | None = None
) -> None:
"""Load extracted data into DuckDB."""
import duckdb
table = table_name or extraction.table_name
conn = duckdb.connect(str(db_path))
if extraction.format == "parquet":
conn.execute(f"""
CREATE OR REPLACE TABLE {table} AS
SELECT * FROM read_parquet('{extraction.output_path}')
""")
elif extraction.format == "json":
conn.execute(f"""
CREATE OR REPLACE TABLE {table} AS
SELECT * FROM read_json_auto('{extraction.output_path}')
""")
CLI Integration¶
# Extract from Snowflake
cbmodels extract snowflake \
--table ACCOUNTING.EOM_LIST_OWNERS_REPORT \
--output ./data/extracts/ \
--format parquet
# Load into DuckDB
cbmodels load ./data/extracts/EOM_LIST_OWNERS_REPORT.parquet \
--db ./data/analysis.db \
--table eom_report
# Or pipeline: extract + load + analyze
cbmodels pipeline run extract_and_analyze.yaml
Files to Copy/Adapt from Lochness¶
| Source File | Target | Adaptation Needed |
|---|---|---|
v2/snow-maker.py |
sources/snowflake.py |
Wrap in class, add streaming |
src/cli/utils/session_manager.py |
sources/session.py |
Simplify for extraction tracking |
nessie/src/pipeline/pipeline.py |
sources/pipeline.py |
Adapt for extraction workflows |
nessie/src/stages/sql/sql_executor.py |
sources/executors/sql.py |
Generalize beyond Snowflake |
Intermediate Format Recommendations¶
| Format | Pros | Cons | Use When |
|---|---|---|---|
| Parquet | Columnar, compressed, typed | Binary, needs library | Large datasets, analytics |
| JSON Lines | Human readable, streaming | Larger size, no types | Small datasets, debugging |
| CSV | Universal, simple | No types, escaping issues | Interop with other tools |
Recommendation: Default to Parquet for production, JSON Lines for development.
Data Flow¶
External Source (Snowflake)
↓
DataSource.extract_table()
↓
Intermediate Format (Parquet/JSON)
↓
load_extraction() → DuckDB
↓
cbmodels.build() → Analysis Model
↓
cbmodels-api → Segment Analysis
Next Steps¶
- Phase 1: Create
cbmodels.sources.snowflakeby adaptingsnow-maker.py - Phase 2: Add Parquet/JSON output formats
- Phase 3: Build CLI commands (
cbmodels extract,cbmodels load) - Phase 4: Add pipeline orchestration for multi-table extraction
- Phase 5: Support additional sources (PostgreSQL, BigQuery, etc.)
Key Snowflake Tables (from Lochness)¶
CONSERVATIVECONNECTOR.ACCOUNTING.EOM_LIST_OWNERS_REPORT
- Commission, send cost, click value per list owner
CONSERVATIVECONNECTOR.PIPELINE.LIST_EMAIL_MATRIX
- Contact-to-list ownership mapping
CONSERVATIVECONNECTOR.PIPELINE.DONATIONS_RELOADED
- Donor transactions
CONSERVATIVECONNECTOR.PIPELINE.P2P_MERGED_DETAILS_REPORT_REALIZED
- P2P messaging metrics