CBOS JSON Streaming Architecture¶
Problem Statement¶
The current CBOS architecture uses script -f to capture terminal output from interactive Claude Code sessions running in GNU Screen. This approach has fundamental issues:
- Terminal redraws - Screen updates multiple lines simultaneously
- ANSI escape codes - Complex to strip reliably
- State detection - Relies on fragile pattern matching
- Synchronization - Hard to know when output is "complete"
Solution: Pseudo-Interactive JSON Mode¶
Instead of running Claude interactively and scraping output, run repeated non-interactive invocations that resume the same session.
Key Claude Code CLI Flags¶
-p, --print # Non-interactive mode (single prompt, single response)
--output-format # text | json | stream-json
--resume SESSION_ID # Resume a previous conversation
--continue # Continue last conversation
--dangerously-skip-permissions # No permission prompts (for automation)
JSON Event Types¶
When using --output-format stream-json, Claude outputs newline-delimited JSON:
{"type":"init","session_id":"abc123","cwd":"/project"}
{"type":"user","message":"Your prompt here"}
{"type":"assistant","message":{"content":"I'll help you..."}}
{"type":"tool_use","tool":{"name":"Read","input":{"path":"/file.py"}}}
{"type":"tool_result","result":"file contents..."}
{"type":"result","subtype":"success","cost_usd":0.01,"duration_ms":5000}
New Architecture¶
┌─────────────────────────────────────────────────────────────────┐
│ CBOS TUI │
│ WebSocket client, displays parsed JSON events │
└─────────────────────────────┬───────────────────────────────────┘
│ ws://localhost:32205/ws/stream
▼
┌─────────────────────────────────────────────────────────────────┐
│ CBOS API Server │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ JSONSessionManager ││
│ │ - Manages session state (waiting, running, complete) ││
│ │ - Spawns Claude CLI processes ││
│ │ - Parses stream-json output ││
│ │ - Broadcasts events to WebSocket clients ││
│ └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────┬───────────────────────────────────┘
│ subprocess
▼
┌─────────────────────────────────────────────────────────────────┐
│ Claude CLI Processes │
│ claude -p "prompt" --output-format stream-json --resume ID │
│ │
│ Each invocation is short-lived, session persists via resume │
└─────────────────────────────────────────────────────────────────┘
Session States¶
| State | Description |
|---|---|
idle |
Waiting for user to send a prompt |
running |
Claude process is executing |
waiting_permission |
Tool needs approval (if not using --dangerously-skip-permissions) |
complete |
Response finished, ready for next prompt |
error |
Process exited with error |
API Changes¶
New Endpoints¶
POST /sessions/{slug}/invoke
Body: {"prompt": "Your message", "options": {...}}
Returns: Starts async invocation, events streamed via WebSocket
GET /sessions/{slug}/events
Returns: Recent parsed JSON events for this session
WebSocket Protocol Updates¶
// Server -> Client: Parsed Claude events
{
"type": "claude_event",
"session": "AUTH",
"event": {
"type": "assistant",
"message": {"content": "I'll read that file..."}
},
"timestamp": 1704326400.123
}
// Server -> Client: Session state change
{
"type": "session_state",
"session": "AUTH",
"state": "running"
}
// Client -> Server: Send prompt
{
"type": "invoke",
"session": "AUTH",
"prompt": "Yes, please continue"
}
Implementation¶
JSONSessionManager¶
import asyncio
import json
from dataclasses import dataclass, field
from enum import Enum
from typing import AsyncIterator, Optional, Callable
class SessionState(Enum):
IDLE = "idle"
RUNNING = "running"
COMPLETE = "complete"
ERROR = "error"
@dataclass
class JSONSession:
slug: str
path: str
session_id: Optional[str] = None # Claude's internal session ID
state: SessionState = SessionState.IDLE
events: list[dict] = field(default_factory=list)
process: Optional[asyncio.subprocess.Process] = None
class JSONSessionManager:
"""Manages Claude Code sessions using JSON streaming mode"""
def __init__(self, claude_command: str = "claude"):
self.claude_command = claude_command
self._sessions: dict[str, JSONSession] = {}
self._callbacks: list[Callable] = []
def create_session(self, slug: str, path: str) -> JSONSession:
"""Create a new session (just metadata, no process yet)"""
session = JSONSession(slug=slug, path=path)
self._sessions[slug] = session
return session
async def invoke(
self,
slug: str,
prompt: str,
skip_permissions: bool = True
) -> AsyncIterator[dict]:
"""
Invoke Claude with a prompt and stream JSON events.
This spawns a Claude process, parses its stream-json output,
and yields events as they arrive.
"""
session = self._sessions.get(slug)
if not session:
raise ValueError(f"Session {slug} not found")
if session.state == SessionState.RUNNING:
raise ValueError(f"Session {slug} is already running")
# Build command
cmd = [
self.claude_command,
"-p", prompt,
"--output-format", "stream-json",
]
if skip_permissions:
cmd.append("--dangerously-skip-permissions")
# Resume existing session or start new
if session.session_id:
cmd.extend(["--resume", session.session_id])
session.state = SessionState.RUNNING
await self._emit_state_change(session)
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=session.path,
env={**os.environ, "NO_COLOR": "1"}
)
session.process = process
# Stream stdout line by line
async for line in process.stdout:
line = line.decode('utf-8').strip()
if not line:
continue
try:
event = json.loads(line)
session.events.append(event)
# Extract session_id from init event
if event.get("type") == "init":
session.session_id = event.get("session_id")
yield event
await self._emit_event(session, event)
except json.JSONDecodeError:
# Non-JSON output (shouldn't happen but handle gracefully)
yield {"type": "raw", "content": line}
# Wait for process to complete
await process.wait()
if process.returncode == 0:
session.state = SessionState.COMPLETE
else:
stderr = await process.stderr.read()
session.state = SessionState.ERROR
yield {"type": "error", "message": stderr.decode('utf-8')}
except Exception as e:
session.state = SessionState.ERROR
yield {"type": "error", "message": str(e)}
finally:
session.process = None
await self._emit_state_change(session)
async def _emit_event(self, session: JSONSession, event: dict):
"""Notify callbacks of a new event"""
for callback in self._callbacks:
try:
await callback({
"type": "claude_event",
"session": session.slug,
"event": event
})
except Exception:
pass
async def _emit_state_change(self, session: JSONSession):
"""Notify callbacks of state change"""
for callback in self._callbacks:
try:
await callback({
"type": "session_state",
"session": session.slug,
"state": session.state.value
})
except Exception:
pass
def on_event(self, callback: Callable):
"""Register callback for events"""
self._callbacks.append(callback)
Migration Path¶
- Keep existing screen-based sessions - Don't break what works
- Add new JSON-based session type -
SessionType.SCREENvsSessionType.JSON - Feature flag - Let users choose which mode per session
- Gradual rollout - Default to JSON for new sessions
Pros and Cons¶
Pros¶
- ✅ Structured, parseable output
- ✅ Clear event types (assistant, tool_use, result)
- ✅ No ANSI code stripping needed
- ✅ Explicit state transitions
- ✅ Session ID tracking built-in
- ✅ Works well with WebSocket broadcasting
Cons¶
- ❌ Each interaction spawns a new process
- ❌ Slight latency between interactions
- ❌ May not work with all Claude Code features (MCP, etc.)
- ❌
--dangerously-skip-permissionsbypasses safety prompts
Alternative: Hybrid Mode¶
Keep screen-based sessions for the interactive TUI experience, but add JSON mode for: - Programmatic API access - Automated workflows - CI/CD integration
Next Steps¶
- Test
claude -p --output-format stream-jsonon your system - Verify
--resumeworks to continue sessions - Implement
JSONSessionManagerin CBOS - Add new WebSocket event types
- Update TUI to handle JSON events