CBAI Integration Plan¶
Non-Destructive Migration from Direct Claude/Ollama Calls to Unified AI Service
Created: 2026-01-06
Executive Summary¶
This document outlines a non-destructive approach to integrating the new CBAI (Campaign Brain AI) unified service into cbapp. The migration replaces direct Anthropic and Ollama API calls with calls to ai.nominate.ai, providing:
- Unified provider management - Single service handles Claude, Ollama, Mistral
- Full tool use support -
/api/v1/chat/toolsenables 100% traffic routing - Usage tracking - Centralized metrics across all tenants
- Simplified configuration - No per-tenant API key distribution
- Future flexibility - Easy provider switching without code changes
- Zero user impact - Transparent migration with identical behavior
Current Architecture¶
AI Integration Points¶
| Location | Purpose | Current Implementation |
|---|---|---|
src/api/routes/cb_chat.py:178-186 |
Chat API | Direct Anthropic() client |
src/api/routes/cb_chat.py:480 |
Claude calls | client.messages.create() |
src/lib/cbchat/engine.py:85-89 |
ChatEngine | Direct Anthropic() client |
src/lib/cbchat/engine.py:214 |
Engine calls | client.messages.create() |
src/api/routes/embeddings.py:188 |
Embeddings | Direct httpx to Ollama |
src/api/config.py:61-62 |
Config | ANTHROPIC_API_KEY, etc. |
Current Dependencies¶
# Direct Anthropic usage
from anthropic import Anthropic
client = Anthropic(api_key=settings.ANTHROPIC_API_KEY)
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
system=system_prompt,
messages=messages,
tools=tools, # Tool use support
)
# Direct Ollama usage
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{ollama_url}/api/embeddings",
json={"model": model, "prompt": text},
)
CBAI API Overview¶
Base URL: https://ai.nominate.ai
Endpoints¶
| Endpoint | Method | Purpose |
|---|---|---|
/api/v1/chat |
POST | Chat completion (Ollama/Claude) |
/api/v1/embed |
POST | Text embeddings (768 dim) |
/api/v1/summarize |
POST | Text summarization |
/api/v1/topics |
POST | Topic extraction |
/api/v1/ocr |
POST | Document OCR |
/api/v1/health |
GET | Provider health status |
/api/v1/usage |
GET | Usage metrics |
Chat API¶
# Request
POST /api/v1/chat?provider=claude
{
"messages": [{"role": "user", "content": "..."}],
"model": "claude-sonnet-4-5-20250929", # Optional, uses default
"max_tokens": 2048,
"temperature": 0.7,
"stream": false
}
# Response
{
"content": "...",
"model": "claude-sonnet-4-5-20250929",
"usage": {"input_tokens": 100, "output_tokens": 50},
"finish_reason": "stop"
}
Embeddings API¶
# Request
POST /api/v1/embed
{
"text": "text to embed" | ["text1", "text2"], # Single or batch
"model": "nomic-embed-text" # Optional
}
# Response
{
"embeddings": [[0.1, 0.2, ...]], # Always 2D array
"model": "nomic-embed-text",
"dimensions": 768
}
Integration Strategy¶
Design Principles¶
- Feature Flag Control - Toggle between direct and CBAI modes
- Adapter Pattern - Maintain existing interfaces, swap implementation
- Graceful Degradation - Fall back to direct calls if CBAI unavailable
- Zero Breaking Changes - All existing code continues to work
- Incremental Rollout - Enable per-tenant or globally
Phase 1: Create CBAI Client Module¶
Create src/lib/cbai/ module with:
src/lib/cbai/
├── __init__.py
├── client.py # Main CBAI client
├── chat.py # Chat adapter
├── embeddings.py # Embeddings adapter
└── config.py # CBAI configuration
src/lib/cbai/config.py¶
"""CBAI configuration and feature flags."""
from pydantic import Field
from pydantic_settings import BaseSettings
class CBAIConfig(BaseSettings):
"""Configuration for CBAI integration."""
# Feature flags
CBAI_ENABLED: bool = Field(default=False, description="Enable CBAI integration")
CBAI_CHAT_ENABLED: bool = Field(default=True, description="Use CBAI for chat")
CBAI_EMBED_ENABLED: bool = Field(default=True, description="Use CBAI for embeddings")
# Service configuration
CBAI_BASE_URL: str = Field(default="https://ai.nominate.ai", description="CBAI base URL")
CBAI_TIMEOUT: int = Field(default=60, description="Request timeout in seconds")
# Fallback behavior
CBAI_FALLBACK_ENABLED: bool = Field(default=True, description="Fall back to direct calls on error")
model_config = {"env_file": ".env", "extra": "allow"}
src/lib/cbai/client.py¶
"""CBAI unified client."""
import httpx
import logging
from typing import Any
from .config import CBAIConfig
logger = logging.getLogger(__name__)
class CBAIClient:
"""Client for CBAI unified AI service."""
def __init__(self, config: CBAIConfig | None = None):
self.config = config or CBAIConfig()
self._http_client: httpx.AsyncClient | None = None
async def _get_client(self) -> httpx.AsyncClient:
if self._http_client is None:
self._http_client = httpx.AsyncClient(
base_url=self.config.CBAI_BASE_URL,
timeout=self.config.CBAI_TIMEOUT,
)
return self._http_client
async def chat(
self,
messages: list[dict],
provider: str = "claude",
model: str | None = None,
max_tokens: int = 2048,
temperature: float = 0.7,
system: str | None = None,
tools: list[dict] | None = None,
stream: bool = False,
) -> dict:
"""
Send chat request to CBAI.
Note: Tool use requires special handling - CBAI may need extension
to support Anthropic's tool format.
"""
client = await self._get_client()
# Prepend system message if provided
if system:
messages = [{"role": "system", "content": system}] + messages
payload = {
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": stream,
}
if model:
payload["model"] = model
# Use /chat/tools endpoint if tools provided
if tools:
return await self.chat_with_tools(
messages=messages,
tools=tools,
system=system,
model=model,
max_tokens=max_tokens,
temperature=temperature,
)
response = await client.post(
f"/api/v1/chat",
params={"provider": provider},
json=payload,
)
response.raise_for_status()
return response.json()
async def chat_with_tools(
self,
messages: list[dict],
tools: list[dict],
system: str | None = None,
model: str | None = None,
max_tokens: int = 2048,
temperature: float = 0.7,
tool_choice: str | dict = "auto",
) -> dict:
"""
Chat with tool use support via /api/v1/chat/tools.
This endpoint supports the full Anthropic tool use format.
"""
client = await self._get_client()
payload = {
"messages": messages,
"tools": tools,
"tool_choice": tool_choice,
"max_tokens": max_tokens,
"temperature": temperature,
}
if system:
payload["system"] = system
if model:
payload["model"] = model
response = await client.post("/api/v1/chat/tools", json=payload)
response.raise_for_status()
return response.json()
async def embed(
self,
text: str | list[str],
model: str = "nomic-embed-text",
) -> dict:
"""Generate embeddings via CBAI."""
client = await self._get_client()
response = await client.post(
"/api/v1/embed",
json={"text": text, "model": model},
)
response.raise_for_status()
return response.json()
async def health(self) -> dict:
"""Check CBAI health status."""
client = await self._get_client()
response = await client.get("/api/v1/health")
response.raise_for_status()
return response.json()
async def close(self):
"""Close HTTP client."""
if self._http_client:
await self._http_client.aclose()
self._http_client = None
Phase 2: Create Adapters¶
Chat Adapter¶
"""Chat adapter with CBAI/direct switching."""
from anthropic import Anthropic
from ..cbai import CBAIClient, CBAIConfig
class ChatAdapter:
"""Adapter for chat completions - switches between CBAI and direct Anthropic."""
def __init__(self, anthropic_api_key: str, cbai_config: CBAIConfig | None = None):
self.anthropic_api_key = anthropic_api_key
self.cbai_config = cbai_config or CBAIConfig()
self._anthropic_client: Anthropic | None = None
self._cbai_client: CBAIClient | None = None
def _get_anthropic_client(self) -> Anthropic:
if self._anthropic_client is None:
self._anthropic_client = Anthropic(api_key=self.anthropic_api_key)
return self._anthropic_client
def _get_cbai_client(self) -> CBAIClient:
if self._cbai_client is None:
self._cbai_client = CBAIClient(self.cbai_config)
return self._cbai_client
async def create_message(
self,
model: str,
max_tokens: int,
system: str,
messages: list[dict],
tools: list[dict] | None = None,
) -> dict:
"""
Create chat message - routes to CBAI or direct Anthropic.
Returns dict matching Anthropic response structure for compatibility.
"""
use_cbai = (
self.cbai_config.CBAI_ENABLED
and self.cbai_config.CBAI_CHAT_ENABLED
)
if use_cbai:
try:
cbai = self._get_cbai_client()
# Use /chat/tools endpoint if tools provided
if tools:
result = await cbai.chat_with_tools(
messages=messages,
tools=tools,
system=system,
model=model,
max_tokens=max_tokens,
)
else:
result = await cbai.chat(
messages=messages,
provider="claude",
model=model,
max_tokens=max_tokens,
system=system,
)
# Transform CBAI response to match Anthropic structure
return self._transform_cbai_response(result)
except Exception as e:
if self.cbai_config.CBAI_FALLBACK_ENABLED:
logger.warning(f"CBAI error, falling back to direct: {e}")
else:
raise
# Direct Anthropic call
client = self._get_anthropic_client()
response = client.messages.create(
model=model,
max_tokens=max_tokens,
system=system,
messages=messages,
tools=tools,
)
return self._anthropic_to_dict(response)
def _transform_cbai_response(self, cbai_response: dict) -> dict:
"""Transform CBAI response to match Anthropic response structure."""
return {
"content": [{"type": "text", "text": cbai_response["content"]}],
"model": cbai_response.get("model", ""),
"stop_reason": cbai_response.get("finish_reason", "stop"),
"usage": {
"input_tokens": cbai_response.get("usage", {}).get("input_tokens", 0),
"output_tokens": cbai_response.get("usage", {}).get("output_tokens", 0),
},
}
def _anthropic_to_dict(self, response) -> dict:
"""Convert Anthropic response object to dict."""
return {
"content": response.content,
"model": response.model,
"stop_reason": response.stop_reason,
"usage": {
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
},
}
Embeddings Adapter¶
"""Embeddings adapter with CBAI/direct switching."""
import httpx
from ..cbai import CBAIClient, CBAIConfig
class EmbeddingsAdapter:
"""Adapter for embeddings - switches between CBAI and direct Ollama."""
def __init__(self, ollama_base_url: str, cbai_config: CBAIConfig | None = None):
self.ollama_base_url = ollama_base_url
self.cbai_config = cbai_config or CBAIConfig()
self._cbai_client: CBAIClient | None = None
def _get_cbai_client(self) -> CBAIClient:
if self._cbai_client is None:
self._cbai_client = CBAIClient(self.cbai_config)
return self._cbai_client
async def create_embedding(
self,
text: str,
model: str = "nomic-embed-text:latest",
) -> list[float]:
"""Create embedding - routes to CBAI or direct Ollama."""
use_cbai = (
self.cbai_config.CBAI_ENABLED
and self.cbai_config.CBAI_EMBED_ENABLED
)
if use_cbai:
try:
cbai = self._get_cbai_client()
result = await cbai.embed(text=text, model=model.replace(":latest", ""))
# CBAI returns {"embeddings": [[...]], "dimensions": 768}
return result["embeddings"][0]
except Exception as e:
if self.cbai_config.CBAI_FALLBACK_ENABLED:
logger.warning(f"CBAI embed error, falling back to direct: {e}")
else:
raise
# Direct Ollama call
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{self.ollama_base_url}/api/embeddings",
json={"model": model, "prompt": text},
)
response.raise_for_status()
return response.json()["embedding"]
async def create_batch_embeddings(
self,
texts: list[str],
model: str = "nomic-embed-text:latest",
) -> list[list[float]]:
"""Create batch embeddings - CBAI supports native batching."""
use_cbai = (
self.cbai_config.CBAI_ENABLED
and self.cbai_config.CBAI_EMBED_ENABLED
)
if use_cbai:
try:
cbai = self._get_cbai_client()
result = await cbai.embed(text=texts, model=model.replace(":latest", ""))
return result["embeddings"]
except Exception as e:
if self.cbai_config.CBAI_FALLBACK_ENABLED:
logger.warning(f"CBAI batch embed error, falling back: {e}")
else:
raise
# Direct Ollama (sequential)
embeddings = []
for text in texts:
embedding = await self.create_embedding(text, model)
embeddings.append(embedding)
return embeddings
Phase 3: Integration Points¶
Update cb_chat.py¶
# Before (direct):
def get_anthropic_client() -> Anthropic:
settings = get_settings()
return Anthropic(api_key=settings.ANTHROPIC_API_KEY)
# After (adapter):
def get_chat_adapter() -> ChatAdapter:
settings = get_settings()
return ChatAdapter(
anthropic_api_key=settings.ANTHROPIC_API_KEY,
cbai_config=CBAIConfig(),
)
Update embeddings.py¶
# Before (direct):
async def create_embedding_with_retry(text: str, model: str) -> list[float]:
async with httpx.AsyncClient() as client:
response = await client.post(f"{ollama_url}/api/embeddings", ...)
# After (adapter):
_embeddings_adapter: EmbeddingsAdapter | None = None
def get_embeddings_adapter() -> EmbeddingsAdapter:
global _embeddings_adapter
if _embeddings_adapter is None:
settings = get_settings()
_embeddings_adapter = EmbeddingsAdapter(
ollama_base_url=settings.OLLAMA_BASE_URL,
cbai_config=CBAIConfig(),
)
return _embeddings_adapter
async def create_embedding_with_retry(text: str, model: str) -> list[float]:
adapter = get_embeddings_adapter()
return await adapter.create_embedding(text, model)
Phase 4: Configuration¶
Add to .env:
# CBAI Integration (optional - disabled by default)
CBAI_ENABLED=false
CBAI_BASE_URL=https://ai.nominate.ai
CBAI_CHAT_ENABLED=true
CBAI_EMBED_ENABLED=true
CBAI_FALLBACK_ENABLED=true
CBAI_TIMEOUT=60
Add to src/api/config.py:
class Settings(BaseSettings):
# ... existing settings ...
# CBAI Integration
CBAI_ENABLED: bool = False
CBAI_BASE_URL: str = "https://ai.nominate.ai"
CBAI_CHAT_ENABLED: bool = True
CBAI_EMBED_ENABLED: bool = True
CBAI_FALLBACK_ENABLED: bool = True
CBAI_TIMEOUT: int = 60
Migration Steps¶
Step 1: Create Module Structure¶
Step 2: Implement Client and Adapters¶
- Create
config.py,client.py,chat.py,embeddings.py - Add comprehensive error handling
- Include logging for debugging
Step 3: Add Feature Flags¶
- Add CBAI settings to
src/api/config.py - Document in
.env.example
Step 4: Update Integration Points¶
- Modify
cb_chat.pyto use ChatAdapter - Modify
embeddings.pyto use EmbeddingsAdapter - Keep direct code paths for fallback
Step 5: Testing¶
- Unit tests for adapters
- Integration tests with CBAI
- E2E tests with feature flag on/off
Step 6: Staged Rollout¶
- Enable on testsite first (
CBAI_ENABLED=true) - Monitor for 1 week
- Enable on remaining tenants
Tool Use Support (Full!)¶
CBAI provides complete tool use support via POST /api/v1/chat/tools:
Endpoint¶
Request Schema¶
{
"messages": [
{"role": "user", "content": "What campaign data do we have?"},
# Supports content blocks for tool interactions:
# - TextBlock: {"type": "text", "text": "..."}
# - ToolUseBlock: {"type": "tool_use", "id": "...", "name": "...", "input": {...}}
# - ToolResultBlock: {"type": "tool_result", "tool_use_id": "...", "content": "..."}
],
"tools": [
{
"name": "list_campaign_sources",
"description": "List all campaign data sources and their fields",
"input_schema": {
"type": "object",
"properties": {},
"required": []
}
}
],
"tool_choice": "auto", # auto | any | none | {"type": "tool", "name": "..."}
"system": "You are a campaign data assistant...",
"model": "claude-sonnet-4-5-20250929", # Optional
"max_tokens": 2048,
"temperature": 0.7,
"stream": false
}
Response Schema¶
{
"content": [
{"type": "text", "text": "Let me check your campaign data..."},
{"type": "tool_use", "id": "toolu_01abc", "name": "list_campaign_sources", "input": {}}
],
"model": "claude-sonnet-4-5-20250929",
"stop_reason": "tool_use", # "end_turn" | "tool_use" | "max_tokens"
"usage": {"input_tokens": 150, "output_tokens": 50}
}
Tool Use Flow¶
- Send messages with
toolsarray - Check
stop_reasonin response - If
"tool_use": Execute requested tools, send backtool_resultblocks - Repeat until
stop_reasonis"end_turn"
Streaming Support¶
When stream: true, returns newline-delimited JSON:
{"type": "text", "text": "Let me "}
{"type": "text", "text": "check..."}
{"type": "tool_use", "id": "toolu_01abc", "name": "list_campaign_sources", "input": {}}
{"type": "message_complete", "stop_reason": "tool_use", "usage": {...}}
This means 100% of cbapp chat traffic can route through CBAI!
Other Considerations¶
Streaming Support¶
Current Status: CBAI supports streaming (stream: true), but cbapp doesn't currently use streaming.
Impact: None for current implementation.
Future: Could improve UX with streaming responses in chat interface.
Model Selection¶
CBAI Defaults:
- Ollama: mistral-small3.2:latest
- Claude: claude-sonnet-4-5-20250929
cbapp Current:
- Claude: claude-sonnet-4-20250514
- Ollama embed: nomic-embed-text:latest
Note: Model versions differ slightly - verify compatibility during testing.
Rollback Plan¶
If issues occur after enabling CBAI:
- Immediate: Set
CBAI_ENABLED=falsein.env - Restart:
sudo systemctl restart {tenant}-api - Verify: Check
/api/cb-chat/healthreturnsanthropic_configured: true
No code changes required - adapter automatically routes to direct calls.
Success Metrics¶
| Metric | Target | Measurement |
|---|---|---|
| Response latency | ≤10% increase | Compare CBAI vs direct timing |
| Error rate | ≤1% | Monitor CBAI health endpoint |
| User experience | No change | No UI/behavior differences |
| Token accuracy | 100% match | Compare usage tracking |
Timeline¶
| Phase | Duration | Deliverable |
|---|---|---|
| 1. Module creation | 1 day | src/lib/cbai/ implemented |
| 2. Adapter integration | 1 day | Integration points updated |
| 3. Testing | 2 days | Unit + integration tests passing |
| 4. Testsite rollout | 1 week | Monitoring on testsite |
| 5. Full rollout | 1 day | All tenants enabled |
Questions for AI Team¶
Tool Use: Can CBAI be extended to support Anthropic's tool use format?ANSWERED: Yes!/api/v1/chat/toolsfully supports it!- Authentication: Should we add tenant ID headers for usage tracking?
- Rate Limits: Are there per-tenant or global rate limits?
- SLA: What's the expected uptime for ai.nominate.ai?
- Model Updates: How will model version updates be communicated?
Related Documentation¶
This plan enables incremental adoption of CBAI while maintaining full backward compatibility. The user experience remains unchanged, but infrastructure becomes more manageable.