Skip to content

Four-Layer Design

Astromesh is organized into four distinct layers, each with a clear boundary and responsibility. This page walks through every layer in detail — the modules it contains, the interfaces it exposes, and how it connects to adjacent layers.

For a high-level overview, see Architecture Overview. For a request-level trace, see Agent Execution Pipeline.

Module: astromesh/api/

The API Layer is the entry point for all external communication. It accepts HTTP and WebSocket connections, validates input, and forwards requests to the Runtime Engine. It contains no business logic — its only job is protocol translation.

Module: astromesh/api/main.py + astromesh/api/routes/

The REST API is built with FastAPI and organized into route groups. Each route module exposes a router (FastAPI APIRouter) and a set_runtime(runtime) function that is called during bootstrap to inject the AgentRuntime instance.

Route GroupPrefixKey EndpointsDescription
Agents/v1/agentsGET /, GET /{name}, POST /{name}/runList, inspect, and execute agents
Memory/v1/memoryGET /{agent}/history/{session}, DELETE /{agent}/history/{session}, GET /{agent}/semanticQuery and manage conversation history and semantic memory
Tools/v1/toolsGET /, POST /executeList registered tools and execute them directly
RAG/v1/ragPOST /ingest, POST /queryIngest documents and query knowledge bases
Health/v1/healthGET /Health check and version information

The main entry point (main.py) creates the FastAPI application, runs AgentRuntime.bootstrap() on startup, and calls set_runtime() on each route module so they can access the runtime without global state.

Module: astromesh/api/ws.py

The WebSocket endpoint at /v1/ws/agent/{agent_name} provides real-time token streaming for agent responses. It uses a ConnectionManager that:

  • Tracks active WebSocket connections per agent
  • Accepts JSON messages with query and session_id fields
  • Streams partial responses as tokens are generated by the LLM provider
  • Handles disconnection and cleanup

Example client interaction:

// Client sends:
{"query": "Explain quantum computing", "session_id": "session-42"}
// Server streams back token-by-token:
{"token": "Quantum", "done": false}
{"token": " computing", "done": false}
{"token": " uses", "done": false}
// ... more tokens ...
{"token": "", "done": true, "response": "Quantum computing uses..."}

Module: astromesh/channels/

Channel adapters sit above the API Layer, bridging external messaging platforms to the Agent Runtime. They are configured in config/channels.yaml and each adapter handles:

  • Webhook verification — Platform-specific challenge/response (e.g., Meta’s hub.verify_token)
  • Message parsing — Extracting text from platform-specific webhook payloads
  • Signature validation — HMAC verification of incoming webhooks
  • Response formatting — Converting agent output to platform-specific API calls
  • Background execution — Running agent execution in background tasks so webhooks respond within platform timeouts (e.g., Meta requires a response within 5 seconds)

Each channel maps to a default_agent that handles all conversations on that channel.


Module: astromesh/runtime/engine.py

The Runtime Engine is the heart of Astromesh — the control plane that turns declarative YAML configuration into running agent instances. It has two primary responsibilities: bootstrapping and execution.

When the application starts, AgentRuntime.bootstrap() performs the following sequence:

config/agents/*.agent.yaml
┌─────────────────────────────────┐
│ 1. Scan config/agents/ dir │
│ 2. Parse each .agent.yaml │
│ 3. Validate against schema │
│ 4. For each agent definition: │
│ ├── Build ModelRouter │──► Wire primary + fallback providers
│ ├── Build MemoryManager │──► Configure backends + strategies
│ ├── Build ToolRegistry │──► Register tools (internal, MCP, webhook, RAG)
│ ├── Select Pattern │──► Instantiate orchestration pattern
│ ├── Build PromptEngine │──► Register Jinja2 templates
│ ├── Build GuardrailsEngine │──► Configure input/output rules
│ └── Create Agent instance │──► Fully wired, ready to execute
└─────────────────────────────────┘
agents: Dict[str, Agent] ← Lookup table by agent name

The bootstrap process reads all YAML files, validates them, and assembles fully wired Agent objects. Each agent gets its own set of dependencies (router, memory, tools, etc.) based on its configuration.

When a request arrives for a specific agent, AgentRuntime.run(agent_name, query, session_id) looks up the agent by name and delegates to Agent.run(), which executes the full pipeline (covered in detail in Agent Execution Pipeline).

Each Agent instance holds references to:

ComponentClassPurpose
Model RouterModelRouterSelect and call LLM providers with fallback
Memory ManagerMemoryManagerBuild context from conversation, semantic, and episodic memory
Tool RegistryToolRegistryDiscover, validate, and execute tools
Orchestration PatternOrchestrationPattern subclassControl the reasoning loop (ReAct, PlanAndExecute, etc.)
Prompt EnginePromptEngineRender Jinja2 system prompts with variable injection
Guardrails EngineGuardrailsEngineApply input/output safety rules

Core Services are the domain-specific building blocks that agents use during execution. Each service has a well-defined interface and delegates to Layer 4 for concrete implementations.

Module: astromesh/core/model_router.py

The Model Router manages multi-provider LLM inference. Given a completion request, it ranks available providers using the configured routing strategy, checks circuit breaker state, and tries providers in order until one succeeds.

┌─────────────┐
Request ──────► │ ModelRouter │
│ │
│ 1. Rank │──► Strategy-based ordering
│ 2. Try │──► Circuit breaker check
│ 3. Fallback │──► Next provider on failure
└──────┬──────┘
┌────────────┼────────────┐
▼ ▼ ▼
┌───────┐ ┌────────┐ ┌───────┐
│Ollama │ │ OpenAI │ │ vLLM │ ...
└───────┘ └────────┘ └───────┘
StrategyBehaviorBest For
cost_optimizedCheapest provider first, based on estimated_cost()Budget-sensitive workloads
latency_optimizedFastest provider first, using exponential moving average of response timesReal-time applications
quality_firstHighest quality score first (configured per provider)Tasks requiring maximum accuracy
round_robinRotate across providers evenlyLoad distribution
capability_matchFilter by required capabilities (tool calling, vision), then apply secondary strategyTasks needing specific features

The circuit breaker protects the system from repeatedly calling failing providers:

  • Closed (normal): Requests flow through. Failures are counted.
  • Open (tripped): After 3 consecutive failures, the circuit opens. All requests to that provider are immediately skipped for 60 seconds.
  • Half-open (testing): After the cooldown, one request is allowed through. If it succeeds, the circuit closes. If it fails, the circuit reopens.

When a provider’s circuit is open, the router automatically tries the next provider in the ranked list. If all providers are tripped, the request fails with a clear error.

Module: astromesh/core/memory.py

The Memory Manager handles three distinct types of memory, each with pluggable backends. It provides two key operations: build_context() (assemble memory for prompt construction) and persist_turn() (store conversation turns after execution).

┌──────────────────┐
│ MemoryManager │
│ │
│ build_context() │──► Assemble context from all memory types
│ persist_turn() │──► Store conversation turns
└──────┬───────────┘
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────────┐ ┌───────────┐ ┌───────────┐
│Conversational│ │ Semantic │ │ Episodic │
│ │ │ │ │ │
│Redis/PG/ │ │pgvector/ │ │PostgreSQL │
│SQLite │ │Chroma/ │ │ │
│ │ │Qdrant/ │ │ │
│ │ │FAISS │ │ │
└──────────────┘ └───────────┘ └───────────┘
TypePurposeBackend Options
ConversationalChat history — stores user and assistant messages for multi-turn conversationsRedis, PostgreSQL, SQLite
SemanticVector embeddings — stores and retrieves information by semantic similaritypgvector, ChromaDB, Qdrant, FAISS
EpisodicEvent logs — records significant events, tool calls, and outcomes for long-term learningPostgreSQL

Memory strategies control how conversational history is managed when it grows beyond what fits in the LLM’s context window:

StrategyDescriptionUse Case
sliding_windowKeep the last N turns, discard older onesSimple chatbots with short context needs
summaryCompress older turns into summaries using the LLM, keep recent turns verbatimLong-running conversations that need historical awareness
token_budgetFit as many recent turns as possible within a configured token limitMaximizing context usage without exceeding model limits

Module: astromesh/core/tools.py

The Tool Registry is the central authority for all tools an agent can use. It handles tool discovery, registration, permission checking, rate limiting, and schema generation for LLM function calling.

TypeSourceDescription
internalPython functionsRegistered directly in code. Fastest execution path.
mcpMCP serversDiscovered from external MCP servers via stdio, SSE, or HTTP transport. Tools are fetched at startup and cached.
webhookHTTP endpointsExternal REST APIs called via HTTP. Supports custom headers, authentication, and payload mapping.
ragRAG pipelineExposes a RAG pipeline as a callable tool, allowing agents to search knowledge bases during reasoning.
  • Schema generation — Automatically generates JSON Schema for each tool, compatible with the function calling format expected by LLM providers (OpenAI-style tools parameter).
  • Permission filtering — Each agent definition specifies which tools it has access to. The registry filters available tools per agent at execution time.
  • Rate limiting — Tools can have per-agent rate limits to prevent runaway tool calls during orchestration loops.

Module: astromesh/core/prompt_engine.py

The Prompt Engine renders Jinja2 templates into system prompts for LLM calls. Key features:

  • SilentUndefined — Missing template variables render as empty strings instead of raising errors. This allows prompts to gracefully handle optional context (e.g., no semantic memory results).
  • Template registration — Templates are registered during agent bootstrap from the prompts.system field in agent YAML.
  • Variable injection — At render time, the engine receives variables from the Memory Manager (conversation history, semantic results, episodic events) and injects them into the template.

Example system prompt template:

You are {{ agent_name }}, a {{ description }}.
{% if conversation_history %}
Previous conversation:
{% for msg in conversation_history %}
{{ msg.role }}: {{ msg.content }}
{% endfor %}
{% endif %}
{% if semantic_context %}
Relevant knowledge:
{{ semantic_context }}
{% endif %}

Module: astromesh/core/guardrails.py

The Guardrails Engine applies safety checks on both input (before the agent processes a query) and output (before the response is returned to the caller).

GuardrailDirectionDescription
pii_detectionInput & OutputDetects and redacts emails, phone numbers, SSNs, and credit card numbers using regex patterns
topic_filterInputBlocks messages matching forbidden topics defined in configuration
max_lengthInputEnforces maximum character limits on incoming queries
cost_limitOutputEnforces token-per-turn limits to prevent runaway LLM usage
content_filterOutputBlocks responses containing forbidden keywords or patterns

Guardrails run in the order they are defined in the agent’s YAML configuration. Each guardrail can either pass (allow the message through), redact (modify the message and continue), or block (reject the message with an error).


Infrastructure contains all concrete implementations of the interfaces defined in Layer 3. These are the adapters, drivers, and backends that do the actual work.

Module: astromesh/providers/

All providers implement ProviderProtocol, a runtime_checkable Python Protocol defined in astromesh/providers/base.py.

MethodReturn TypeDescription
complete(messages, **kwargs)CompletionResultSynchronous (non-streaming) completion
stream(messages, **kwargs)AsyncIterator[StreamChunk]Streaming completion, yielding tokens
health_check()boolCheck if the provider is reachable
supports_tools()boolWhether the provider supports function calling
supports_vision()boolWhether the provider supports image inputs
estimated_cost(tokens)floatEstimated cost for a given token count
avg_latency_msfloatExponential moving average of response latency
ProviderBackendEndpoint StyleNotes
OllamaProviderOllama/api/chatNative Ollama API format
OpenAICompatProviderOpenAI API/v1/chat/completionsWorks with any OpenAI-compatible API
VLLMProvidervLLMOpenAI-compatibleHigh-throughput serving with PagedAttention
LlamaCppProviderllama.cppOpenAI-compatibleCPU/GPU inference with GGUF models
HFTGIProviderHuggingFace TGIOpenAI-compatibleText Generation Inference server
ONNXProviderONNX RuntimeLocal inferenceIn-process inference, no network calls

Each provider tracks its own latency history and cost estimates, which the Model Router uses for routing decisions.

Module: astromesh/orchestration/

Orchestration patterns control how agents reason and use tools. Each pattern implements the OrchestrationPattern abstract base class with an execute() method that runs the reasoning loop.

PatternModuleDescriptionTypical Use Case
ReActpatterns.pyThink, Act, Observe loop. The LLM decides when to call tools and when to produce a final answer. Iterates up to max_iterations.General-purpose agents that need tools
PlanAndExecutepatterns.pyFirst, the LLM creates a plan (list of steps). Then, each step is executed sequentially.Multi-step tasks with clear decomposition
ParallelFanOutpatterns.pySend the same query to multiple sub-models simultaneously, merge results.Ensemble approaches, multi-perspective analysis
Pipelinepatterns.pyChain multiple processing steps sequentially, where each step’s output feeds the next.Data transformation, multi-stage processing
Supervisorsupervisor.pyA supervisor agent delegates sub-tasks to worker agents and combines their results.Complex tasks requiring specialized sub-agents
Swarmswarm.pyAgents hand off conversations to each other based on context. No central coordinator.Customer service routing, multi-domain conversations

Module: astromesh/rag/

The RAG (Retrieval-Augmented Generation) pipeline connects document ingestion to knowledge-augmented agent responses.

Ingestion Flow:
Documents → Chunking → Embedding → Vector Store
Query Flow:
Query → Embedding → Vector Search → Reranking → Top-K results

Each stage is pluggable:

StageOptionsModule
ChunkingFixed-size, Recursive (split on separators), Sentence-aware, Semantic (embedding-based boundaries)astromesh/rag/chunking/
EmbeddingsHuggingFace Inference API, SentenceTransformers (local), Ollamaastromesh/rag/embeddings/
Vector Storepgvector, ChromaDB, Qdrant, FAISSastromesh/rag/stores/
RerankingCross-encoder (local), Cohere Rerank APIastromesh/rag/reranking/

RAG pipelines are configured in config/rag/*.rag.yaml with apiVersion: astromesh/v1, kind: RAGPipeline.

Module: astromesh/mcp/

Astromesh supports the Model Context Protocol (MCP) in both directions:

Connects to external MCP servers to discover and invoke remote tools. Supports three transport mechanisms:

TransportProtocolUse Case
stdiostdin/stdout JSON-RPCLocal MCP servers, CLI tools
SSEServer-Sent Events over HTTPRemote MCP servers with streaming
HTTPStandard HTTP JSON-RPCRemote MCP servers, simple request/response

At startup, the MCP client connects to configured servers, fetches their tool manifests, and registers discovered tools in the Tool Registry. Tools are then available to agents just like internal tools.

Exposes Astromesh agents as MCP tools via a JSON-RPC endpoint at /mcp. This allows other MCP-compatible systems to call your Astromesh agents as if they were tools.

Module: astromesh/ml/

The ML Model Registry manages non-LLM machine learning models for tasks like classification, embedding generation, and custom inference:

  • Registry (model_registry.py) — Register, version, load, and serve ML models
  • Serving (serving/) — ONNX Runtime and PyTorch model servers for inference
  • Training (training/) — Classifier and embedding fine-tuning pipelines

Module: astromesh/observability/

The observability stack provides three pillars of monitoring:

Agent Execution ──► TelemetryManager ──► OpenTelemetry Collector ──► Jaeger/Zipkin
MetricsCollector ──► Prometheus ──► Grafana
CostTracker ──► Usage records + budget alerts
ComponentModuleDescription
TelemetryManagertelemetry.pyDistributed tracing with OpenTelemetry. Creates spans for each pipeline step (guardrails, memory, routing, tool calls). Falls back to _NoOpSpan when OpenTelemetry is not installed.
MetricsCollectormetrics.pyPrometheus metrics: request counts (by agent, status), latency histograms (by agent, provider), active agents gauge.
CostTrackercost_tracker.pyPer-provider usage records tracking token counts and estimated costs. Supports budget enforcement (alert or block when budget is exceeded) and grouped cost reports.

The following rules govern how layers interact:

  1. Layer 1 (API) calls only Layer 2 (Runtime). API routes call runtime.run() and nothing else.
  2. Layer 2 (Runtime) calls only Layer 3 (Core Services). The Agent orchestrates calls to ModelRouter, MemoryManager, ToolRegistry, PromptEngine, and GuardrailsEngine.
  3. Layer 3 (Core Services) calls only Layer 4 (Infrastructure). The ModelRouter calls providers. The MemoryManager calls memory backends. The ToolRegistry calls tool implementations.
  4. Layer 4 (Infrastructure) calls only external systems (LLM endpoints, databases, vector stores, APIs).

No layer ever skips a level. The API Layer never directly calls a provider. The Runtime Engine never directly accesses a database. This strict layering makes the system testable (each layer can be tested in isolation with mocked dependencies) and maintainable (changes to infrastructure never ripple up to the API).