wip: migrate MCP server to streamable HTTP

This commit is contained in:
Clelia (Astra) Bertelli
2025-12-19 12:05:08 +01:00
parent be84c14af1
commit 9f1e389aa2
7 changed files with 268 additions and 163 deletions
+11
View File
@@ -0,0 +1,11 @@
You are an expert programmer whose task is to assist the user implement their requests within the current working directory.
In order to perform file system operations, you MUST NOT USE the built-in tools you have (Read, Write, Glob, Edit): instead, you MUST USE the `filesystem` MCP server, which provides the following tools:
- `read_file`: read a file, providing its path
- `write_file`: write a file, providing its path and content
- `edit_file`: edit a file, providing the old string and the new string to replace the old one with
- `list_files`: list all the available files
- `file_exists`: check whether or not a file exists, providing its path
Using these tools, you should be able to provide the user with the assistance that they need.
+7 -6
View File
@@ -5,11 +5,12 @@ model_provider = "openai-chat-completions"
name = "OpenAI using Chat Completions"
base_url = "https://api.openai.com/v1"
env_key = "OPENAI_API_KEY"
wire_api = "chat"
wire_api = "responses"
[features]
rmcp_client = true
[mcp_servers.filesystem]
command = "pnpm"
args = ["run", "mcp-start"]
[mcp_servers.filesystem.env]
cwd = "/Users/<user>/agentfs-claude/"
url = "http://localhost:3000/mcp"
startup_timeout_sec = 30
tool_timeout_sec = 30
+2
View File
@@ -22,6 +22,7 @@
"devDependencies": {
"@anthropic-ai/sdk": "^0.71.2",
"@eslint/js": "^9.39.1",
"@types/express": "^5.0.6",
"@types/figlet": "^1.7.0",
"@types/mime-types": "^3.0.1",
"@types/node": "^24.10.1",
@@ -41,6 +42,7 @@
"@openai/codex-sdk": "^0.73.0",
"@visulima/colorize": "^1.4.29",
"agentfs-sdk": "^0.2.1",
"express": "^5.2.1",
"figlet": "^1.9.4",
"llama-cloud-services": "^0.4.3",
"mime-types": "^3.0.2",
+1 -1
View File
@@ -8,7 +8,7 @@ export async function consoleInput(question: string): Promise<string> {
output: process.stdout,
});
const answer = await rl.question(question);
const answer = await rl.question(bold(question));
rl.close();
return answer;
}
+6 -3
View File
@@ -29,6 +29,9 @@ export async function runCodex(
for await (const event of events) {
switch (event.type) {
case "thread.started":
console.log(`Started session with ID: ${bold(event.thread_id)}`);
break;
case "item.started":
await handleItemStart(event);
break;
@@ -118,7 +121,7 @@ async function handleItemStart(event: ThreadEvent) {
}
async function handleItemUpdated(event: ThreadEvent) {
if (event.type == "item.started") {
if (event.type == "item.updated") {
if (event.item.type == "agent_message") {
console.log(bold(magentaBright("Assistant updated its response...")));
console.log(event.item.text);
@@ -185,10 +188,10 @@ async function handleItemUpdated(event: ThreadEvent) {
async function handleItemCompleted(event: ThreadEvent) {
if (event.type == "item.completed") {
if (event.item.type == "agent_message") {
console.log(bold(magentaBright("Assistant completed its response...")));
console.log(bold(magentaBright("Assistant completed its response:")));
console.log(event.item.text);
} else if (event.item.type == "reasoning") {
console.log(bold(magentaBright("Assistant completed its thoughts...")));
console.log(bold(magentaBright("Assistant completed its thoughts:")));
console.log(event.item.text);
} else if (event.item.type == "mcp_tool_call") {
console.log(
+11 -4
View File
@@ -4,7 +4,7 @@ import { recordFiles } from "./filesystem";
import { getAgentFS } from "./mcp";
import { Agent } from "./claude";
import { queryOptions } from "./options";
import { bold } from "@visulima/colorize";
import { bold, green, red } from "@visulima/colorize";
import { consoleInput, renderLogo } from "./cli";
import * as fs from "fs";
import { runCodex } from "./codex";
@@ -53,7 +53,7 @@ async function main() {
workflow.handle([filesRegisteredEvent], async (_context, _event) => {
console.log(
bold(
"All the files have been uploaded to the AgentFS filesystem, what would you like to do now?",
green("All the files have been uploaded to the AgentFS filesystem, what would you like to do now?"),
),
);
return requestPromptEvent.with();
@@ -87,7 +87,7 @@ async function main() {
const snapshotData = await snapshot();
let agentOfChoice = "claude";
const chosenAgent = await consoleInput(
"What agent would you like to use? [codex/claude]",
"What agent would you like to use? [codex/claude] ",
);
if (chosenAgent.trim() != "") {
agentOfChoice = chosenAgent;
@@ -118,7 +118,14 @@ async function main() {
plan: planMode,
}),
);
await resumedContext.stream.until(stopEvent).toArray();
const finalEvent = (await resumedContext.stream.until(stopEvent).toArray()).at(-1);
if (typeof finalEvent != "undefined") {
if ("error" in finalEvent.data) {
if (typeof finalEvent.data.error == "string") {
console.log(`${bold(red("An error occurred during the workflow execution:"))} ${finalEvent.data.error}`)
}
}
}
}
await main().catch(console.error);
+230 -149
View File
@@ -1,6 +1,8 @@
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { type CallToolResult } from "@modelcontextprotocol/sdk/types.js";
import { Request, Response } from 'express';
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { type CallToolResult, isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
import { randomUUID } from 'node:crypto';
import {
readSchemaShape,
fileExistsSchemaShape,
@@ -16,159 +18,238 @@ import {
fileExists,
listFiles,
} from "./filesystem";
import { createMcpExpressApp } from "@modelcontextprotocol/sdk/server/express.js";
const mcpServer = new McpServer({
name: "filesystem-mcp",
version: "1.0.0",
});
const getServer = () => {
const mcpServer = new McpServer({
name: "filesystem-mcp",
version: "1.0.0",
});
mcpServer.registerTool(
"read_file",
{
description: "Read a file by passing its path.",
inputSchema: readSchemaShape,
},
async ({ filePath }) => {
const agentfs = await getAgentFS({});
const content = await readFile(filePath, agentfs);
if (typeof content == "string") {
return { content: [{ type: "text", text: content }] };
} else {
return {
content: [
{
type: "text",
text: `Could not read ${filePath}. Please check that the file exists and submit the request again.`,
},
],
isError: true,
};
}
},
);
mcpServer.registerTool(
"read_file",
{
description: "Read a file by passing its path.",
inputSchema: readSchemaShape,
},
async ({ filePath }) => {
const agentfs = await getAgentFS({});
const content = await readFile(filePath, agentfs);
if (typeof content == "string") {
return { content: [{ type: "text", text: content }] };
} else {
return {
content: [
{
type: "text",
text: `Could not read ${filePath}. Please check that the file exists and submit the request again.`,
},
],
isError: true,
};
}
},
);
mcpServer.registerTool(
"file_exists",
{
description: "Check whether a file exists or not by passing its path.",
inputSchema: fileExistsSchemaShape,
},
async ({ filePath }) => {
const agentfs = await getAgentFS({});
const exists = await fileExists(filePath, agentfs);
if (exists) {
return {
content: [{ type: "text", text: `File ${filePath} exists` }],
};
} else {
return {
content: [{ type: "text", text: `File ${filePath} does not exist.` }],
};
}
},
);
mcpServer.registerTool(
"file_exists",
{
description: "Check whether a file exists or not by passing its path.",
inputSchema: fileExistsSchemaShape,
},
async ({ filePath }) => {
const agentfs = await getAgentFS({});
const exists = await fileExists(filePath, agentfs);
if (exists) {
return {
content: [{ type: "text", text: `File ${filePath} exists` }],
};
} else {
return {
content: [{ type: "text", text: `File ${filePath} does not exist.` }],
};
}
},
);
mcpServer.registerTool(
"write_file",
{
description: "Write a file by passing its path and content.",
inputSchema: writeSchemaShape,
},
async ({ filePath, fileContent }) => {
const agentfs = await getAgentFS({});
const success = await writeFile(filePath, fileContent, agentfs);
if (success) {
return {
content: [
{
type: "text",
text: `File ${filePath} successfully written with content:\n\n'''\n${fileContent}\n'''`,
},
],
};
} else {
return {
content: [
{
type: "text",
text: `There was an error while writing file ${filePath}`,
},
],
};
}
},
);
mcpServer.registerTool(
"write_file",
{
description: "Write a file by passing its path and content.",
inputSchema: writeSchemaShape,
},
async ({ filePath, fileContent }) => {
const agentfs = await getAgentFS({});
const success = await writeFile(filePath, fileContent, agentfs);
if (success) {
return {
content: [
{
type: "text",
text: `File ${filePath} successfully written with content:\n\n'''\n${fileContent}\n'''`,
},
],
};
} else {
return {
content: [
{
type: "text",
text: `There was an error while writing file ${filePath}`,
},
],
};
}
},
);
mcpServer.registerTool(
"edit_file",
{
description:
"Edit a file by passing its path, the old string and the new string.",
inputSchema: editSchemaShape,
},
async ({ filePath, oldString, newString }) => {
const agentfs = await getAgentFS({});
const editedContent = await editFile(
filePath,
oldString,
newString,
agentfs,
);
if (typeof editedContent == "string") {
return {
content: [
{
type: "text",
text: `Successfully edited ${filePath}. New content:\n\n'''\n${editedContent}\n'''`,
},
],
} as CallToolResult;
} else {
return {
content: [
{
type: "text",
text: `Could not edit ${filePath}. Please check that the file exists and submit the request again.`,
},
],
isError: true,
};
}
},
);
mcpServer.registerTool(
"edit_file",
{
description:
"Edit a file by passing its path, the old string and the new string.",
inputSchema: editSchemaShape,
},
async ({ filePath, oldString, newString }) => {
const agentfs = await getAgentFS({});
const editedContent = await editFile(
filePath,
oldString,
newString,
agentfs,
);
if (typeof editedContent == "string") {
return {
content: [
{
type: "text",
text: `Successfully edited ${filePath}. New content:\n\n'''\n${editedContent}\n'''`,
},
],
} as CallToolResult;
} else {
return {
content: [
{
type: "text",
text: `Could not edit ${filePath}. Please check that the file exists and submit the request again.`,
},
],
isError: true,
};
}
},
);
mcpServer.registerTool(
"list_files",
{
description: "List all the available files",
inputSchema: listFilesSchemaShape,
},
async () => {
const agentfs = await getAgentFS({});
const files = await listFiles(agentfs);
if (files != "") {
return { content: [{ type: "text", text: files }] };
} else {
return {
content: [
{
type: "text",
text: `Could not list files. Please report this failure to the user`,
},
],
isError: true,
};
}
},
);
async function main() {
const transport = new StdioServerTransport();
await mcpServer.connect(transport);
console.log("MCP server is running...");
mcpServer.registerTool(
"list_files",
{
description: "List all the available files",
inputSchema: listFilesSchemaShape,
},
async () => {
const agentfs = await getAgentFS({});
const files = await listFiles(agentfs);
console.log("Sending result: ", files)
if (files != "") {
return { content: [{ type: "text", text: files }] };
} else {
return {
content: [
{
type: "text",
text: `Could not list files. Please report this failure to the user`,
},
],
isError: true,
};
}
},
);
return mcpServer
}
main().catch((error) => {
console.error("Server error:", error);
process.exit(1);
const app = createMcpExpressApp();
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};
app.post('/mcp', async (req: Request, res: Response) => {
console.log('Received MCP request:', req.body);
try {
// Check for existing session ID
const sessionId = req.headers['mcp-session-id'] as string | undefined;
let transport: StreamableHTTPServerTransport;
if (sessionId && transports[sessionId]) {
// Reuse existing transport
transport = transports[sessionId];
} else if (!sessionId && isInitializeRequest(req.body)) {
// New initialization request - use JSON response mode
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
enableJsonResponse: true, // Enable JSON response mode
onsessioninitialized: sessionId => {
// Store the transport by session ID when session is initialized
// This avoids race conditions where requests might come in before the session is stored
console.log(`Session initialized with ID: ${sessionId}`);
transports[sessionId] = transport;
}
});
// Connect the transport to the MCP server BEFORE handling the request
const server = getServer();
await server.connect(transport);
await transport.handleRequest(req, res, req.body);
return; // Already handled
} else {
// Invalid request - no session ID or not initialization request
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided'
},
id: null
});
return;
}
// Handle the request with existing transport - no need to reconnect
await transport.handleRequest(req, res, req.body);
} catch (error) {
console.error('Error handling MCP request:', error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal server error'
},
id: null
});
}
}
});
// Handle GET requests for SSE streams according to spec
app.get('/mcp', async (req: Request, res: Response) => {
// Since this is a very simple example, we don't support GET requests for this server
// The spec requires returning 405 Method Not Allowed in this case
res.status(405).set('Allow', 'POST').send('Method Not Allowed');
});
// Start the server
const PORT = 3000;
app.listen(PORT, error => {
if (error) {
console.error('Failed to start server:', error);
process.exit(1);
}
console.log(`MCP Streamable HTTP Server listening on port ${PORT}`);
});
// Handle server shutdown
process.on('SIGINT', async () => {
console.log('Shutting down server...');
process.exit(0);
});