Merge pull request #339 from cliffhall/fix-streamable-endpoint

Fix support for streamable-http connections
This commit is contained in:
Cliff Hall
2025-04-25 11:26:46 -04:00
committed by GitHub
5 changed files with 216 additions and 34 deletions

View File

@@ -1,6 +1,6 @@
module.exports = {
preset: "ts-jest",
testEnvironment: "jsdom",
testEnvironment: "jest-fixed-jsdom",
moduleNameMapper: {
"^@/(.*)$": "<rootDir>/src/$1",
"\\.css$": "<rootDir>/src/__mocks__/styleMock.js",

View File

@@ -3,6 +3,7 @@ import {
SSEClientTransport,
SseError,
} from "@modelcontextprotocol/sdk/client/sse.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import {
ClientNotification,
ClientRequest,
@@ -278,15 +279,29 @@ export function useConnection({
setConnectionStatus("error-connecting-to-proxy");
return;
}
const mcpProxyServerUrl = new URL(`${getMCPProxyAddress(config)}/sse`);
mcpProxyServerUrl.searchParams.append("transportType", transportType);
if (transportType === "stdio") {
mcpProxyServerUrl.searchParams.append("command", command);
mcpProxyServerUrl.searchParams.append("args", args);
mcpProxyServerUrl.searchParams.append("env", JSON.stringify(env));
} else {
mcpProxyServerUrl.searchParams.append("url", sseUrl);
let mcpProxyServerUrl;
switch (transportType) {
case "stdio":
mcpProxyServerUrl = new URL(`${getMCPProxyAddress(config)}/stdio`);
mcpProxyServerUrl.searchParams.append("command", command);
mcpProxyServerUrl.searchParams.append("args", args);
mcpProxyServerUrl.searchParams.append("env", JSON.stringify(env));
break;
case "sse":
mcpProxyServerUrl = new URL(`${getMCPProxyAddress(config)}/sse`);
mcpProxyServerUrl.searchParams.append("url", sseUrl);
break;
case "streamable-http":
mcpProxyServerUrl = new URL(`${getMCPProxyAddress(config)}/mcp`);
mcpProxyServerUrl.searchParams.append("url", sseUrl);
break;
}
(mcpProxyServerUrl as URL).searchParams.append(
"transportType",
transportType,
);
try {
// Inject auth manually instead of using SSEClientTransport, because we're
@@ -304,14 +319,24 @@ export function useConnection({
headers[authHeaderName] = `Bearer ${token}`;
}
const clientTransport = new SSEClientTransport(mcpProxyServerUrl, {
// Create appropriate transport
const transportOptions = {
eventSourceInit: {
fetch: (url, init) => fetch(url, { ...init, headers }),
fetch: (
url: string | URL | globalThis.Request,
init: RequestInit | undefined,
) => fetch(url, { ...init, headers }),
},
requestInit: {
headers,
},
});
};
const clientTransport =
transportType === "streamable-http"
? new StreamableHTTPClientTransport(mcpProxyServerUrl as URL, {
sessionId: undefined,
})
: new SSEClientTransport(mcpProxyServerUrl as URL, transportOptions);
if (onNotification) {
[

28
package-lock.json generated
View File

@@ -17,7 +17,7 @@
"@modelcontextprotocol/inspector-cli": "^0.10.2",
"@modelcontextprotocol/inspector-client": "^0.10.2",
"@modelcontextprotocol/inspector-server": "^0.10.2",
"@modelcontextprotocol/sdk": "^1.10.0",
"@modelcontextprotocol/sdk": "^1.10.2",
"concurrently": "^9.0.1",
"shell-quote": "^1.8.2",
"spawn-rx": "^5.1.2",
@@ -31,13 +31,14 @@
"@types/jest": "^29.5.14",
"@types/node": "^22.7.5",
"@types/shell-quote": "^1.7.5",
"jest-fixed-jsdom": "^0.0.9",
"prettier": "3.3.3",
"typescript": "^5.4.2"
}
},
"cli": {
"name": "@modelcontextprotocol/inspector-cli",
"version": "0.10.1",
"version": "0.10.2",
"license": "MIT",
"dependencies": {
"@modelcontextprotocol/sdk": "^1.10.0",
@@ -58,7 +59,7 @@
},
"client": {
"name": "@modelcontextprotocol/inspector-client",
"version": "0.10.1",
"version": "0.10.2",
"license": "MIT",
"dependencies": {
"@modelcontextprotocol/sdk": "^1.10.0",
@@ -1399,9 +1400,9 @@
"link": true
},
"node_modules/@modelcontextprotocol/sdk": {
"version": "1.10.0",
"resolved": "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.10.0.tgz",
"integrity": "sha512-wijOavYZfSOADbVM0LA7mrQ17N4IKNdFcfezknCCsZ1Y1KstVWlkDZ5ebcxuQJmqTTxsNjBHLc7it1SV0TBiPg==",
"version": "1.10.2",
"resolved": "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.10.2.tgz",
"integrity": "sha512-rb6AMp2DR4SN+kc6L1ta2NCpApyA9WYNx3CrTSZvGxq9wH71bRur+zRqPfg0vQ9mjywR7qZdX2RGHOPq3ss+tA==",
"license": "MIT",
"dependencies": {
"content-type": "^1.0.5",
@@ -5358,6 +5359,19 @@
"node": "^14.15.0 || ^16.10.0 || >=18.0.0"
}
},
"node_modules/jest-fixed-jsdom": {
"version": "0.0.9",
"resolved": "https://registry.npmjs.org/jest-fixed-jsdom/-/jest-fixed-jsdom-0.0.9.tgz",
"integrity": "sha512-KPfqh2+sn5q2B+7LZktwDcwhCpOpUSue8a1I+BcixWLOQoEVyAjAGfH+IYZGoxZsziNojoHGRTC8xRbB1wDD4g==",
"dev": true,
"license": "MIT",
"engines": {
"node": ">=18.0.0"
},
"peerDependencies": {
"jest-environment-jsdom": ">=28.0.0"
}
},
"node_modules/jest-get-type": {
"version": "29.6.3",
"dev": true,
@@ -8550,7 +8564,7 @@
},
"server": {
"name": "@modelcontextprotocol/inspector-server",
"version": "0.10.1",
"version": "0.10.2",
"license": "MIT",
"dependencies": {
"@modelcontextprotocol/sdk": "^1.10.0",

View File

@@ -41,7 +41,7 @@
"@modelcontextprotocol/inspector-cli": "^0.10.2",
"@modelcontextprotocol/inspector-client": "^0.10.2",
"@modelcontextprotocol/inspector-server": "^0.10.2",
"@modelcontextprotocol/sdk": "^1.10.0",
"@modelcontextprotocol/sdk": "^1.10.2",
"concurrently": "^9.0.1",
"shell-quote": "^1.8.2",
"spawn-rx": "^5.1.2",
@@ -52,6 +52,7 @@
"@types/jest": "^29.5.14",
"@types/node": "^22.7.5",
"@types/shell-quote": "^1.7.5",
"jest-fixed-jsdom": "^0.0.9",
"prettier": "3.3.3",
"typescript": "^5.4.2"
}

View File

@@ -12,15 +12,21 @@ import {
StdioClientTransport,
getDefaultEnvironment,
} from "@modelcontextprotocol/sdk/client/stdio.js";
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import express from "express";
import { findActualExecutable } from "spawn-rx";
import mcpProxy from "./mcpProxy.js";
import { randomUUID } from "node:crypto";
const SSE_HEADERS_PASSTHROUGH = ["authorization"];
const STREAMABLE_HTTP_HEADERS_PASSTHROUGH = ["authorization"];
const STREAMABLE_HTTP_HEADERS_PASSTHROUGH = [
"authorization",
"mcp-session-id",
"last-event-id",
];
const defaultEnvironment = {
...getDefaultEnvironment(),
@@ -37,8 +43,12 @@ const { values } = parseArgs({
const app = express();
app.use(cors());
app.use((req, res, next) => {
res.header("Access-Control-Expose-Headers", "mcp-session-id");
next();
});
let webAppTransports: SSEServerTransport[] = [];
const webAppTransports: Map<string, Transport> = new Map<string, Transport>(); // Transports by sessionId
const createTransport = async (req: express.Request): Promise<Transport> => {
const query = req.query;
@@ -97,7 +107,9 @@ const createTransport = async (req: express.Request): Promise<Transport> => {
console.log("Connected to SSE transport");
return transport;
} else if (transportType === "streamable-http") {
const headers: HeadersInit = {};
const headers: HeadersInit = {
Accept: "text/event-stream, application/json",
};
for (const key of STREAMABLE_HTTP_HEADERS_PASSTHROUGH) {
if (req.headers[key] === undefined) {
@@ -127,9 +139,96 @@ const createTransport = async (req: express.Request): Promise<Transport> => {
let backingServerTransport: Transport | undefined;
app.get("/sse", async (req, res) => {
app.get("/mcp", async (req, res) => {
const sessionId = req.headers["mcp-session-id"] as string;
console.log(`Received GET message for sessionId ${sessionId}`);
try {
console.log("New SSE connection");
const transport = webAppTransports.get(
sessionId,
) as StreamableHTTPServerTransport;
if (!transport) {
res.status(404).end("Session not found");
return;
} else {
await transport.handleRequest(req, res);
}
} catch (error) {
console.error("Error in /mcp route:", error);
res.status(500).json(error);
}
});
app.post("/mcp", async (req, res) => {
const sessionId = req.headers["mcp-session-id"] as string | undefined;
console.log(`Received POST message for sessionId ${sessionId}`);
if (!sessionId) {
try {
console.log("New streamable-http connection");
try {
await backingServerTransport?.close();
backingServerTransport = await createTransport(req);
} catch (error) {
if (error instanceof SseError && error.code === 401) {
console.error(
"Received 401 Unauthorized from MCP server:",
error.message,
);
res.status(401).json(error);
return;
}
throw error;
}
console.log("Connected MCP client to backing server transport");
const webAppTransport = new StreamableHTTPServerTransport({
sessionIdGenerator: randomUUID,
onsessioninitialized: (sessionId) => {
webAppTransports.set(sessionId, webAppTransport);
console.log("Created streamable web app transport " + sessionId);
},
});
await webAppTransport.start();
mcpProxy({
transportToClient: webAppTransport,
transportToServer: backingServerTransport,
});
await (webAppTransport as StreamableHTTPServerTransport).handleRequest(
req,
res,
req.body,
);
} catch (error) {
console.error("Error in /mcp POST route:", error);
res.status(500).json(error);
}
} else {
try {
const transport = webAppTransports.get(
sessionId,
) as StreamableHTTPServerTransport;
if (!transport) {
res.status(404).end("Transport not found for sessionId " + sessionId);
} else {
await (transport as StreamableHTTPServerTransport).handleRequest(
req,
res,
);
}
} catch (error) {
console.error("Error in /mcp route:", error);
res.status(500).json(error);
}
}
});
app.get("/stdio", async (req, res) => {
try {
console.log("New connection");
try {
await backingServerTransport?.close();
@@ -150,15 +249,14 @@ app.get("/sse", async (req, res) => {
console.log("Connected MCP client to backing server transport");
const webAppTransport = new SSEServerTransport("/message", res);
console.log("Created web app transport");
webAppTransports.set(webAppTransport.sessionId, webAppTransport);
webAppTransports.push(webAppTransport);
console.log("Created web app transport");
await webAppTransport.start();
if (backingServerTransport instanceof StdioClientTransport) {
backingServerTransport.stderr!.on("data", (chunk) => {
(backingServerTransport as StdioClientTransport).stderr!.on(
"data",
(chunk) => {
webAppTransport.send({
jsonrpc: "2.0",
method: "notifications/stderr",
@@ -166,9 +264,51 @@ app.get("/sse", async (req, res) => {
content: chunk.toString(),
},
});
});
},
);
mcpProxy({
transportToClient: webAppTransport,
transportToServer: backingServerTransport,
});
console.log("Set up MCP proxy");
} catch (error) {
console.error("Error in /stdio route:", error);
res.status(500).json(error);
}
});
app.get("/sse", async (req, res) => {
try {
console.log(
"New SSE connection. NOTE: The sse transport is deprecated and has been replaced by streamable-http",
);
try {
await backingServerTransport?.close();
backingServerTransport = await createTransport(req);
} catch (error) {
if (error instanceof SseError && error.code === 401) {
console.error(
"Received 401 Unauthorized from MCP server:",
error.message,
);
res.status(401).json(error);
return;
}
throw error;
}
console.log("Connected MCP client to backing server transport");
const webAppTransport = new SSEServerTransport("/message", res);
webAppTransports.set(webAppTransport.sessionId, webAppTransport);
console.log("Created web app transport");
await webAppTransport.start();
mcpProxy({
transportToClient: webAppTransport,
transportToServer: backingServerTransport,
@@ -186,7 +326,9 @@ app.post("/message", async (req, res) => {
const sessionId = req.query.sessionId;
console.log(`Received message for sessionId ${sessionId}`);
const transport = webAppTransports.find((t) => t.sessionId === sessionId);
const transport = webAppTransports.get(
sessionId as string,
) as SSEServerTransport;
if (!transport) {
res.status(404).end("Session not found");
return;