cbintel API Reference¶
Programmatic API reference for all cbintel sub-services.
cbintel.crawl - AI Web Crawling¶
CrawlPipeline¶
from cbintel.crawl import CrawlPipeline, CrawlConfig
config = CrawlConfig(
max_urls=50,
max_depth=3,
ai_model="claude-3-5-sonnet-20241022",
)
pipeline = CrawlPipeline(config)
result = await pipeline.run("What is the current state of AI regulation?")
print(result.synthesis) # Final AI synthesis
for page in result.pages:
print(f"{page.url}: score={page.score}")
Configuration¶
| Parameter | Type | Default | Description |
|---|---|---|---|
max_urls |
int | 50 | Maximum URLs to process |
max_depth |
int | 3 | Maximum batch depth |
ai_model |
str | claude-3-5-sonnet | AI model for evaluation |
min_score |
float | 6.0 | Minimum relevance score (0-10) |
search_provider |
str | duckduckgo | Search engine |
cbintel.lazarus - Historical Archives¶
CDXClient¶
Query Internet Archive and Common Crawl CDX APIs.
from cbintel.lazarus import CDXClient
client = CDXClient(source="ia") # or "cc" for Common Crawl
# Query snapshots for a URL
records = await client.query(
"https://example.com",
from_date=datetime(2020, 1, 1),
to_date=datetime(2024, 1, 1),
limit=100,
)
for record in records:
print(f"{record.timestamp}: {record.status}")
content = await record.fetch() # Get archived content
URLDiscovery¶
Discover archived URLs using gau.
from cbintel.lazarus import URLDiscovery
discovery = URLDiscovery()
# Discover URLs for a domain
result = await discovery.discover(
"example.com",
sources=["wayback", "commoncrawl"],
limit=1000,
)
print(f"Found {result.total_urls} URLs")
for url in result.sample(50): # Random sample
print(url)
ArchiveClient¶
High-level archive operations.
from cbintel.lazarus import ArchiveClient
client = ArchiveClient()
# Get snapshots of a URL
snapshots = await client.get_snapshots(
"https://example.com/page",
from_date=datetime(2020, 1, 1),
)
# Process an entire domain
async for snapshot in client.process_domain("example.com"):
print(f"{snapshot.timestamp}: {snapshot.url}")
# Compare two versions
diff = await client.compare_versions(url, timestamp1, timestamp2)
TemporalAnalyzer¶
Analyze content changes over time.
from cbintel.lazarus import TemporalAnalyzer
analyzer = TemporalAnalyzer()
# Build timeline
timeline = await analyzer.build_timeline(snapshots)
print(f"Timeline: {timeline.start_date} to {timeline.end_date}")
# Detect changes
for change in timeline.changes:
print(f"{change.date}: {change.change_type} ({change.similarity:.2f})")
# Get statistics
stats = analyzer.get_stats(snapshots)
print(f"Total snapshots: {stats.total_snapshots}")
print(f"Years covered: {stats.year_distribution}")
cbintel.vectl - Vector Search¶
EmbeddingService¶
Generate text embeddings using Ollama.
from cbintel.vectl import EmbeddingService
service = EmbeddingService(
model="nomic-embed-text",
base_url="http://127.0.0.1:11434",
)
# Single text
vector = await service.embed("Hello, world!")
print(f"Dimensions: {len(vector)}") # 768
# Batch embedding
vectors = await service.embed_batch([
"First document",
"Second document",
"Third document",
])
VectorStore¶
Store and search vectors with K-means clustering.
from cbintel.vectl import VectorStore
store = VectorStore(path="./my-index")
# Add vectors
await store.add("doc1", vector1, metadata={"source": "file1.txt"})
await store.add("doc2", vector2, metadata={"source": "file2.txt"})
# Search
results = await store.search(query_vector, top_k=10)
for result in results:
print(f"{result.id}: {result.score:.3f}")
# Persist
await store.save()
SemanticSearch¶
End-to-end semantic search.
from cbintel.vectl import SemanticSearch
search = SemanticSearch("./my-index")
# Index documents
await search.index_file("document.txt")
await search.index_directory("./docs")
# Search with text query
results = await search.search("machine learning algorithms", top_k=10)
for result in results:
print(f"{result.text[:100]}... (score: {result.score:.3f})")
DocumentIndex¶
Simple document indexing interface.
from cbintel.vectl import DocumentIndex
index = DocumentIndex("./my-corpus")
# Index with automatic chunking
await index.add_document("doc1", "Long document text...", chunk_size=512)
# Search
matches = await index.search("query text", top_k=5)
cbintel.screenshots - Browser Automation¶
ScreenshotService¶
Capture web page screenshots.
from cbintel.screenshots import ScreenshotService, CaptureConfig
config = CaptureConfig(
browser="chromium", # or firefox, webkit
viewport_width=1920,
viewport_height=1080,
)
async with ScreenshotService(config) as service:
# Full page screenshot
capture = await service.screenshot(
"https://example.com",
full_page=True,
)
capture.save("screenshot.png")
# Element screenshot
capture = await service.screenshot(
"https://example.com",
selector="#header",
)
# With DOM capture
capture = await service.screenshot(
"https://example.com",
capture_dom=True,
)
print(capture.dom) # HTML content
PDFService¶
Generate PDFs from web pages.
from cbintel.screenshots import PDFService, PDFConfig
config = PDFConfig(
format="A4",
landscape=False,
scale=1.0,
margin_top="1cm",
margin_bottom="1cm",
)
async with PDFService(config) as service:
result = await service.generate(
"https://example.com",
output=Path("document.pdf"),
)
print(f"Generated: {len(result.pdf)} bytes")
DOMService¶
Extract DOM elements with positions.
from cbintel.screenshots import DOMService, DOMConfig
config = DOMConfig(
browser="chromium",
viewport_width=1920,
viewport_height=1080,
)
async with DOMService(config) as service:
# Extract elements by selector
elements = await service.extract(
"https://example.com",
selector="a, button",
include_positions=True,
)
for el in elements:
print(f"{el.tag}: {el.text} at ({el.x}, {el.y})")
# Extract all links
links = await service.extract_links("https://example.com")
# Extract interactive elements
interactive = await service.extract_interactive("https://example.com")
# Get full page HTML
html = await service.get_html("https://example.com")
cbintel.cluster - VPN Cluster API¶
REST API¶
The cluster API runs as a FastAPI service on port 9002. See vpn-cluster-api.md for full endpoint documentation.
DeviceService¶
Manage OpenWRT devices programmatically.
from cbintel.cluster.services.device_service import DeviceService, DeviceRegistryManager
# Initialize
registry = DeviceRegistryManager("/var/lib/vpn-banks/device-registry.json")
service = DeviceService(registry)
# Ping a device
result = await service.ping_device(1)
print(f"Latency: {result.avg_ms}ms")
# Get external IP with geolocation
ip_info = await service.get_external_ip(1)
print(f"Exit: {ip_info.ip} ({ip_info.country}, {ip_info.city})")
# Run speedtest
speed = await service.run_speedtest(1)
print(f"Down: {speed.download_mbps} Mbps, Up: {speed.upload_mbps} Mbps")
# Execute command
result = await service.execute_command(1, "uptime")
print(result.stdout)
# Get system info
info = await service.get_system_info(1)
print(f"Uptime: {info.uptime}s, Load: {info.load_avg}")
BankService¶
Manage geographic VPN pools.
from cbintel.cluster.services.bank_service import BankService
service = BankService()
# Create a bank
bank = await service.create_bank(
name="California",
workers=[1, 2, 3],
filter="us:ca",
)
print(f"Bank endpoint: {bank.endpoint}")
# List banks
banks = await service.list_banks()
# Get bank status
status = await service.get_bank("california")
# Delete bank
await service.delete_bank("california")
cbintel.ai - AI Clients¶
AnthropicClient¶
from cbintel.ai import AnthropicClient
client = AnthropicClient(model="claude-3-5-sonnet-20241022")
response = await client.complete(
"Summarize this document:",
context=document_text,
)
OllamaClient¶
from cbintel.ai import OllamaClient
client = OllamaClient(
model="llama3.2",
base_url="http://127.0.0.1:11434",
)
response = await client.complete("What is machine learning?")
CBAIClient (Unified)¶
from cbintel.ai import CBAIClient
# Automatic provider selection
client = CBAIClient()
response = await client.complete(
"Analyze this text",
model="claude-3-5-sonnet", # or "llama3.2" for Ollama
)
cbintel.net - Network Operations¶
HTTPClient¶
from cbintel.net import HTTPClient
async with HTTPClient() as client:
# Simple GET
response = await client.get("https://example.com")
# With proxy
response = await client.get(
"https://example.com",
proxy="http://17.0.0.1:8894",
)
# POST with JSON
response = await client.post(
"https://api.example.com",
json={"key": "value"},
)
SearchClient¶
from cbintel.net import SearchClient
client = SearchClient(provider="duckduckgo")
results = await client.search("AI regulation", max_results=20)
for result in results:
print(f"{result.title}: {result.url}")
URLCleaner¶
from cbintel.net import URLCleaner
cleaner = URLCleaner()
# Clean URL (remove tracking params, normalize)
clean = cleaner.clean("https://example.com?utm_source=twitter&id=123")
# Extract domain
domain = cleaner.extract_domain("https://sub.example.com/path")
cbintel.io - I/O Operations¶
HTMLProcessor¶
from cbintel.io import HTMLProcessor
processor = HTMLProcessor()
# Extract main content
content = processor.extract_content(html)
# Convert to markdown
markdown = processor.to_markdown(html)
# Extract links
links = processor.extract_links(html, base_url="https://example.com")
SessionManager¶
from cbintel.io import SessionManager
manager = SessionManager(base_path="./sessions")
# Create session
session = manager.create("crawl-2024-01-15")
# Save data
session.save("results.json", results)
session.save("pages/page1.md", content)
# Load session
session = manager.load("crawl-2024-01-15")
data = session.load("results.json")
Error Handling¶
All services use standard Python exceptions:
from cbintel.exceptions import (
CbintelError, # Base exception
NetworkError, # HTTP/connection errors
AIError, # LLM API errors
StorageError, # File/database errors
ConfigurationError, # Missing config/env vars
)
try:
result = await client.complete(prompt)
except AIError as e:
print(f"AI error: {e}")
except NetworkError as e:
print(f"Network error: {e}")
Async Patterns¶
All cbintel services use async/await:
import asyncio
from cbintel.vectl import SemanticSearch
from cbintel.screenshots import ScreenshotService
async def main():
# Initialize services
search = SemanticSearch("./index")
async with ScreenshotService() as screenshots:
# Parallel operations
results = await asyncio.gather(
search.search("query"),
screenshots.screenshot("https://example.com"),
)
search_results, capture = results
asyncio.run(main())