⏱️ fix: Separate MCP GET SSE Stream Timeout from POST and Suppress SDK-Internal Recovery Errors (#11936)

* fix: Separate MCP GET SSE body timeout from POST and suppress SDK-internal stream recovery

- Add a dedicated GET Agent with a configurable `sseReadTimeout` (default 5 min,
  matching the Python MCP SDK) so idle SSE streams time out independently of POST
  requests, preventing the reconnect-loop log flood described in Discussion #11230.
- Suppress "SSE stream disconnected" and "Failed to reconnect SSE stream" errors
  in setupTransportErrorHandlers — these are SDK-internal recovery events, not
  transport failures. "Maximum reconnection attempts exceeded" still escalates.
- Add optional `sseReadTimeout` to BaseOptionsSchema for per-server configuration.
- Add 6 tests: agent timeout separation, custom sseReadTimeout, SSE disconnect
  suppression (3 unit), and a real-server integration test proving the GET stream
  recovers without a full transport rebuild.

* fix: Refactor MCP connection timeouts and error handling

- Updated the `DEFAULT_SSE_READ_TIMEOUT` to use a constant for better readability.
- Introduced internal error message constants for SSE stream disconnection and reconnection failures to improve maintainability.
- Enhanced type safety in tests by ensuring the options symbol is defined before usage.
- Updated the `sseReadTimeout` in `BaseOptionsSchema` to enforce positive values, ensuring valid configurations.

* chore: Update SSE read timeout documentation format in BaseOptionsSchema

- Changed the default timeout value comment in BaseOptionsSchema to use an underscore for better readability, aligning with common formatting practices.
This commit is contained in:
Danny Avila 2026-02-24 21:05:58 -05:00 committed by GitHub
parent 44dbbd5328
commit 9a8a5d66d7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 254 additions and 10 deletions

View file

@ -229,7 +229,8 @@ describe('MCPConnection Agent lifecycle streamable-http', () => {
await safeDisconnect(conn);
/**
* streamable-http creates one Agent via createFetchFunction.
* streamable-http creates two Agents via createFetchFunction: one for POST
* (normal timeout) and one for GET SSE (long body timeout).
* If agents were per-request (old bug), they would not be stored and close
* would be called 0 times. With our fix, Agents are stored and closed on
* disconnect regardless of request count confirming reuse.
@ -300,6 +301,57 @@ describe('MCPConnection Agent lifecycle streamable-http', () => {
expect(closeSpy.mock.calls.length).toBe(countAfterFirst);
conn = null;
});
it('creates separate Agents for POST (normal timeout) and GET SSE (default sseReadTimeout)', async () => {
conn = new MCPConnection({
serverName: 'test',
serverConfig: { type: 'streamable-http', url: server.url },
useSSRFProtection: false,
});
await conn.connect();
const agents = (conn as unknown as { agents: Agent[] }).agents;
expect(agents.length).toBeGreaterThanOrEqual(2);
const optionsSym = Object.getOwnPropertySymbols(agents[0]).find(
(s) => s.toString() === 'Symbol(options)',
);
expect(optionsSym).toBeDefined();
const bodyTimeouts = agents.map(
(a) => (a as unknown as Record<symbol, { bodyTimeout: number }>)[optionsSym!].bodyTimeout,
);
const hasShortTimeout = bodyTimeouts.some((t) => t <= 120_000);
const hasLongTimeout = bodyTimeouts.some((t) => t === 5 * 60 * 1000);
expect(hasShortTimeout).toBe(true);
expect(hasLongTimeout).toBe(true);
});
it('respects a custom sseReadTimeout from server config', async () => {
const customTimeout = 10 * 60 * 1000;
conn = new MCPConnection({
serverName: 'test',
serverConfig: { type: 'streamable-http', url: server.url, sseReadTimeout: customTimeout },
useSSRFProtection: false,
});
await conn.connect();
const agents = (conn as unknown as { agents: Agent[] }).agents;
const optionsSym = Object.getOwnPropertySymbols(agents[0]).find(
(s) => s.toString() === 'Symbol(options)',
);
expect(optionsSym).toBeDefined();
const bodyTimeouts = agents.map(
(a) => (a as unknown as Record<symbol, { bodyTimeout: number }>)[optionsSym!].bodyTimeout,
);
expect(bodyTimeouts).toContain(customTimeout);
});
});
describe('MCPConnection Agent lifecycle SSE', () => {
@ -528,3 +580,144 @@ describe('MCPConnection SSE 404 handling session-aware', () => {
expect(emitSpy).not.toHaveBeenCalledWith('connectionChange', 'error');
});
});
describe('MCPConnection SSE stream disconnect handling', () => {
function makeTransportStub() {
return {
onerror: undefined as ((e: Error) => void) | undefined,
onclose: undefined as (() => void) | undefined,
onmessage: undefined as ((m: unknown) => void) | undefined,
start: jest.fn(),
close: jest.fn(),
send: jest.fn(),
};
}
function makeConn() {
return new MCPConnection({
serverName: 'test-sse-disconnect',
serverConfig: { url: 'http://127.0.0.1:1/sse' },
useSSRFProtection: false,
});
}
function bindErrorHandler(conn: MCPConnection, transport: ReturnType<typeof makeTransportStub>) {
(
conn as unknown as { setupTransportErrorHandlers: (t: unknown) => void }
).setupTransportErrorHandlers(transport);
}
beforeEach(() => {
mockLogger.debug.mockClear();
mockLogger.error.mockClear();
});
it('suppresses "SSE stream disconnected" errors from escalating to full reconnection', () => {
const conn = makeConn();
const transport = makeTransportStub();
const emitSpy = jest.spyOn(conn, 'emit');
bindErrorHandler(conn, transport);
transport.onerror?.(
new Error('SSE stream disconnected: AbortError: The operation was aborted'),
);
expect(mockLogger.debug).toHaveBeenCalledWith(
expect.stringContaining('SDK SSE stream recovery in progress'),
);
expect(emitSpy).not.toHaveBeenCalledWith('connectionChange', 'error');
});
it('suppresses "Failed to reconnect SSE stream" errors (SDK still has retries left)', () => {
const conn = makeConn();
const transport = makeTransportStub();
const emitSpy = jest.spyOn(conn, 'emit');
bindErrorHandler(conn, transport);
transport.onerror?.(new Error('Failed to reconnect SSE stream: connection refused'));
expect(mockLogger.debug).toHaveBeenCalledWith(
expect.stringContaining('SDK SSE stream recovery in progress'),
);
expect(emitSpy).not.toHaveBeenCalledWith('connectionChange', 'error');
});
it('escalates "Maximum reconnection attempts exceeded" (SDK gave up)', () => {
const conn = makeConn();
const transport = makeTransportStub();
const emitSpy = jest.spyOn(conn, 'emit');
bindErrorHandler(conn, transport);
transport.onerror?.(new Error('Maximum reconnection attempts (2) exceeded.'));
expect(emitSpy).toHaveBeenCalledWith('connectionChange', 'error');
});
it('still escalates non-SSE-stream errors (e.g. POST failures)', () => {
const conn = makeConn();
const transport = makeTransportStub();
const emitSpy = jest.spyOn(conn, 'emit');
bindErrorHandler(conn, transport);
transport.onerror?.(new Error('Streamable HTTP error: Error POSTing to endpoint: 500'));
expect(emitSpy).toHaveBeenCalledWith('connectionChange', 'error');
});
});
describe('MCPConnection SSE GET stream recovery integration', () => {
let server: TestServer;
let conn: MCPConnection | null;
beforeEach(async () => {
server = await createStreamableServer();
conn = null;
});
afterEach(async () => {
await safeDisconnect(conn);
conn = null;
jest.restoreAllMocks();
await server.close();
});
it('survives a GET SSE body timeout without triggering a full transport rebuild', async () => {
const SHORT_SSE_TIMEOUT = 1500;
conn = new MCPConnection({
serverName: 'test-sse-recovery',
serverConfig: {
type: 'streamable-http',
url: server.url,
sseReadTimeout: SHORT_SSE_TIMEOUT,
},
useSSRFProtection: false,
});
await conn.connect();
await conn.fetchTools();
/**
* Wait for the GET SSE body timeout to fire. The SDK will see a stream
* error and call onerror("SSE stream disconnected: …"), then internally
* schedule a reconnection. Our handler should suppress the escalation.
*/
await new Promise((resolve) => setTimeout(resolve, SHORT_SSE_TIMEOUT + 1000));
expect(mockLogger.debug).toHaveBeenCalledWith(
expect.stringContaining('SDK SSE stream recovery in progress'),
);
expect(mockLogger.error).not.toHaveBeenCalledWith(
expect.stringContaining('Reconnection handler failed'),
expect.anything(),
);
/**
* The connection should still be functional POST requests use a
* separate Agent with the normal timeout and are unaffected.
*/
const tools = await conn.fetchTools();
expect(tools).toBeDefined();
}, 10_000);
});

View file

@ -71,6 +71,17 @@ const FIVE_MINUTES = 5 * 60 * 1000;
const DEFAULT_TIMEOUT = 60000;
/** SSE connections through proxies may need longer initial handshake time */
const SSE_CONNECT_TIMEOUT = 120000;
/** Default body timeout for Streamable HTTP GET SSE streams that idle between server pushes */
const DEFAULT_SSE_READ_TIMEOUT = FIVE_MINUTES;
/**
* Error message prefixes emitted by the MCP SDK's StreamableHTTPClientTransport
* (client/streamableHttp.ts _handleSseStream / _scheduleReconnection).
* These are SDK-internal strings, not part of a public API. If the SDK changes
* them, suppression in setupTransportErrorHandlers will silently stop working.
*/
const SDK_SSE_STREAM_DISCONNECTED = 'SSE stream disconnected';
const SDK_SSE_RECONNECT_FAILED = 'Failed to reconnect SSE stream';
/**
* Headers for SSE connections.
@ -254,6 +265,7 @@ export class MCPConnection extends EventEmitter {
private readonly useSSRFProtection: boolean;
iconPath?: string;
timeout?: number;
sseReadTimeout?: number;
url?: string;
/**
@ -285,6 +297,7 @@ export class MCPConnection extends EventEmitter {
this.useSSRFProtection = params.useSSRFProtection === true;
this.iconPath = params.serverConfig.iconPath;
this.timeout = params.serverConfig.timeout;
this.sseReadTimeout = params.serverConfig.sseReadTimeout;
this.lastPingTime = Date.now();
this.createdAt = Date.now(); // Record creation timestamp for staleness detection
if (params.oauthTokens) {
@ -313,30 +326,45 @@ export class MCPConnection extends EventEmitter {
* Factory function to create fetch functions without capturing the entire `this` context.
* This helps prevent memory leaks by only passing necessary dependencies.
*
* @param getHeaders Function to retrieve request headers
* @param timeout Timeout value for the agent (in milliseconds)
* @returns A fetch function that merges headers appropriately
* When `sseBodyTimeout` is provided, a second Agent is created with a much longer
* body timeout for GET requests (the Streamable HTTP SSE stream). POST requests
* continue using the normal timeout so they fail fast on real errors.
*/
private createFetchFunction(
getHeaders: () => Record<string, string> | null | undefined,
timeout?: number,
sseBodyTimeout?: number,
): (input: UndiciRequestInfo, init?: UndiciRequestInit) => Promise<UndiciResponse> {
const ssrfConnect = this.useSSRFProtection ? createSSRFSafeUndiciConnect() : undefined;
const connectOpts = ssrfConnect != null ? { connect: ssrfConnect } : {};
const effectiveTimeout = timeout || DEFAULT_TIMEOUT;
const agent = new Agent({
const postAgent = new Agent({
bodyTimeout: effectiveTimeout,
headersTimeout: effectiveTimeout,
...(ssrfConnect != null ? { connect: ssrfConnect } : {}),
...connectOpts,
});
this.agents.push(agent);
this.agents.push(postAgent);
let getAgent: Agent | undefined;
if (sseBodyTimeout != null) {
getAgent = new Agent({
bodyTimeout: sseBodyTimeout,
headersTimeout: effectiveTimeout,
...connectOpts,
});
this.agents.push(getAgent);
}
return function customFetch(
input: UndiciRequestInfo,
init?: UndiciRequestInit,
): Promise<UndiciResponse> {
const isGet = (init?.method ?? 'GET').toUpperCase() === 'GET';
const dispatcher = isGet && getAgent ? getAgent : postAgent;
const requestHeaders = getHeaders();
if (!requestHeaders) {
return undiciFetch(input, { ...init, dispatcher: agent });
return undiciFetch(input, { ...init, dispatcher });
}
let initHeaders: Record<string, string> = {};
@ -356,7 +384,7 @@ export class MCPConnection extends EventEmitter {
...initHeaders,
...requestHeaders,
},
dispatcher: agent,
dispatcher,
});
};
}
@ -507,6 +535,7 @@ export class MCPConnection extends EventEmitter {
fetch: this.createFetchFunction(
this.getRequestHeaders.bind(this),
this.timeout,
this.sseReadTimeout || DEFAULT_SSE_READ_TIMEOUT,
) as unknown as FetchLike,
});
@ -829,7 +858,26 @@ export class MCPConnection extends EventEmitter {
private setupTransportErrorHandlers(transport: Transport): void {
transport.onerror = (error) => {
// Extract meaningful error information (handles "SSE error: undefined" cases)
const rawMessage =
error && typeof error === 'object' ? ((error as { message?: string }).message ?? '') : '';
/**
* The MCP SDK's StreamableHTTPClientTransport fires onerror for SSE GET stream
* disconnects but also handles reconnection internally via _scheduleReconnection.
* Escalating these to a full transport rebuild creates a redundant reconnection
* loop. Log at debug level and let the SDK recover the GET stream on its own.
*
* "Maximum reconnection attempts … exceeded" means the SDK gave up that one
* must fall through so our higher-level reconnection takes over.
*/
if (
rawMessage.startsWith(SDK_SSE_STREAM_DISCONNECTED) ||
rawMessage.startsWith(SDK_SSE_RECONNECT_FAILED)
) {
logger.debug(`${this.getLogPrefix()} SDK SSE stream recovery in progress: ${rawMessage}`);
return;
}
const {
message: errorMessage,
code: errorCode,

View file

@ -19,6 +19,8 @@ const BaseOptionsSchema = z.object({
startup: z.boolean().optional(),
iconPath: z.string().optional(),
timeout: z.number().optional(),
/** Timeout (ms) for the long-lived SSE GET stream body before undici aborts it. Default: 300_000 (5 min). */
sseReadTimeout: z.number().positive().optional(),
initTimeout: z.number().optional(),
/** Controls visibility in chat dropdown menu (MCPSelect) */
chatMenu: z.boolean().optional(),
@ -212,6 +214,7 @@ const omitServerManagedFields = <T extends z.ZodObject<z.ZodRawShape>>(schema: T
schema.omit({
startup: true,
timeout: true,
sseReadTimeout: true,
initTimeout: true,
chatMenu: true,
serverInstructions: true,