Skip to content

CBAI Integration Plan

Non-Destructive Migration from Direct Claude/Ollama Calls to Unified AI Service

Created: 2026-01-06


Executive Summary

This document outlines a non-destructive approach to integrating the new CBAI (Campaign Brain AI) unified service into cbapp. The migration replaces direct Anthropic and Ollama API calls with calls to ai.nominate.ai, providing:

  • Unified provider management - Single service handles Claude, Ollama, Mistral
  • Full tool use support - /api/v1/chat/tools enables 100% traffic routing
  • Usage tracking - Centralized metrics across all tenants
  • Simplified configuration - No per-tenant API key distribution
  • Future flexibility - Easy provider switching without code changes
  • Zero user impact - Transparent migration with identical behavior

Current Architecture

AI Integration Points

Location Purpose Current Implementation
src/api/routes/cb_chat.py:178-186 Chat API Direct Anthropic() client
src/api/routes/cb_chat.py:480 Claude calls client.messages.create()
src/lib/cbchat/engine.py:85-89 ChatEngine Direct Anthropic() client
src/lib/cbchat/engine.py:214 Engine calls client.messages.create()
src/api/routes/embeddings.py:188 Embeddings Direct httpx to Ollama
src/api/config.py:61-62 Config ANTHROPIC_API_KEY, etc.

Current Dependencies

# Direct Anthropic usage
from anthropic import Anthropic
client = Anthropic(api_key=settings.ANTHROPIC_API_KEY)
response = client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=2048,
    system=system_prompt,
    messages=messages,
    tools=tools,  # Tool use support
)

# Direct Ollama usage
async with httpx.AsyncClient(timeout=30.0) as client:
    response = await client.post(
        f"{ollama_url}/api/embeddings",
        json={"model": model, "prompt": text},
    )

CBAI API Overview

Base URL: https://ai.nominate.ai

Endpoints

Endpoint Method Purpose
/api/v1/chat POST Chat completion (Ollama/Claude)
/api/v1/embed POST Text embeddings (768 dim)
/api/v1/summarize POST Text summarization
/api/v1/topics POST Topic extraction
/api/v1/ocr POST Document OCR
/api/v1/health GET Provider health status
/api/v1/usage GET Usage metrics

Chat API

# Request
POST /api/v1/chat?provider=claude
{
    "messages": [{"role": "user", "content": "..."}],
    "model": "claude-sonnet-4-5-20250929",  # Optional, uses default
    "max_tokens": 2048,
    "temperature": 0.7,
    "stream": false
}

# Response
{
    "content": "...",
    "model": "claude-sonnet-4-5-20250929",
    "usage": {"input_tokens": 100, "output_tokens": 50},
    "finish_reason": "stop"
}

Embeddings API

# Request
POST /api/v1/embed
{
    "text": "text to embed" | ["text1", "text2"],  # Single or batch
    "model": "nomic-embed-text"  # Optional
}

# Response
{
    "embeddings": [[0.1, 0.2, ...]],  # Always 2D array
    "model": "nomic-embed-text",
    "dimensions": 768
}

Integration Strategy

Design Principles

  1. Feature Flag Control - Toggle between direct and CBAI modes
  2. Adapter Pattern - Maintain existing interfaces, swap implementation
  3. Graceful Degradation - Fall back to direct calls if CBAI unavailable
  4. Zero Breaking Changes - All existing code continues to work
  5. Incremental Rollout - Enable per-tenant or globally

Phase 1: Create CBAI Client Module

Create src/lib/cbai/ module with:

src/lib/cbai/
├── __init__.py
├── client.py      # Main CBAI client
├── chat.py        # Chat adapter
├── embeddings.py  # Embeddings adapter
└── config.py      # CBAI configuration

src/lib/cbai/config.py

"""CBAI configuration and feature flags."""
from pydantic import Field
from pydantic_settings import BaseSettings


class CBAIConfig(BaseSettings):
    """Configuration for CBAI integration."""

    # Feature flags
    CBAI_ENABLED: bool = Field(default=False, description="Enable CBAI integration")
    CBAI_CHAT_ENABLED: bool = Field(default=True, description="Use CBAI for chat")
    CBAI_EMBED_ENABLED: bool = Field(default=True, description="Use CBAI for embeddings")

    # Service configuration
    CBAI_BASE_URL: str = Field(default="https://ai.nominate.ai", description="CBAI base URL")
    CBAI_TIMEOUT: int = Field(default=60, description="Request timeout in seconds")

    # Fallback behavior
    CBAI_FALLBACK_ENABLED: bool = Field(default=True, description="Fall back to direct calls on error")

    model_config = {"env_file": ".env", "extra": "allow"}

src/lib/cbai/client.py

"""CBAI unified client."""
import httpx
import logging
from typing import Any

from .config import CBAIConfig

logger = logging.getLogger(__name__)


class CBAIClient:
    """Client for CBAI unified AI service."""

    def __init__(self, config: CBAIConfig | None = None):
        self.config = config or CBAIConfig()
        self._http_client: httpx.AsyncClient | None = None

    async def _get_client(self) -> httpx.AsyncClient:
        if self._http_client is None:
            self._http_client = httpx.AsyncClient(
                base_url=self.config.CBAI_BASE_URL,
                timeout=self.config.CBAI_TIMEOUT,
            )
        return self._http_client

    async def chat(
        self,
        messages: list[dict],
        provider: str = "claude",
        model: str | None = None,
        max_tokens: int = 2048,
        temperature: float = 0.7,
        system: str | None = None,
        tools: list[dict] | None = None,
        stream: bool = False,
    ) -> dict:
        """
        Send chat request to CBAI.

        Note: Tool use requires special handling - CBAI may need extension
        to support Anthropic's tool format.
        """
        client = await self._get_client()

        # Prepend system message if provided
        if system:
            messages = [{"role": "system", "content": system}] + messages

        payload = {
            "messages": messages,
            "max_tokens": max_tokens,
            "temperature": temperature,
            "stream": stream,
        }
        if model:
            payload["model"] = model

        # Use /chat/tools endpoint if tools provided
        if tools:
            return await self.chat_with_tools(
                messages=messages,
                tools=tools,
                system=system,
                model=model,
                max_tokens=max_tokens,
                temperature=temperature,
            )

        response = await client.post(
            f"/api/v1/chat",
            params={"provider": provider},
            json=payload,
        )
        response.raise_for_status()
        return response.json()

    async def chat_with_tools(
        self,
        messages: list[dict],
        tools: list[dict],
        system: str | None = None,
        model: str | None = None,
        max_tokens: int = 2048,
        temperature: float = 0.7,
        tool_choice: str | dict = "auto",
    ) -> dict:
        """
        Chat with tool use support via /api/v1/chat/tools.

        This endpoint supports the full Anthropic tool use format.
        """
        client = await self._get_client()

        payload = {
            "messages": messages,
            "tools": tools,
            "tool_choice": tool_choice,
            "max_tokens": max_tokens,
            "temperature": temperature,
        }
        if system:
            payload["system"] = system
        if model:
            payload["model"] = model

        response = await client.post("/api/v1/chat/tools", json=payload)
        response.raise_for_status()
        return response.json()

    async def embed(
        self,
        text: str | list[str],
        model: str = "nomic-embed-text",
    ) -> dict:
        """Generate embeddings via CBAI."""
        client = await self._get_client()

        response = await client.post(
            "/api/v1/embed",
            json={"text": text, "model": model},
        )
        response.raise_for_status()
        return response.json()

    async def health(self) -> dict:
        """Check CBAI health status."""
        client = await self._get_client()
        response = await client.get("/api/v1/health")
        response.raise_for_status()
        return response.json()

    async def close(self):
        """Close HTTP client."""
        if self._http_client:
            await self._http_client.aclose()
            self._http_client = None

Phase 2: Create Adapters

Chat Adapter

"""Chat adapter with CBAI/direct switching."""
from anthropic import Anthropic
from ..cbai import CBAIClient, CBAIConfig

class ChatAdapter:
    """Adapter for chat completions - switches between CBAI and direct Anthropic."""

    def __init__(self, anthropic_api_key: str, cbai_config: CBAIConfig | None = None):
        self.anthropic_api_key = anthropic_api_key
        self.cbai_config = cbai_config or CBAIConfig()
        self._anthropic_client: Anthropic | None = None
        self._cbai_client: CBAIClient | None = None

    def _get_anthropic_client(self) -> Anthropic:
        if self._anthropic_client is None:
            self._anthropic_client = Anthropic(api_key=self.anthropic_api_key)
        return self._anthropic_client

    def _get_cbai_client(self) -> CBAIClient:
        if self._cbai_client is None:
            self._cbai_client = CBAIClient(self.cbai_config)
        return self._cbai_client

    async def create_message(
        self,
        model: str,
        max_tokens: int,
        system: str,
        messages: list[dict],
        tools: list[dict] | None = None,
    ) -> dict:
        """
        Create chat message - routes to CBAI or direct Anthropic.

        Returns dict matching Anthropic response structure for compatibility.
        """
        use_cbai = (
            self.cbai_config.CBAI_ENABLED
            and self.cbai_config.CBAI_CHAT_ENABLED
        )

        if use_cbai:
            try:
                cbai = self._get_cbai_client()

                # Use /chat/tools endpoint if tools provided
                if tools:
                    result = await cbai.chat_with_tools(
                        messages=messages,
                        tools=tools,
                        system=system,
                        model=model,
                        max_tokens=max_tokens,
                    )
                else:
                    result = await cbai.chat(
                        messages=messages,
                        provider="claude",
                        model=model,
                        max_tokens=max_tokens,
                        system=system,
                    )
                # Transform CBAI response to match Anthropic structure
                return self._transform_cbai_response(result)

            except Exception as e:
                if self.cbai_config.CBAI_FALLBACK_ENABLED:
                    logger.warning(f"CBAI error, falling back to direct: {e}")
                else:
                    raise

        # Direct Anthropic call
        client = self._get_anthropic_client()
        response = client.messages.create(
            model=model,
            max_tokens=max_tokens,
            system=system,
            messages=messages,
            tools=tools,
        )
        return self._anthropic_to_dict(response)

    def _transform_cbai_response(self, cbai_response: dict) -> dict:
        """Transform CBAI response to match Anthropic response structure."""
        return {
            "content": [{"type": "text", "text": cbai_response["content"]}],
            "model": cbai_response.get("model", ""),
            "stop_reason": cbai_response.get("finish_reason", "stop"),
            "usage": {
                "input_tokens": cbai_response.get("usage", {}).get("input_tokens", 0),
                "output_tokens": cbai_response.get("usage", {}).get("output_tokens", 0),
            },
        }

    def _anthropic_to_dict(self, response) -> dict:
        """Convert Anthropic response object to dict."""
        return {
            "content": response.content,
            "model": response.model,
            "stop_reason": response.stop_reason,
            "usage": {
                "input_tokens": response.usage.input_tokens,
                "output_tokens": response.usage.output_tokens,
            },
        }

Embeddings Adapter

"""Embeddings adapter with CBAI/direct switching."""
import httpx
from ..cbai import CBAIClient, CBAIConfig

class EmbeddingsAdapter:
    """Adapter for embeddings - switches between CBAI and direct Ollama."""

    def __init__(self, ollama_base_url: str, cbai_config: CBAIConfig | None = None):
        self.ollama_base_url = ollama_base_url
        self.cbai_config = cbai_config or CBAIConfig()
        self._cbai_client: CBAIClient | None = None

    def _get_cbai_client(self) -> CBAIClient:
        if self._cbai_client is None:
            self._cbai_client = CBAIClient(self.cbai_config)
        return self._cbai_client

    async def create_embedding(
        self,
        text: str,
        model: str = "nomic-embed-text:latest",
    ) -> list[float]:
        """Create embedding - routes to CBAI or direct Ollama."""
        use_cbai = (
            self.cbai_config.CBAI_ENABLED
            and self.cbai_config.CBAI_EMBED_ENABLED
        )

        if use_cbai:
            try:
                cbai = self._get_cbai_client()
                result = await cbai.embed(text=text, model=model.replace(":latest", ""))
                # CBAI returns {"embeddings": [[...]], "dimensions": 768}
                return result["embeddings"][0]

            except Exception as e:
                if self.cbai_config.CBAI_FALLBACK_ENABLED:
                    logger.warning(f"CBAI embed error, falling back to direct: {e}")
                else:
                    raise

        # Direct Ollama call
        async with httpx.AsyncClient(timeout=30.0) as client:
            response = await client.post(
                f"{self.ollama_base_url}/api/embeddings",
                json={"model": model, "prompt": text},
            )
            response.raise_for_status()
            return response.json()["embedding"]

    async def create_batch_embeddings(
        self,
        texts: list[str],
        model: str = "nomic-embed-text:latest",
    ) -> list[list[float]]:
        """Create batch embeddings - CBAI supports native batching."""
        use_cbai = (
            self.cbai_config.CBAI_ENABLED
            and self.cbai_config.CBAI_EMBED_ENABLED
        )

        if use_cbai:
            try:
                cbai = self._get_cbai_client()
                result = await cbai.embed(text=texts, model=model.replace(":latest", ""))
                return result["embeddings"]
            except Exception as e:
                if self.cbai_config.CBAI_FALLBACK_ENABLED:
                    logger.warning(f"CBAI batch embed error, falling back: {e}")
                else:
                    raise

        # Direct Ollama (sequential)
        embeddings = []
        for text in texts:
            embedding = await self.create_embedding(text, model)
            embeddings.append(embedding)
        return embeddings

Phase 3: Integration Points

Update cb_chat.py

# Before (direct):
def get_anthropic_client() -> Anthropic:
    settings = get_settings()
    return Anthropic(api_key=settings.ANTHROPIC_API_KEY)

# After (adapter):
def get_chat_adapter() -> ChatAdapter:
    settings = get_settings()
    return ChatAdapter(
        anthropic_api_key=settings.ANTHROPIC_API_KEY,
        cbai_config=CBAIConfig(),
    )

Update embeddings.py

# Before (direct):
async def create_embedding_with_retry(text: str, model: str) -> list[float]:
    async with httpx.AsyncClient() as client:
        response = await client.post(f"{ollama_url}/api/embeddings", ...)

# After (adapter):
_embeddings_adapter: EmbeddingsAdapter | None = None

def get_embeddings_adapter() -> EmbeddingsAdapter:
    global _embeddings_adapter
    if _embeddings_adapter is None:
        settings = get_settings()
        _embeddings_adapter = EmbeddingsAdapter(
            ollama_base_url=settings.OLLAMA_BASE_URL,
            cbai_config=CBAIConfig(),
        )
    return _embeddings_adapter

async def create_embedding_with_retry(text: str, model: str) -> list[float]:
    adapter = get_embeddings_adapter()
    return await adapter.create_embedding(text, model)

Phase 4: Configuration

Add to .env:

# CBAI Integration (optional - disabled by default)
CBAI_ENABLED=false
CBAI_BASE_URL=https://ai.nominate.ai
CBAI_CHAT_ENABLED=true
CBAI_EMBED_ENABLED=true
CBAI_FALLBACK_ENABLED=true
CBAI_TIMEOUT=60

Add to src/api/config.py:

class Settings(BaseSettings):
    # ... existing settings ...

    # CBAI Integration
    CBAI_ENABLED: bool = False
    CBAI_BASE_URL: str = "https://ai.nominate.ai"
    CBAI_CHAT_ENABLED: bool = True
    CBAI_EMBED_ENABLED: bool = True
    CBAI_FALLBACK_ENABLED: bool = True
    CBAI_TIMEOUT: int = 60

Migration Steps

Step 1: Create Module Structure

mkdir -p src/lib/cbai
touch src/lib/cbai/__init__.py

Step 2: Implement Client and Adapters

  • Create config.py, client.py, chat.py, embeddings.py
  • Add comprehensive error handling
  • Include logging for debugging

Step 3: Add Feature Flags

  • Add CBAI settings to src/api/config.py
  • Document in .env.example

Step 4: Update Integration Points

  • Modify cb_chat.py to use ChatAdapter
  • Modify embeddings.py to use EmbeddingsAdapter
  • Keep direct code paths for fallback

Step 5: Testing

  • Unit tests for adapters
  • Integration tests with CBAI
  • E2E tests with feature flag on/off

Step 6: Staged Rollout

  1. Enable on testsite first (CBAI_ENABLED=true)
  2. Monitor for 1 week
  3. Enable on remaining tenants

Tool Use Support (Full!)

CBAI provides complete tool use support via POST /api/v1/chat/tools:

Endpoint

POST /api/v1/chat/tools

Request Schema

{
    "messages": [
        {"role": "user", "content": "What campaign data do we have?"},
        # Supports content blocks for tool interactions:
        # - TextBlock: {"type": "text", "text": "..."}
        # - ToolUseBlock: {"type": "tool_use", "id": "...", "name": "...", "input": {...}}
        # - ToolResultBlock: {"type": "tool_result", "tool_use_id": "...", "content": "..."}
    ],
    "tools": [
        {
            "name": "list_campaign_sources",
            "description": "List all campaign data sources and their fields",
            "input_schema": {
                "type": "object",
                "properties": {},
                "required": []
            }
        }
    ],
    "tool_choice": "auto",  # auto | any | none | {"type": "tool", "name": "..."}
    "system": "You are a campaign data assistant...",
    "model": "claude-sonnet-4-5-20250929",  # Optional
    "max_tokens": 2048,
    "temperature": 0.7,
    "stream": false
}

Response Schema

{
    "content": [
        {"type": "text", "text": "Let me check your campaign data..."},
        {"type": "tool_use", "id": "toolu_01abc", "name": "list_campaign_sources", "input": {}}
    ],
    "model": "claude-sonnet-4-5-20250929",
    "stop_reason": "tool_use",  # "end_turn" | "tool_use" | "max_tokens"
    "usage": {"input_tokens": 150, "output_tokens": 50}
}

Tool Use Flow

  1. Send messages with tools array
  2. Check stop_reason in response
  3. If "tool_use": Execute requested tools, send back tool_result blocks
  4. Repeat until stop_reason is "end_turn"

Streaming Support

When stream: true, returns newline-delimited JSON:

{"type": "text", "text": "Let me "}
{"type": "text", "text": "check..."}
{"type": "tool_use", "id": "toolu_01abc", "name": "list_campaign_sources", "input": {}}
{"type": "message_complete", "stop_reason": "tool_use", "usage": {...}}

This means 100% of cbapp chat traffic can route through CBAI!


Other Considerations

Streaming Support

Current Status: CBAI supports streaming (stream: true), but cbapp doesn't currently use streaming.

Impact: None for current implementation.

Future: Could improve UX with streaming responses in chat interface.

Model Selection

CBAI Defaults: - Ollama: mistral-small3.2:latest - Claude: claude-sonnet-4-5-20250929

cbapp Current: - Claude: claude-sonnet-4-20250514 - Ollama embed: nomic-embed-text:latest

Note: Model versions differ slightly - verify compatibility during testing.


Rollback Plan

If issues occur after enabling CBAI:

  1. Immediate: Set CBAI_ENABLED=false in .env
  2. Restart: sudo systemctl restart {tenant}-api
  3. Verify: Check /api/cb-chat/health returns anthropic_configured: true

No code changes required - adapter automatically routes to direct calls.


Success Metrics

Metric Target Measurement
Response latency ≤10% increase Compare CBAI vs direct timing
Error rate ≤1% Monitor CBAI health endpoint
User experience No change No UI/behavior differences
Token accuracy 100% match Compare usage tracking

Timeline

Phase Duration Deliverable
1. Module creation 1 day src/lib/cbai/ implemented
2. Adapter integration 1 day Integration points updated
3. Testing 2 days Unit + integration tests passing
4. Testsite rollout 1 week Monitoring on testsite
5. Full rollout 1 day All tenants enabled

Questions for AI Team

  1. Tool Use: Can CBAI be extended to support Anthropic's tool use format? ANSWERED: Yes! /api/v1/chat/tools fully supports it!
  2. Authentication: Should we add tenant ID headers for usage tracking?
  3. Rate Limits: Are there per-tenant or global rate limits?
  4. SLA: What's the expected uptime for ai.nominate.ai?
  5. Model Updates: How will model version updates be communicated?


This plan enables incremental adoption of CBAI while maintaining full backward compatibility. The user experience remains unchanged, but infrastructure becomes more manageable.