Data Products Notebook¶
Working document for understanding cbintel data products, schemas, and the path toward entity-centric knowledge intelligence.
Overview¶
All job outputs follow a common pattern:
1. Local working directory: /tmp/cbintel-jobs/{job_id}/
2. Remote storage: files.nominate.ai in bucket cbintel-jobs
3. Public URLs: https://files.nominate.ai/api/v1/public/cbintel-jobs/{job_id}/{filename}
Common JobResult Schema¶
class JobResult:
result_url: str | None # Primary result file URL
summary: dict[str, Any] # Job-specific summary data
output_paths: list[str] # All uploaded file URLs
metrics: dict[str, Any] # Performance/size metrics
Three-Stage Architecture¶
The system evolves through three stages, each building on the previous:
┌─────────────────────────────────────────────────────────────────────────────┐
│ STAGE 1: TACTICAL │
│ (Immediate Data Products) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Query ──► Jobs API ──► Data Products │
│ │ │
│ ├── crawl → pages, synthesis │
│ ├── lazarus → historical snapshots │
│ ├── screenshot → visual captures │
│ ├── vectl → embeddings, search │
│ └── transcript → structured video content │
│ │
│ Output: Job-specific files in files.nominate.ai │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ STAGE 2: KNOWLEDGE INTEGRATION │
│ (Lightweight connections in vectl clusters) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Job Outputs ──► Processing Pipeline ──► Knowledge Chunks │
│ │ │
│ ├── Clean & chunk content │
│ ├── OCR for PDFs/images │
│ ├── Entity extraction (NER) │
│ ├── Generate embeddings │
│ └── Index in vectl clusters │
│ │
│ Storage: vectl stores organized by topic/domain │
│ Links: Parent/child batch relationships │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ STAGE 3: KNOWLEDGE INTELLIGENCE │
│ (Entities with time dimension, connections, chatability) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Knowledge Chunks ──► Entity Resolution ──► Knowledge Graph │
│ │ │
│ ├── Merge mentions → canonical entities │
│ ├── Track time dimension (daily updates) │
│ ├── Connect entities via relationships │
│ ├── Historical context from lazarus │
│ └── Conversational interface │
│ │
│ Capabilities: │
│ - "What happened to [entity] last week?" │
│ - "Show me everything related to [topic]" │
│ - "Track [entity] over time" │
│ - "Attach this document to [entity]" │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Existing Building Blocks¶
From crawwwl/crawl.sh - Post-Processing Pipeline¶
The crawl pipeline already has post-processing steps that feed into integration:
# Post Analysis (lines 799-804)
${EVALUATE} ${BATCHID} ${DATA} # evaluate_results.py
${EVALUATE_SIMPLE} ${BATCHID} ${DATA} # evaluate_results_simple.py
${EVALUATE_ENHANCED} ${BATCHID} ${DATA} # evaluate_results_enhanced.py
${INTEGRATE} ${BATCHID} question_answering # knowledge_integrator.py
${EXTEND} ${BATCHID} --execute-children # extend_results.py
From seedbed/lib/state_manager.py - State Schema¶
The seedbed project defines a session state with entities already anticipated:
@dataclass
class ContentChunk:
chunk_id: str
content: str
quality_score: float
source_url: str
placeholders: List[Dict[str, Any]] = None
entities: List[Dict[str, Any]] = None # ← Already has entities field!
Knowledge Graph Structure (from state_schema.json):
Session States: initialized → active → completed | failed | interrupted
From extend_results.py - Gap Analysis & Iteration¶
Extension types for iterative knowledge building:
- depth - More detail on existing topics
- breadth - Related topics
- clarification - Resolve ambiguity
- update - Newer information
Child batch generation creates parent/child relationships between crawls.
Job Types & Data Products¶
1. Crawl (/api/v1/jobs/crawl)¶
Purpose: AI-powered web crawling and knowledge synthesis
Input:
{
"query": "search query",
"prompt_type": "investigative|question_answering|technical_research|competitive_analysis|trend_analysis",
"max_urls": 100,
"skip_embeddings": false,
"geo": "us:ca"
}
Output Files:
| File | Description |
|------|-------------|
| crawl_result.json | Primary result with synthesis |
| {batch_id}/ | Directory with raw page data |
Directory Structure:
{job_id}/
├── crawl_result.json # Main result
└── {batch_id}/
├── page_001.json # Individual page data
├── page_002.json
└── synthesis.json # AI synthesis
Entity Extraction Points: - Page titles → potential entity names - Links → relationships between entities - Synthesis → key entities mentioned
2. Lazarus (/api/v1/jobs/lazarus)¶
Purpose: Historical web archive retrieval (Wayback Machine, Common Crawl)
Input:
{
"domain": "example.com",
"providers": ["wayback"],
"from_date": "20200101",
"to_date": "20231231",
"sample_size": 100,
"use_pipeline_v2": true,
"timeout": 300,
"geo": "us"
}
Time Dimension Value: - Snapshots provide historical state of entities - Can track how entity mentions/descriptions changed over time - Enables "what did [entity] look like in 2020?" queries
3. Screenshots (/api/v1/jobs/screenshots)¶
Purpose: Browser automation for visual capture
Input:
{
"urls": ["https://example.com"],
"full_page": true,
"capture_dom": false,
"viewport_width": 1920,
"viewport_height": 1080,
"format": "png|jpeg|pdf",
"geo": "de"
}
OCR Integration Point: - Screenshots can be OCR'd via CBAI - Extract text from visual content - Useful for capturing dynamic/JS-heavy pages
4. Vectl (/api/v1/jobs/vectl)¶
Purpose: Vector embeddings and semantic search
Input:
{
"operation": "embed|search|batch_embed|index",
"texts": ["text to embed"],
"query": "search query",
"store_name": "default",
"k": 10
}
Central Role in Architecture: - Stage 2: All content chunks get embedded here - Clustering: Similar content naturally groups - Search: Semantic search across all knowledge
5. Transcript (/api/v1/jobs/transcript)¶
Purpose: YouTube video transcript processing and summarization
Input:
{
"url": "https://youtube.com/watch?v=xxx",
"content_type": "debate|lecture|tutorial|interview|conversation|general",
"skip_classification": false
}
Entity-Rich Content Types:
| Type | Natural Entities |
|------|-----------------|
| debate | Speakers, arguments, positions |
| lecture | Instructor, concepts, references |
| interview | Interviewer, interviewee, quotes |
| conversation | Participants, topics |
Entity Model¶
The Entity is the core of Stage 3. An entity can be:
- Person: politician, researcher, CEO, speaker
- Organization: company, government, NGO
- Place: country, city, location
- Thing: product, technology, concept
- Event: election, debate, conference, crisis
- Topic: climate change, AI regulation, etc.
Entity Schema¶
┌─────────────────────────────────────────────────────────────────┐
│ ENTITY │
├─────────────────────────────────────────────────────────────────┤
│ id: uuid │
│ canonical_name: str # "OpenAI", "Joe Biden" │
│ aliases: [str] # ["Open AI", "OPENAI"] │
│ entity_type: person | org | place | thing | event | topic │
│ │
│ first_seen: timestamp # When entity entered system │
│ last_updated: timestamp # Most recent mention │
│ │
│ mentions: [ # All occurrences │
│ { │
│ source_id: job_id, │
│ source_type: crawl | lazarus | transcript, │
│ source_url: str, │
│ captured_at: timestamp, │
│ context: str, # Surrounding text │
│ sentiment: float # -1.0 to 1.0 │
│ } │
│ ] │
│ │
│ connections: [ # Relationships to other entities│
│ { │
│ target_entity_id: uuid, │
│ relationship_type: str, # "works_for", "mentions", etc │
│ strength: float, # 0.0 to 1.0 │
│ first_seen: timestamp, │
│ last_seen: timestamp │
│ } │
│ ] │
│ │
│ time_series: [ # Historical data points │
│ { │
│ date: date, │
│ mention_count: int, │
│ sentiment_avg: float, │
│ key_events: [str] │
│ } │
│ ] │
│ │
│ embedding: [float] * 768 # Semantic representation │
└─────────────────────────────────────────────────────────────────┘
Entity Resolution Pipeline¶
Raw Text ──► NER Extraction ──► Mention Candidates ──► Resolution ──► Canonical Entity
│ │ │
│ │ └── Match to existing?
│ │ ├── Yes → Add mention
│ │ └── No → Create new
│ │
│ └── Coreference resolution
│ ("he", "the company", "it")
│
└── spaCy / LLM-based extraction
- PERSON, ORG, GPE, EVENT, etc.
Data Flow: From Job to Entity¶
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Stage 1 │ │ Stage 2 │ │ Stage 2 │ │ Stage 3 │
│ Job Runs │ ──► │ Clean & │ ──► │ Extract │ ──► │ Resolve & │
│ │ │ Chunk │ │ Entities │ │ Connect │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│ │ │ │
▼ ▼ ▼ ▼
crawl_result.json content_chunks[] mentions[] entities[]
transcript.json quality_scores entity_types connections[]
snapshots.json embeddings context time_series[]
Concrete Example¶
Input: Crawl job for "OpenAI board crisis November 2023"
Stage 1 Output: - 50 pages crawled - synthesis.json with summary
Stage 2 Processing:
# Content chunking
chunks = [
{"chunk_id": "c1", "content": "Sam Altman was fired as CEO...", "quality_score": 0.87},
{"chunk_id": "c2", "content": "Microsoft offered to hire...", "quality_score": 0.91},
...
]
# Entity extraction
entities_found = [
{"text": "Sam Altman", "type": "PERSON", "chunk_id": "c1"},
{"text": "OpenAI", "type": "ORG", "chunk_id": "c1"},
{"text": "Microsoft", "type": "ORG", "chunk_id": "c2"},
{"text": "Satya Nadella", "type": "PERSON", "chunk_id": "c2"},
]
Stage 3 Resolution:
# Resolved entities with connections
entities = {
"sam_altman": Entity(
canonical_name="Sam Altman",
entity_type="person",
connections=[
{"target": "openai", "relationship": "former_ceo_of"},
{"target": "microsoft", "relationship": "offered_position_by"},
]
),
"openai": Entity(
canonical_name="OpenAI",
entity_type="org",
connections=[
{"target": "sam_altman", "relationship": "fired"},
{"target": "microsoft", "relationship": "partnership"},
]
)
}
Storage Architecture¶
Stage 1: Job Files (files.nominate.ai)¶
cbintel-jobs/
├── job_abc123/
│ ├── crawl_result.json
│ └── batch_xyz/
└── job_def456/
├── result.json
└── subtitles.srt
Stage 2: Vectl Clusters¶
vectl-stores/
├── topic_openai/ # Topic-specific index
│ ├── index.bin # HNSW index
│ └── metadata.json # Chunk → source mapping
├── topic_climate/
└── global/ # Cross-topic search
Stage 3: Entity Store (DuckDB)¶
-- Core entity table
CREATE TABLE entities (
id UUID PRIMARY KEY,
canonical_name TEXT NOT NULL,
entity_type TEXT NOT NULL,
aliases TEXT[],
first_seen TIMESTAMP,
last_updated TIMESTAMP,
embedding FLOAT[768]
);
-- Mentions table (append-only log)
CREATE TABLE mentions (
id UUID PRIMARY KEY,
entity_id UUID REFERENCES entities(id),
source_id TEXT,
source_type TEXT,
source_url TEXT,
captured_at TIMESTAMP,
context TEXT,
sentiment FLOAT
);
-- Connections table
CREATE TABLE connections (
source_entity_id UUID REFERENCES entities(id),
target_entity_id UUID REFERENCES entities(id),
relationship_type TEXT,
strength FLOAT,
first_seen TIMESTAMP,
last_seen TIMESTAMP,
PRIMARY KEY (source_entity_id, target_entity_id, relationship_type)
);
-- Time series rollups (daily aggregates)
CREATE TABLE entity_daily (
entity_id UUID REFERENCES entities(id),
date DATE,
mention_count INT,
sentiment_avg FLOAT,
sources TEXT[],
PRIMARY KEY (entity_id, date)
);
Triggers & Automation¶
Daily Knowledge Updates¶
┌─────────────────────────────────────────────────────────────────┐
│ DAILY TRIGGER PIPELINE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Query triggers from watched entities/topics │
│ ├── "OpenAI news today" │
│ ├── "[watched person] latest" │
│ └── "[tracked topic] updates" │
│ │
│ 2. Run crawl jobs (geo-distributed) │
│ │
│ 3. Process new content │
│ ├── Chunk & embed │
│ ├── Extract entities │
│ └── Resolve & connect │
│ │
│ 4. Update time series │
│ ├── Increment mention counts │
│ ├── Calculate sentiment drift │
│ └── Flag significant changes │
│ │
│ 5. Generate alerts (optional) │
│ └── "Entity X mentioned 5x more than usual" │
│ │
└─────────────────────────────────────────────────────────────────┘
Historical Backfill (Lazarus)¶
Entity Created ──► Check for historical presence ──► Lazarus query
│
┌─────────────────────────────────────┘
▼
Historical snapshots ──► Extract mentions ──► Backfill time_series
Chatability: The Conversational Interface¶
Stage 3 enables natural language interaction with the knowledge base:
User: "What's been happening with OpenAI lately?"
System:
1. Resolve "OpenAI" → entity_id
2. Query recent mentions (last 7 days)
3. Summarize key events
4. Show connected entities that changed
Response: "In the past week, OpenAI has been mentioned in 47 sources...
Key events:
- GPT-5 announcement rumors (Jan 3)
- Sam Altman testimony to Congress (Jan 5)
Connected entities with activity: Microsoft, Anthropic, Google DeepMind"
---
User: "Attach this PDF to the Sam Altman entity"
System:
1. Upload PDF
2. OCR & extract text
3. Create new mention linked to entity
4. Extract any new connections from content
Response: "Added document as source for Sam Altman.
Extracted 3 new connections: Reid Hoffman, Y Combinator, Worldcoin"
Implementation Phases¶
Phase 1: Complete Stage 1 (Current)¶
- Job types working (crawl, lazarus, screenshots, vectl, transcript)
- File storage operational
- API endpoints functional
- Standardize output schemas across jobs
Phase 2: Build Stage 2 Pipeline¶
- Integrate existing crawl.sh post-processing into cbintel workers
- Add entity extraction (spaCy or LLM-based)
- Auto-embed all content chunks via vectl
- Create topic-based vectl clusters
- Parent/child batch tracking
Phase 3: Entity Resolution & Storage¶
- Design DuckDB entity schema
- Build entity resolution logic
- Create connection inference
- Time series aggregation jobs
Phase 4: Chatability¶
- Natural language query parsing
- Entity-aware RAG pipeline
- Attachment/annotation interface
- Alert/notification system
Design Principles¶
- Let data drive architecture - Don't over-engineer; let patterns emerge from actual use
- Keep it lightweight - Entities are just indexed pointers, not heavy objects
- Always collecting - As long as we're gathering data on a topic, there's always something to integrate
- Time is first-class - Every piece of knowledge has a temporal dimension
- Search engine that builds itself - Each query adds to the knowledge base
Next Steps¶
- Prototype entity extraction on existing crawl outputs
- Design vectl cluster naming convention
- Create DuckDB schema and test with sample data
- Build simple entity resolution (exact match + alias)
- Add daily trigger for one watched topic as proof of concept