Skip to content

Data Products Notebook

Working document for understanding cbintel data products, schemas, and the path toward entity-centric knowledge intelligence.


Overview

All job outputs follow a common pattern: 1. Local working directory: /tmp/cbintel-jobs/{job_id}/ 2. Remote storage: files.nominate.ai in bucket cbintel-jobs 3. Public URLs: https://files.nominate.ai/api/v1/public/cbintel-jobs/{job_id}/{filename}

Common JobResult Schema

class JobResult:
    result_url: str | None       # Primary result file URL
    summary: dict[str, Any]      # Job-specific summary data
    output_paths: list[str]      # All uploaded file URLs
    metrics: dict[str, Any]      # Performance/size metrics

Three-Stage Architecture

The system evolves through three stages, each building on the previous:

┌─────────────────────────────────────────────────────────────────────────────┐
│                           STAGE 1: TACTICAL                                  │
│                     (Immediate Data Products)                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Query ──► Jobs API ──► Data Products                                       │
│              │                                                               │
│              ├── crawl     → pages, synthesis                                │
│              ├── lazarus   → historical snapshots                            │
│              ├── screenshot → visual captures                                │
│              ├── vectl     → embeddings, search                              │
│              └── transcript → structured video content                       │
│                                                                              │
│   Output: Job-specific files in files.nominate.ai                           │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│                      STAGE 2: KNOWLEDGE INTEGRATION                          │
│              (Lightweight connections in vectl clusters)                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Job Outputs ──► Processing Pipeline ──► Knowledge Chunks                   │
│                        │                                                     │
│                        ├── Clean & chunk content                             │
│                        ├── OCR for PDFs/images                               │
│                        ├── Entity extraction (NER)                           │
│                        ├── Generate embeddings                               │
│                        └── Index in vectl clusters                           │
│                                                                              │
│   Storage: vectl stores organized by topic/domain                           │
│   Links: Parent/child batch relationships                                    │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│                    STAGE 3: KNOWLEDGE INTELLIGENCE                           │
│         (Entities with time dimension, connections, chatability)             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Knowledge Chunks ──► Entity Resolution ──► Knowledge Graph                 │
│                              │                                               │
│                              ├── Merge mentions → canonical entities         │
│                              ├── Track time dimension (daily updates)        │
│                              ├── Connect entities via relationships          │
│                              ├── Historical context from lazarus             │
│                              └── Conversational interface                    │
│                                                                              │
│   Capabilities:                                                              │
│   - "What happened to [entity] last week?"                                   │
│   - "Show me everything related to [topic]"                                  │
│   - "Track [entity] over time"                                               │
│   - "Attach this document to [entity]"                                       │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Existing Building Blocks

From crawwwl/crawl.sh - Post-Processing Pipeline

The crawl pipeline already has post-processing steps that feed into integration:

# Post Analysis (lines 799-804)
${EVALUATE} ${BATCHID} ${DATA}           # evaluate_results.py
${EVALUATE_SIMPLE} ${BATCHID} ${DATA}    # evaluate_results_simple.py
${EVALUATE_ENHANCED} ${BATCHID} ${DATA}  # evaluate_results_enhanced.py
${INTEGRATE} ${BATCHID} question_answering  # knowledge_integrator.py
${EXTEND} ${BATCHID} --execute-children     # extend_results.py

From seedbed/lib/state_manager.py - State Schema

The seedbed project defines a session state with entities already anticipated:

@dataclass
class ContentChunk:
    chunk_id: str
    content: str
    quality_score: float
    source_url: str
    placeholders: List[Dict[str, Any]] = None
    entities: List[Dict[str, Any]] = None  # ← Already has entities field!

Knowledge Graph Structure (from state_schema.json):

{
  "knowledge_graph": {
    "entities": [],
    "relationships": []
  }
}

Session States: initialized → active → completed | failed | interrupted

From extend_results.py - Gap Analysis & Iteration

Extension types for iterative knowledge building: - depth - More detail on existing topics - breadth - Related topics - clarification - Resolve ambiguity - update - Newer information

Child batch generation creates parent/child relationships between crawls.


Job Types & Data Products

1. Crawl (/api/v1/jobs/crawl)

Purpose: AI-powered web crawling and knowledge synthesis

Input:

{
  "query": "search query",
  "prompt_type": "investigative|question_answering|technical_research|competitive_analysis|trend_analysis",
  "max_urls": 100,
  "skip_embeddings": false,
  "geo": "us:ca"
}

Output Files: | File | Description | |------|-------------| | crawl_result.json | Primary result with synthesis | | {batch_id}/ | Directory with raw page data |

Directory Structure:

{job_id}/
├── crawl_result.json          # Main result
└── {batch_id}/
    ├── page_001.json          # Individual page data
    ├── page_002.json
    └── synthesis.json         # AI synthesis

Entity Extraction Points: - Page titles → potential entity names - Links → relationships between entities - Synthesis → key entities mentioned


2. Lazarus (/api/v1/jobs/lazarus)

Purpose: Historical web archive retrieval (Wayback Machine, Common Crawl)

Input:

{
  "domain": "example.com",
  "providers": ["wayback"],
  "from_date": "20200101",
  "to_date": "20231231",
  "sample_size": 100,
  "use_pipeline_v2": true,
  "timeout": 300,
  "geo": "us"
}

Time Dimension Value: - Snapshots provide historical state of entities - Can track how entity mentions/descriptions changed over time - Enables "what did [entity] look like in 2020?" queries


3. Screenshots (/api/v1/jobs/screenshots)

Purpose: Browser automation for visual capture

Input:

{
  "urls": ["https://example.com"],
  "full_page": true,
  "capture_dom": false,
  "viewport_width": 1920,
  "viewport_height": 1080,
  "format": "png|jpeg|pdf",
  "geo": "de"
}

OCR Integration Point: - Screenshots can be OCR'd via CBAI - Extract text from visual content - Useful for capturing dynamic/JS-heavy pages


4. Vectl (/api/v1/jobs/vectl)

Purpose: Vector embeddings and semantic search

Input:

{
  "operation": "embed|search|batch_embed|index",
  "texts": ["text to embed"],
  "query": "search query",
  "store_name": "default",
  "k": 10
}

Central Role in Architecture: - Stage 2: All content chunks get embedded here - Clustering: Similar content naturally groups - Search: Semantic search across all knowledge


5. Transcript (/api/v1/jobs/transcript)

Purpose: YouTube video transcript processing and summarization

Input:

{
  "url": "https://youtube.com/watch?v=xxx",
  "content_type": "debate|lecture|tutorial|interview|conversation|general",
  "skip_classification": false
}

Entity-Rich Content Types: | Type | Natural Entities | |------|-----------------| | debate | Speakers, arguments, positions | | lecture | Instructor, concepts, references | | interview | Interviewer, interviewee, quotes | | conversation | Participants, topics |


Entity Model

The Entity is the core of Stage 3. An entity can be:

  • Person: politician, researcher, CEO, speaker
  • Organization: company, government, NGO
  • Place: country, city, location
  • Thing: product, technology, concept
  • Event: election, debate, conference, crisis
  • Topic: climate change, AI regulation, etc.

Entity Schema

┌─────────────────────────────────────────────────────────────────┐
│                          ENTITY                                  │
├─────────────────────────────────────────────────────────────────┤
│ id: uuid                                                         │
│ canonical_name: str              # "OpenAI", "Joe Biden"        │
│ aliases: [str]                   # ["Open AI", "OPENAI"]        │
│ entity_type: person | org | place | thing | event | topic       │
│                                                                  │
│ first_seen: timestamp            # When entity entered system   │
│ last_updated: timestamp          # Most recent mention          │
│                                                                  │
│ mentions: [                      # All occurrences              │
│   {                                                              │
│     source_id: job_id,                                          │
│     source_type: crawl | lazarus | transcript,                  │
│     source_url: str,                                            │
│     captured_at: timestamp,                                      │
│     context: str,                # Surrounding text              │
│     sentiment: float             # -1.0 to 1.0                  │
│   }                                                              │
│ ]                                                                │
│                                                                  │
│ connections: [                   # Relationships to other entities│
│   {                                                              │
│     target_entity_id: uuid,                                     │
│     relationship_type: str,      # "works_for", "mentions", etc │
│     strength: float,             # 0.0 to 1.0                   │
│     first_seen: timestamp,                                      │
│     last_seen: timestamp                                        │
│   }                                                              │
│ ]                                                                │
│                                                                  │
│ time_series: [                   # Historical data points       │
│   {                                                              │
│     date: date,                                                  │
│     mention_count: int,                                         │
│     sentiment_avg: float,                                       │
│     key_events: [str]                                           │
│   }                                                              │
│ ]                                                                │
│                                                                  │
│ embedding: [float] * 768         # Semantic representation      │
└─────────────────────────────────────────────────────────────────┘

Entity Resolution Pipeline

Raw Text ──► NER Extraction ──► Mention Candidates ──► Resolution ──► Canonical Entity
                │                      │                    │
                │                      │                    └── Match to existing?
                │                      │                        ├── Yes → Add mention
                │                      │                        └── No  → Create new
                │                      │
                │                      └── Coreference resolution
                │                          ("he", "the company", "it")
                └── spaCy / LLM-based extraction
                    - PERSON, ORG, GPE, EVENT, etc.

Data Flow: From Job to Entity

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Stage 1   │     │   Stage 2   │     │   Stage 2   │     │   Stage 3   │
│  Job Runs   │ ──► │   Clean &   │ ──► │   Extract   │ ──► │  Resolve &  │
│             │     │   Chunk     │     │  Entities   │     │   Connect   │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
      │                   │                   │                   │
      ▼                   ▼                   ▼                   ▼
 crawl_result.json   content_chunks[]    mentions[]         entities[]
 transcript.json     quality_scores      entity_types       connections[]
 snapshots.json      embeddings          context            time_series[]

Concrete Example

Input: Crawl job for "OpenAI board crisis November 2023"

Stage 1 Output: - 50 pages crawled - synthesis.json with summary

Stage 2 Processing:

# Content chunking
chunks = [
    {"chunk_id": "c1", "content": "Sam Altman was fired as CEO...", "quality_score": 0.87},
    {"chunk_id": "c2", "content": "Microsoft offered to hire...", "quality_score": 0.91},
    ...
]

# Entity extraction
entities_found = [
    {"text": "Sam Altman", "type": "PERSON", "chunk_id": "c1"},
    {"text": "OpenAI", "type": "ORG", "chunk_id": "c1"},
    {"text": "Microsoft", "type": "ORG", "chunk_id": "c2"},
    {"text": "Satya Nadella", "type": "PERSON", "chunk_id": "c2"},
]

Stage 3 Resolution:

# Resolved entities with connections
entities = {
    "sam_altman": Entity(
        canonical_name="Sam Altman",
        entity_type="person",
        connections=[
            {"target": "openai", "relationship": "former_ceo_of"},
            {"target": "microsoft", "relationship": "offered_position_by"},
        ]
    ),
    "openai": Entity(
        canonical_name="OpenAI",
        entity_type="org",
        connections=[
            {"target": "sam_altman", "relationship": "fired"},
            {"target": "microsoft", "relationship": "partnership"},
        ]
    )
}


Storage Architecture

Stage 1: Job Files (files.nominate.ai)

cbintel-jobs/
├── job_abc123/
│   ├── crawl_result.json
│   └── batch_xyz/
└── job_def456/
    ├── result.json
    └── subtitles.srt

Stage 2: Vectl Clusters

vectl-stores/
├── topic_openai/              # Topic-specific index
│   ├── index.bin              # HNSW index
│   └── metadata.json          # Chunk → source mapping
├── topic_climate/
└── global/                    # Cross-topic search

Stage 3: Entity Store (DuckDB)

-- Core entity table
CREATE TABLE entities (
    id UUID PRIMARY KEY,
    canonical_name TEXT NOT NULL,
    entity_type TEXT NOT NULL,
    aliases TEXT[],
    first_seen TIMESTAMP,
    last_updated TIMESTAMP,
    embedding FLOAT[768]
);

-- Mentions table (append-only log)
CREATE TABLE mentions (
    id UUID PRIMARY KEY,
    entity_id UUID REFERENCES entities(id),
    source_id TEXT,
    source_type TEXT,
    source_url TEXT,
    captured_at TIMESTAMP,
    context TEXT,
    sentiment FLOAT
);

-- Connections table
CREATE TABLE connections (
    source_entity_id UUID REFERENCES entities(id),
    target_entity_id UUID REFERENCES entities(id),
    relationship_type TEXT,
    strength FLOAT,
    first_seen TIMESTAMP,
    last_seen TIMESTAMP,
    PRIMARY KEY (source_entity_id, target_entity_id, relationship_type)
);

-- Time series rollups (daily aggregates)
CREATE TABLE entity_daily (
    entity_id UUID REFERENCES entities(id),
    date DATE,
    mention_count INT,
    sentiment_avg FLOAT,
    sources TEXT[],
    PRIMARY KEY (entity_id, date)
);

Triggers & Automation

Daily Knowledge Updates

┌─────────────────────────────────────────────────────────────────┐
│                    DAILY TRIGGER PIPELINE                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  1. Query triggers from watched entities/topics                  │
│     ├── "OpenAI news today"                                      │
│     ├── "[watched person] latest"                                │
│     └── "[tracked topic] updates"                                │
│                                                                  │
│  2. Run crawl jobs (geo-distributed)                             │
│                                                                  │
│  3. Process new content                                          │
│     ├── Chunk & embed                                            │
│     ├── Extract entities                                         │
│     └── Resolve & connect                                        │
│                                                                  │
│  4. Update time series                                           │
│     ├── Increment mention counts                                 │
│     ├── Calculate sentiment drift                                │
│     └── Flag significant changes                                 │
│                                                                  │
│  5. Generate alerts (optional)                                   │
│     └── "Entity X mentioned 5x more than usual"                  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Historical Backfill (Lazarus)

Entity Created ──► Check for historical presence ──► Lazarus query
                   ┌─────────────────────────────────────┘
            Historical snapshots ──► Extract mentions ──► Backfill time_series

Chatability: The Conversational Interface

Stage 3 enables natural language interaction with the knowledge base:

User: "What's been happening with OpenAI lately?"

System:
1. Resolve "OpenAI" → entity_id
2. Query recent mentions (last 7 days)
3. Summarize key events
4. Show connected entities that changed

Response: "In the past week, OpenAI has been mentioned in 47 sources...
Key events:
- GPT-5 announcement rumors (Jan 3)
- Sam Altman testimony to Congress (Jan 5)
Connected entities with activity: Microsoft, Anthropic, Google DeepMind"

---

User: "Attach this PDF to the Sam Altman entity"

System:
1. Upload PDF
2. OCR & extract text
3. Create new mention linked to entity
4. Extract any new connections from content

Response: "Added document as source for Sam Altman.
Extracted 3 new connections: Reid Hoffman, Y Combinator, Worldcoin"

Implementation Phases

Phase 1: Complete Stage 1 (Current)

  • Job types working (crawl, lazarus, screenshots, vectl, transcript)
  • File storage operational
  • API endpoints functional
  • Standardize output schemas across jobs

Phase 2: Build Stage 2 Pipeline

  • Integrate existing crawl.sh post-processing into cbintel workers
  • Add entity extraction (spaCy or LLM-based)
  • Auto-embed all content chunks via vectl
  • Create topic-based vectl clusters
  • Parent/child batch tracking

Phase 3: Entity Resolution & Storage

  • Design DuckDB entity schema
  • Build entity resolution logic
  • Create connection inference
  • Time series aggregation jobs

Phase 4: Chatability

  • Natural language query parsing
  • Entity-aware RAG pipeline
  • Attachment/annotation interface
  • Alert/notification system

Design Principles

  1. Let data drive architecture - Don't over-engineer; let patterns emerge from actual use
  2. Keep it lightweight - Entities are just indexed pointers, not heavy objects
  3. Always collecting - As long as we're gathering data on a topic, there's always something to integrate
  4. Time is first-class - Every piece of knowledge has a temporal dimension
  5. Search engine that builds itself - Each query adds to the knowledge base

Next Steps

  • Prototype entity extraction on existing crawl outputs
  • Design vectl cluster naming convention
  • Create DuckDB schema and test with sample data
  • Build simple entity resolution (exact match + alias)
  • Add daily trigger for one watched topic as proof of concept