# API Mesh - Project Initialization¶
A lightweight, Python-native distributed WebSocket network that proxies FastAPI services.
Project Overview¶
API Mesh creates a self-organizing network of WebSocket workers that wrap existing FastAPI services. Workers discover each other via broadcast, exchange API Cards describing their capabilities, and route messages to the appropriate service. A standalone Gateway provides external WebSocket access for web clients.
Core Concepts¶
- API Worker: Wraps a FastAPI service, parses its OpenAPI spec, handles proxied requests
- API Card: Service advertisement containing endpoints, schemas, and metadata
- Message Protocol: JSON envelope for all inter-worker communication
- Discovery: UDP broadcast for LAN-based worker discovery
- Gateway: External WebSocket entry point for clients
Project Structure¶
apimesh/
├── pyproject.toml
├── README.md
├── src/
│ └── apimesh/
│ ├── __init__.py
│ ├── py.typed
│ ├── models/
│ │ ├── __init__.py
│ │ ├── card.py # APICard, EndpointSpec
│ │ ├── messages.py # WSMessage, APIRequest, APIResponse
│ │ └── registry.py # ServiceRegistry
│ ├── worker/
│ │ ├── __init__.py
│ │ ├── worker.py # APIWorker main class
│ │ ├── parser.py # OpenAPI spec parser
│ │ └── proxy.py # HTTP proxy client
│ ├── discovery/
│ │ ├── __init__.py
│ │ ├── broadcast.py # UDP broadcast discovery
│ │ └── protocol.py # Discovery message handling
│ ├── gateway/
│ │ ├── __init__.py
│ │ ├── server.py # FastAPI WebSocket gateway
│ │ └── router.py # Message routing logic
│ ├── network/
│ │ ├── __init__.py
│ │ ├── mesh.py # WebSocket mesh connections
│ │ └── handlers.py # Message type handlers
│ ├── cli.py # CLI entry points
│ └── config.py # Configuration management
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_models/
│ ├── test_worker/
│ ├── test_discovery/
│ └── test_gateway/
└── examples/
├── sample_api/ # Example FastAPI to wrap
│ └── main.py
├── run_worker.py
└── run_gateway.py
Dependencies¶
pyproject.toml¶
[project]
name = "apimesh"
version = "0.1.0"
description = "Distributed WebSocket proxy network for FastAPI services"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.115.0",
"uvicorn[standard]>=0.32.0",
"websockets>=13.0",
"httpx>=0.27.0",
"pydantic>=2.9.0",
"pydantic-settings>=2.6.0",
"structlog>=24.0.0",
"typer>=0.12.0",
"rich>=13.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.24.0",
"pytest-cov>=5.0.0",
"ruff>=0.7.0",
"mypy>=1.13.0",
"pre-commit>=4.0.0",
]
[project.scripts]
apimesh = "apimesh.cli:app"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/apimesh"]
[tool.ruff]
target-version = "py311"
line-length = 88
src = ["src", "tests"]
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # Pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
"ARG", # flake8-unused-arguments
"SIM", # flake8-simplify
]
ignore = ["E501"] # line length handled by formatter
[tool.ruff.lint.isort]
known-first-party = ["apimesh"]
[tool.mypy]
python_version = "3.11"
strict = true
warn_return_any = true
warn_unused_ignores = true
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
addopts = "-ra -q"
Core Models¶
src/apimesh/models/card.py¶
"""API Card models for service advertisement."""
from datetime import datetime, timezone
from typing import Any
from uuid import UUID, uuid4
from pydantic import BaseModel, Field, HttpUrl
class ParameterSpec(BaseModel):
"""OpenAPI parameter specification."""
name: str
location: str = Field(alias="in") # query, path, header, cookie
required: bool = False
schema_: dict[str, Any] = Field(default_factory=dict, alias="schema")
model_config = {"populate_by_name": True}
class EndpointSpec(BaseModel):
"""Specification for a single API endpoint."""
path: str
method: str
operation_id: str | None = None
summary: str | None = None
description: str | None = None
parameters: list[ParameterSpec] = Field(default_factory=list)
request_body: dict[str, Any] | None = None
responses: dict[str, Any] = Field(default_factory=dict)
tags: list[str] = Field(default_factory=list)
class APICard(BaseModel):
"""Service advertisement broadcast by workers.
Contains all information needed to route requests to this service.
"""
worker_id: UUID = Field(default_factory=uuid4)
service_name: str
service_version: str = "1.0.0"
base_url: HttpUrl
openapi_url: HttpUrl
endpoints: list[EndpointSpec] = Field(default_factory=list)
schemas: dict[str, Any] = Field(default_factory=dict)
tags: list[str] = Field(default_factory=list)
heartbeat: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
def refresh_heartbeat(self) -> None:
"""Update heartbeat timestamp."""
self.heartbeat = datetime.now(timezone.utc)
def get_endpoint(self, method: str, path: str) -> EndpointSpec | None:
"""Find endpoint by method and path."""
method_upper = method.upper()
for endpoint in self.endpoints:
if endpoint.method == method_upper and endpoint.path == path:
return endpoint
return None
def has_endpoint(self, method: str, path: str) -> bool:
"""Check if endpoint exists."""
return self.get_endpoint(method, path) is not None
src/apimesh/models/messages.py¶
"""WebSocket message protocol models."""
from datetime import datetime, timezone
from enum import StrEnum
from typing import Any, Literal
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
class MessageType(StrEnum):
"""Types of messages in the mesh network."""
REQUEST = "request"
RESPONSE = "response"
DISCOVER = "discover"
ANNOUNCE = "announce"
HEARTBEAT = "heartbeat"
ERROR = "error"
class WSMessage(BaseModel):
"""Base WebSocket message envelope.
All communication in the mesh uses this envelope format.
"""
id: UUID = Field(default_factory=uuid4)
type: MessageType
source: str # worker_id as string
target: str # worker_id, service_name, or "*" for broadcast
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
correlation_id: UUID | None = None # For request/response matching
payload: dict[str, Any] = Field(default_factory=dict)
def create_response(
self,
source: str,
payload: dict[str, Any],
*,
error: bool = False,
) -> "WSMessage":
"""Create a response message to this message."""
return WSMessage(
type=MessageType.ERROR if error else MessageType.RESPONSE,
source=source,
target=self.source,
correlation_id=self.id,
payload=payload,
)
class APIRequest(BaseModel):
"""Request payload for API calls through the mesh."""
method: Literal["GET", "POST", "PUT", "PATCH", "DELETE"]
path: str
path_params: dict[str, Any] = Field(default_factory=dict)
query_params: dict[str, Any] = Field(default_factory=dict)
body: Any = None
headers: dict[str, str] = Field(default_factory=dict)
timeout: float = 30.0
class APIResponse(BaseModel):
"""Response payload for API calls through the mesh."""
status_code: int
headers: dict[str, str] = Field(default_factory=dict)
body: Any = None
elapsed_ms: float = 0.0
class ErrorPayload(BaseModel):
"""Error information payload."""
code: str
message: str
details: dict[str, Any] = Field(default_factory=dict)
src/apimesh/models/registry.py¶
"""Service registry for tracking available workers."""
from datetime import datetime, timedelta, timezone
from uuid import UUID
from pydantic import BaseModel, Field
from apimesh.models.card import APICard
class ServiceRegistry(BaseModel):
"""Registry of known services in the mesh.
Tracks API Cards and manages service discovery state.
"""
services: dict[str, APICard] = Field(default_factory=dict) # worker_id -> card
by_name: dict[str, set[str]] = Field(default_factory=dict) # name -> worker_ids
ttl_seconds: int = 60 # Time before stale entries are removed
def register(self, card: APICard) -> None:
"""Register or update a service."""
worker_id = str(card.worker_id)
card.refresh_heartbeat()
# Update main registry
self.services[worker_id] = card
# Update name index
if card.service_name not in self.by_name:
self.by_name[card.service_name] = set()
self.by_name[card.service_name].add(worker_id)
def unregister(self, worker_id: str) -> APICard | None:
"""Remove a service from the registry."""
card = self.services.pop(worker_id, None)
if card:
name_set = self.by_name.get(card.service_name)
if name_set:
name_set.discard(worker_id)
if not name_set:
del self.by_name[card.service_name]
return card
def get_by_id(self, worker_id: str) -> APICard | None:
"""Get service by worker ID."""
return self.services.get(worker_id)
def get_by_name(self, service_name: str) -> list[APICard]:
"""Get all services with given name."""
worker_ids = self.by_name.get(service_name, set())
return [self.services[wid] for wid in worker_ids if wid in self.services]
def get_one_by_name(self, service_name: str) -> APICard | None:
"""Get one service by name (for simple routing)."""
cards = self.get_by_name(service_name)
return cards[0] if cards else None
def cleanup_stale(self) -> list[str]:
"""Remove stale entries. Returns list of removed worker IDs."""
now = datetime.now(timezone.utc)
cutoff = now - timedelta(seconds=self.ttl_seconds)
stale = [
wid for wid, card in self.services.items() if card.heartbeat < cutoff
]
for wid in stale:
self.unregister(wid)
return stale
def list_services(self) -> list[str]:
"""List all unique service names."""
return list(self.by_name.keys())
def list_workers(self) -> list[str]:
"""List all worker IDs."""
return list(self.services.keys())
Worker Implementation¶
src/apimesh/worker/parser.py¶
"""OpenAPI specification parser."""
from typing import Any
import httpx
from apimesh.models.card import APICard, EndpointSpec, ParameterSpec
async def fetch_openapi_spec(openapi_url: str) -> dict[str, Any]:
"""Fetch OpenAPI JSON specification from URL."""
async with httpx.AsyncClient() as client:
response = await client.get(openapi_url, timeout=10.0)
response.raise_for_status()
return response.json()
def parse_parameters(params: list[dict[str, Any]]) -> list[ParameterSpec]:
"""Parse OpenAPI parameters into ParameterSpec models."""
return [ParameterSpec.model_validate(p) for p in params]
def parse_endpoints(spec: dict[str, Any]) -> list[EndpointSpec]:
"""Parse all endpoints from OpenAPI spec."""
endpoints: list[EndpointSpec] = []
paths = spec.get("paths", {})
for path, methods in paths.items():
for method, details in methods.items():
if method.upper() not in {"GET", "POST", "PUT", "PATCH", "DELETE"}:
continue
endpoint = EndpointSpec(
path=path,
method=method.upper(),
operation_id=details.get("operationId"),
summary=details.get("summary"),
description=details.get("description"),
parameters=parse_parameters(details.get("parameters", [])),
request_body=details.get("requestBody"),
responses=details.get("responses", {}),
tags=details.get("tags", []),
)
endpoints.append(endpoint)
return endpoints
def extract_service_info(spec: dict[str, Any]) -> tuple[str, str]:
"""Extract service name and version from spec."""
info = spec.get("info", {})
title = info.get("title", "unknown-service")
version = info.get("version", "1.0.0")
# Convert title to slug format
service_name = title.lower().replace(" ", "-")
return service_name, version
def extract_schemas(spec: dict[str, Any]) -> dict[str, Any]:
"""Extract component schemas from spec."""
components = spec.get("components", {})
return components.get("schemas", {})
def extract_tags(spec: dict[str, Any]) -> list[str]:
"""Extract tag names from spec."""
tags = spec.get("tags", [])
return [t.get("name", "") for t in tags if t.get("name")]
async def parse_openapi_to_card(
base_url: str,
openapi_path: str = "/openapi.json",
) -> APICard:
"""Parse OpenAPI spec into an APICard.
Args:
base_url: Base URL of the API (e.g., "http://localhost:8000")
openapi_path: Path to OpenAPI JSON (default: /openapi.json)
Returns:
APICard populated from the OpenAPI specification
"""
openapi_url = f"{base_url.rstrip('/')}{openapi_path}"
spec = await fetch_openapi_spec(openapi_url)
service_name, version = extract_service_info(spec)
return APICard(
service_name=service_name,
service_version=version,
base_url=base_url, # type: ignore[arg-type]
openapi_url=openapi_url, # type: ignore[arg-type]
endpoints=parse_endpoints(spec),
schemas=extract_schemas(spec),
tags=extract_tags(spec),
)
src/apimesh/worker/proxy.py¶
"""HTTP proxy client for making requests to wrapped services."""
import re
import time
from typing import Any
import httpx
from apimesh.models.messages import APIRequest, APIResponse
def build_url(base_url: str, path: str, path_params: dict[str, Any]) -> str:
"""Build full URL with path parameters substituted."""
# Replace {param} placeholders with actual values
resolved_path = path
for key, value in path_params.items():
resolved_path = re.sub(
rf"\{{{key}\}}",
str(value),
resolved_path,
)
return f"{base_url.rstrip('/')}{resolved_path}"
async def proxy_request(
base_url: str,
request: APIRequest,
*,
client: httpx.AsyncClient | None = None,
) -> APIResponse:
"""Proxy an API request to the target service.
Args:
base_url: Base URL of the target service
request: The API request to proxy
client: Optional httpx client (creates one if not provided)
Returns:
APIResponse with status, headers, and body
"""
url = build_url(base_url, request.path, request.path_params)
should_close = client is None
client = client or httpx.AsyncClient()
start_time = time.perf_counter()
try:
response = await client.request(
method=request.method,
url=url,
params=request.query_params or None,
json=request.body if request.body is not None else None,
headers=request.headers or None,
timeout=request.timeout,
)
elapsed_ms = (time.perf_counter() - start_time) * 1000
# Parse response body
try:
body = response.json()
except Exception:
body = response.text or None
return APIResponse(
status_code=response.status_code,
headers=dict(response.headers),
body=body,
elapsed_ms=elapsed_ms,
)
finally:
if should_close:
await client.aclose()
class ProxyClient:
"""Reusable proxy client with connection pooling."""
def __init__(self, base_url: str) -> None:
self.base_url = base_url
self._client: httpx.AsyncClient | None = None
async def __aenter__(self) -> "ProxyClient":
self._client = httpx.AsyncClient()
return self
async def __aexit__(self, *args: Any) -> None:
if self._client:
await self._client.aclose()
self._client = None
async def request(self, request: APIRequest) -> APIResponse:
"""Make a proxied request."""
return await proxy_request(
self.base_url,
request,
client=self._client,
)
src/apimesh/worker/worker.py¶
"""API Worker - wraps a FastAPI service and handles mesh communication."""
import asyncio
from typing import Any
from uuid import UUID
import structlog
from websockets.client import connect as ws_connect
from websockets.exceptions import ConnectionClosed
from apimesh.config import WorkerSettings
from apimesh.models.card import APICard
from apimesh.models.messages import (
APIRequest,
APIResponse,
ErrorPayload,
MessageType,
WSMessage,
)
from apimesh.worker.parser import parse_openapi_to_card
from apimesh.worker.proxy import ProxyClient
logger = structlog.get_logger()
class APIWorker:
"""Worker that wraps a FastAPI service and joins the mesh.
Responsibilities:
- Parse target API's OpenAPI spec
- Connect to mesh network
- Broadcast API Card for discovery
- Handle incoming requests and proxy to target
- Send responses back through mesh
"""
def __init__(
self,
target_url: str,
settings: WorkerSettings | None = None,
) -> None:
self.target_url = target_url
self.settings = settings or WorkerSettings()
self.card: APICard | None = None
self._proxy: ProxyClient | None = None
self._running = False
self._tasks: list[asyncio.Task[Any]] = []
@property
def worker_id(self) -> UUID | None:
"""Get worker ID from card."""
return self.card.worker_id if self.card else None
@property
def worker_id_str(self) -> str:
"""Get worker ID as string."""
return str(self.worker_id) if self.worker_id else "uninitialized"
async def initialize(self) -> None:
"""Initialize worker by parsing target API."""
logger.info("Initializing worker", target_url=self.target_url)
self.card = await parse_openapi_to_card(
self.target_url,
self.settings.openapi_path,
)
logger.info(
"Worker initialized",
worker_id=self.worker_id_str,
service_name=self.card.service_name,
endpoint_count=len(self.card.endpoints),
)
async def start(self) -> None:
"""Start the worker and connect to mesh."""
if not self.card:
await self.initialize()
self._running = True
self._proxy = ProxyClient(self.target_url)
await self._proxy.__aenter__()
# Start background tasks
self._tasks = [
asyncio.create_task(self._mesh_loop()),
asyncio.create_task(self._heartbeat_loop()),
]
logger.info("Worker started", worker_id=self.worker_id_str)
async def stop(self) -> None:
"""Stop the worker gracefully."""
self._running = False
# Cancel background tasks
for task in self._tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Close proxy client
if self._proxy:
await self._proxy.__aexit__(None, None, None)
self._proxy = None
logger.info("Worker stopped", worker_id=self.worker_id_str)
async def _mesh_loop(self) -> None:
"""Main loop for mesh WebSocket communication."""
gateway_url = self.settings.gateway_ws_url
while self._running:
try:
async with ws_connect(gateway_url) as websocket:
logger.info("Connected to gateway", url=gateway_url)
# Announce ourselves
await self._send_announce(websocket)
# Listen for messages
async for raw_message in websocket:
if not self._running:
break
await self._handle_message(websocket, raw_message)
except ConnectionClosed:
logger.warning("Gateway connection closed, reconnecting...")
except Exception as e:
logger.error("Mesh connection error", error=str(e))
if self._running:
await asyncio.sleep(self.settings.reconnect_delay)
async def _heartbeat_loop(self) -> None:
"""Periodic heartbeat broadcast."""
while self._running:
await asyncio.sleep(self.settings.heartbeat_interval)
if self.card:
self.card.refresh_heartbeat()
async def _send_announce(self, websocket: Any) -> None:
"""Send announcement with our API Card."""
if not self.card:
return
message = WSMessage(
type=MessageType.ANNOUNCE,
source=self.worker_id_str,
target="*",
payload={"card": self.card.model_dump(mode="json")},
)
await websocket.send(message.model_dump_json())
logger.debug("Sent announce", service=self.card.service_name)
async def _handle_message(self, websocket: Any, raw: str | bytes) -> None:
"""Handle incoming WebSocket message."""
try:
data = raw if isinstance(raw, str) else raw.decode()
message = WSMessage.model_validate_json(data)
except Exception as e:
logger.warning("Invalid message received", error=str(e))
return
# Check if message is for us
if message.target not in (self.worker_id_str, self.card.service_name if self.card else "", "*"):
return
logger.debug(
"Received message",
type=message.type,
source=message.source,
)
match message.type:
case MessageType.REQUEST:
await self._handle_request(websocket, message)
case MessageType.DISCOVER:
await self._send_announce(websocket)
case MessageType.HEARTBEAT:
pass # Just acknowledge we're alive
case _:
logger.debug("Ignoring message type", type=message.type)
async def _handle_request(self, websocket: Any, message: WSMessage) -> None:
"""Handle an API request message."""
if not self._proxy:
return
try:
# Parse request from payload
request = APIRequest.model_validate(message.payload.get("request", {}))
# Proxy to target API
response = await self._proxy.request(request)
# Send response
response_message = message.create_response(
source=self.worker_id_str,
payload={"response": response.model_dump(mode="json")},
)
except Exception as e:
logger.error("Request handling error", error=str(e))
error = ErrorPayload(
code="PROXY_ERROR",
message=str(e),
)
response_message = message.create_response(
source=self.worker_id_str,
payload={"error": error.model_dump()},
error=True,
)
await websocket.send(response_message.model_dump_json())
Discovery Implementation¶
src/apimesh/discovery/broadcast.py¶
"""UDP broadcast discovery for LAN worker discovery."""
import asyncio
import json
import socket
from typing import Any
import structlog
from apimesh.models.card import APICard
logger = structlog.get_logger()
BROADCAST_PORT = 5678
BROADCAST_ADDR = "255.255.255.255"
BUFFER_SIZE = 65535
class BroadcastDiscovery:
"""UDP broadcast-based discovery for LAN environments.
Workers broadcast their API Cards periodically.
Other workers listen and update their registries.
"""
def __init__(
self,
port: int = BROADCAST_PORT,
broadcast_addr: str = BROADCAST_ADDR,
) -> None:
self.port = port
self.broadcast_addr = broadcast_addr
self._socket: socket.socket | None = None
self._running = False
def _create_socket(self) -> socket.socket:
"""Create and configure broadcast socket."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Allow receiving our own broadcasts (for testing)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass # Not available on all platforms
sock.setblocking(False)
return sock
async def start_listening(
self,
callback: Any, # Callable[[APICard], Awaitable[None]]
) -> None:
"""Start listening for broadcast announcements.
Args:
callback: Async function called with each discovered APICard
"""
self._socket = self._create_socket()
self._socket.bind(("", self.port))
self._running = True
logger.info("Discovery listening", port=self.port)
loop = asyncio.get_event_loop()
while self._running:
try:
# Non-blocking receive
data, addr = await loop.run_in_executor(
None,
lambda: self._socket.recvfrom(BUFFER_SIZE) if self._socket else (b"", ("", 0)),
)
if data:
await self._handle_broadcast(data, addr, callback)
except BlockingIOError:
await asyncio.sleep(0.1)
except Exception as e:
if self._running:
logger.error("Discovery receive error", error=str(e))
await asyncio.sleep(1.0)
async def _handle_broadcast(
self,
data: bytes,
addr: tuple[str, int],
callback: Any,
) -> None:
"""Process received broadcast data."""
try:
payload = json.loads(data.decode())
card = APICard.model_validate(payload)
logger.debug(
"Discovered service",
service=card.service_name,
from_addr=addr[0],
)
await callback(card)
except Exception as e:
logger.warning(
"Invalid broadcast data",
error=str(e),
from_addr=addr[0],
)
async def broadcast(self, card: APICard) -> None:
"""Broadcast our API Card."""
if not self._socket:
self._socket = self._create_socket()
data = card.model_dump_json().encode()
try:
self._socket.sendto(data, (self.broadcast_addr, self.port))
logger.debug("Broadcast sent", service=card.service_name)
except Exception as e:
logger.error("Broadcast send error", error=str(e))
def stop(self) -> None:
"""Stop discovery."""
self._running = False
if self._socket:
self._socket.close()
self._socket = None
Gateway Implementation¶
src/apimesh/gateway/server.py¶
"""FastAPI WebSocket Gateway for external access to the mesh."""
import asyncio
from contextlib import asynccontextmanager
from typing import Any
from uuid import uuid4
import structlog
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from apimesh.config import GatewaySettings
from apimesh.gateway.router import MessageRouter
from apimesh.models.card import APICard
from apimesh.models.messages import MessageType, WSMessage
from apimesh.models.registry import ServiceRegistry
logger = structlog.get_logger()
class ConnectionManager:
"""Manages WebSocket connections for workers and clients."""
def __init__(self) -> None:
self.workers: dict[str, WebSocket] = {} # worker_id -> websocket
self.clients: dict[str, WebSocket] = {} # client_id -> websocket
self.pending: dict[str, asyncio.Future[WSMessage]] = {} # correlation_id -> future
async def connect_worker(self, websocket: WebSocket, worker_id: str) -> None:
"""Register a worker connection."""
await websocket.accept()
self.workers[worker_id] = websocket
logger.info("Worker connected", worker_id=worker_id)
async def connect_client(self, websocket: WebSocket) -> str:
"""Register a client connection."""
await websocket.accept()
client_id = str(uuid4())
self.clients[client_id] = websocket
logger.info("Client connected", client_id=client_id)
return client_id
def disconnect_worker(self, worker_id: str) -> None:
"""Remove a worker connection."""
self.workers.pop(worker_id, None)
logger.info("Worker disconnected", worker_id=worker_id)
def disconnect_client(self, client_id: str) -> None:
"""Remove a client connection."""
self.clients.pop(client_id, None)
logger.info("Client disconnected", client_id=client_id)
async def send_to_worker(self, worker_id: str, message: WSMessage) -> bool:
"""Send message to specific worker."""
websocket = self.workers.get(worker_id)
if not websocket:
return False
try:
await websocket.send_text(message.model_dump_json())
return True
except Exception as e:
logger.error("Failed to send to worker", worker_id=worker_id, error=str(e))
return False
async def send_to_client(self, client_id: str, message: WSMessage) -> bool:
"""Send message to specific client."""
websocket = self.clients.get(client_id)
if not websocket:
return False
try:
await websocket.send_text(message.model_dump_json())
return True
except Exception as e:
logger.error("Failed to send to client", client_id=client_id, error=str(e))
return False
async def broadcast_to_workers(self, message: WSMessage) -> int:
"""Broadcast message to all workers."""
sent = 0
for worker_id in list(self.workers.keys()):
if await self.send_to_worker(worker_id, message):
sent += 1
return sent
def create_gateway(settings: GatewaySettings | None = None) -> FastAPI:
"""Create and configure the gateway FastAPI application."""
settings = settings or GatewaySettings()
registry = ServiceRegistry(ttl_seconds=settings.registry_ttl)
connections = ConnectionManager()
router = MessageRouter(registry, connections)
@asynccontextmanager
async def lifespan(app: FastAPI) -> Any:
# Start cleanup task
cleanup_task = asyncio.create_task(cleanup_loop())
logger.info("Gateway started", host=settings.host, port=settings.port)
yield
cleanup_task.cancel()
logger.info("Gateway stopped")
async def cleanup_loop() -> None:
"""Periodic cleanup of stale registry entries."""
while True:
await asyncio.sleep(30)
stale = registry.cleanup_stale()
if stale:
logger.info("Cleaned stale workers", count=len(stale))
app = FastAPI(
title="API Mesh Gateway",
description="WebSocket gateway for the API Mesh network",
version="0.1.0",
lifespan=lifespan,
)
@app.get("/health")
async def health() -> dict[str, str]:
"""Health check endpoint."""
return {"status": "healthy"}
@app.get("/services")
async def list_services() -> dict[str, Any]:
"""List all registered services."""
return {
"services": registry.list_services(),
"workers": registry.list_workers(),
}
@app.get("/services/{service_name}")
async def get_service(service_name: str) -> dict[str, Any]:
"""Get details for a specific service."""
cards = registry.get_by_name(service_name)
return {
"service_name": service_name,
"instances": [card.model_dump(mode="json") for card in cards],
}
@app.websocket("/ws/worker")
async def worker_websocket(websocket: WebSocket) -> None:
"""WebSocket endpoint for mesh workers."""
worker_id: str | None = None
try:
# Wait for initial announce to get worker_id
await websocket.accept()
raw = await websocket.receive_text()
message = WSMessage.model_validate_json(raw)
if message.type != MessageType.ANNOUNCE:
await websocket.close(code=4000, reason="Expected ANNOUNCE")
return
# Extract card and register
card = APICard.model_validate(message.payload.get("card", {}))
worker_id = str(card.worker_id)
registry.register(card)
connections.workers[worker_id] = websocket
logger.info(
"Worker registered",
worker_id=worker_id,
service=card.service_name,
)
# Handle messages
while True:
raw = await websocket.receive_text()
await router.route_worker_message(raw, worker_id)
except WebSocketDisconnect:
pass
except Exception as e:
logger.error("Worker WebSocket error", error=str(e))
finally:
if worker_id:
connections.disconnect_worker(worker_id)
registry.unregister(worker_id)
@app.websocket("/ws/client")
async def client_websocket(websocket: WebSocket) -> None:
"""WebSocket endpoint for external clients."""
client_id = await connections.connect_client(websocket)
try:
while True:
raw = await websocket.receive_text()
await router.route_client_message(raw, client_id)
except WebSocketDisconnect:
pass
except Exception as e:
logger.error("Client WebSocket error", error=str(e))
finally:
connections.disconnect_client(client_id)
# Store references for access
app.state.registry = registry
app.state.connections = connections
app.state.router = router
return app
src/apimesh/gateway/router.py¶
"""Message routing logic for the gateway."""
import asyncio
from typing import TYPE_CHECKING
import structlog
from apimesh.models.card import APICard
from apimesh.models.messages import ErrorPayload, MessageType, WSMessage
from apimesh.models.registry import ServiceRegistry
if TYPE_CHECKING:
from apimesh.gateway.server import ConnectionManager
logger = structlog.get_logger()
class MessageRouter:
"""Routes messages between workers and clients."""
def __init__(
self,
registry: ServiceRegistry,
connections: "ConnectionManager",
) -> None:
self.registry = registry
self.connections = connections
self._pending: dict[str, tuple[str, asyncio.Future[WSMessage]]] = {}
async def route_worker_message(self, raw: str, source_worker_id: str) -> None:
"""Route a message from a worker."""
try:
message = WSMessage.model_validate_json(raw)
except Exception as e:
logger.warning("Invalid worker message", error=str(e))
return
match message.type:
case MessageType.ANNOUNCE:
card = APICard.model_validate(message.payload.get("card", {}))
self.registry.register(card)
logger.debug("Worker announced", service=card.service_name)
case MessageType.RESPONSE | MessageType.ERROR:
# Route response back to waiting client
if message.correlation_id:
await self._handle_response(message)
case MessageType.HEARTBEAT:
# Update registry heartbeat
card = self.registry.get_by_id(source_worker_id)
if card:
card.refresh_heartbeat()
case _:
logger.debug("Unhandled worker message type", type=message.type)
async def route_client_message(self, raw: str, client_id: str) -> None:
"""Route a message from a client."""
try:
message = WSMessage.model_validate_json(raw)
except Exception as e:
logger.warning("Invalid client message", error=str(e))
return
if message.type != MessageType.REQUEST:
logger.debug("Ignoring non-request from client", type=message.type)
return
# Find target worker
target = message.target
card: APICard | None = None
# Try as worker_id first, then as service name
card = self.registry.get_by_id(target)
if not card:
card = self.registry.get_one_by_name(target)
if not card:
# Send error back to client
error_response = message.create_response(
source="gateway",
payload={
"error": ErrorPayload(
code="SERVICE_NOT_FOUND",
message=f"No service found: {target}",
).model_dump()
},
error=True,
)
await self.connections.send_to_client(client_id, error_response)
return
worker_id = str(card.worker_id)
# Store pending request for response routing
future: asyncio.Future[WSMessage] = asyncio.Future()
self._pending[str(message.id)] = (client_id, future)
# Forward to worker
message.target = worker_id
sent = await self.connections.send_to_worker(worker_id, message)
if not sent:
# Worker disconnected
self._pending.pop(str(message.id), None)
error_response = message.create_response(
source="gateway",
payload={
"error": ErrorPayload(
code="WORKER_UNAVAILABLE",
message=f"Worker unavailable: {worker_id}",
).model_dump()
},
error=True,
)
await self.connections.send_to_client(client_id, error_response)
async def _handle_response(self, message: WSMessage) -> None:
"""Handle a response message from a worker."""
correlation_id = str(message.correlation_id) if message.correlation_id else None
if not correlation_id:
return
pending = self._pending.pop(correlation_id, None)
if not pending:
logger.warning("No pending request for response", correlation_id=correlation_id)
return
client_id, _ = pending
# Forward response to client
await self.connections.send_to_client(client_id, message)
Configuration¶
src/apimesh/config.py¶
"""Configuration management for API Mesh components."""
from pydantic_settings import BaseSettings, SettingsConfigDict
class WorkerSettings(BaseSettings):
"""Configuration for API Workers."""
model_config = SettingsConfigDict(
env_prefix="APIMESH_WORKER_",
env_file=".env",
)
# Target API
openapi_path: str = "/openapi.json"
# Gateway connection
gateway_ws_url: str = "ws://localhost:8080/ws/worker"
# Timing
heartbeat_interval: float = 30.0
reconnect_delay: float = 5.0
class GatewaySettings(BaseSettings):
"""Configuration for the Gateway server."""
model_config = SettingsConfigDict(
env_prefix="APIMESH_GATEWAY_",
env_file=".env",
)
# Server
host: str = "0.0.0.0"
port: int = 8080
# Registry
registry_ttl: int = 90 # Seconds before stale workers removed
class DiscoverySettings(BaseSettings):
"""Configuration for UDP discovery."""
model_config = SettingsConfigDict(
env_prefix="APIMESH_DISCOVERY_",
env_file=".env",
)
port: int = 5678
broadcast_addr: str = "255.255.255.255"
announce_interval: float = 30.0
CLI¶
src/apimesh/cli.py¶
"""Command-line interface for API Mesh."""
import asyncio
import structlog
import typer
import uvicorn
from apimesh.config import GatewaySettings, WorkerSettings
from apimesh.gateway.server import create_gateway
from apimesh.worker.worker import APIWorker
app = typer.Typer(
name="apimesh",
help="Distributed WebSocket proxy network for FastAPI services",
)
# Configure structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.dev.ConsoleRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
@app.command()
def gateway(
host: str = typer.Option("0.0.0.0", "--host", "-h", help="Host to bind to"),
port: int = typer.Option(8080, "--port", "-p", help="Port to bind to"),
) -> None:
"""Start the API Mesh Gateway server."""
settings = GatewaySettings(host=host, port=port)
gateway_app = create_gateway(settings)
typer.echo(f"Starting Gateway on {host}:{port}")
uvicorn.run(gateway_app, host=host, port=port)
@app.command()
def worker(
target: str = typer.Argument(..., help="Target API URL (e.g., http://localhost:8000)"),
gateway_url: str = typer.Option(
"ws://localhost:8080/ws/worker",
"--gateway",
"-g",
help="Gateway WebSocket URL",
),
) -> None:
"""Start an API Worker wrapping a target FastAPI service."""
settings = WorkerSettings(gateway_ws_url=gateway_url)
api_worker = APIWorker(target, settings)
async def run() -> None:
await api_worker.start()
try:
# Keep running until interrupted
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
finally:
await api_worker.stop()
typer.echo(f"Starting Worker for {target}")
try:
asyncio.run(run())
except KeyboardInterrupt:
typer.echo("\nWorker stopped")
@app.command()
def inspect(
target: str = typer.Argument(..., help="Target API URL to inspect"),
) -> None:
"""Inspect a FastAPI service's OpenAPI spec."""
from rich.console import Console
from rich.table import Table
from apimesh.worker.parser import parse_openapi_to_card
console = Console()
async def run() -> None:
card = await parse_openapi_to_card(target)
console.print(f"\n[bold]Service:[/bold] {card.service_name} v{card.service_version}")
console.print(f"[bold]Base URL:[/bold] {card.base_url}")
console.print(f"[bold]Tags:[/bold] {', '.join(card.tags) or 'None'}")
table = Table(title="\nEndpoints")
table.add_column("Method", style="cyan")
table.add_column("Path", style="green")
table.add_column("Operation ID")
table.add_column("Summary")
for endpoint in card.endpoints:
table.add_row(
endpoint.method,
endpoint.path,
endpoint.operation_id or "-",
endpoint.summary or "-",
)
console.print(table)
asyncio.run(run())
if __name__ == "__main__":
app()
Package Init Files¶
src/apimesh/init.py¶
"""API Mesh - Distributed WebSocket proxy network for FastAPI services."""
from apimesh.config import GatewaySettings, WorkerSettings
from apimesh.gateway.server import create_gateway
from apimesh.models.card import APICard, EndpointSpec
from apimesh.models.messages import APIRequest, APIResponse, MessageType, WSMessage
from apimesh.models.registry import ServiceRegistry
from apimesh.worker.worker import APIWorker
__version__ = "0.1.0"
__all__ = [
# Core classes
"APIWorker",
"create_gateway",
# Models
"APICard",
"EndpointSpec",
"WSMessage",
"APIRequest",
"APIResponse",
"MessageType",
"ServiceRegistry",
# Config
"WorkerSettings",
"GatewaySettings",
]
src/apimesh/models/init.py¶
"""API Mesh data models."""
from apimesh.models.card import APICard, EndpointSpec, ParameterSpec
from apimesh.models.messages import (
APIRequest,
APIResponse,
ErrorPayload,
MessageType,
WSMessage,
)
from apimesh.models.registry import ServiceRegistry
__all__ = [
"APICard",
"EndpointSpec",
"ParameterSpec",
"WSMessage",
"APIRequest",
"APIResponse",
"ErrorPayload",
"MessageType",
"ServiceRegistry",
]
src/apimesh/worker/init.py¶
"""API Mesh worker components."""
from apimesh.worker.parser import parse_openapi_to_card
from apimesh.worker.proxy import ProxyClient, proxy_request
from apimesh.worker.worker import APIWorker
__all__ = [
"APIWorker",
"parse_openapi_to_card",
"ProxyClient",
"proxy_request",
]
src/apimesh/gateway/init.py¶
"""API Mesh gateway components."""
from apimesh.gateway.router import MessageRouter
from apimesh.gateway.server import ConnectionManager, create_gateway
__all__ = [
"create_gateway",
"ConnectionManager",
"MessageRouter",
]
src/apimesh/discovery/init.py¶
"""API Mesh discovery components."""
from apimesh.discovery.broadcast import BroadcastDiscovery
__all__ = [
"BroadcastDiscovery",
]
src/apimesh/network/init.py¶
Example Usage¶
examples/sample_api/main.py¶
"""Sample FastAPI application for testing API Mesh."""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI(
title="Sample Users API",
description="A sample API for testing API Mesh",
version="1.0.0",
)
class User(BaseModel):
id: int
name: str
email: str
class UserCreate(BaseModel):
name: str
email: str
# In-memory storage
users_db: dict[int, User] = {
1: User(id=1, name="Alice", email="alice@example.com"),
2: User(id=2, name="Bob", email="bob@example.com"),
}
next_id = 3
@app.get("/users", tags=["users"])
async def list_users() -> list[User]:
"""List all users."""
return list(users_db.values())
@app.get("/users/{user_id}", tags=["users"])
async def get_user(user_id: int) -> User:
"""Get a specific user by ID."""
if user_id not in users_db:
raise HTTPException(status_code=404, detail="User not found")
return users_db[user_id]
@app.post("/users", tags=["users"], status_code=201)
async def create_user(user: UserCreate) -> User:
"""Create a new user."""
global next_id
new_user = User(id=next_id, name=user.name, email=user.email)
users_db[next_id] = new_user
next_id += 1
return new_user
@app.delete("/users/{user_id}", tags=["users"])
async def delete_user(user_id: int) -> dict[str, str]:
"""Delete a user."""
if user_id not in users_db:
raise HTTPException(status_code=404, detail="User not found")
del users_db[user_id]
return {"status": "deleted"}
@app.get("/health")
async def health() -> dict[str, str]:
"""Health check endpoint."""
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
examples/run_worker.py¶
"""Example script to run an API Worker."""
import asyncio
from apimesh import APIWorker, WorkerSettings
async def main() -> None:
"""Run a worker wrapping the sample API."""
settings = WorkerSettings(
gateway_ws_url="ws://localhost:8080/ws/worker",
)
worker = APIWorker(
target_url="http://localhost:8000",
settings=settings,
)
await worker.start()
print(f"Worker started: {worker.worker_id}")
print(f"Service: {worker.card.service_name if worker.card else 'unknown'}")
try:
# Keep running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nShutting down...")
finally:
await worker.stop()
if __name__ == "__main__":
asyncio.run(main())
examples/run_gateway.py¶
"""Example script to run the Gateway server."""
import uvicorn
from apimesh import GatewaySettings, create_gateway
def main() -> None:
"""Run the gateway server."""
settings = GatewaySettings(
host="0.0.0.0",
port=8080,
)
app = create_gateway(settings)
print(f"Starting Gateway on {settings.host}:{settings.port}")
uvicorn.run(app, host=settings.host, port=settings.port)
if __name__ == "__main__":
main()
examples/client_example.py¶
"""Example WebSocket client for API Mesh."""
import asyncio
import json
from websockets.client import connect as ws_connect
from apimesh.models.messages import APIRequest, MessageType, WSMessage
async def main() -> None:
"""Connect as a client and make requests through the mesh."""
gateway_url = "ws://localhost:8080/ws/client"
async with ws_connect(gateway_url) as websocket:
print("Connected to gateway")
# Create a request to list users
request = APIRequest(
method="GET",
path="/users",
)
message = WSMessage(
type=MessageType.REQUEST,
source="example-client",
target="sample-users-api", # Service name from OpenAPI spec
payload={"request": request.model_dump(mode="json")},
)
# Send request
await websocket.send(message.model_dump_json())
print(f"Sent request: {message.type}")
# Wait for response
raw = await websocket.recv()
response = WSMessage.model_validate_json(raw)
print(f"Response type: {response.type}")
print(f"Response payload: {json.dumps(response.payload, indent=2)}")
if __name__ == "__main__":
asyncio.run(main())
Tests¶
tests/conftest.py¶
"""Pytest configuration and fixtures."""
import pytest
from httpx import ASGITransport, AsyncClient
from apimesh.gateway.server import create_gateway
@pytest.fixture
def gateway_app():
"""Create a gateway app for testing."""
return create_gateway()
@pytest.fixture
async def gateway_client(gateway_app):
"""Create an async test client for the gateway."""
transport = ASGITransport(app=gateway_app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
tests/test_models/test_card.py¶
"""Tests for API Card models."""
from apimesh.models.card import APICard, EndpointSpec
def test_api_card_creation():
"""Test basic APICard creation."""
card = APICard(
service_name="test-api",
base_url="http://localhost:8000",
openapi_url="http://localhost:8000/openapi.json",
)
assert card.service_name == "test-api"
assert card.worker_id is not None
assert card.heartbeat is not None
def test_endpoint_lookup():
"""Test endpoint lookup methods."""
endpoint = EndpointSpec(
path="/users",
method="GET",
operation_id="list_users",
)
card = APICard(
service_name="test-api",
base_url="http://localhost:8000",
openapi_url="http://localhost:8000/openapi.json",
endpoints=[endpoint],
)
assert card.has_endpoint("GET", "/users")
assert not card.has_endpoint("POST", "/users")
found = card.get_endpoint("GET", "/users")
assert found is not None
assert found.operation_id == "list_users"
tests/test_models/test_messages.py¶
"""Tests for message protocol models."""
from apimesh.models.messages import APIRequest, MessageType, WSMessage
def test_message_creation():
"""Test WSMessage creation with defaults."""
msg = WSMessage(
type=MessageType.REQUEST,
source="worker-1",
target="worker-2",
)
assert msg.id is not None
assert msg.timestamp is not None
assert msg.payload == {}
def test_create_response():
"""Test creating a response to a message."""
request = WSMessage(
type=MessageType.REQUEST,
source="client",
target="worker",
payload={"data": "test"},
)
response = request.create_response(
source="worker",
payload={"result": "success"},
)
assert response.type == MessageType.RESPONSE
assert response.source == "worker"
assert response.target == "client"
assert response.correlation_id == request.id
def test_api_request_serialization():
"""Test APIRequest model."""
request = APIRequest(
method="POST",
path="/users/{id}",
path_params={"id": 123},
body={"name": "Test"},
)
data = request.model_dump()
assert data["method"] == "POST"
assert data["path_params"]["id"] == 123
Quick Start¶
# Terminal 1: Start the sample API
cd examples/sample_api
uvicorn main:app --port 8000
# Terminal 2: Start the Gateway
apimesh gateway --port 8080
# Terminal 3: Start a Worker
apimesh worker http://localhost:8000 --gateway ws://localhost:8080/ws/worker
# Terminal 4: Inspect the wrapped API
apimesh inspect http://localhost:8000
# Terminal 5: Run client example
python examples/client_example.py
Architecture Diagram¶
┌─────────────────────────┐
│ External Clients │
│ (Web/Mobile/Services) │
└───────────┬─────────────┘
│ WebSocket
┌───────────▼─────────────┐
│ GATEWAY │
│ ─────────────────── │
│ • Client connections │
│ • Service registry │
│ • Message routing │
│ • REST API for status │
└───────────┬─────────────┘
│
┌───────────────────────────┼───────────────────────────┐
│ │ │
┌───────────▼───────────┐ ┌───────────▼───────────┐ ┌───────────▼───────────┐
│ API WORKER │ │ API WORKER │ │ API WORKER │
│ ───────────────── │ │ ───────────────── │ │ ───────────────── │
│ Wraps: Users API │ │ Wraps: Orders API │ │ Wraps: Auth API │
│ ID: worker-abc │ │ ID: worker-def │ │ ID: worker-ghi │
└───────────┬───────────┘ └───────────┬───────────┘ └───────────┬───────────┘
│ HTTP │ HTTP │ HTTP
┌───────────▼───────────┐ ┌───────────▼───────────┐ ┌───────────▼───────────┐
│ FastAPI Service │ │ FastAPI Service │ │ FastAPI Service │
│ localhost:8001 │ │ localhost:8002 │ │ localhost:8003 │
└───────────────────────┘ └───────────────────────┘ └───────────────────────┘
Next Steps / Roadmap¶
- Load Balancing: Add round-robin or least-connections routing when multiple workers serve the same API
- Authentication: Pass-through auth headers, or implement mesh-level auth
- Request Validation: Validate requests against OpenAPI schemas before proxying
- Metrics: Add Prometheus metrics for request counts, latencies, errors
- Distributed Discovery: Replace UDP broadcast with Redis pub/sub or etcd for multi-network support
- Circuit Breaker: Implement circuit breaker pattern for failing services
- Request Tracing: Add distributed tracing with OpenTelemetry