WIP: Attempting to proxy streamable-http connections. Inspector still works fine with STDIO and SSE servers.

* In index.ts,
  - refactor transport webAppTransports to be a map with the session id as key and transport as value.

* Implement /mcp GET and POST endpoints using StreamableHTTPServerTransport and doing the new session in the POST (opposite from SSE) handler.

* In package.json
  - update the SDK to 1.10.2

* In useConnection.ts
  - import StreamableHTTPClientTransport
    - NOTE: while we NEED to do this, it causes useConnection.test.ts to fail with " ReferenceError: TransformStream is not defined"
  - in connect method
    - instantiate the appropriate transport
This commit is contained in:
cliffhall
2025-04-22 18:25:47 -04:00
parent 3a2e248527
commit 6e4dcd6120
4 changed files with 107 additions and 74 deletions

View File

@@ -3,6 +3,7 @@ import {
SSEClientTransport,
SseError,
} from "@modelcontextprotocol/sdk/client/sse.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import {
ClientNotification,
ClientRequest,
@@ -286,6 +287,7 @@ export function useConnection({
mcpProxyServerUrl.searchParams.append("args", args);
mcpProxyServerUrl.searchParams.append("env", JSON.stringify(env));
break;
case "sse":
mcpProxyServerUrl = new URL(`${getMCPProxyAddress(config)}/sse`);
mcpProxyServerUrl.searchParams.append("url", sseUrl);
@@ -317,14 +319,24 @@ export function useConnection({
headers[authHeaderName] = `Bearer ${token}`;
}
const clientTransport = new SSEClientTransport(mcpProxyServerUrl as URL, {
// Create appropriate transport
const transportOptions = {
eventSourceInit: {
fetch: (url, init) => fetch(url, { ...init, headers }),
fetch: (
url: string | URL | globalThis.Request,
init: RequestInit | undefined,
) => fetch(url, { ...init, headers }),
},
requestInit: {
headers,
},
});
};
const clientTransport =
transportType === "streamable-http"
? new StreamableHTTPClientTransport(mcpProxyServerUrl as URL, {
sessionId: undefined,
})
: new SSEClientTransport(mcpProxyServerUrl as URL, transportOptions);
if (onNotification) {
[

14
package-lock.json generated
View File

@@ -17,7 +17,7 @@
"@modelcontextprotocol/inspector-cli": "^0.10.2",
"@modelcontextprotocol/inspector-client": "^0.10.2",
"@modelcontextprotocol/inspector-server": "^0.10.2",
"@modelcontextprotocol/sdk": "^1.10.0",
"@modelcontextprotocol/sdk": "^1.10.2",
"concurrently": "^9.0.1",
"shell-quote": "^1.8.2",
"spawn-rx": "^5.1.2",
@@ -37,7 +37,7 @@
},
"cli": {
"name": "@modelcontextprotocol/inspector-cli",
"version": "0.10.1",
"version": "0.10.2",
"license": "MIT",
"dependencies": {
"@modelcontextprotocol/sdk": "^1.10.0",
@@ -58,7 +58,7 @@
},
"client": {
"name": "@modelcontextprotocol/inspector-client",
"version": "0.10.1",
"version": "0.10.2",
"license": "MIT",
"dependencies": {
"@modelcontextprotocol/sdk": "^1.10.0",
@@ -1399,9 +1399,9 @@
"link": true
},
"node_modules/@modelcontextprotocol/sdk": {
"version": "1.10.0",
"resolved": "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.10.0.tgz",
"integrity": "sha512-wijOavYZfSOADbVM0LA7mrQ17N4IKNdFcfezknCCsZ1Y1KstVWlkDZ5ebcxuQJmqTTxsNjBHLc7it1SV0TBiPg==",
"version": "1.10.2",
"resolved": "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.10.2.tgz",
"integrity": "sha512-rb6AMp2DR4SN+kc6L1ta2NCpApyA9WYNx3CrTSZvGxq9wH71bRur+zRqPfg0vQ9mjywR7qZdX2RGHOPq3ss+tA==",
"license": "MIT",
"dependencies": {
"content-type": "^1.0.5",
@@ -8550,7 +8550,7 @@
},
"server": {
"name": "@modelcontextprotocol/inspector-server",
"version": "0.10.1",
"version": "0.10.2",
"license": "MIT",
"dependencies": {
"@modelcontextprotocol/sdk": "^1.10.0",

View File

@@ -41,7 +41,7 @@
"@modelcontextprotocol/inspector-cli": "^0.10.2",
"@modelcontextprotocol/inspector-client": "^0.10.2",
"@modelcontextprotocol/inspector-server": "^0.10.2",
"@modelcontextprotocol/sdk": "^1.10.0",
"@modelcontextprotocol/sdk": "^1.10.2",
"concurrently": "^9.0.1",
"shell-quote": "^1.8.2",
"spawn-rx": "^5.1.2",

View File

@@ -12,15 +12,17 @@ import {
StdioClientTransport,
getDefaultEnvironment,
} from "@modelcontextprotocol/sdk/client/stdio.js";
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import express from "express";
import { findActualExecutable } from "spawn-rx";
import mcpProxy from "./mcpProxy.js";
import { randomUUID } from "node:crypto";
const SSE_HEADERS_PASSTHROUGH = ["authorization"];
const STREAMABLE_HTTP_HEADERS_PASSTHROUGH = ["authorization"];
const STREAMABLE_HTTP_HEADERS_PASSTHROUGH = ["authorization", "mcp-session-id"];
const defaultEnvironment = {
...getDefaultEnvironment(),
@@ -38,7 +40,7 @@ const { values } = parseArgs({
const app = express();
app.use(cors());
let webAppTransports: SSEServerTransport[] = [];
const webAppTransports: Map<string, Transport> = new Map<string, Transport>(); // Transports by sessionId
const createTransport = async (req: express.Request): Promise<Transport> => {
const query = req.query;
@@ -130,71 +132,89 @@ const createTransport = async (req: express.Request): Promise<Transport> => {
let backingServerTransport: Transport | undefined;
app.get("/mcp", async (req, res) => {
const sessionId = req.headers["mcp-session-id"] as string;
console.log(`Received GET message for sessionId ${sessionId}`);
try {
console.log("New streamable-http connection");
try {
await backingServerTransport?.close();
backingServerTransport = await createTransport(req);
} catch (error) {
if (error instanceof SseError && error.code === 401) {
console.error(
"Received 401 Unauthorized from MCP server:",
error.message,
);
res.status(401).json(error);
return;
}
throw error;
const transport = webAppTransports.get(
sessionId,
) as StreamableHTTPServerTransport;
if (!transport) {
res.status(404).end("Session not found");
return;
} else {
await transport.handleRequest(req, res);
}
console.log("Connected MCP client to backing server transport");
const webAppTransport = new SSEServerTransport("/mcp", res);
webAppTransports.push(webAppTransport);
console.log("Created web app transport");
await webAppTransport.start();
if (backingServerTransport instanceof StdioClientTransport) {
backingServerTransport.stderr!.on("data", (chunk) => {
webAppTransport.send({
jsonrpc: "2.0",
method: "notifications/stderr",
params: {
content: chunk.toString(),
},
});
});
}
mcpProxy({
transportToClient: webAppTransport,
transportToServer: backingServerTransport,
});
console.log("Set up MCP proxy");
} catch (error) {
console.error("Error in /sse route:", error);
console.error("Error in /mcp route:", error);
res.status(500).json(error);
}
});
app.post("/mcp", async (req, res) => {
try {
const sessionId = req.query.sessionId;
console.log(`Received message for sessionId ${sessionId}`);
const sessionId = req.headers["mcp-session-id"] as string | undefined;
console.log(`Received POST message for sessionId ${sessionId}`);
if (!sessionId) {
try {
console.log("New streamable-http connection");
try {
await backingServerTransport?.close();
backingServerTransport = await createTransport(req);
} catch (error) {
if (error instanceof SseError && error.code === 401) {
console.error(
"Received 401 Unauthorized from MCP server:",
error.message,
);
res.status(401).json(error);
return;
}
const transport = webAppTransports.find((t) => t.sessionId === sessionId);
if (!transport) {
res.status(404).end("Session not found");
return;
throw error;
}
console.log("Connected MCP client to backing server transport");
const webAppTransport = new StreamableHTTPServerTransport({
sessionIdGenerator: randomUUID,
onsessioninitialized: (sessionId) => {
webAppTransports.set(sessionId, webAppTransport);
console.log("Created streamable web app transport " + sessionId);
},
});
await webAppTransport.start();
mcpProxy({
transportToClient: webAppTransport,
transportToServer: backingServerTransport,
});
await (webAppTransport as StreamableHTTPServerTransport).handleRequest(
req,
res,
req.body,
);
} catch (error) {
console.error("Error in /mcp POST route:", error);
res.status(500).json(error);
}
} else {
try {
const transport = webAppTransports.get(
sessionId,
) as StreamableHTTPServerTransport;
if (!transport) {
res.status(404).end("Transport not found for sessionId " + sessionId);
} else {
await (transport as StreamableHTTPServerTransport).handleRequest(
req,
res,
);
}
} catch (error) {
console.error("Error in /mcp route:", error);
res.status(500).json(error);
}
await transport.handlePostMessage(req, res);
} catch (error) {
console.error("Error in /mcp route:", error);
res.status(500).json(error);
}
});
@@ -221,7 +241,7 @@ app.get("/stdio", async (req, res) => {
console.log("Connected MCP client to backing server transport");
const webAppTransport = new SSEServerTransport("/message", res);
webAppTransports.push(webAppTransport);
webAppTransports.set(webAppTransport.sessionId, webAppTransport);
console.log("Created web app transport");
@@ -276,8 +296,7 @@ app.get("/sse", async (req, res) => {
console.log("Connected MCP client to backing server transport");
const webAppTransport = new SSEServerTransport("/message", res);
webAppTransports.push(webAppTransport);
webAppTransports.set(webAppTransport.sessionId, webAppTransport);
console.log("Created web app transport");
await webAppTransport.start();
@@ -299,7 +318,9 @@ app.post("/message", async (req, res) => {
const sessionId = req.query.sessionId;
console.log(`Received message for sessionId ${sessionId}`);
const transport = webAppTransports.find((t) => t.sessionId === sessionId);
const transport = webAppTransports.get(
sessionId as string,
) as SSEServerTransport;
if (!transport) {
res.status(404).end("Session not found");
return;