feat: support StreamableHTTPClientTransport (#64)

Co-authored-by: Abhilash Panigrahi <pabhila@amazon.com>
Co-authored-by: Ben Burns <803016+benjamincburns@users.noreply.github.com>
This commit is contained in:
Abhilash Panigrahi
2025-05-13 13:00:39 -07:00
committed by GitHub
parent 03172e3fcd
commit 68f130e1d0
13 changed files with 1451 additions and 414 deletions
+2 -1
View File
@@ -5,4 +5,5 @@ index.d.cts
node_modules
dist
.yarn
.env
.env
.eslintcache
+18 -12
View File
@@ -9,7 +9,8 @@ This library provides a lightweight wrapper that makes [Anthropic Model Context
- 🔌 **Transport Options**
- Connect to MCP servers via stdio (local) or SSE (remote)
- Connect to MCP servers via stdio (local) or Streamable HTTP (remote)
- Streamable HTTP automatically falls back to SSE for compatibility with legacy MCP server implementations
- Support for custom headers in SSE connections for authentication
- Configurable reconnection strategies for both transport types
@@ -38,13 +39,13 @@ npm install @langchain/mcp-adapters
### Optional Dependencies
For SSE connections with custom headers in Node.js:
For SSE connections with custom headers in Node.js (does not apply to Streamable HTTP):
```bash
npm install eventsource
```
For enhanced SSE header support:
For enhanced SSE header support (does not apply to Streamable HTTP):
```bash
npm install extended-eventsource
@@ -155,14 +156,19 @@ const client = new MultiServerMCPClient({
args: ["-y", "@modelcontextprotocol/server-filesystem"],
},
// SSE transport example with reconnection configuration
// Sreamable HTTP transport example, with auth headers and automatic SSE fallback disabled (defaults to enabled)
weather: {
transport: "sse",
url: "https://example.com/mcp-weather",
url: "https://example.com/weather/mcp",
headers: {
Authorization: "Bearer token123",
},
useNodeEventSource: true,
}
automaticSSEFallback: false
},
// how to force SSE, for old servers that are known to only support SSE (streamable HTTP falls back automatically if unsure)
github: {
transport: "sse", // also works with "type" field instead of "transport"
url: "https://example.com/mcp",
reconnect: {
enabled: true,
maxAttempts: 5,
@@ -212,8 +218,8 @@ When loading MCP tools either directly through `loadMcpTools` or via `MultiServe
| Option | Type | Default | Description |
| ------------------------------ | ------- | ------- | ------------------------------------------------------------------------------------ |
| `throwOnLoadError` | boolean | `true` | Whether to throw an error if a tool fails to load |
| `prefixToolNameWithServerName` | boolean | `false` | If true, prefixes all tool names with the server name (e.g., `serverName__toolName`) |
| `additionalToolNamePrefix` | string | `""` | Additional prefix to add to tool names (e.g., `prefix__serverName__toolName`) |
| `prefixToolNameWithServerName` | boolean | `true` | If true, prefixes all tool names with the server name (e.g., `serverName__toolName`) |
| `additionalToolNamePrefix` | string | `mcp` | Additional prefix to add to tool names (e.g., `prefix__serverName__toolName`) |
## Response Handling
@@ -361,8 +367,8 @@ Example Zod error for an invalid SSE URL:
When using in browsers:
- Native EventSource API doesn't support custom headers
- Consider using a proxy or pass authentication via query parameters
- EventSource API doesn't support custom headers for SSE
- Consider using a proxy or pass authentication via query parameters to avoid leaking credentials to client
- May require CORS configuration on the server side
## Troubleshooting
+139
View File
@@ -21,6 +21,9 @@ const { StdioClientTransport } = await import(
const { SSEClientTransport } = await import(
"@modelcontextprotocol/sdk/client/sse.js"
);
const { StreamableHTTPClientTransport } = await import(
"@modelcontextprotocol/sdk/client/streamableHttp.js"
);
describe("MultiServerMCPClient", () => {
// Setup and teardown
@@ -63,6 +66,17 @@ describe("MultiServerMCPClient", () => {
// Additional assertions to verify the connection was processed correctly
});
test("should process valid streamable HTTP connection config", () => {
const client = new MultiServerMCPClient({
"test-server": {
transport: "http",
url: "http://localhost:8000/mcp",
},
});
expect(client).toBeDefined();
// Additional assertions to verify the connection was processed correctly
});
test("should have a compile time error and a runtime error when the config is invalid", () => {
expect(() => {
// eslint-disable-next-line no-new
@@ -93,6 +107,7 @@ describe("MultiServerMCPClient", () => {
command: "python",
args: ["./script.py"],
env: undefined,
stderr: "inherit",
});
expect(Client).toHaveBeenCalled();
@@ -116,6 +131,24 @@ describe("MultiServerMCPClient", () => {
expect(Client.prototype.listTools).toHaveBeenCalled();
});
test("should initialize streamable HTTP connections correctly", async () => {
const client = new MultiServerMCPClient({
"test-server": {
transport: "http",
url: "http://localhost:8000/mcp",
},
});
await client.initializeConnections();
expect(StreamableHTTPClientTransport).toHaveBeenCalledWith(
new URL("http://localhost:8000/mcp")
);
expect(Client).toHaveBeenCalled();
expect(Client.prototype.connect).toHaveBeenCalled();
expect(Client.prototype.listTools).toHaveBeenCalled();
});
test("should throw on connection failure", async () => {
(Client as Mock).mockImplementationOnce(() => ({
connect: vi
@@ -307,6 +340,10 @@ describe("MultiServerMCPClient", () => {
transport: "sse",
url: "http://localhost:8000/sse",
},
server3: {
transport: "http",
url: "http://localhost:8000/mcp",
},
});
await client.initializeConnections();
@@ -315,6 +352,7 @@ describe("MultiServerMCPClient", () => {
// Verify that all transports were closed using the mock functions directly
expect(StdioClientTransport.prototype.close).toHaveBeenCalled();
expect(SSEClientTransport.prototype.close).toHaveBeenCalled();
expect(StreamableHTTPClientTransport.prototype.close).toHaveBeenCalled();
});
test("should handle errors during cleanup gracefully", async () => {
@@ -341,4 +379,105 @@ describe("MultiServerMCPClient", () => {
expect(closeMock).toHaveBeenCalledOnce();
});
});
// Streamable HTTP specific tests
describe("streamable HTTP transport", () => {
test("should throw when streamable HTTP config is missing required fields", () => {
expect(() => {
// eslint-disable-next-line no-new
new MultiServerMCPClient({
// @ts-expect-error missing url field
"test-server": {
transport: "http",
// Missing url field
},
});
}).toThrow(ZodError);
});
test("should throw when streamable HTTP URL is invalid", () => {
expect(() => {
// eslint-disable-next-line no-new
new MultiServerMCPClient({
"test-server": {
transport: "http",
url: "invalid-url", // Invalid URL format
},
});
}).toThrow(ZodError);
});
test("should handle mixed transport types including streamable HTTP", async () => {
const client = new MultiServerMCPClient({
"stdio-server": {
transport: "stdio",
command: "python",
args: ["./script.py"],
},
"sse-server": {
transport: "sse",
url: "http://localhost:8000/sse",
},
"streamable-server": {
transport: "http",
url: "http://localhost:8000/mcp",
},
});
await client.initializeConnections();
// Verify all transports were initialized
expect(StreamableHTTPClientTransport).toHaveBeenCalled();
expect(SSEClientTransport).toHaveBeenCalled();
expect(StdioClientTransport).toHaveBeenCalled();
// Get tools from all servers
const tools = await client.getTools();
expect(tools.length).toBeGreaterThan(0);
});
test("should throw on streamable HTTP connection failure", async () => {
(Client as Mock).mockImplementationOnce(() => ({
connect: vi
.fn()
.mockReturnValue(Promise.reject(new Error("Connection failed"))),
listTools: vi.fn().mockReturnValue(Promise.resolve({ tools: [] })),
}));
const client = new MultiServerMCPClient({
"test-server": {
transport: "http",
url: "http://localhost:8000/mcp",
},
});
await expect(() => client.initializeConnections()).rejects.toThrow(
MCPClientError
);
});
test("should handle errors during streamable HTTP cleanup gracefully", async () => {
const closeMock = vi
.fn()
.mockReturnValue(Promise.reject(new Error("Close failed")));
// Mock close to throw an error
(StreamableHTTPClientTransport as Mock).mockImplementationOnce(() => ({
close: closeMock,
connect: vi.fn().mockReturnValue(Promise.resolve()),
}));
const client = new MultiServerMCPClient({
"test-server": {
transport: "http",
url: "http://localhost:8000/mcp",
},
});
await client.initializeConnections();
await client.close();
expect(closeMock).toHaveBeenCalledOnce();
});
});
});
+54 -1
View File
@@ -1,6 +1,10 @@
import { vi, describe, test, expect, beforeEach, type Mock } from "vitest";
import { ZodError } from "zod";
import { Connection } from "../src/client.js";
import type {
ClientConfig,
Connection,
StdioConnection,
} from "../src/client.js";
import "./mocks.js";
@@ -11,6 +15,9 @@ const { StdioClientTransport } = await import(
const { SSEClientTransport } = await import(
"@modelcontextprotocol/sdk/client/sse.js"
);
const { StreamableHTTPClientTransport } = await import(
"@modelcontextprotocol/sdk/client/streamableHttp.js"
);
const { MultiServerMCPClient, MCPClientError } = await import(
"../src/client.js"
);
@@ -44,6 +51,23 @@ describe("MultiServerMCPClient", () => {
expect(Client).toHaveBeenCalled();
});
test("should process valid streamable HTTP connection config", async () => {
const config = {
"test-server": {
transport: "http" as const,
url: "http://localhost:8000/mcp",
},
};
const client = new MultiServerMCPClient(config);
expect(client).toBeDefined();
// Initialize connections and verify
await client.initializeConnections();
expect(StreamableHTTPClientTransport).toHaveBeenCalled();
expect(Client).toHaveBeenCalled();
});
test("should process valid SSE connection config", async () => {
const config = {
"test-server": {
@@ -324,6 +348,10 @@ describe("MultiServerMCPClient", () => {
},
});
const conf = client.config;
expect(conf.additionalToolNamePrefix).toBe("mcp");
expect(conf.prefixToolNameWithServerName).toBe(true);
await client.initializeConnections();
const tools = await client.getTools();
@@ -522,5 +550,30 @@ describe("MultiServerMCPClient", () => {
// Should not have created a client
expect(Client).not.toHaveBeenCalled();
});
test("should throw on streamable HTTP transport creation errors", async () => {
// Force an error when creating transport
(StreamableHTTPClientTransport as Mock).mockImplementationOnce(() => {
throw new Error("Streamable HTTP transport creation failed");
});
const client = new MultiServerMCPClient({
"test-server": {
transport: "http" as const,
url: "http://localhost:8000/mcp",
},
});
// Should throw error when connecting
await expect(
async () => await client.initializeConnections()
).rejects.toThrow();
// Should have attempted to create transport
expect(StreamableHTTPClientTransport).toHaveBeenCalled();
// Should not have created a client
expect(Client).not.toHaveBeenCalled();
});
});
});
+18
View File
@@ -70,3 +70,21 @@ vi.mock("@modelcontextprotocol/sdk/client/sse.js", () => {
SSEClientTransport,
};
});
vi.mock("@modelcontextprotocol/sdk/client/streamableHttp.js", () => {
const streamableHTTPClientTransportPrototype = {
connect: vi.fn().mockReturnValue(Promise.resolve()),
send: vi.fn().mockReturnValue(Promise.resolve()),
close: vi.fn().mockReturnValue(Promise.resolve()),
};
const StreamableHTTPClientTransport = vi.fn().mockImplementation((config) => {
return {
...streamableHTTPClientTransportPrototype,
config,
};
});
StreamableHTTPClientTransport.prototype = streamableHTTPClientTransportPrototype;
return {
StreamableHTTPClientTransport,
};
});
+173
View File
@@ -0,0 +1,173 @@
import express from "express";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { randomUUID } from "node:crypto";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
import { z } from "zod";
export async function main() {
const server = new McpServer({
name: "backwards-compatible-server",
version: "1.0.0",
});
const calcSchema = { a: z.number(), b: z.number() };
server.tool(
"add",
"Adds two numbers together",
calcSchema,
async ({ a, b }: { a: number; b: number }, extra) => {
return {
content: [{ type: "text", text: `${a + b}` }],
};
}
);
server.tool(
"subtract",
"Subtracts two numbers",
calcSchema,
async ({ a, b }: { a: number; b: number }, extra) => {
return { content: [{ type: "text", text: `${a - b}` }] };
}
);
server.tool(
"multiply",
"Multiplies two numbers",
calcSchema,
async ({ a, b }: { a: number; b: number }, extra) => {
return { content: [{ type: "text", text: `${a * b}` }] };
}
);
server.tool(
"divide",
"Divides two numbers",
calcSchema,
async ({ a, b }: { a: number; b: number }, extra) => {
return { content: [{ type: "text", text: `${a / b}` }] };
}
);
const app = express();
app.use(express.json());
// Store transports for each session type
const transports = {
streamable: {} as Record<string, StreamableHTTPServerTransport>,
sse: {} as Record<string, SSEServerTransport>,
};
// Modern Streamable HTTP endpoint
app.post("/mcp", async (req, res) => {
// Check for existing session ID
const sessionId = req.headers["mcp-session-id"] as string | undefined;
let transport: StreamableHTTPServerTransport;
if (sessionId && transports.streamable[sessionId]) {
// Reuse existing transport
transport = transports.streamable[sessionId];
} else if (!sessionId && isInitializeRequest(req.body)) {
// New initialization request
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sessionId) => {
// Store the transport by session ID
transports.streamable[sessionId] = transport;
},
});
// Clean up transport when closed
transport.onclose = () => {
if (transport.sessionId) {
delete transports.streamable[transport.sessionId];
}
};
// Connect to the MCP server
await server.connect(transport);
} else {
// Invalid request
console.error(
"Invalid Streamable HTTP request: ",
JSON.stringify(req.body, null, 2)
);
res.status(400).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Bad Request: No valid session ID provided",
},
id: null,
});
return;
}
// Handle the request
await transport.handleRequest(req, res, req.body);
});
// Reusable handler for GET and DELETE requests
const handleSessionRequest = async (
req: express.Request,
res: express.Response
) => {
const sessionId = req.headers["mcp-session-id"] as string | undefined;
if (!sessionId || !transports.streamable[sessionId]) {
console.error(
"Invalid Streamable HTTP request (invalid/missing session ID): ",
JSON.stringify(req.body, null, 2)
);
res.status(400).send("Invalid or missing session ID");
return;
}
const transport = transports.streamable[sessionId];
await transport.handleRequest(req, res);
};
app.get("/mcp", handleSessionRequest);
app.delete("/mcp", handleSessionRequest);
// Legacy SSE endpoint for older clients
app.get("/sse", async (req, res) => {
// Create SSE transport for legacy clients
const transport = new SSEServerTransport("/messages", res);
transports.sse[transport.sessionId] = transport;
res.on("close", () => {
delete transports.sse[transport.sessionId];
});
await server.connect(transport);
});
// Legacy message endpoint for older clients
app.post("/messages", async (req, res) => {
const sessionId = req.query.sessionId as string;
const transport = transports.sse[sessionId];
if (transport) {
await transport.handlePostMessage(req, res, req.body);
} else {
console.error("No transport found for sessionId", sessionId);
res.status(400).send("No transport found for sessionId");
}
});
app.listen(3000);
}
if (typeof require !== "undefined" && require.main === module) {
main().catch(console.error);
}
if (
import.meta.url === process.argv[1] ||
import.meta.url === `file://${process.argv[1]}`
) {
main().catch(console.error);
}
+204
View File
@@ -0,0 +1,204 @@
/**
* Calculator MCP Server with LangGraph Example
*
* This example demonstrates how to use the Calculator MCP server with LangGraph
* to create a structured workflow for simple calculations.
*
* The graph-based approach allows:
* 1. Clear separation of responsibilities (reasoning vs execution)
* 2. Conditional routing based on tool calls
* 3. Structured handling of complex multi-tool operations
*/
/* eslint-disable no-console */
import { ChatOpenAI } from "@langchain/openai";
import {
StateGraph,
END,
START,
MessagesAnnotation,
} from "@langchain/langgraph";
import { ToolNode } from "@langchain/langgraph/prebuilt";
import {
HumanMessage,
AIMessage,
SystemMessage,
isHumanMessage,
} from "@langchain/core/messages";
import dotenv from "dotenv";
import { main as calculatorServerMain } from "./calculator_server_shttp_sse.js";
// MCP client imports
import { MultiServerMCPClient } from "../src/index.js";
// Load environment variables from .env file
dotenv.config();
const transportType = process.env.MCP_TRANSPORT_TYPE === "sse" ? "sse" : "http";
export async function runExample(client?: MultiServerMCPClient) {
try {
console.log("Initializing MCP client...");
void calculatorServerMain();
// Wait for the server to start
await new Promise((resolve) => {
setTimeout(resolve, 100);
});
// Create a client with configurations for the calculator server
// eslint-disable-next-line no-param-reassign
client =
client ??
new MultiServerMCPClient({
calculator: {
url: `http://localhost:3000/${
transportType === "sse" ? "sse" : "mcp"
}`,
},
});
console.log("Connected to server");
// Get all tools (flattened array is the default now)
const mcpTools = await client.getTools();
if (mcpTools.length === 0) {
throw new Error("No tools found");
}
console.log(
`Loaded ${mcpTools.length} MCP tools: ${mcpTools
.map((tool) => tool.name)
.join(", ")}`
);
// Create an OpenAI model with tools attached
const systemMessage = `You are an assistant that helps users with calculations.
You have access to tools that can add, subtract, multiply, and divide numbers. Use
these tools to answer the user's questions.`;
const model = new ChatOpenAI({
modelName: process.env.OPENAI_MODEL_NAME || "gpt-4o-mini",
temperature: 0.7,
}).bindTools(mcpTools);
// Create a tool node for the LangGraph
const toolNode = new ToolNode(mcpTools);
// ================================================
// Create a LangGraph agent flow
// ================================================
console.log("\n=== CREATING LANGGRAPH AGENT FLOW ===");
// Define the function that calls the model
const llmNode = async (state: typeof MessagesAnnotation.State) => {
console.log(`Calling LLM with ${state.messages.length} messages`);
// Add system message if it's the first call
let { messages } = state;
if (messages.length === 1 && isHumanMessage(messages[0])) {
messages = [new SystemMessage(systemMessage), ...messages];
}
const response = await model.invoke(messages);
return { messages: [response] };
};
// Create a new graph with MessagesAnnotation
const workflow = new StateGraph(MessagesAnnotation)
// Add the nodes to the graph
.addNode("llm", llmNode)
.addNode("tools", toolNode)
// Add edges - these define how nodes are connected
.addEdge(START, "llm")
.addEdge("tools", "llm")
// Conditional routing to end or continue the tool loop
.addConditionalEdges("llm", (state) => {
const lastMessage = state.messages[state.messages.length - 1];
// Cast to AIMessage to access tool_calls property
const aiMessage = lastMessage as AIMessage;
if (aiMessage.tool_calls && aiMessage.tool_calls.length > 0) {
console.log("Tool calls detected, routing to tools node");
// Log what tools are being called
const toolNames = aiMessage.tool_calls
.map((tc) => tc.name)
.join(", ");
console.log(`Tools being called: ${toolNames}`);
return "tools";
}
// If there are no tool calls, we're done
console.log("No tool calls, ending the workflow");
return END;
});
// Compile the graph
const app = workflow.compile();
// Define examples to run
const examples = [
{
name: "Add 1 and 2",
query: "What is 1 + 2?",
},
{
name: "Subtract 1 from 2",
query: "What is 2 - 1?",
},
{
name: "Multiply 1 and 2",
query: "What is 1 * 2?",
},
{
name: "Divide 1 by 2",
query: "What is 1 / 2?",
},
];
// Run the examples
console.log("\n=== RUNNING LANGGRAPH AGENT ===");
for (const example of examples) {
console.log(`\n--- Example: ${example.name} ---`);
console.log(`Query: ${example.query}`);
// Run the LangGraph agent
const result = await app.invoke({
messages: [new HumanMessage(example.query)],
});
// Display the final answer
const finalMessage = result.messages[result.messages.length - 1];
console.log(`\nResult: ${finalMessage.content}`);
}
} catch (error) {
console.error("Error:", error);
process.exit(1); // Exit with error code
} finally {
if (client) {
await client.close();
console.log("Closed all MCP connections");
}
// Exit process after a short delay to allow for cleanup
setTimeout(() => {
console.log("Example completed, exiting process.");
process.exit(0);
}, 500);
}
}
const isMainModule = import.meta.url === `file://${process.argv[1]}`;
if (isMainModule) {
runExample().catch((error) => console.error("Setup error:", error));
}
+1
View File
@@ -18,5 +18,6 @@ export const config = {
tsConfigPath: resolve("./tsconfig.json"),
cjsSource: "./dist-cjs",
cjsDestination: "./dist",
additionalGitignorePaths: [".env", ".eslintcache"],
abs,
};
+3 -1
View File
@@ -46,7 +46,7 @@
"author": "Ravi Kiran Vemula",
"license": "MIT",
"dependencies": {
"@modelcontextprotocol/sdk": "^1.7.0",
"@modelcontextprotocol/sdk": "^1.11.2",
"debug": "^4.4.0",
"zod": "^3.24.2"
},
@@ -64,6 +64,7 @@
"@langchain/scripts": "^0.1.3",
"@tsconfig/recommended": "^1.0.8",
"@types/debug": "^4.1.12",
"@types/express": "^5",
"@types/node": "^22.13.10",
"@typescript-eslint/eslint-plugin": "^6.12.0",
"@typescript-eslint/parser": "^6.12.0",
@@ -78,6 +79,7 @@
"eslint-plugin-prettier": "^4.2.1",
"eslint-plugin-vitest": "^0.5.4",
"eventsource": "^3.0.6",
"express": "^5.1.0",
"husky": "^9.0.11",
"lint-staged": "^15.2.2",
"npm-run-all": "^4.1.5",
+388 -67
View File
@@ -1,6 +1,14 @@
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
import {
SSEClientTransport,
type SseError,
} from "@modelcontextprotocol/sdk/client/sse.js";
import {
StreamableHTTPClientTransport,
StreamableHTTPError,
type StreamableHTTPReconnectionOptions,
} from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import type { StructuredToolInterface } from "@langchain/core/tools";
import debug from "debug";
import { z } from "zod";
@@ -17,7 +25,7 @@ function getDebugLog() {
}
/**
* Create schema for stdio transport restart configuration
* Stdio transport restart configuration
*/
export function createStdioRestartSchema() {
return z
@@ -48,25 +56,52 @@ export function createStdioRestartSchema() {
}
/**
* Create schema for stdio transport connection
* Stdio transport connection
*/
export function createStdioConnectionSchema() {
return z
.object({
/**
* Optional transport type, inferred from the structure of the config if not provided. Included
* for compatibility with common MCP client config file formats.
*/
transport: z.literal("stdio").optional(),
/**
* Optional transport type, inferred from the structure of the config if not provided. Included
* for compatibility with common MCP client config file formats.
*/
type: z.literal("stdio").optional(),
/**
* The executable to run the server (e.g. `node`, `npx`, etc)
*/
command: z.string().describe("The executable to run the server"),
/**
* Array of command line arguments to pass to the executable
*/
args: z
.array(z.string())
.describe("Command line arguments to pass to the executable"),
/**
* Environment variables to set when spawning the process.
*/
env: z
.record(z.string())
.describe("The environment to use when spawning the process")
.optional(),
/**
* The encoding to use when reading from the process
*/
encoding: z
.string()
.describe("The encoding to use when reading from the process")
.optional(),
/**
* How to handle stderr of the child process. This matches the semantics of Node's `child_process.spawn`
*
* The default is "inherit", meaning messages to stderr will be printed to the parent process's stderr.
*
* @default "inherit"
*/
stderr: z
.union([
z.literal("overlapped"),
@@ -78,7 +113,11 @@ export function createStdioConnectionSchema() {
.describe(
"How to handle stderr of the child process. This matches the semantics of Node's `child_process.spawn`"
)
.optional(),
.optional()
.default("inherit"),
/**
* The working directory to use when spawning the process.
*/
cwd: z
.string()
.describe("The working directory to use when spawning the process")
@@ -94,9 +133,9 @@ export function createStdioConnectionSchema() {
}
/**
* Create schema for SSE transport reconnection configuration
* Streamable HTTP transport reconnection configuration
*/
export function createSseReconnectSchema() {
export function createStreamableReconnectSchema() {
return z
.object({
/**
@@ -123,30 +162,51 @@ export function createSseReconnectSchema() {
.describe("The delay in milliseconds between reconnection attempts")
.optional(),
})
.describe("Configuration for SSE transport reconnection");
.describe("Configuration for streamable HTTP transport reconnection");
}
/**
* Create schema for SSE transport connection
* Streamable HTTP transport connection
*/
export function createSseConnectionSchema() {
return z.intersection(
z
.object({
url: z.string().url(),
headers: z.record(z.string()).optional(),
useNodeEventSource: z.boolean().optional(),
/**
* Additional reconnection settings
*/
reconnect: createSseReconnectSchema().optional(),
})
.describe("Configuration for SSE transport connection"),
z.union([
z.object({ transport: z.literal("sse") }),
z.object({ type: z.literal("sse") }),
])
);
export function createStreamableHTTPConnectionSchema() {
return z
.object({
/**
* Optional transport type, inferred from the structure of the config. If "sse", will not attempt
* to connect using streamable HTTP.
*/
transport: z.union([z.literal("http"), z.literal("sse")]).optional(),
/**
* Optional transport type, inferred from the structure of the config. If "sse", will not attempt
* to connect using streamable HTTP.
*/
type: z.union([z.literal("http"), z.literal("sse")]).optional(),
/**
* The URL to connect to
*/
url: z.string().url(),
/**
* Additional headers to send with the request, useful for authentication
*/
headers: z.record(z.string()).optional(),
/**
* Whether to use Node's EventSource for SSE connections (not applicable to streamable HTTP)
*
* @default false
*/
useNodeEventSource: z.boolean().optional().default(false),
/**
* Additional reconnection settings.
*/
reconnect: createStreamableReconnectSchema().optional(),
/**
* Whether to automatically fallback to SSE if Streamable HTTP is not available or not supported
*
* @default true
*/
automaticSSEFallback: z.boolean().optional().default(true),
})
.describe("Configuration for streamable HTTP transport connection");
}
/**
@@ -154,29 +214,52 @@ export function createSseConnectionSchema() {
*/
export function createConnectionSchema() {
return z
.union([createStdioConnectionSchema(), createSseConnectionSchema()])
.union([
createStdioConnectionSchema(),
createStreamableHTTPConnectionSchema(),
])
.describe("Configuration for a single MCP server");
}
/**
* Create schema for {@link MultiServerMCPClient} configuration
* {@link MultiServerMCPClient} configuration
*/
export function createClientConfigSchema() {
return z
.object({
/**
* A map of server names to their configuration
*/
mcpServers: z
.record(createConnectionSchema())
.describe("A map of server names to their configuration"),
/**
* Whether to throw an error if a tool fails to load
*
* @default true
*/
throwOnLoadError: z
.boolean()
.describe("Whether to throw an error if a tool fails to load")
.optional()
.default(true),
/**
* Whether to prefix tool names with the server name. Prefixes are separated by double
* underscores (example: `calculator_server_1__add`).
*
* @default true
*/
prefixToolNameWithServerName: z
.boolean()
.describe("Whether to prefix tool names with the server name")
.optional()
.default(true),
/**
* An additional prefix to add to the tool name Prefixes are separated by double underscores
* (example: `mcp__add`).
*
* @default "mcp"
*/
additionalToolNamePrefix: z
.string()
.describe("An additional prefix to add to the tool name")
@@ -189,26 +272,54 @@ export function createClientConfigSchema() {
/**
* Configuration for stdio transport connection
*/
export type StdioConnection = z.infer<
export type StdioConnection = z.input<
ReturnType<typeof createStdioConnectionSchema>
>;
/**
* Configuration for SSE transport connection
* Type for {@link StdioConnection} with default values applied.
*/
export type SSEConnection = z.infer<
ReturnType<typeof createSseConnectionSchema>
export type ResolvedStdioConnection = z.infer<
ReturnType<typeof createStdioConnectionSchema>
>;
/**
* Configuration for streamable HTTP transport connection
*/
export type StreamableHTTPConnection = z.input<
ReturnType<typeof createStreamableHTTPConnectionSchema>
>;
/**
* Type for {@link StreamableHTTPConnection} with default values applied.
*/
export type ResolvedStreamableHTTPConnection = z.infer<
ReturnType<typeof createStreamableHTTPConnectionSchema>
>;
/**
* Union type for all transport connection types
*/
export type Connection = z.infer<ReturnType<typeof createConnectionSchema>>;
export type Connection = z.input<ReturnType<typeof createConnectionSchema>>;
/**
* Type for {@link MultiServerMCPClient} configuration
*/
export type ClientConfig = z.infer<ReturnType<typeof createClientConfigSchema>>;
export type ClientConfig = z.input<ReturnType<typeof createClientConfigSchema>>;
/**
* Type for {@link Connection} with default values applied.
*/
export type ResolvedConnection = z.infer<
ReturnType<typeof createConnectionSchema>
>;
/**
* Type for {@link MultiServerMCPClient} configuration, with default values applied.
*/
export type ResolvedClientConfig = z.infer<
ReturnType<typeof createClientConfigSchema>
>;
/**
* Error class for MCP client operations
@@ -220,9 +331,17 @@ export class MCPClientError extends Error {
}
}
function isStdioConnection(
connection: Connection
): connection is StdioConnection {
function isResolvedStdioConnection(
connection: unknown
): connection is ResolvedStdioConnection {
if (
typeof connection !== "object" ||
connection === null ||
Array.isArray(connection)
) {
return false;
}
if ("transport" in connection && connection.transport === "stdio") {
return true;
}
@@ -238,17 +357,36 @@ function isStdioConnection(
return false;
}
function isSSEConnection(connection: Connection): connection is SSEConnection {
if ("transport" in connection && connection.transport === "sse") {
return true;
function isResolvedStreamableHTTPConnection(
connection: unknown
): connection is ResolvedStreamableHTTPConnection {
if (
typeof connection !== "object" ||
connection === null ||
Array.isArray(connection)
) {
return false;
}
if ("type" in connection && connection.type === "sse") {
if (
("transport" in connection &&
typeof connection.transport === "string" &&
["http", "sse"].includes(connection.transport)) ||
("type" in connection &&
typeof connection.type === "string" &&
["http", "sse"].includes(connection.type))
) {
return true;
}
if ("url" in connection && typeof connection.url === "string") {
return true;
try {
// eslint-disable-next-line no-new
new URL(connection.url);
return true;
} catch (error) {
return false;
}
}
return false;
@@ -262,7 +400,7 @@ export class MultiServerMCPClient {
private _serverNameToTools: Record<string, StructuredToolInterface[]> = {};
private _connections?: Record<string, Connection>;
private _connections?: Record<string, ResolvedConnection>;
private _loadToolsOptions: LoadMcpToolsOptions;
@@ -270,18 +408,31 @@ export class MultiServerMCPClient {
private _transportInstances: Record<
string,
StdioClientTransport | SSEClientTransport
StdioClientTransport | SSEClientTransport | StreamableHTTPClientTransport
> = {};
private _config: ResolvedClientConfig;
/**
* Returns clone of server config for inspection purposes.
*
* Client does not support config modifications.
*/
get config(): ClientConfig {
// clone config so it can't be mutated
return JSON.parse(JSON.stringify(this._config));
}
/**
* Create a new MultiServerMCPClient.
*
* @param connections - Optional connections to initialize
*/
constructor(config: ClientConfig | Record<string, Connection>) {
let parsedServerConfig: ClientConfig;
let parsedServerConfig: ResolvedClientConfig;
const configSchema = createClientConfigSchema();
if ("mcpServers" in config) {
parsedServerConfig = configSchema.parse(config);
} else {
@@ -302,6 +453,7 @@ export class MultiServerMCPClient {
additionalToolNamePrefix: parsedServerConfig.additionalToolNamePrefix,
};
this._config = parsedServerConfig;
this._connections = parsedServerConfig.mcpServers;
}
@@ -320,7 +472,7 @@ export class MultiServerMCPClient {
throw new MCPClientError("No connections to initialize");
}
const connectionsToInit: [string, Connection][] = Array.from(
const connectionsToInit: [string, ResolvedConnection][] = Array.from(
Object.entries(this._connections).filter(
([serverName]) => this._clients[serverName] === undefined
)
@@ -331,10 +483,17 @@ export class MultiServerMCPClient {
`INFO: Initializing connection to server "${serverName}"...`
);
if (isStdioConnection(connection)) {
if (isResolvedStdioConnection(connection)) {
await this._initializeStdioConnection(serverName, connection);
} else if (isSSEConnection(connection)) {
await this._initializeSSEConnection(serverName, connection);
} else if (isResolvedStreamableHTTPConnection(connection)) {
if (connection.type === "sse" || connection.transport === "sse") {
await this._initializeSSEConnection(serverName, connection);
} else {
await this._initializeStreamableHTTPConnection(
serverName,
connection
);
}
} else {
// This should never happen due to the validation in the constructor
throw new MCPClientError(
@@ -400,9 +559,9 @@ export class MultiServerMCPClient {
*/
private async _initializeStdioConnection(
serverName: string,
connection: StdioConnection
connection: ResolvedStdioConnection
): Promise<void> {
const { command, args, env, restart } = connection;
const { command, args, env, restart, stderr } = connection;
getDebugLog()(
`DEBUG: Creating stdio transport for server "${serverName}" with command: ${command} ${args.join(
@@ -414,6 +573,7 @@ export class MultiServerMCPClient {
command,
args,
env,
stderr,
});
this._transportInstances[serverName] = transport;
@@ -458,8 +618,8 @@ export class MultiServerMCPClient {
private _setupStdioRestart(
serverName: string,
transport: StdioClientTransport,
connection: StdioConnection,
restart: NonNullable<StdioConnection["restart"]>
connection: ResolvedStdioConnection,
restart: NonNullable<ResolvedStdioConnection["restart"]>
): void {
const originalOnClose = transport.onclose;
// eslint-disable-next-line no-param-reassign, @typescript-eslint/no-misused-promises
@@ -483,19 +643,135 @@ export class MultiServerMCPClient {
};
}
private _getHttpErrorCode(error: unknown): number | undefined {
const streamableError = error as StreamableHTTPError | SseError;
let { code } = streamableError;
// try parsing from error message if code is not set
if (code == null) {
const m = streamableError.message.match(/\(HTTP (\d\d\d)\)/);
if (m && m.length > 1) {
code = parseInt(m[1], 10);
}
}
return code;
}
private _toSSEConnectionURL(url: string): string {
const urlObj = new URL(url);
const pathnameParts = urlObj.pathname.split("/");
const lastPart = pathnameParts.at(-1);
if (lastPart && lastPart === "mcp") {
pathnameParts[pathnameParts.length - 1] = "sse";
}
urlObj.pathname = pathnameParts.join("/");
return urlObj.toString();
}
/**
* Initialize an SSE connection
* Initialize a streamable HTTP connection
*/
private async _initializeSSEConnection(
private async _initializeStreamableHTTPConnection(
serverName: string,
connection: SSEConnection
connection: ResolvedStreamableHTTPConnection
): Promise<void> {
const { url, headers, useNodeEventSource, reconnect } = connection;
const {
url,
headers,
reconnect,
type: typeField,
transport: transportField,
} = connection;
const automaticSSEFallback = connection.automaticSSEFallback ?? true;
const transportType = typeField || transportField;
getDebugLog()(
`DEBUG: Creating SSE transport for server "${serverName}" with URL: ${url}`
);
if (transportType === "http" || transportType == null) {
const transport = await this._createStreamableHTTPTransport(
serverName,
url,
headers,
reconnect
);
this._transportInstances[serverName] = transport;
const client = new Client({
name: "langchain-mcp-adapter",
version: "0.1.0",
});
try {
await client.connect(transport);
this._clients[serverName] = client;
const cleanup = async () => {
getDebugLog()(
`DEBUG: Closing streamable HTTP transport for server "${serverName}"`
);
await transport.close();
};
this._cleanupFunctions.push(cleanup);
// Load tools for this server
await this._loadToolsForServer(serverName, client);
} catch (error) {
const code = this._getHttpErrorCode(error);
if (automaticSSEFallback && code != null && code >= 400 && code < 500) {
// Streamable HTTP error is a 4xx, so fall back to SSE
try {
await this._initializeSSEConnection(serverName, connection);
} catch (firstSSEError) {
// try one more time, but modify the URL to end with `/sse`
const sseUrl = this._toSSEConnectionURL(url);
if (sseUrl !== url) {
try {
await this._initializeSSEConnection(serverName, {
...connection,
url: sseUrl,
});
} catch (secondSSEError) {
throw new MCPClientError(
`Failed to connect to streamable HTTP server "${serverName}, url: ${url}": ${error}. Additionally, tried falling back to SSE at ${url} and ${sseUrl}, but this also failed: ${secondSSEError}`,
serverName
);
}
} else {
throw new MCPClientError(
`Failed to connect to streamable HTTP server after trying to fall back to SSE: "${serverName}, url: ${url}": ${error} (SSE fallback failed with error ${firstSSEError})`,
serverName
);
}
}
} else {
throw new MCPClientError(
`Failed to connect to streamable HTTP server "${serverName}, url: ${url}": ${error}`,
serverName
);
}
}
}
}
/**
* Initialize an SSE connection
*
* Don't call this directly unless SSE transport is explicitly requested. Otherwise,
* use _initializeStreamableHTTPConnection and it'll fall back to SSE if needed for
* backwards compatibility.
*/
private async _initializeSSEConnection(
serverName: string,
connection: ResolvedStreamableHTTPConnection // used for both SSE and streamable HTTP
): Promise<void> {
const { url, headers, useNodeEventSource, reconnect } = connection;
try {
const transport = await this._createSSETransport(
serverName,
@@ -539,7 +815,7 @@ export class MultiServerMCPClient {
await this._loadToolsForServer(serverName, client);
} catch (error) {
throw new MCPClientError(
`Failed to create SSE transport for server "${serverName}": ${error}`,
`Failed to create SSE transport for server "${serverName}, url: ${url}": ${error}`,
serverName
);
}
@@ -585,6 +861,44 @@ export class MultiServerMCPClient {
});
}
private async _createStreamableHTTPTransport(
serverName: string,
url: string,
headers?: Record<string, string>,
reconnect?: ResolvedStreamableHTTPConnection["reconnect"]
): Promise<StreamableHTTPClientTransport> {
if (!headers) {
// Simple case - no headers, use default transport
return new StreamableHTTPClientTransport(new URL(url));
} else {
getDebugLog()(
`DEBUG: Using custom headers for SSE transport to server "${serverName}"`
);
// partial options object for setting up reconnections
const r: {
reconnectionOptions: StreamableHTTPReconnectionOptions;
} = {
reconnectionOptions: {
initialReconnectionDelay: reconnect?.delayMs ?? 1000, // MCP default
maxReconnectionDelay: reconnect?.delayMs ?? 30000, // MCP default
maxRetries: reconnect?.maxAttempts ?? 2, // MCP default
reconnectionDelayGrowFactor: 1.5, // MCP default
},
};
if (reconnect != null && reconnect.enabled === false) {
r.reconnectionOptions.maxRetries = 0;
}
return new StreamableHTTPClientTransport(new URL(url), {
requestInit: { headers },
// don't set if reconnect is null so we rely on SDK defaults
...(reconnect == null ? {} : r),
});
}
}
/**
* Create an EventSource transport for Node.js environments
*/
@@ -661,13 +975,13 @@ export class MultiServerMCPClient {
}
/**
* Set up SSE reconnect handling
* Set up reconnect handling for SSE (Streamable HTTP reconnects are more complex and are handled internally by the SDK)
*/
private _setupSSEReconnect(
serverName: string,
transport: SSEClientTransport,
connection: SSEConnection,
reconnect: NonNullable<SSEConnection["reconnect"]>
transport: SSEClientTransport | StreamableHTTPClientTransport,
connection: ResolvedStreamableHTTPConnection,
reconnect: NonNullable<ResolvedStreamableHTTPConnection["reconnect"]>
): void {
const originalOnClose = transport.onclose;
// eslint-disable-next-line @typescript-eslint/no-misused-promises, no-param-reassign
@@ -679,7 +993,7 @@ export class MultiServerMCPClient {
// Only attempt reconnect if we haven't cleaned up
if (this._clients[serverName]) {
getDebugLog()(
`INFO: SSE connection for server "${serverName}" closed, attempting to reconnect...`
`INFO: HTTP connection for server "${serverName}" closed, attempting to reconnect...`
);
await this._attemptReconnect(
serverName,
@@ -727,7 +1041,7 @@ export class MultiServerMCPClient {
*/
private async _attemptReconnect(
serverName: string,
connection: Connection,
connection: ResolvedConnection,
maxAttempts = 3,
delayMs = 1000
): Promise<void> {
@@ -757,10 +1071,17 @@ export class MultiServerMCPClient {
}
// Initialize just this connection based on its type
if (isStdioConnection(connection)) {
if (isResolvedStdioConnection(connection)) {
await this._initializeStdioConnection(serverName, connection);
} else if (isSSEConnection(connection)) {
await this._initializeSSEConnection(serverName, connection);
} else if (isResolvedStreamableHTTPConnection(connection)) {
if (connection.type === "sse" || connection.transport === "sse") {
await this._initializeSSEConnection(serverName, connection);
} else {
await this._initializeStreamableHTTPConnection(
serverName,
connection
);
}
}
// Check if connected
+8 -1
View File
@@ -1,8 +1,15 @@
import { type StreamableHTTPConnection } from "./client.js";
export {
MultiServerMCPClient,
type Connection,
type StdioConnection,
type SSEConnection,
type StreamableHTTPConnection,
} from "./client.js";
/**
* Type alias for backward compatibility with previous versions of the package.
*/
export type SSEConnection = StreamableHTTPConnection;
export { loadMcpTools } from "./tools.js";
+3 -4
View File
@@ -95,15 +95,14 @@ async function _convertCallToolResult(
if (result.isError) {
throw new ToolException(
`MCP tool '${toolName}' on server '${serverName}' returned an error: ${result.content
.map((content: CallToolResultContent) => content.text)
.map((content) => content.text)
.join("\n")}`
);
}
const mcpTextAndImageContent: MessageContentComplex[] = (
result.content.filter(
(content: CallToolResultContent) =>
content.type === "text" || content.type === "image"
(content) => content.type === "text" || content.type === "image"
) as (TextContent | ImageContent)[]
).map((content: TextContent | ImageContent) => {
switch (content.type) {
@@ -134,7 +133,7 @@ async function _convertCallToolResult(
await Promise.all(
(
result.content.filter(
(content: CallToolResultContent) => content.type === "resource"
(content) => content.type === "resource"
) as EmbeddedResource[]
).map((content: EmbeddedResource) =>
_embeddedResourceToArtifact(content, client)
+440 -327
View File
File diff suppressed because it is too large Load Diff