Skip to content

# API Mesh - Project Initialization

A lightweight, Python-native distributed WebSocket network that proxies FastAPI services.

Project Overview

API Mesh creates a self-organizing network of WebSocket workers that wrap existing FastAPI services. Workers discover each other via broadcast, exchange API Cards describing their capabilities, and route messages to the appropriate service. A standalone Gateway provides external WebSocket access for web clients.

Core Concepts

  • API Worker: Wraps a FastAPI service, parses its OpenAPI spec, handles proxied requests
  • API Card: Service advertisement containing endpoints, schemas, and metadata
  • Message Protocol: JSON envelope for all inter-worker communication
  • Discovery: UDP broadcast for LAN-based worker discovery
  • Gateway: External WebSocket entry point for clients

Project Structure

apimesh/
├── pyproject.toml
├── README.md
├── src/
│   └── apimesh/
│       ├── __init__.py
│       ├── py.typed
│       ├── models/
│       │   ├── __init__.py
│       │   ├── card.py          # APICard, EndpointSpec
│       │   ├── messages.py      # WSMessage, APIRequest, APIResponse
│       │   └── registry.py      # ServiceRegistry
│       ├── worker/
│       │   ├── __init__.py
│       │   ├── worker.py        # APIWorker main class
│       │   ├── parser.py        # OpenAPI spec parser
│       │   └── proxy.py         # HTTP proxy client
│       ├── discovery/
│       │   ├── __init__.py
│       │   ├── broadcast.py     # UDP broadcast discovery
│       │   └── protocol.py      # Discovery message handling
│       ├── gateway/
│       │   ├── __init__.py
│       │   ├── server.py        # FastAPI WebSocket gateway
│       │   └── router.py        # Message routing logic
│       ├── network/
│       │   ├── __init__.py
│       │   ├── mesh.py          # WebSocket mesh connections
│       │   └── handlers.py      # Message type handlers
│       ├── cli.py               # CLI entry points
│       └── config.py            # Configuration management
├── tests/
│   ├── __init__.py
│   ├── conftest.py
│   ├── test_models/
│   ├── test_worker/
│   ├── test_discovery/
│   └── test_gateway/
└── examples/
    ├── sample_api/              # Example FastAPI to wrap
    │   └── main.py
    ├── run_worker.py
    └── run_gateway.py

Dependencies

pyproject.toml

[project]
name = "apimesh"
version = "0.1.0"
description = "Distributed WebSocket proxy network for FastAPI services"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
    "fastapi>=0.115.0",
    "uvicorn[standard]>=0.32.0",
    "websockets>=13.0",
    "httpx>=0.27.0",
    "pydantic>=2.9.0",
    "pydantic-settings>=2.6.0",
    "structlog>=24.0.0",
    "typer>=0.12.0",
    "rich>=13.0.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0.0",
    "pytest-asyncio>=0.24.0",
    "pytest-cov>=5.0.0",
    "ruff>=0.7.0",
    "mypy>=1.13.0",
    "pre-commit>=4.0.0",
]

[project.scripts]
apimesh = "apimesh.cli:app"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/apimesh"]

[tool.ruff]
target-version = "py311"
line-length = 88
src = ["src", "tests"]

[tool.ruff.lint]
select = [
    "E",      # pycodestyle errors
    "W",      # pycodestyle warnings
    "F",      # Pyflakes
    "I",      # isort
    "B",      # flake8-bugbear
    "C4",     # flake8-comprehensions
    "UP",     # pyupgrade
    "ARG",    # flake8-unused-arguments
    "SIM",    # flake8-simplify
]
ignore = ["E501"]  # line length handled by formatter

[tool.ruff.lint.isort]
known-first-party = ["apimesh"]

[tool.mypy]
python_version = "3.11"
strict = true
warn_return_any = true
warn_unused_ignores = true

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
addopts = "-ra -q"

Core Models

src/apimesh/models/card.py

"""API Card models for service advertisement."""

from datetime import datetime, timezone
from typing import Any
from uuid import UUID, uuid4

from pydantic import BaseModel, Field, HttpUrl


class ParameterSpec(BaseModel):
    """OpenAPI parameter specification."""

    name: str
    location: str = Field(alias="in")  # query, path, header, cookie
    required: bool = False
    schema_: dict[str, Any] = Field(default_factory=dict, alias="schema")

    model_config = {"populate_by_name": True}


class EndpointSpec(BaseModel):
    """Specification for a single API endpoint."""

    path: str
    method: str
    operation_id: str | None = None
    summary: str | None = None
    description: str | None = None
    parameters: list[ParameterSpec] = Field(default_factory=list)
    request_body: dict[str, Any] | None = None
    responses: dict[str, Any] = Field(default_factory=dict)
    tags: list[str] = Field(default_factory=list)


class APICard(BaseModel):
    """Service advertisement broadcast by workers.

    Contains all information needed to route requests to this service.
    """

    worker_id: UUID = Field(default_factory=uuid4)
    service_name: str
    service_version: str = "1.0.0"
    base_url: HttpUrl
    openapi_url: HttpUrl
    endpoints: list[EndpointSpec] = Field(default_factory=list)
    schemas: dict[str, Any] = Field(default_factory=dict)
    tags: list[str] = Field(default_factory=list)
    heartbeat: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

    def refresh_heartbeat(self) -> None:
        """Update heartbeat timestamp."""
        self.heartbeat = datetime.now(timezone.utc)

    def get_endpoint(self, method: str, path: str) -> EndpointSpec | None:
        """Find endpoint by method and path."""
        method_upper = method.upper()
        for endpoint in self.endpoints:
            if endpoint.method == method_upper and endpoint.path == path:
                return endpoint
        return None

    def has_endpoint(self, method: str, path: str) -> bool:
        """Check if endpoint exists."""
        return self.get_endpoint(method, path) is not None

src/apimesh/models/messages.py

"""WebSocket message protocol models."""

from datetime import datetime, timezone
from enum import StrEnum
from typing import Any, Literal
from uuid import UUID, uuid4

from pydantic import BaseModel, Field


class MessageType(StrEnum):
    """Types of messages in the mesh network."""

    REQUEST = "request"
    RESPONSE = "response"
    DISCOVER = "discover"
    ANNOUNCE = "announce"
    HEARTBEAT = "heartbeat"
    ERROR = "error"


class WSMessage(BaseModel):
    """Base WebSocket message envelope.

    All communication in the mesh uses this envelope format.
    """

    id: UUID = Field(default_factory=uuid4)
    type: MessageType
    source: str  # worker_id as string
    target: str  # worker_id, service_name, or "*" for broadcast
    timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    correlation_id: UUID | None = None  # For request/response matching
    payload: dict[str, Any] = Field(default_factory=dict)

    def create_response(
        self,
        source: str,
        payload: dict[str, Any],
        *,
        error: bool = False,
    ) -> "WSMessage":
        """Create a response message to this message."""
        return WSMessage(
            type=MessageType.ERROR if error else MessageType.RESPONSE,
            source=source,
            target=self.source,
            correlation_id=self.id,
            payload=payload,
        )


class APIRequest(BaseModel):
    """Request payload for API calls through the mesh."""

    method: Literal["GET", "POST", "PUT", "PATCH", "DELETE"]
    path: str
    path_params: dict[str, Any] = Field(default_factory=dict)
    query_params: dict[str, Any] = Field(default_factory=dict)
    body: Any = None
    headers: dict[str, str] = Field(default_factory=dict)
    timeout: float = 30.0


class APIResponse(BaseModel):
    """Response payload for API calls through the mesh."""

    status_code: int
    headers: dict[str, str] = Field(default_factory=dict)
    body: Any = None
    elapsed_ms: float = 0.0


class ErrorPayload(BaseModel):
    """Error information payload."""

    code: str
    message: str
    details: dict[str, Any] = Field(default_factory=dict)

src/apimesh/models/registry.py

"""Service registry for tracking available workers."""

from datetime import datetime, timedelta, timezone
from uuid import UUID

from pydantic import BaseModel, Field

from apimesh.models.card import APICard


class ServiceRegistry(BaseModel):
    """Registry of known services in the mesh.

    Tracks API Cards and manages service discovery state.
    """

    services: dict[str, APICard] = Field(default_factory=dict)  # worker_id -> card
    by_name: dict[str, set[str]] = Field(default_factory=dict)  # name -> worker_ids
    ttl_seconds: int = 60  # Time before stale entries are removed

    def register(self, card: APICard) -> None:
        """Register or update a service."""
        worker_id = str(card.worker_id)
        card.refresh_heartbeat()

        # Update main registry
        self.services[worker_id] = card

        # Update name index
        if card.service_name not in self.by_name:
            self.by_name[card.service_name] = set()
        self.by_name[card.service_name].add(worker_id)

    def unregister(self, worker_id: str) -> APICard | None:
        """Remove a service from the registry."""
        card = self.services.pop(worker_id, None)
        if card:
            name_set = self.by_name.get(card.service_name)
            if name_set:
                name_set.discard(worker_id)
                if not name_set:
                    del self.by_name[card.service_name]
        return card

    def get_by_id(self, worker_id: str) -> APICard | None:
        """Get service by worker ID."""
        return self.services.get(worker_id)

    def get_by_name(self, service_name: str) -> list[APICard]:
        """Get all services with given name."""
        worker_ids = self.by_name.get(service_name, set())
        return [self.services[wid] for wid in worker_ids if wid in self.services]

    def get_one_by_name(self, service_name: str) -> APICard | None:
        """Get one service by name (for simple routing)."""
        cards = self.get_by_name(service_name)
        return cards[0] if cards else None

    def cleanup_stale(self) -> list[str]:
        """Remove stale entries. Returns list of removed worker IDs."""
        now = datetime.now(timezone.utc)
        cutoff = now - timedelta(seconds=self.ttl_seconds)

        stale = [
            wid for wid, card in self.services.items() if card.heartbeat < cutoff
        ]
        for wid in stale:
            self.unregister(wid)

        return stale

    def list_services(self) -> list[str]:
        """List all unique service names."""
        return list(self.by_name.keys())

    def list_workers(self) -> list[str]:
        """List all worker IDs."""
        return list(self.services.keys())

Worker Implementation

src/apimesh/worker/parser.py

"""OpenAPI specification parser."""

from typing import Any

import httpx

from apimesh.models.card import APICard, EndpointSpec, ParameterSpec


async def fetch_openapi_spec(openapi_url: str) -> dict[str, Any]:
    """Fetch OpenAPI JSON specification from URL."""
    async with httpx.AsyncClient() as client:
        response = await client.get(openapi_url, timeout=10.0)
        response.raise_for_status()
        return response.json()


def parse_parameters(params: list[dict[str, Any]]) -> list[ParameterSpec]:
    """Parse OpenAPI parameters into ParameterSpec models."""
    return [ParameterSpec.model_validate(p) for p in params]


def parse_endpoints(spec: dict[str, Any]) -> list[EndpointSpec]:
    """Parse all endpoints from OpenAPI spec."""
    endpoints: list[EndpointSpec] = []
    paths = spec.get("paths", {})

    for path, methods in paths.items():
        for method, details in methods.items():
            if method.upper() not in {"GET", "POST", "PUT", "PATCH", "DELETE"}:
                continue

            endpoint = EndpointSpec(
                path=path,
                method=method.upper(),
                operation_id=details.get("operationId"),
                summary=details.get("summary"),
                description=details.get("description"),
                parameters=parse_parameters(details.get("parameters", [])),
                request_body=details.get("requestBody"),
                responses=details.get("responses", {}),
                tags=details.get("tags", []),
            )
            endpoints.append(endpoint)

    return endpoints


def extract_service_info(spec: dict[str, Any]) -> tuple[str, str]:
    """Extract service name and version from spec."""
    info = spec.get("info", {})
    title = info.get("title", "unknown-service")
    version = info.get("version", "1.0.0")

    # Convert title to slug format
    service_name = title.lower().replace(" ", "-")

    return service_name, version


def extract_schemas(spec: dict[str, Any]) -> dict[str, Any]:
    """Extract component schemas from spec."""
    components = spec.get("components", {})
    return components.get("schemas", {})


def extract_tags(spec: dict[str, Any]) -> list[str]:
    """Extract tag names from spec."""
    tags = spec.get("tags", [])
    return [t.get("name", "") for t in tags if t.get("name")]


async def parse_openapi_to_card(
    base_url: str,
    openapi_path: str = "/openapi.json",
) -> APICard:
    """Parse OpenAPI spec into an APICard.

    Args:
        base_url: Base URL of the API (e.g., "http://localhost:8000")
        openapi_path: Path to OpenAPI JSON (default: /openapi.json)

    Returns:
        APICard populated from the OpenAPI specification
    """
    openapi_url = f"{base_url.rstrip('/')}{openapi_path}"
    spec = await fetch_openapi_spec(openapi_url)

    service_name, version = extract_service_info(spec)

    return APICard(
        service_name=service_name,
        service_version=version,
        base_url=base_url,  # type: ignore[arg-type]
        openapi_url=openapi_url,  # type: ignore[arg-type]
        endpoints=parse_endpoints(spec),
        schemas=extract_schemas(spec),
        tags=extract_tags(spec),
    )

src/apimesh/worker/proxy.py

"""HTTP proxy client for making requests to wrapped services."""

import re
import time
from typing import Any

import httpx

from apimesh.models.messages import APIRequest, APIResponse


def build_url(base_url: str, path: str, path_params: dict[str, Any]) -> str:
    """Build full URL with path parameters substituted."""
    # Replace {param} placeholders with actual values
    resolved_path = path
    for key, value in path_params.items():
        resolved_path = re.sub(
            rf"\{{{key}\}}",
            str(value),
            resolved_path,
        )

    return f"{base_url.rstrip('/')}{resolved_path}"


async def proxy_request(
    base_url: str,
    request: APIRequest,
    *,
    client: httpx.AsyncClient | None = None,
) -> APIResponse:
    """Proxy an API request to the target service.

    Args:
        base_url: Base URL of the target service
        request: The API request to proxy
        client: Optional httpx client (creates one if not provided)

    Returns:
        APIResponse with status, headers, and body
    """
    url = build_url(base_url, request.path, request.path_params)

    should_close = client is None
    client = client or httpx.AsyncClient()

    start_time = time.perf_counter()

    try:
        response = await client.request(
            method=request.method,
            url=url,
            params=request.query_params or None,
            json=request.body if request.body is not None else None,
            headers=request.headers or None,
            timeout=request.timeout,
        )

        elapsed_ms = (time.perf_counter() - start_time) * 1000

        # Parse response body
        try:
            body = response.json()
        except Exception:
            body = response.text or None

        return APIResponse(
            status_code=response.status_code,
            headers=dict(response.headers),
            body=body,
            elapsed_ms=elapsed_ms,
        )

    finally:
        if should_close:
            await client.aclose()


class ProxyClient:
    """Reusable proxy client with connection pooling."""

    def __init__(self, base_url: str) -> None:
        self.base_url = base_url
        self._client: httpx.AsyncClient | None = None

    async def __aenter__(self) -> "ProxyClient":
        self._client = httpx.AsyncClient()
        return self

    async def __aexit__(self, *args: Any) -> None:
        if self._client:
            await self._client.aclose()
            self._client = None

    async def request(self, request: APIRequest) -> APIResponse:
        """Make a proxied request."""
        return await proxy_request(
            self.base_url,
            request,
            client=self._client,
        )

src/apimesh/worker/worker.py

"""API Worker - wraps a FastAPI service and handles mesh communication."""

import asyncio
from typing import Any
from uuid import UUID

import structlog
from websockets.client import connect as ws_connect
from websockets.exceptions import ConnectionClosed

from apimesh.config import WorkerSettings
from apimesh.models.card import APICard
from apimesh.models.messages import (
    APIRequest,
    APIResponse,
    ErrorPayload,
    MessageType,
    WSMessage,
)
from apimesh.worker.parser import parse_openapi_to_card
from apimesh.worker.proxy import ProxyClient

logger = structlog.get_logger()


class APIWorker:
    """Worker that wraps a FastAPI service and joins the mesh.

    Responsibilities:
    - Parse target API's OpenAPI spec
    - Connect to mesh network
    - Broadcast API Card for discovery
    - Handle incoming requests and proxy to target
    - Send responses back through mesh
    """

    def __init__(
        self,
        target_url: str,
        settings: WorkerSettings | None = None,
    ) -> None:
        self.target_url = target_url
        self.settings = settings or WorkerSettings()
        self.card: APICard | None = None
        self._proxy: ProxyClient | None = None
        self._running = False
        self._tasks: list[asyncio.Task[Any]] = []

    @property
    def worker_id(self) -> UUID | None:
        """Get worker ID from card."""
        return self.card.worker_id if self.card else None

    @property
    def worker_id_str(self) -> str:
        """Get worker ID as string."""
        return str(self.worker_id) if self.worker_id else "uninitialized"

    async def initialize(self) -> None:
        """Initialize worker by parsing target API."""
        logger.info("Initializing worker", target_url=self.target_url)

        self.card = await parse_openapi_to_card(
            self.target_url,
            self.settings.openapi_path,
        )

        logger.info(
            "Worker initialized",
            worker_id=self.worker_id_str,
            service_name=self.card.service_name,
            endpoint_count=len(self.card.endpoints),
        )

    async def start(self) -> None:
        """Start the worker and connect to mesh."""
        if not self.card:
            await self.initialize()

        self._running = True
        self._proxy = ProxyClient(self.target_url)
        await self._proxy.__aenter__()

        # Start background tasks
        self._tasks = [
            asyncio.create_task(self._mesh_loop()),
            asyncio.create_task(self._heartbeat_loop()),
        ]

        logger.info("Worker started", worker_id=self.worker_id_str)

    async def stop(self) -> None:
        """Stop the worker gracefully."""
        self._running = False

        # Cancel background tasks
        for task in self._tasks:
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass

        # Close proxy client
        if self._proxy:
            await self._proxy.__aexit__(None, None, None)
            self._proxy = None

        logger.info("Worker stopped", worker_id=self.worker_id_str)

    async def _mesh_loop(self) -> None:
        """Main loop for mesh WebSocket communication."""
        gateway_url = self.settings.gateway_ws_url

        while self._running:
            try:
                async with ws_connect(gateway_url) as websocket:
                    logger.info("Connected to gateway", url=gateway_url)

                    # Announce ourselves
                    await self._send_announce(websocket)

                    # Listen for messages
                    async for raw_message in websocket:
                        if not self._running:
                            break
                        await self._handle_message(websocket, raw_message)

            except ConnectionClosed:
                logger.warning("Gateway connection closed, reconnecting...")
            except Exception as e:
                logger.error("Mesh connection error", error=str(e))

            if self._running:
                await asyncio.sleep(self.settings.reconnect_delay)

    async def _heartbeat_loop(self) -> None:
        """Periodic heartbeat broadcast."""
        while self._running:
            await asyncio.sleep(self.settings.heartbeat_interval)
            if self.card:
                self.card.refresh_heartbeat()

    async def _send_announce(self, websocket: Any) -> None:
        """Send announcement with our API Card."""
        if not self.card:
            return

        message = WSMessage(
            type=MessageType.ANNOUNCE,
            source=self.worker_id_str,
            target="*",
            payload={"card": self.card.model_dump(mode="json")},
        )

        await websocket.send(message.model_dump_json())
        logger.debug("Sent announce", service=self.card.service_name)

    async def _handle_message(self, websocket: Any, raw: str | bytes) -> None:
        """Handle incoming WebSocket message."""
        try:
            data = raw if isinstance(raw, str) else raw.decode()
            message = WSMessage.model_validate_json(data)
        except Exception as e:
            logger.warning("Invalid message received", error=str(e))
            return

        # Check if message is for us
        if message.target not in (self.worker_id_str, self.card.service_name if self.card else "", "*"):
            return

        logger.debug(
            "Received message",
            type=message.type,
            source=message.source,
        )

        match message.type:
            case MessageType.REQUEST:
                await self._handle_request(websocket, message)
            case MessageType.DISCOVER:
                await self._send_announce(websocket)
            case MessageType.HEARTBEAT:
                pass  # Just acknowledge we're alive
            case _:
                logger.debug("Ignoring message type", type=message.type)

    async def _handle_request(self, websocket: Any, message: WSMessage) -> None:
        """Handle an API request message."""
        if not self._proxy:
            return

        try:
            # Parse request from payload
            request = APIRequest.model_validate(message.payload.get("request", {}))

            # Proxy to target API
            response = await self._proxy.request(request)

            # Send response
            response_message = message.create_response(
                source=self.worker_id_str,
                payload={"response": response.model_dump(mode="json")},
            )

        except Exception as e:
            logger.error("Request handling error", error=str(e))
            error = ErrorPayload(
                code="PROXY_ERROR",
                message=str(e),
            )
            response_message = message.create_response(
                source=self.worker_id_str,
                payload={"error": error.model_dump()},
                error=True,
            )

        await websocket.send(response_message.model_dump_json())

Discovery Implementation

src/apimesh/discovery/broadcast.py

"""UDP broadcast discovery for LAN worker discovery."""

import asyncio
import json
import socket
from typing import Any

import structlog

from apimesh.models.card import APICard

logger = structlog.get_logger()

BROADCAST_PORT = 5678
BROADCAST_ADDR = "255.255.255.255"
BUFFER_SIZE = 65535


class BroadcastDiscovery:
    """UDP broadcast-based discovery for LAN environments.

    Workers broadcast their API Cards periodically.
    Other workers listen and update their registries.
    """

    def __init__(
        self,
        port: int = BROADCAST_PORT,
        broadcast_addr: str = BROADCAST_ADDR,
    ) -> None:
        self.port = port
        self.broadcast_addr = broadcast_addr
        self._socket: socket.socket | None = None
        self._running = False

    def _create_socket(self) -> socket.socket:
        """Create and configure broadcast socket."""
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        # Allow receiving our own broadcasts (for testing)
        try:
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
        except AttributeError:
            pass  # Not available on all platforms

        sock.setblocking(False)
        return sock

    async def start_listening(
        self,
        callback: Any,  # Callable[[APICard], Awaitable[None]]
    ) -> None:
        """Start listening for broadcast announcements.

        Args:
            callback: Async function called with each discovered APICard
        """
        self._socket = self._create_socket()
        self._socket.bind(("", self.port))
        self._running = True

        logger.info("Discovery listening", port=self.port)

        loop = asyncio.get_event_loop()

        while self._running:
            try:
                # Non-blocking receive
                data, addr = await loop.run_in_executor(
                    None,
                    lambda: self._socket.recvfrom(BUFFER_SIZE) if self._socket else (b"", ("", 0)),
                )

                if data:
                    await self._handle_broadcast(data, addr, callback)

            except BlockingIOError:
                await asyncio.sleep(0.1)
            except Exception as e:
                if self._running:
                    logger.error("Discovery receive error", error=str(e))
                    await asyncio.sleep(1.0)

    async def _handle_broadcast(
        self,
        data: bytes,
        addr: tuple[str, int],
        callback: Any,
    ) -> None:
        """Process received broadcast data."""
        try:
            payload = json.loads(data.decode())
            card = APICard.model_validate(payload)

            logger.debug(
                "Discovered service",
                service=card.service_name,
                from_addr=addr[0],
            )

            await callback(card)

        except Exception as e:
            logger.warning(
                "Invalid broadcast data",
                error=str(e),
                from_addr=addr[0],
            )

    async def broadcast(self, card: APICard) -> None:
        """Broadcast our API Card."""
        if not self._socket:
            self._socket = self._create_socket()

        data = card.model_dump_json().encode()

        try:
            self._socket.sendto(data, (self.broadcast_addr, self.port))
            logger.debug("Broadcast sent", service=card.service_name)
        except Exception as e:
            logger.error("Broadcast send error", error=str(e))

    def stop(self) -> None:
        """Stop discovery."""
        self._running = False
        if self._socket:
            self._socket.close()
            self._socket = None

Gateway Implementation

src/apimesh/gateway/server.py

"""FastAPI WebSocket Gateway for external access to the mesh."""

import asyncio
from contextlib import asynccontextmanager
from typing import Any
from uuid import uuid4

import structlog
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

from apimesh.config import GatewaySettings
from apimesh.gateway.router import MessageRouter
from apimesh.models.card import APICard
from apimesh.models.messages import MessageType, WSMessage
from apimesh.models.registry import ServiceRegistry

logger = structlog.get_logger()


class ConnectionManager:
    """Manages WebSocket connections for workers and clients."""

    def __init__(self) -> None:
        self.workers: dict[str, WebSocket] = {}  # worker_id -> websocket
        self.clients: dict[str, WebSocket] = {}  # client_id -> websocket
        self.pending: dict[str, asyncio.Future[WSMessage]] = {}  # correlation_id -> future

    async def connect_worker(self, websocket: WebSocket, worker_id: str) -> None:
        """Register a worker connection."""
        await websocket.accept()
        self.workers[worker_id] = websocket
        logger.info("Worker connected", worker_id=worker_id)

    async def connect_client(self, websocket: WebSocket) -> str:
        """Register a client connection."""
        await websocket.accept()
        client_id = str(uuid4())
        self.clients[client_id] = websocket
        logger.info("Client connected", client_id=client_id)
        return client_id

    def disconnect_worker(self, worker_id: str) -> None:
        """Remove a worker connection."""
        self.workers.pop(worker_id, None)
        logger.info("Worker disconnected", worker_id=worker_id)

    def disconnect_client(self, client_id: str) -> None:
        """Remove a client connection."""
        self.clients.pop(client_id, None)
        logger.info("Client disconnected", client_id=client_id)

    async def send_to_worker(self, worker_id: str, message: WSMessage) -> bool:
        """Send message to specific worker."""
        websocket = self.workers.get(worker_id)
        if not websocket:
            return False

        try:
            await websocket.send_text(message.model_dump_json())
            return True
        except Exception as e:
            logger.error("Failed to send to worker", worker_id=worker_id, error=str(e))
            return False

    async def send_to_client(self, client_id: str, message: WSMessage) -> bool:
        """Send message to specific client."""
        websocket = self.clients.get(client_id)
        if not websocket:
            return False

        try:
            await websocket.send_text(message.model_dump_json())
            return True
        except Exception as e:
            logger.error("Failed to send to client", client_id=client_id, error=str(e))
            return False

    async def broadcast_to_workers(self, message: WSMessage) -> int:
        """Broadcast message to all workers."""
        sent = 0
        for worker_id in list(self.workers.keys()):
            if await self.send_to_worker(worker_id, message):
                sent += 1
        return sent


def create_gateway(settings: GatewaySettings | None = None) -> FastAPI:
    """Create and configure the gateway FastAPI application."""
    settings = settings or GatewaySettings()
    registry = ServiceRegistry(ttl_seconds=settings.registry_ttl)
    connections = ConnectionManager()
    router = MessageRouter(registry, connections)

    @asynccontextmanager
    async def lifespan(app: FastAPI) -> Any:
        # Start cleanup task
        cleanup_task = asyncio.create_task(cleanup_loop())
        logger.info("Gateway started", host=settings.host, port=settings.port)
        yield
        cleanup_task.cancel()
        logger.info("Gateway stopped")

    async def cleanup_loop() -> None:
        """Periodic cleanup of stale registry entries."""
        while True:
            await asyncio.sleep(30)
            stale = registry.cleanup_stale()
            if stale:
                logger.info("Cleaned stale workers", count=len(stale))

    app = FastAPI(
        title="API Mesh Gateway",
        description="WebSocket gateway for the API Mesh network",
        version="0.1.0",
        lifespan=lifespan,
    )

    @app.get("/health")
    async def health() -> dict[str, str]:
        """Health check endpoint."""
        return {"status": "healthy"}

    @app.get("/services")
    async def list_services() -> dict[str, Any]:
        """List all registered services."""
        return {
            "services": registry.list_services(),
            "workers": registry.list_workers(),
        }

    @app.get("/services/{service_name}")
    async def get_service(service_name: str) -> dict[str, Any]:
        """Get details for a specific service."""
        cards = registry.get_by_name(service_name)
        return {
            "service_name": service_name,
            "instances": [card.model_dump(mode="json") for card in cards],
        }

    @app.websocket("/ws/worker")
    async def worker_websocket(websocket: WebSocket) -> None:
        """WebSocket endpoint for mesh workers."""
        worker_id: str | None = None

        try:
            # Wait for initial announce to get worker_id
            await websocket.accept()
            raw = await websocket.receive_text()
            message = WSMessage.model_validate_json(raw)

            if message.type != MessageType.ANNOUNCE:
                await websocket.close(code=4000, reason="Expected ANNOUNCE")
                return

            # Extract card and register
            card = APICard.model_validate(message.payload.get("card", {}))
            worker_id = str(card.worker_id)
            registry.register(card)
            connections.workers[worker_id] = websocket

            logger.info(
                "Worker registered",
                worker_id=worker_id,
                service=card.service_name,
            )

            # Handle messages
            while True:
                raw = await websocket.receive_text()
                await router.route_worker_message(raw, worker_id)

        except WebSocketDisconnect:
            pass
        except Exception as e:
            logger.error("Worker WebSocket error", error=str(e))
        finally:
            if worker_id:
                connections.disconnect_worker(worker_id)
                registry.unregister(worker_id)

    @app.websocket("/ws/client")
    async def client_websocket(websocket: WebSocket) -> None:
        """WebSocket endpoint for external clients."""
        client_id = await connections.connect_client(websocket)

        try:
            while True:
                raw = await websocket.receive_text()
                await router.route_client_message(raw, client_id)

        except WebSocketDisconnect:
            pass
        except Exception as e:
            logger.error("Client WebSocket error", error=str(e))
        finally:
            connections.disconnect_client(client_id)

    # Store references for access
    app.state.registry = registry
    app.state.connections = connections
    app.state.router = router

    return app

src/apimesh/gateway/router.py

"""Message routing logic for the gateway."""

import asyncio
from typing import TYPE_CHECKING

import structlog

from apimesh.models.card import APICard
from apimesh.models.messages import ErrorPayload, MessageType, WSMessage
from apimesh.models.registry import ServiceRegistry

if TYPE_CHECKING:
    from apimesh.gateway.server import ConnectionManager

logger = structlog.get_logger()


class MessageRouter:
    """Routes messages between workers and clients."""

    def __init__(
        self,
        registry: ServiceRegistry,
        connections: "ConnectionManager",
    ) -> None:
        self.registry = registry
        self.connections = connections
        self._pending: dict[str, tuple[str, asyncio.Future[WSMessage]]] = {}

    async def route_worker_message(self, raw: str, source_worker_id: str) -> None:
        """Route a message from a worker."""
        try:
            message = WSMessage.model_validate_json(raw)
        except Exception as e:
            logger.warning("Invalid worker message", error=str(e))
            return

        match message.type:
            case MessageType.ANNOUNCE:
                card = APICard.model_validate(message.payload.get("card", {}))
                self.registry.register(card)
                logger.debug("Worker announced", service=card.service_name)

            case MessageType.RESPONSE | MessageType.ERROR:
                # Route response back to waiting client
                if message.correlation_id:
                    await self._handle_response(message)

            case MessageType.HEARTBEAT:
                # Update registry heartbeat
                card = self.registry.get_by_id(source_worker_id)
                if card:
                    card.refresh_heartbeat()

            case _:
                logger.debug("Unhandled worker message type", type=message.type)

    async def route_client_message(self, raw: str, client_id: str) -> None:
        """Route a message from a client."""
        try:
            message = WSMessage.model_validate_json(raw)
        except Exception as e:
            logger.warning("Invalid client message", error=str(e))
            return

        if message.type != MessageType.REQUEST:
            logger.debug("Ignoring non-request from client", type=message.type)
            return

        # Find target worker
        target = message.target
        card: APICard | None = None

        # Try as worker_id first, then as service name
        card = self.registry.get_by_id(target)
        if not card:
            card = self.registry.get_one_by_name(target)

        if not card:
            # Send error back to client
            error_response = message.create_response(
                source="gateway",
                payload={
                    "error": ErrorPayload(
                        code="SERVICE_NOT_FOUND",
                        message=f"No service found: {target}",
                    ).model_dump()
                },
                error=True,
            )
            await self.connections.send_to_client(client_id, error_response)
            return

        worker_id = str(card.worker_id)

        # Store pending request for response routing
        future: asyncio.Future[WSMessage] = asyncio.Future()
        self._pending[str(message.id)] = (client_id, future)

        # Forward to worker
        message.target = worker_id
        sent = await self.connections.send_to_worker(worker_id, message)

        if not sent:
            # Worker disconnected
            self._pending.pop(str(message.id), None)
            error_response = message.create_response(
                source="gateway",
                payload={
                    "error": ErrorPayload(
                        code="WORKER_UNAVAILABLE",
                        message=f"Worker unavailable: {worker_id}",
                    ).model_dump()
                },
                error=True,
            )
            await self.connections.send_to_client(client_id, error_response)

    async def _handle_response(self, message: WSMessage) -> None:
        """Handle a response message from a worker."""
        correlation_id = str(message.correlation_id) if message.correlation_id else None
        if not correlation_id:
            return

        pending = self._pending.pop(correlation_id, None)
        if not pending:
            logger.warning("No pending request for response", correlation_id=correlation_id)
            return

        client_id, _ = pending

        # Forward response to client
        await self.connections.send_to_client(client_id, message)

Configuration

src/apimesh/config.py

"""Configuration management for API Mesh components."""

from pydantic_settings import BaseSettings, SettingsConfigDict


class WorkerSettings(BaseSettings):
    """Configuration for API Workers."""

    model_config = SettingsConfigDict(
        env_prefix="APIMESH_WORKER_",
        env_file=".env",
    )

    # Target API
    openapi_path: str = "/openapi.json"

    # Gateway connection
    gateway_ws_url: str = "ws://localhost:8080/ws/worker"

    # Timing
    heartbeat_interval: float = 30.0
    reconnect_delay: float = 5.0


class GatewaySettings(BaseSettings):
    """Configuration for the Gateway server."""

    model_config = SettingsConfigDict(
        env_prefix="APIMESH_GATEWAY_",
        env_file=".env",
    )

    # Server
    host: str = "0.0.0.0"
    port: int = 8080

    # Registry
    registry_ttl: int = 90  # Seconds before stale workers removed


class DiscoverySettings(BaseSettings):
    """Configuration for UDP discovery."""

    model_config = SettingsConfigDict(
        env_prefix="APIMESH_DISCOVERY_",
        env_file=".env",
    )

    port: int = 5678
    broadcast_addr: str = "255.255.255.255"
    announce_interval: float = 30.0

CLI

src/apimesh/cli.py

"""Command-line interface for API Mesh."""

import asyncio

import structlog
import typer
import uvicorn

from apimesh.config import GatewaySettings, WorkerSettings
from apimesh.gateway.server import create_gateway
from apimesh.worker.worker import APIWorker

app = typer.Typer(
    name="apimesh",
    help="Distributed WebSocket proxy network for FastAPI services",
)

# Configure structlog
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.dev.ConsoleRenderer(),
    ],
    wrapper_class=structlog.stdlib.BoundLogger,
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)


@app.command()
def gateway(
    host: str = typer.Option("0.0.0.0", "--host", "-h", help="Host to bind to"),
    port: int = typer.Option(8080, "--port", "-p", help="Port to bind to"),
) -> None:
    """Start the API Mesh Gateway server."""
    settings = GatewaySettings(host=host, port=port)
    gateway_app = create_gateway(settings)

    typer.echo(f"Starting Gateway on {host}:{port}")
    uvicorn.run(gateway_app, host=host, port=port)


@app.command()
def worker(
    target: str = typer.Argument(..., help="Target API URL (e.g., http://localhost:8000)"),
    gateway_url: str = typer.Option(
        "ws://localhost:8080/ws/worker",
        "--gateway",
        "-g",
        help="Gateway WebSocket URL",
    ),
) -> None:
    """Start an API Worker wrapping a target FastAPI service."""
    settings = WorkerSettings(gateway_ws_url=gateway_url)
    api_worker = APIWorker(target, settings)

    async def run() -> None:
        await api_worker.start()
        try:
            # Keep running until interrupted
            while True:
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            pass
        finally:
            await api_worker.stop()

    typer.echo(f"Starting Worker for {target}")
    try:
        asyncio.run(run())
    except KeyboardInterrupt:
        typer.echo("\nWorker stopped")


@app.command()
def inspect(
    target: str = typer.Argument(..., help="Target API URL to inspect"),
) -> None:
    """Inspect a FastAPI service's OpenAPI spec."""
    from rich.console import Console
    from rich.table import Table

    from apimesh.worker.parser import parse_openapi_to_card

    console = Console()

    async def run() -> None:
        card = await parse_openapi_to_card(target)

        console.print(f"\n[bold]Service:[/bold] {card.service_name} v{card.service_version}")
        console.print(f"[bold]Base URL:[/bold] {card.base_url}")
        console.print(f"[bold]Tags:[/bold] {', '.join(card.tags) or 'None'}")

        table = Table(title="\nEndpoints")
        table.add_column("Method", style="cyan")
        table.add_column("Path", style="green")
        table.add_column("Operation ID")
        table.add_column("Summary")

        for endpoint in card.endpoints:
            table.add_row(
                endpoint.method,
                endpoint.path,
                endpoint.operation_id or "-",
                endpoint.summary or "-",
            )

        console.print(table)

    asyncio.run(run())


if __name__ == "__main__":
    app()

Package Init Files

src/apimesh/init.py

"""API Mesh - Distributed WebSocket proxy network for FastAPI services."""

from apimesh.config import GatewaySettings, WorkerSettings
from apimesh.gateway.server import create_gateway
from apimesh.models.card import APICard, EndpointSpec
from apimesh.models.messages import APIRequest, APIResponse, MessageType, WSMessage
from apimesh.models.registry import ServiceRegistry
from apimesh.worker.worker import APIWorker

__version__ = "0.1.0"

__all__ = [
    # Core classes
    "APIWorker",
    "create_gateway",
    # Models
    "APICard",
    "EndpointSpec",
    "WSMessage",
    "APIRequest",
    "APIResponse",
    "MessageType",
    "ServiceRegistry",
    # Config
    "WorkerSettings",
    "GatewaySettings",
]

src/apimesh/models/init.py

"""API Mesh data models."""

from apimesh.models.card import APICard, EndpointSpec, ParameterSpec
from apimesh.models.messages import (
    APIRequest,
    APIResponse,
    ErrorPayload,
    MessageType,
    WSMessage,
)
from apimesh.models.registry import ServiceRegistry

__all__ = [
    "APICard",
    "EndpointSpec",
    "ParameterSpec",
    "WSMessage",
    "APIRequest",
    "APIResponse",
    "ErrorPayload",
    "MessageType",
    "ServiceRegistry",
]

src/apimesh/worker/init.py

"""API Mesh worker components."""

from apimesh.worker.parser import parse_openapi_to_card
from apimesh.worker.proxy import ProxyClient, proxy_request
from apimesh.worker.worker import APIWorker

__all__ = [
    "APIWorker",
    "parse_openapi_to_card",
    "ProxyClient",
    "proxy_request",
]

src/apimesh/gateway/init.py

"""API Mesh gateway components."""

from apimesh.gateway.router import MessageRouter
from apimesh.gateway.server import ConnectionManager, create_gateway

__all__ = [
    "create_gateway",
    "ConnectionManager",
    "MessageRouter",
]

src/apimesh/discovery/init.py

"""API Mesh discovery components."""

from apimesh.discovery.broadcast import BroadcastDiscovery

__all__ = [
    "BroadcastDiscovery",
]

src/apimesh/network/init.py

"""API Mesh network components."""

Example Usage

examples/sample_api/main.py

"""Sample FastAPI application for testing API Mesh."""

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI(
    title="Sample Users API",
    description="A sample API for testing API Mesh",
    version="1.0.0",
)


class User(BaseModel):
    id: int
    name: str
    email: str


class UserCreate(BaseModel):
    name: str
    email: str


# In-memory storage
users_db: dict[int, User] = {
    1: User(id=1, name="Alice", email="alice@example.com"),
    2: User(id=2, name="Bob", email="bob@example.com"),
}
next_id = 3


@app.get("/users", tags=["users"])
async def list_users() -> list[User]:
    """List all users."""
    return list(users_db.values())


@app.get("/users/{user_id}", tags=["users"])
async def get_user(user_id: int) -> User:
    """Get a specific user by ID."""
    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="User not found")
    return users_db[user_id]


@app.post("/users", tags=["users"], status_code=201)
async def create_user(user: UserCreate) -> User:
    """Create a new user."""
    global next_id
    new_user = User(id=next_id, name=user.name, email=user.email)
    users_db[next_id] = new_user
    next_id += 1
    return new_user


@app.delete("/users/{user_id}", tags=["users"])
async def delete_user(user_id: int) -> dict[str, str]:
    """Delete a user."""
    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="User not found")
    del users_db[user_id]
    return {"status": "deleted"}


@app.get("/health")
async def health() -> dict[str, str]:
    """Health check endpoint."""
    return {"status": "healthy"}


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)

examples/run_worker.py

"""Example script to run an API Worker."""

import asyncio

from apimesh import APIWorker, WorkerSettings


async def main() -> None:
    """Run a worker wrapping the sample API."""
    settings = WorkerSettings(
        gateway_ws_url="ws://localhost:8080/ws/worker",
    )

    worker = APIWorker(
        target_url="http://localhost:8000",
        settings=settings,
    )

    await worker.start()

    print(f"Worker started: {worker.worker_id}")
    print(f"Service: {worker.card.service_name if worker.card else 'unknown'}")

    try:
        # Keep running
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        print("\nShutting down...")
    finally:
        await worker.stop()


if __name__ == "__main__":
    asyncio.run(main())

examples/run_gateway.py

"""Example script to run the Gateway server."""

import uvicorn

from apimesh import GatewaySettings, create_gateway


def main() -> None:
    """Run the gateway server."""
    settings = GatewaySettings(
        host="0.0.0.0",
        port=8080,
    )

    app = create_gateway(settings)

    print(f"Starting Gateway on {settings.host}:{settings.port}")
    uvicorn.run(app, host=settings.host, port=settings.port)


if __name__ == "__main__":
    main()

examples/client_example.py

"""Example WebSocket client for API Mesh."""

import asyncio
import json

from websockets.client import connect as ws_connect

from apimesh.models.messages import APIRequest, MessageType, WSMessage


async def main() -> None:
    """Connect as a client and make requests through the mesh."""
    gateway_url = "ws://localhost:8080/ws/client"

    async with ws_connect(gateway_url) as websocket:
        print("Connected to gateway")

        # Create a request to list users
        request = APIRequest(
            method="GET",
            path="/users",
        )

        message = WSMessage(
            type=MessageType.REQUEST,
            source="example-client",
            target="sample-users-api",  # Service name from OpenAPI spec
            payload={"request": request.model_dump(mode="json")},
        )

        # Send request
        await websocket.send(message.model_dump_json())
        print(f"Sent request: {message.type}")

        # Wait for response
        raw = await websocket.recv()
        response = WSMessage.model_validate_json(raw)

        print(f"Response type: {response.type}")
        print(f"Response payload: {json.dumps(response.payload, indent=2)}")


if __name__ == "__main__":
    asyncio.run(main())

Tests

tests/conftest.py

"""Pytest configuration and fixtures."""

import pytest
from httpx import ASGITransport, AsyncClient

from apimesh.gateway.server import create_gateway


@pytest.fixture
def gateway_app():
    """Create a gateway app for testing."""
    return create_gateway()


@pytest.fixture
async def gateway_client(gateway_app):
    """Create an async test client for the gateway."""
    transport = ASGITransport(app=gateway_app)
    async with AsyncClient(transport=transport, base_url="http://test") as client:
        yield client

tests/test_models/test_card.py

"""Tests for API Card models."""

from apimesh.models.card import APICard, EndpointSpec


def test_api_card_creation():
    """Test basic APICard creation."""
    card = APICard(
        service_name="test-api",
        base_url="http://localhost:8000",
        openapi_url="http://localhost:8000/openapi.json",
    )

    assert card.service_name == "test-api"
    assert card.worker_id is not None
    assert card.heartbeat is not None


def test_endpoint_lookup():
    """Test endpoint lookup methods."""
    endpoint = EndpointSpec(
        path="/users",
        method="GET",
        operation_id="list_users",
    )

    card = APICard(
        service_name="test-api",
        base_url="http://localhost:8000",
        openapi_url="http://localhost:8000/openapi.json",
        endpoints=[endpoint],
    )

    assert card.has_endpoint("GET", "/users")
    assert not card.has_endpoint("POST", "/users")

    found = card.get_endpoint("GET", "/users")
    assert found is not None
    assert found.operation_id == "list_users"

tests/test_models/test_messages.py

"""Tests for message protocol models."""

from apimesh.models.messages import APIRequest, MessageType, WSMessage


def test_message_creation():
    """Test WSMessage creation with defaults."""
    msg = WSMessage(
        type=MessageType.REQUEST,
        source="worker-1",
        target="worker-2",
    )

    assert msg.id is not None
    assert msg.timestamp is not None
    assert msg.payload == {}


def test_create_response():
    """Test creating a response to a message."""
    request = WSMessage(
        type=MessageType.REQUEST,
        source="client",
        target="worker",
        payload={"data": "test"},
    )

    response = request.create_response(
        source="worker",
        payload={"result": "success"},
    )

    assert response.type == MessageType.RESPONSE
    assert response.source == "worker"
    assert response.target == "client"
    assert response.correlation_id == request.id


def test_api_request_serialization():
    """Test APIRequest model."""
    request = APIRequest(
        method="POST",
        path="/users/{id}",
        path_params={"id": 123},
        body={"name": "Test"},
    )

    data = request.model_dump()
    assert data["method"] == "POST"
    assert data["path_params"]["id"] == 123

Quick Start

# Terminal 1: Start the sample API
cd examples/sample_api
uvicorn main:app --port 8000

# Terminal 2: Start the Gateway
apimesh gateway --port 8080

# Terminal 3: Start a Worker
apimesh worker http://localhost:8000 --gateway ws://localhost:8080/ws/worker

# Terminal 4: Inspect the wrapped API
apimesh inspect http://localhost:8000

# Terminal 5: Run client example
python examples/client_example.py

Architecture Diagram

                                    ┌─────────────────────────┐
                                    │    External Clients     │
                                    │  (Web/Mobile/Services)  │
                                    └───────────┬─────────────┘
                                                │ WebSocket
                                    ┌───────────▼─────────────┐
                                    │        GATEWAY          │
                                    │  ───────────────────    │
                                    │  • Client connections   │
                                    │  • Service registry     │
                                    │  • Message routing      │
                                    │  • REST API for status  │
                                    └───────────┬─────────────┘
                    ┌───────────────────────────┼───────────────────────────┐
                    │                           │                           │
        ┌───────────▼───────────┐   ┌───────────▼───────────┐   ┌───────────▼───────────┐
        │     API WORKER        │   │     API WORKER        │   │     API WORKER        │
        │  ─────────────────    │   │  ─────────────────    │   │  ─────────────────    │
        │  Wraps: Users API     │   │  Wraps: Orders API    │   │  Wraps: Auth API      │
        │  ID: worker-abc       │   │  ID: worker-def       │   │  ID: worker-ghi       │
        └───────────┬───────────┘   └───────────┬───────────┘   └───────────┬───────────┘
                    │ HTTP                      │ HTTP                      │ HTTP
        ┌───────────▼───────────┐   ┌───────────▼───────────┐   ┌───────────▼───────────┐
        │    FastAPI Service    │   │    FastAPI Service    │   │    FastAPI Service    │
        │    localhost:8001     │   │    localhost:8002     │   │    localhost:8003     │
        └───────────────────────┘   └───────────────────────┘   └───────────────────────┘

Next Steps / Roadmap

  1. Load Balancing: Add round-robin or least-connections routing when multiple workers serve the same API
  2. Authentication: Pass-through auth headers, or implement mesh-level auth
  3. Request Validation: Validate requests against OpenAPI schemas before proxying
  4. Metrics: Add Prometheus metrics for request counts, latencies, errors
  5. Distributed Discovery: Replace UDP broadcast with Redis pub/sub or etcd for multi-network support
  6. Circuit Breaker: Implement circuit breaker pattern for failing services
  7. Request Tracing: Add distributed tracing with OpenTelemetry