🔌 fix: Reuse Undici Agents Per Transport and Close on Disconnect (#11935)

* fix: error handling for transient HTTP request failures in MCP connection

- Added specific handling for the "fetch failed" TypeError, indicating that the request was aborted likely due to a timeout, while the connection remains usable.
- Updated the error message to provide clearer context for users regarding the transient nature of the error.

* refactor: MCPConnection with Agent Lifecycle Management

- Introduced an array to manage undici Agents, ensuring they are reused across requests and properly closed during disconnection.
- Updated the custom fetch and SSE connection methods to utilize the new Agent management system.
- Implemented error handling for SSE 404 responses based on session presence, improving connection stability.
- Added integration tests to validate the Agent lifecycle, ensuring agents are reused and closed correctly.

* fix: enhance error handling and connection management in MCPConnection

- Updated SSE connection timeout handling to use nullish coalescing for better defaulting.
- Improved the connection closure process by ensuring agents are properly closed and errors are logged non-fatally.
- Added tests to validate handling of "fetch failed" errors, marking them as transient and providing clearer messaging for users.

* fix: update timeout handling in MCPConnection for improved defaulting

- Changed timeout handling in MCPConnection to use logical OR instead of nullish coalescing for better default value assignment.
- Ensured consistent timeout behavior for both standard and SSE connections, enhancing reliability in connection management.
This commit is contained in:
Danny Avila 2026-02-24 19:06:06 -05:00 committed by GitHub
parent 3d7e26382e
commit 8c3c326440
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 625 additions and 20 deletions

View file

@ -290,6 +290,16 @@ describe('extractSSEErrorMessage', () => {
};
}
if (rawMessage === 'fetch failed') {
return {
message:
'fetch failed (request aborted, likely after a timeout — connection may still be usable)',
code,
isProxyHint: false,
isTransient: true,
};
}
return {
message: rawMessage,
code,
@ -528,4 +538,24 @@ describe('extractSSEErrorMessage', () => {
expect(result.isTransient).toBe(false);
});
});
describe('fetch failed errors', () => {
it('should detect "fetch failed" as transient', () => {
const error = { message: 'fetch failed' };
const result = extractSSEErrorMessage(error);
expect(result.message).toContain('fetch failed');
expect(result.message).toContain('request aborted');
expect(result.isProxyHint).toBe(false);
expect(result.isTransient).toBe(true);
});
it('should not match "fetch failed" as a substring in a longer message', () => {
const error = { message: 'Something fetch failed to do' };
const result = extractSSEErrorMessage(error);
expect(result.message).toBe('Something fetch failed to do');
expect(result.isTransient).toBe(false);
});
});
});

View file

@ -0,0 +1,530 @@
/**
* Integration tests for MCPConnection undici Agent lifecycle.
*
* These tests spin up real in-process MCP servers using the official SDK's
* StreamableHTTPServerTransport and SSEServerTransport, then connect via
* MCPConnection and assert that:
*
* 1. Agents are reused across requests one per transport, not one per request.
* 2. All Agents are closed when disconnect() is called.
* 3. Prior Agents are closed before a new transport is built during reconnection.
* 4. A second disconnect() does not double-close already-cleared Agents.
* 5. SSE 404 without an active session is silently ignored (backwards compat).
* 6. SSE 404 with an active session falls through so reconnection can fire.
* 7. Regression: the old per-request Agent pattern results in leaked agents that
* are never closed proving the fix is necessary.
*/
import * as http from 'http';
import * as net from 'net';
import { randomUUID } from 'crypto';
import { Agent, fetch as undiciFetch } from 'undici';
import { Server as McpServerCore } from '@modelcontextprotocol/sdk/server/index.js';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { logger } from '@librechat/data-schemas';
import { MCPConnection } from '~/mcp/connection';
import type { Socket } from 'net';
jest.mock('@librechat/data-schemas', () => ({
logger: {
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
debug: jest.fn(),
},
}));
jest.mock('~/auth', () => ({
createSSRFSafeUndiciConnect: jest.fn(() => undefined),
resolveHostnameSSRF: jest.fn(async () => false),
}));
jest.mock('~/mcp/mcpConfig', () => ({
mcpConfig: { CONNECTION_CHECK_TTL: 0 },
}));
const mockLogger = logger as jest.Mocked<typeof logger>;
/**
* Track every Agent created during the test run so we can forcibly tear down their
* internal connection pools in afterAll. The MCP SDK's Client / EventSource may hold
* references to undici internals that keep Node's event loop alive.
*/
const allAgentsCreated: Agent[] = [];
const OriginalAgent = Agent;
const PatchedAgent = new Proxy(OriginalAgent, {
construct(target, args) {
const instance = new target(...(args as [Agent.Options?]));
allAgentsCreated.push(instance);
return instance;
},
});
(global as Record<string, unknown>).__undiciAgent = PatchedAgent;
/** Cleanly disconnect an MCPConnection — suppress reconnection first so no timers linger. */
async function safeDisconnect(conn: MCPConnection | null): Promise<void> {
if (!conn) {
return;
}
(conn as unknown as { shouldStopReconnecting: boolean }).shouldStopReconnecting = true;
conn.removeAllListeners();
await conn.disconnect();
}
afterAll(async () => {
const destroying = allAgentsCreated.map((a) => {
if (!a.destroyed && !a.closed) {
return a.destroy().catch(() => undefined);
}
return Promise.resolve();
});
allAgentsCreated.length = 0;
await Promise.all(destroying);
});
interface TestServer {
url: string;
close: () => Promise<void>;
}
function getFreePort(): Promise<number> {
return new Promise((resolve, reject) => {
const srv = net.createServer();
srv.listen(0, '127.0.0.1', () => {
const addr = srv.address() as net.AddressInfo;
srv.close((err) => (err ? reject(err) : resolve(addr.port)));
});
});
}
/** Wraps an http.Server with socket tracking so close() kills all lingering connections. */
function trackSockets(httpServer: http.Server): () => Promise<void> {
const sockets = new Set<Socket>();
httpServer.on('connection', (socket: Socket) => {
sockets.add(socket);
socket.once('close', () => sockets.delete(socket));
});
return () =>
new Promise<void>((resolve) => {
for (const socket of sockets) {
socket.destroy();
}
sockets.clear();
httpServer.close(() => resolve());
});
}
async function createStreamableServer(): Promise<TestServer> {
const sessions = new Map<string, StreamableHTTPServerTransport>();
const httpServer = http.createServer(async (req, res) => {
const sid = req.headers['mcp-session-id'] as string | undefined;
let transport = sid ? sessions.get(sid) : undefined;
if (!transport) {
transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID() });
const mcp = new McpServer({ name: 'test-streamable', version: '0.0.1' });
await mcp.connect(transport);
}
await transport.handleRequest(req, res);
if (transport.sessionId && !sessions.has(transport.sessionId)) {
sessions.set(transport.sessionId, transport);
transport.onclose = () => sessions.delete(transport!.sessionId!);
}
});
const destroySockets = trackSockets(httpServer);
const port = await getFreePort();
await new Promise<void>((resolve) => httpServer.listen(port, '127.0.0.1', resolve));
return {
url: `http://127.0.0.1:${port}/`,
close: async () => {
const closing = [...sessions.values()].map((t) => t.close().catch(() => undefined));
sessions.clear();
await Promise.all(closing);
await destroySockets();
},
};
}
async function createSSEServer(): Promise<TestServer> {
const transports = new Map<string, SSEServerTransport>();
const mcpServer = new McpServerCore({ name: 'test-sse', version: '0.0.1' }, { capabilities: {} });
const httpServer = http.createServer(async (req, res) => {
if (req.method === 'GET' && req.url === '/sse') {
const t = new SSEServerTransport('/messages', res);
transports.set(t.sessionId, t);
t.onclose = () => transports.delete(t.sessionId);
await mcpServer.connect(t);
return;
}
if (req.method === 'POST' && req.url?.startsWith('/messages')) {
const sid = new URL(req.url, 'http://x').searchParams.get('sessionId') ?? '';
const t = transports.get(sid);
if (!t) {
res.writeHead(404).end();
return;
}
await t.handlePostMessage(req, res);
return;
}
res.writeHead(404).end();
});
const destroySockets = trackSockets(httpServer);
const port = await getFreePort();
await new Promise<void>((resolve) => httpServer.listen(port, '127.0.0.1', resolve));
return {
url: `http://127.0.0.1:${port}/sse`,
close: async () => {
const closing = [...transports.values()].map((t) => t.close().catch(() => undefined));
transports.clear();
await Promise.all(closing);
await destroySockets();
},
};
}
describe('MCPConnection Agent lifecycle streamable-http', () => {
let server: TestServer;
let conn: MCPConnection | null;
let closeSpy: jest.SpyInstance;
beforeEach(async () => {
server = await createStreamableServer();
conn = null;
closeSpy = jest.spyOn(Agent.prototype, 'close');
});
afterEach(async () => {
await safeDisconnect(conn);
conn = null;
jest.restoreAllMocks();
await server.close();
});
it('reuses the same Agent across multiple requests instead of creating one per request', async () => {
conn = new MCPConnection({
serverName: 'test',
serverConfig: { type: 'streamable-http', url: server.url },
useSSRFProtection: false,
});
await conn.connect();
await conn.fetchTools();
await conn.fetchTools();
await conn.fetchTools();
await safeDisconnect(conn);
/**
* streamable-http creates one Agent via createFetchFunction.
* If agents were per-request (old bug), they would not be stored and close
* would be called 0 times. With our fix, Agents are stored and closed on
* disconnect regardless of request count confirming reuse.
*/
const closeCount = closeSpy.mock.calls.length;
expect(closeCount).toBeGreaterThanOrEqual(1);
expect(closeCount).not.toBe(3);
conn = null;
});
it('calls Agent.close() on every registered Agent when disconnect() is called', async () => {
conn = new MCPConnection({
serverName: 'test',
serverConfig: { type: 'streamable-http', url: server.url },
useSSRFProtection: false,
});
await conn.connect();
expect(closeSpy).not.toHaveBeenCalled();
await safeDisconnect(conn);
expect(closeSpy).toHaveBeenCalled();
conn = null;
});
it('does not call Agent.close() before disconnect()', async () => {
conn = new MCPConnection({
serverName: 'test',
serverConfig: { type: 'streamable-http', url: server.url },
useSSRFProtection: false,
});
await conn.connect();
expect(closeSpy).not.toHaveBeenCalled();
});
it('closes prior Agents on the connectClient() teardown path', async () => {
conn = new MCPConnection({
serverName: 'test',
serverConfig: { type: 'streamable-http', url: server.url },
useSSRFProtection: false,
});
await conn.connect();
expect(closeSpy).not.toHaveBeenCalled();
(conn as unknown as { connectionState: string }).connectionState = 'disconnected';
await conn.connectClient();
expect(closeSpy.mock.calls.length).toBeGreaterThan(0);
});
it('does not double-close Agents when disconnect() is called twice', async () => {
conn = new MCPConnection({
serverName: 'test',
serverConfig: { type: 'streamable-http', url: server.url },
useSSRFProtection: false,
});
await conn.connect();
await safeDisconnect(conn);
const countAfterFirst = closeSpy.mock.calls.length;
expect(countAfterFirst).toBeGreaterThan(0);
await safeDisconnect(conn);
expect(closeSpy.mock.calls.length).toBe(countAfterFirst);
conn = null;
});
});
describe('MCPConnection Agent lifecycle SSE', () => {
let server: TestServer;
let conn: MCPConnection | null;
let closeSpy: jest.SpyInstance;
beforeEach(async () => {
server = await createSSEServer();
conn = null;
closeSpy = jest.spyOn(Agent.prototype, 'close');
});
afterEach(async () => {
await safeDisconnect(conn);
conn = null;
jest.restoreAllMocks();
await server.close();
});
it('reuses the same Agents across multiple requests instead of creating one per request', async () => {
conn = new MCPConnection({
serverName: 'test-sse',
serverConfig: { url: server.url },
useSSRFProtection: false,
});
await conn.connect();
await conn.fetchTools();
await conn.fetchTools();
await conn.fetchTools();
await safeDisconnect(conn);
/**
* SSE creates two Agents: sseAgent (eventSourceInit) + createFetchFunction agent.
* Close count must be at least 2 regardless of how many POST requests were made.
* If agents were per-request (old bug), they would not be stored and close
* would be called 0 times.
*/
expect(closeSpy.mock.calls.length).toBeGreaterThanOrEqual(2);
conn = null;
});
it('calls Agent.close() on every registered Agent when disconnect() is called', async () => {
conn = new MCPConnection({
serverName: 'test-sse',
serverConfig: { url: server.url },
useSSRFProtection: false,
});
await conn.connect();
expect(closeSpy).not.toHaveBeenCalled();
await safeDisconnect(conn);
expect(closeSpy).toHaveBeenCalled();
conn = null;
});
it('closes at least two Agents for SSE transport (eventSourceInit + fetch)', async () => {
conn = new MCPConnection({
serverName: 'test-sse',
serverConfig: { url: server.url },
useSSRFProtection: false,
});
await conn.connect();
await safeDisconnect(conn);
expect(closeSpy.mock.calls.length).toBeGreaterThanOrEqual(2);
conn = null;
});
it('does not double-close Agents when disconnect() is called twice', async () => {
conn = new MCPConnection({
serverName: 'test-sse',
serverConfig: { url: server.url },
useSSRFProtection: false,
});
await conn.connect();
await safeDisconnect(conn);
const countAfterFirst = closeSpy.mock.calls.length;
await safeDisconnect(conn);
expect(closeSpy.mock.calls.length).toBe(countAfterFirst);
conn = null;
});
});
describe('Regression: old per-request Agent pattern leaks agents', () => {
let server: TestServer;
let conn: MCPConnection | null;
beforeEach(async () => {
server = await createStreamableServer();
conn = null;
});
afterEach(async () => {
await safeDisconnect(conn);
conn = null;
jest.restoreAllMocks();
await server.close();
});
it('per-request Agent allocation prevents any agent from being closed on disconnect', async () => {
conn = new MCPConnection({
serverName: 'test-regression',
serverConfig: { type: 'streamable-http', url: server.url },
useSSRFProtection: false,
});
/**
* Monkey-patch createFetchFunction to replicate the old per-request Agent behavior.
* In the old code, `new Agent()` was inside the returned closure, so each call to
* the fetch function allocated a fresh Agent that was never stored or closed.
*/
const privateSelf = conn as unknown as Record<string, unknown> & { agents: Agent[] };
const originalMethod = (privateSelf.createFetchFunction as (...a: unknown[]) => unknown).bind(
conn,
);
privateSelf.createFetchFunction = (_getHeaders: unknown, timeout?: number) => {
const effectiveTimeout = timeout ?? 60000;
return (input: unknown, init?: unknown) => {
const agent = new Agent({
bodyTimeout: effectiveTimeout,
headersTimeout: effectiveTimeout,
});
return undiciFetch(input as string, {
...(init as Record<string, unknown>),
dispatcher: agent,
});
};
};
const closeSpy = jest.spyOn(Agent.prototype, 'close');
await conn.connect();
await conn.fetchTools();
await conn.fetchTools();
await conn.fetchTools();
/**
* The old pattern: agents is empty because none were stored.
* disconnecting closes nothing.
*/
expect(privateSelf.agents.length).toBe(0);
await safeDisconnect(conn);
expect(closeSpy).not.toHaveBeenCalled();
/** Restore the real method so afterEach teardown works cleanly. */
privateSelf.createFetchFunction = originalMethod;
conn = null;
});
});
describe('MCPConnection SSE 404 handling session-aware', () => {
function makeTransportStub(sessionId?: string) {
return {
...(sessionId != null ? { sessionId } : {}),
onerror: undefined as ((e: Error) => void) | undefined,
onclose: undefined as (() => void) | undefined,
onmessage: undefined as ((m: unknown) => void) | undefined,
start: jest.fn(),
close: jest.fn(),
send: jest.fn(),
};
}
function makeConn() {
return new MCPConnection({
serverName: 'test-404',
serverConfig: { url: 'http://127.0.0.1:1/sse' },
useSSRFProtection: false,
});
}
function fire404(conn: MCPConnection, transport: ReturnType<typeof makeTransportStub>) {
(
conn as unknown as { setupTransportErrorHandlers: (t: unknown) => void }
).setupTransportErrorHandlers(transport);
const sseError = Object.assign(new Error('Failed to open SSE stream'), { code: 404 });
transport.onerror?.(sseError);
}
beforeEach(() => {
mockLogger.warn.mockClear();
mockLogger.error.mockClear();
});
it('silently ignores a 404 when no session is established (backwards-compat probe)', () => {
const conn = makeConn();
const transport = makeTransportStub();
const emitSpy = jest.spyOn(conn, 'emit');
fire404(conn, transport);
expect(mockLogger.warn).toHaveBeenCalledWith(expect.stringContaining('no session'));
expect(emitSpy).not.toHaveBeenCalledWith('connectionChange', 'error');
});
it('falls through on a 404 when a session already exists, triggering reconnection', () => {
const conn = makeConn();
const transport = makeTransportStub('existing-session-id');
const emitSpy = jest.spyOn(conn, 'emit');
fire404(conn, transport);
expect(mockLogger.warn).toHaveBeenCalledWith(expect.stringContaining('session lost'));
expect(emitSpy).toHaveBeenCalledWith('connectionChange', 'error');
});
it('treats an empty-string sessionId as no session (guards against falsy sessionId)', () => {
const conn = makeConn();
const transport = makeTransportStub('');
const emitSpy = jest.spyOn(conn, 'emit');
fire404(conn, transport);
expect(emitSpy).not.toHaveBeenCalledWith('connectionChange', 'error');
});
});

View file

@ -201,6 +201,21 @@ function extractSSEErrorMessage(error: unknown): {
};
}
/**
* "fetch failed" is a generic undici TypeError that occurs when an in-flight HTTP request
* is aborted (e.g. after an MCP protocol-level timeout fires). The transport itself is still
* functional only the individual request was lost so treat this as transient.
*/
if (rawMessage === 'fetch failed') {
return {
message:
'fetch failed (request aborted, likely after a timeout — connection may still be usable)',
code,
isProxyHint: false,
isTransient: true,
};
}
return {
message: rawMessage,
code,
@ -229,6 +244,7 @@ export class MCPConnection extends EventEmitter {
private isReconnecting = false;
private isInitializing = false;
private reconnectAttempts = 0;
private agents: Agent[] = [];
private readonly userId?: string;
private lastPingTime: number;
private lastConnectionCheckAt: number = 0;
@ -306,17 +322,19 @@ export class MCPConnection extends EventEmitter {
timeout?: number,
): (input: UndiciRequestInfo, init?: UndiciRequestInit) => Promise<UndiciResponse> {
const ssrfConnect = this.useSSRFProtection ? createSSRFSafeUndiciConnect() : undefined;
const effectiveTimeout = timeout || DEFAULT_TIMEOUT;
const agent = new Agent({
bodyTimeout: effectiveTimeout,
headersTimeout: effectiveTimeout,
...(ssrfConnect != null ? { connect: ssrfConnect } : {}),
});
this.agents.push(agent);
return function customFetch(
input: UndiciRequestInfo,
init?: UndiciRequestInit,
): Promise<UndiciResponse> {
const requestHeaders = getHeaders();
const effectiveTimeout = timeout || DEFAULT_TIMEOUT;
const agent = new Agent({
bodyTimeout: effectiveTimeout,
headersTimeout: effectiveTimeout,
...(ssrfConnect != null ? { connect: ssrfConnect } : {}),
});
if (!requestHeaders) {
return undiciFetch(input, { ...init, dispatcher: agent });
}
@ -418,6 +436,14 @@ export class MCPConnection extends EventEmitter {
*/
const sseTimeout = this.timeout || SSE_CONNECT_TIMEOUT;
const ssrfConnect = this.useSSRFProtection ? createSSRFSafeUndiciConnect() : undefined;
const sseAgent = new Agent({
bodyTimeout: sseTimeout,
headersTimeout: sseTimeout,
keepAliveTimeout: sseTimeout,
keepAliveMaxTimeout: sseTimeout * 2,
...(ssrfConnect != null ? { connect: ssrfConnect } : {}),
});
this.agents.push(sseAgent);
const transport = new SSEClientTransport(url, {
requestInit: {
/** User/OAuth headers override SSE defaults */
@ -430,17 +456,9 @@ export class MCPConnection extends EventEmitter {
const fetchHeaders = new Headers(
Object.assign({}, SSE_REQUEST_HEADERS, init?.headers, headers),
);
const agent = new Agent({
bodyTimeout: sseTimeout,
headersTimeout: sseTimeout,
/** Extended keep-alive for long-lived SSE connections */
keepAliveTimeout: sseTimeout,
keepAliveMaxTimeout: sseTimeout * 2,
...(ssrfConnect != null ? { connect: ssrfConnect } : {}),
});
return undiciFetch(url, {
...init,
dispatcher: agent,
dispatcher: sseAgent,
headers: fetchHeaders,
});
},
@ -644,10 +662,11 @@ export class MCPConnection extends EventEmitter {
if (this.transport) {
try {
await this.client.close();
this.transport = null;
} catch (error) {
logger.warn(`${this.getLogPrefix()} Error closing connection:`, error);
}
this.transport = null;
await this.closeAgents();
}
this.transport = await this.constructTransport(this.options);
@ -818,10 +837,24 @@ export class MCPConnection extends EventEmitter {
isTransient,
} = extractSSEErrorMessage(error);
// Ignore SSE 404 errors for servers that don't support SSE
if (errorCode === 404 && errorMessage.toLowerCase().includes('failed to open sse stream')) {
logger.warn(`${this.getLogPrefix()} SSE stream not available (404). Ignoring.`);
return;
if (errorCode === 404) {
const hasSession =
'sessionId' in transport &&
(transport as { sessionId?: string }).sessionId != null &&
(transport as { sessionId?: string }).sessionId !== '';
if (!hasSession && errorMessage.toLowerCase().includes('failed to open sse stream')) {
logger.warn(
`${this.getLogPrefix()} SSE stream not available (404), no session. Ignoring.`,
);
return;
}
if (hasSession) {
logger.warn(
`${this.getLogPrefix()} 404 with active session — session lost, triggering reconnection.`,
);
}
}
// Check if it's an OAuth authentication error
@ -879,12 +912,24 @@ export class MCPConnection extends EventEmitter {
};
}
private async closeAgents(): Promise<void> {
const logPrefix = this.getLogPrefix();
const closing = this.agents.map((agent) =>
agent.close().catch((err: unknown) => {
logger.debug(`${logPrefix} Agent close error (non-fatal):`, err);
}),
);
this.agents = [];
await Promise.all(closing);
}
public async disconnect(): Promise<void> {
try {
if (this.transport) {
await this.client.close();
this.transport = null;
}
await this.closeAgents();
if (this.connectionState === 'disconnected') {
return;
}