cbintel.knowledge - Entity-Centric Knowledge Management¶
The cbintel.knowledge package provides entity extraction, resolution, and storage for building knowledge graphs from intelligence data.
Architecture Overview¶
flowchart TB
subgraph Input["Data Sources"]
CRAWL[Crawl Jobs]
LAZARUS[Lazarus Jobs]
TRANSCRIPT[Transcript Jobs]
MANUAL[Manual Text]
end
subgraph Pipeline["Knowledge Pipeline"]
EXTRACT[EntityExtractor<br/>LLM-based NER]
RESOLVE[EntityResolver<br/>Match & Dedupe]
STORE[EntityStore<br/>DuckDB]
end
subgraph Output["Knowledge Graph"]
ENTITIES[(Entities)]
MENTIONS[(Mentions)]
CONNECTIONS[(Connections)]
TIMESERIES[(Time Series)]
end
CRAWL --> EXTRACT
LAZARUS --> EXTRACT
TRANSCRIPT --> EXTRACT
MANUAL --> EXTRACT
EXTRACT -->|ExtractedEntity| RESOLVE
RESOLVE -->|Entity + Mention| STORE
STORE --> ENTITIES
STORE --> MENTIONS
STORE --> CONNECTIONS
STORE --> TIMESERIES
Core Concepts¶
Entity Types¶
mindmap
root((Entity Types))
PERSON
Politicians
Executives
Researchers
ORG
Companies
Governments
NGOs
PLACE
Countries
Cities
Regions
THING
Products
Technologies
Documents
EVENT
Elections
Conferences
Incidents
TOPIC
Debates
Policies
Trends
Data Model¶
erDiagram
ENTITY ||--o{ MENTION : "has"
ENTITY ||--o{ CONNECTION : "source"
ENTITY ||--o{ CONNECTION : "target"
ENTITY ||--o{ ENTITY_DAILY : "tracked"
ENTITY {
uuid id PK
string tenant_id
string canonical_name
enum entity_type
list aliases
string description
json metadata
list embedding
int mention_count
int connection_count
datetime first_seen
datetime last_updated
}
MENTION {
uuid id PK
uuid entity_id FK
string source_id
string source_type
string source_url
string text
string context
float sentiment
float confidence
datetime created_at
}
CONNECTION {
uuid id PK
uuid source_entity_id FK
uuid target_entity_id FK
string relationship_type
float strength
datetime first_seen
datetime last_seen
int mention_count
}
ENTITY_DAILY {
uuid entity_id FK
date point_date
int mention_count
float avg_sentiment
list source_types
}
Components¶
KnowledgeService¶
High-level orchestrator for the full pipeline.
from cbintel.knowledge import KnowledgeService
async with KnowledgeService(tenant_id="my_tenant") as service:
# Process text directly
result = await service.process_text(
text="Sam Altman returned as CEO of OpenAI...",
source_id="article_123",
source_type="crawl",
source_url="https://example.com/article"
)
# Or process job output
result = await service.process_job_output(
job_id="job_456",
job_type="crawl",
content=job_output_dict
)
# Query entities
entities = service.search_entities("OpenAI")
entity = service.get_entity_by_name("Sam Altman")
# Get entity details
mentions = service.get_entity_mentions(entity.id)
connections = service.get_entity_connections(entity.id)
timeline = service.get_entity_timeline(entity.id, days=30)
# Trending analysis
trending = service.get_trending_entities(days=7, limit=10)
EntityExtractor¶
LLM-based Named Entity Recognition.
sequenceDiagram
participant Text
participant Extractor
participant LLM as CBAI (Claude)
participant Result
Text->>Extractor: extract(text)
Extractor->>LLM: Phase 1: NER Prompt
LLM-->>Extractor: Entities JSON
Extractor->>Extractor: Parse entities
alt Multiple entities found
Extractor->>LLM: Phase 2: Relationship Prompt
LLM-->>Extractor: Relationships JSON
Extractor->>Extractor: Enrich entities
end
Extractor-->>Result: List[ExtractedEntity]
from cbintel.knowledge import EntityExtractor
extractor = EntityExtractor(provider="claude")
entities = await extractor.extract(
text="Elon Musk announced that Tesla will...",
extract_relationships=True
)
for entity in entities:
print(f"{entity.text} ({entity.entity_type.value})")
for rel in entity.relationships:
print(f" -> {rel['target']} ({rel['relationship_type']})")
EntityResolver¶
Matches extracted mentions to canonical entities.
flowchart TD
INPUT[ExtractedEntity] --> EXACT{Exact Name<br/>Match?}
EXACT -->|Yes| FOUND[Return Entity]
EXACT -->|No| ALIAS{Alias<br/>Match?}
ALIAS -->|Yes| FOUND
ALIAS -->|No| EMBED{Embedding<br/>Similarity?}
EMBED -->|>0.85| FOUND
EMBED -->|<0.85| CREATE[Create New Entity]
CREATE --> FOUND
FOUND --> MENTION[Create Mention]
MENTION --> UPDATE[Update Aliases]
UPDATE --> RETURN[Return Entity, Mention]
from cbintel.knowledge import EntityResolver, EntityStore
store = EntityStore(tenant_id="my_tenant")
resolver = EntityResolver(store=store, similarity_threshold=0.85)
entity, mention = await resolver.resolve(
extracted=extracted_entity,
source_id="doc_123",
source_type="manual"
)
EntityStore¶
DuckDB-based storage with tenant isolation.
from cbintel.knowledge import EntityStore, Entity, EntityType
store = EntityStore(tenant_id="my_tenant", db_path="./knowledge.duckdb")
# Create entity
entity = Entity(
canonical_name="OpenAI",
entity_type=EntityType.ORG,
aliases=["Open AI", "OpenAI Inc"]
)
store.create_entity(entity)
# Search
results = store.search_entities("open", limit=10)
# Time series
timeline = store.get_time_series(entity.id, start_date, end_date)
# Statistics
stats = store.get_stats()
Processing Pipeline¶
flowchart LR
subgraph Stage1["Stage 1: Extraction"]
T1[Raw Text] --> E1[NER]
E1 --> E2[Relationship<br/>Enrichment]
end
subgraph Stage2["Stage 2: Resolution"]
E2 --> R1[Exact Match]
R1 --> R2[Alias Match]
R2 --> R3[Embedding<br/>Similarity]
R3 --> R4[Auto-Create]
end
subgraph Stage3["Stage 3: Storage"]
R4 --> S1[Store Entity]
S1 --> S2[Store Mention]
S2 --> S3[Store Connection]
S3 --> S4[Update<br/>Time Series]
end
Job Integration¶
The service integrates with all cbintel job types:
| Job Type | Content Extracted |
|---|---|
crawl |
Synthesis + page summaries |
lazarus |
Snapshot text (2000 chars/snapshot) |
transcript |
Transcript text + structured summary |
vectl |
Text content field |
screenshots |
(Future: OCR text) |
# Automatic extraction from job results
result = await service.process_job_output(
job_id="crawl_abc123",
job_type="crawl",
content={
"synthesis": "Key findings about...",
"pages": [
{"summary": "Page 1 discusses..."},
{"summary": "Page 2 covers..."}
]
}
)
Query Examples¶
Find Related Entities¶
# Get all connections for an entity
connections = service.get_entity_connections(entity.id)
# Get related entities with their connections
related = service.get_related_entities(
entity.id,
relationship_type="works_for",
limit=20
)
for other_entity, connection in related:
print(f"{other_entity.canonical_name}: {connection.relationship_type}")
Trending Analysis¶
# Get entities with most mentions in last 7 days
trending = service.get_trending_entities(
entity_type=EntityType.PERSON,
days=7,
limit=10
)
for entity, mention_count in trending:
print(f"{entity.canonical_name}: {mention_count} mentions")
Entity Timeline¶
# Get daily mention counts
timeline = service.get_entity_timeline(entity.id, days=30)
for point in timeline:
print(f"{point.point_date}: {point.mention_count} mentions, "
f"sentiment: {point.avg_sentiment:.2f}")
Database Schema¶
The EntityStore uses DuckDB with tenant-isolated tables:
-- Entities table
CREATE TABLE entities (
id UUID PRIMARY KEY,
tenant_id VARCHAR NOT NULL,
canonical_name VARCHAR NOT NULL,
entity_type VARCHAR NOT NULL,
aliases VARCHAR[],
description VARCHAR,
metadata JSON,
embedding DOUBLE[],
mention_count INTEGER DEFAULT 0,
connection_count INTEGER DEFAULT 0,
first_seen TIMESTAMP,
last_updated TIMESTAMP
);
-- Mentions table
CREATE TABLE mentions (
id UUID PRIMARY KEY,
entity_id UUID REFERENCES entities(id),
source_id VARCHAR NOT NULL,
source_type VARCHAR NOT NULL,
source_url VARCHAR,
text VARCHAR NOT NULL,
entity_type VARCHAR,
context VARCHAR,
sentiment DOUBLE,
confidence DOUBLE,
created_at TIMESTAMP
);
-- Connections table
CREATE TABLE connections (
id UUID PRIMARY KEY,
source_entity_id UUID REFERENCES entities(id),
target_entity_id UUID REFERENCES entities(id),
relationship_type VARCHAR NOT NULL,
strength DOUBLE DEFAULT 1.0,
first_seen TIMESTAMP,
last_seen TIMESTAMP,
mention_count INTEGER DEFAULT 1
);
-- Daily aggregates
CREATE TABLE entity_daily (
entity_id UUID REFERENCES entities(id),
point_date DATE NOT NULL,
mention_count INTEGER DEFAULT 0,
avg_sentiment DOUBLE,
source_types VARCHAR[],
PRIMARY KEY (entity_id, point_date)
);
Configuration¶
# Default provider is Claude (recommended for accuracy)
service = KnowledgeService(
tenant_id="my_tenant",
db_path="./data/knowledge.duckdb", # Optional, uses default if None
provider="claude" # or "ollama" for local inference
)
# Resolver settings
resolver = EntityResolver(
store=store,
similarity_threshold=0.85, # Embedding match threshold
auto_create=True, # Create entities for unmatched mentions
use_embeddings=True # Enable semantic matching
)
Future Enhancements¶
- OCR Integration: Extract entities from screenshots via CBAI OCR
- Graph Visualization: NetworkX/D3.js entity graph rendering
- Alerts: Notify on entity activity spikes
- Chat Interface: Natural language queries over entity graph
- Cross-tenant Analytics: Aggregate insights across tenants