diff --git a/packages/api/src/mcp/__tests__/MCPConnectionAgentLifecycle.test.ts b/packages/api/src/mcp/__tests__/MCPConnectionAgentLifecycle.test.ts index f3b3a4c0f2..14e0694558 100644 --- a/packages/api/src/mcp/__tests__/MCPConnectionAgentLifecycle.test.ts +++ b/packages/api/src/mcp/__tests__/MCPConnectionAgentLifecycle.test.ts @@ -229,7 +229,8 @@ describe('MCPConnection Agent lifecycle – streamable-http', () => { await safeDisconnect(conn); /** - * streamable-http creates one Agent via createFetchFunction. + * streamable-http creates two Agents via createFetchFunction: one for POST + * (normal timeout) and one for GET SSE (long body timeout). * If agents were per-request (old bug), they would not be stored and close * would be called 0 times. With our fix, Agents are stored and closed on * disconnect regardless of request count — confirming reuse. @@ -300,6 +301,57 @@ describe('MCPConnection Agent lifecycle – streamable-http', () => { expect(closeSpy.mock.calls.length).toBe(countAfterFirst); conn = null; }); + + it('creates separate Agents for POST (normal timeout) and GET SSE (default sseReadTimeout)', async () => { + conn = new MCPConnection({ + serverName: 'test', + serverConfig: { type: 'streamable-http', url: server.url }, + useSSRFProtection: false, + }); + + await conn.connect(); + + const agents = (conn as unknown as { agents: Agent[] }).agents; + expect(agents.length).toBeGreaterThanOrEqual(2); + + const optionsSym = Object.getOwnPropertySymbols(agents[0]).find( + (s) => s.toString() === 'Symbol(options)', + ); + expect(optionsSym).toBeDefined(); + + const bodyTimeouts = agents.map( + (a) => (a as unknown as Record)[optionsSym!].bodyTimeout, + ); + + const hasShortTimeout = bodyTimeouts.some((t) => t <= 120_000); + const hasLongTimeout = bodyTimeouts.some((t) => t === 5 * 60 * 1000); + + expect(hasShortTimeout).toBe(true); + expect(hasLongTimeout).toBe(true); + }); + + it('respects a custom sseReadTimeout from server config', async () => { + const customTimeout = 10 * 60 * 1000; + conn = new MCPConnection({ + serverName: 'test', + serverConfig: { type: 'streamable-http', url: server.url, sseReadTimeout: customTimeout }, + useSSRFProtection: false, + }); + + await conn.connect(); + + const agents = (conn as unknown as { agents: Agent[] }).agents; + const optionsSym = Object.getOwnPropertySymbols(agents[0]).find( + (s) => s.toString() === 'Symbol(options)', + ); + expect(optionsSym).toBeDefined(); + + const bodyTimeouts = agents.map( + (a) => (a as unknown as Record)[optionsSym!].bodyTimeout, + ); + + expect(bodyTimeouts).toContain(customTimeout); + }); }); describe('MCPConnection Agent lifecycle – SSE', () => { @@ -528,3 +580,144 @@ describe('MCPConnection SSE 404 handling – session-aware', () => { expect(emitSpy).not.toHaveBeenCalledWith('connectionChange', 'error'); }); }); + +describe('MCPConnection SSE stream disconnect handling', () => { + function makeTransportStub() { + return { + onerror: undefined as ((e: Error) => void) | undefined, + onclose: undefined as (() => void) | undefined, + onmessage: undefined as ((m: unknown) => void) | undefined, + start: jest.fn(), + close: jest.fn(), + send: jest.fn(), + }; + } + + function makeConn() { + return new MCPConnection({ + serverName: 'test-sse-disconnect', + serverConfig: { url: 'http://127.0.0.1:1/sse' }, + useSSRFProtection: false, + }); + } + + function bindErrorHandler(conn: MCPConnection, transport: ReturnType) { + ( + conn as unknown as { setupTransportErrorHandlers: (t: unknown) => void } + ).setupTransportErrorHandlers(transport); + } + + beforeEach(() => { + mockLogger.debug.mockClear(); + mockLogger.error.mockClear(); + }); + + it('suppresses "SSE stream disconnected" errors from escalating to full reconnection', () => { + const conn = makeConn(); + const transport = makeTransportStub(); + const emitSpy = jest.spyOn(conn, 'emit'); + bindErrorHandler(conn, transport); + + transport.onerror?.( + new Error('SSE stream disconnected: AbortError: The operation was aborted'), + ); + + expect(mockLogger.debug).toHaveBeenCalledWith( + expect.stringContaining('SDK SSE stream recovery in progress'), + ); + expect(emitSpy).not.toHaveBeenCalledWith('connectionChange', 'error'); + }); + + it('suppresses "Failed to reconnect SSE stream" errors (SDK still has retries left)', () => { + const conn = makeConn(); + const transport = makeTransportStub(); + const emitSpy = jest.spyOn(conn, 'emit'); + bindErrorHandler(conn, transport); + + transport.onerror?.(new Error('Failed to reconnect SSE stream: connection refused')); + + expect(mockLogger.debug).toHaveBeenCalledWith( + expect.stringContaining('SDK SSE stream recovery in progress'), + ); + expect(emitSpy).not.toHaveBeenCalledWith('connectionChange', 'error'); + }); + + it('escalates "Maximum reconnection attempts exceeded" (SDK gave up)', () => { + const conn = makeConn(); + const transport = makeTransportStub(); + const emitSpy = jest.spyOn(conn, 'emit'); + bindErrorHandler(conn, transport); + + transport.onerror?.(new Error('Maximum reconnection attempts (2) exceeded.')); + + expect(emitSpy).toHaveBeenCalledWith('connectionChange', 'error'); + }); + + it('still escalates non-SSE-stream errors (e.g. POST failures)', () => { + const conn = makeConn(); + const transport = makeTransportStub(); + const emitSpy = jest.spyOn(conn, 'emit'); + bindErrorHandler(conn, transport); + + transport.onerror?.(new Error('Streamable HTTP error: Error POSTing to endpoint: 500')); + + expect(emitSpy).toHaveBeenCalledWith('connectionChange', 'error'); + }); +}); + +describe('MCPConnection SSE GET stream recovery – integration', () => { + let server: TestServer; + let conn: MCPConnection | null; + + beforeEach(async () => { + server = await createStreamableServer(); + conn = null; + }); + + afterEach(async () => { + await safeDisconnect(conn); + conn = null; + jest.restoreAllMocks(); + await server.close(); + }); + + it('survives a GET SSE body timeout without triggering a full transport rebuild', async () => { + const SHORT_SSE_TIMEOUT = 1500; + + conn = new MCPConnection({ + serverName: 'test-sse-recovery', + serverConfig: { + type: 'streamable-http', + url: server.url, + sseReadTimeout: SHORT_SSE_TIMEOUT, + }, + useSSRFProtection: false, + }); + + await conn.connect(); + + await conn.fetchTools(); + + /** + * Wait for the GET SSE body timeout to fire. The SDK will see a stream + * error and call onerror("SSE stream disconnected: …"), then internally + * schedule a reconnection. Our handler should suppress the escalation. + */ + await new Promise((resolve) => setTimeout(resolve, SHORT_SSE_TIMEOUT + 1000)); + + expect(mockLogger.debug).toHaveBeenCalledWith( + expect.stringContaining('SDK SSE stream recovery in progress'), + ); + expect(mockLogger.error).not.toHaveBeenCalledWith( + expect.stringContaining('Reconnection handler failed'), + expect.anything(), + ); + + /** + * The connection should still be functional — POST requests use a + * separate Agent with the normal timeout and are unaffected. + */ + const tools = await conn.fetchTools(); + expect(tools).toBeDefined(); + }, 10_000); +}); diff --git a/packages/api/src/mcp/connection.ts b/packages/api/src/mcp/connection.ts index 534607234e..5744059708 100644 --- a/packages/api/src/mcp/connection.ts +++ b/packages/api/src/mcp/connection.ts @@ -71,6 +71,17 @@ const FIVE_MINUTES = 5 * 60 * 1000; const DEFAULT_TIMEOUT = 60000; /** SSE connections through proxies may need longer initial handshake time */ const SSE_CONNECT_TIMEOUT = 120000; +/** Default body timeout for Streamable HTTP GET SSE streams that idle between server pushes */ +const DEFAULT_SSE_READ_TIMEOUT = FIVE_MINUTES; + +/** + * Error message prefixes emitted by the MCP SDK's StreamableHTTPClientTransport + * (client/streamableHttp.ts → _handleSseStream / _scheduleReconnection). + * These are SDK-internal strings, not part of a public API. If the SDK changes + * them, suppression in setupTransportErrorHandlers will silently stop working. + */ +const SDK_SSE_STREAM_DISCONNECTED = 'SSE stream disconnected'; +const SDK_SSE_RECONNECT_FAILED = 'Failed to reconnect SSE stream'; /** * Headers for SSE connections. @@ -254,6 +265,7 @@ export class MCPConnection extends EventEmitter { private readonly useSSRFProtection: boolean; iconPath?: string; timeout?: number; + sseReadTimeout?: number; url?: string; /** @@ -285,6 +297,7 @@ export class MCPConnection extends EventEmitter { this.useSSRFProtection = params.useSSRFProtection === true; this.iconPath = params.serverConfig.iconPath; this.timeout = params.serverConfig.timeout; + this.sseReadTimeout = params.serverConfig.sseReadTimeout; this.lastPingTime = Date.now(); this.createdAt = Date.now(); // Record creation timestamp for staleness detection if (params.oauthTokens) { @@ -313,30 +326,45 @@ export class MCPConnection extends EventEmitter { * Factory function to create fetch functions without capturing the entire `this` context. * This helps prevent memory leaks by only passing necessary dependencies. * - * @param getHeaders Function to retrieve request headers - * @param timeout Timeout value for the agent (in milliseconds) - * @returns A fetch function that merges headers appropriately + * When `sseBodyTimeout` is provided, a second Agent is created with a much longer + * body timeout for GET requests (the Streamable HTTP SSE stream). POST requests + * continue using the normal timeout so they fail fast on real errors. */ private createFetchFunction( getHeaders: () => Record | null | undefined, timeout?: number, + sseBodyTimeout?: number, ): (input: UndiciRequestInfo, init?: UndiciRequestInit) => Promise { const ssrfConnect = this.useSSRFProtection ? createSSRFSafeUndiciConnect() : undefined; + const connectOpts = ssrfConnect != null ? { connect: ssrfConnect } : {}; const effectiveTimeout = timeout || DEFAULT_TIMEOUT; - const agent = new Agent({ + const postAgent = new Agent({ bodyTimeout: effectiveTimeout, headersTimeout: effectiveTimeout, - ...(ssrfConnect != null ? { connect: ssrfConnect } : {}), + ...connectOpts, }); - this.agents.push(agent); + this.agents.push(postAgent); + + let getAgent: Agent | undefined; + if (sseBodyTimeout != null) { + getAgent = new Agent({ + bodyTimeout: sseBodyTimeout, + headersTimeout: effectiveTimeout, + ...connectOpts, + }); + this.agents.push(getAgent); + } return function customFetch( input: UndiciRequestInfo, init?: UndiciRequestInit, ): Promise { + const isGet = (init?.method ?? 'GET').toUpperCase() === 'GET'; + const dispatcher = isGet && getAgent ? getAgent : postAgent; + const requestHeaders = getHeaders(); if (!requestHeaders) { - return undiciFetch(input, { ...init, dispatcher: agent }); + return undiciFetch(input, { ...init, dispatcher }); } let initHeaders: Record = {}; @@ -356,7 +384,7 @@ export class MCPConnection extends EventEmitter { ...initHeaders, ...requestHeaders, }, - dispatcher: agent, + dispatcher, }); }; } @@ -507,6 +535,7 @@ export class MCPConnection extends EventEmitter { fetch: this.createFetchFunction( this.getRequestHeaders.bind(this), this.timeout, + this.sseReadTimeout || DEFAULT_SSE_READ_TIMEOUT, ) as unknown as FetchLike, }); @@ -829,7 +858,26 @@ export class MCPConnection extends EventEmitter { private setupTransportErrorHandlers(transport: Transport): void { transport.onerror = (error) => { - // Extract meaningful error information (handles "SSE error: undefined" cases) + const rawMessage = + error && typeof error === 'object' ? ((error as { message?: string }).message ?? '') : ''; + + /** + * The MCP SDK's StreamableHTTPClientTransport fires onerror for SSE GET stream + * disconnects but also handles reconnection internally via _scheduleReconnection. + * Escalating these to a full transport rebuild creates a redundant reconnection + * loop. Log at debug level and let the SDK recover the GET stream on its own. + * + * "Maximum reconnection attempts … exceeded" means the SDK gave up — that one + * must fall through so our higher-level reconnection takes over. + */ + if ( + rawMessage.startsWith(SDK_SSE_STREAM_DISCONNECTED) || + rawMessage.startsWith(SDK_SSE_RECONNECT_FAILED) + ) { + logger.debug(`${this.getLogPrefix()} SDK SSE stream recovery in progress: ${rawMessage}`); + return; + } + const { message: errorMessage, code: errorCode, diff --git a/packages/data-provider/src/mcp.ts b/packages/data-provider/src/mcp.ts index 50a9746357..3911e91ed0 100644 --- a/packages/data-provider/src/mcp.ts +++ b/packages/data-provider/src/mcp.ts @@ -19,6 +19,8 @@ const BaseOptionsSchema = z.object({ startup: z.boolean().optional(), iconPath: z.string().optional(), timeout: z.number().optional(), + /** Timeout (ms) for the long-lived SSE GET stream body before undici aborts it. Default: 300_000 (5 min). */ + sseReadTimeout: z.number().positive().optional(), initTimeout: z.number().optional(), /** Controls visibility in chat dropdown menu (MCPSelect) */ chatMenu: z.boolean().optional(), @@ -212,6 +214,7 @@ const omitServerManagedFields = >(schema: T schema.omit({ startup: true, timeout: true, + sseReadTimeout: true, initTimeout: true, chatMenu: true, serverInstructions: true,