From 8c3c326440e1540c2e66ce573ed37bb9480aceb3 Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Tue, 24 Feb 2026 19:06:06 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=8C=20fix:=20Reuse=20Undici=20Agents?= =?UTF-8?q?=20Per=20Transport=20and=20Close=20on=20Disconnect=20(#11935)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: error handling for transient HTTP request failures in MCP connection - Added specific handling for the "fetch failed" TypeError, indicating that the request was aborted likely due to a timeout, while the connection remains usable. - Updated the error message to provide clearer context for users regarding the transient nature of the error. * refactor: MCPConnection with Agent Lifecycle Management - Introduced an array to manage undici Agents, ensuring they are reused across requests and properly closed during disconnection. - Updated the custom fetch and SSE connection methods to utilize the new Agent management system. - Implemented error handling for SSE 404 responses based on session presence, improving connection stability. - Added integration tests to validate the Agent lifecycle, ensuring agents are reused and closed correctly. * fix: enhance error handling and connection management in MCPConnection - Updated SSE connection timeout handling to use nullish coalescing for better defaulting. - Improved the connection closure process by ensuring agents are properly closed and errors are logged non-fatally. - Added tests to validate handling of "fetch failed" errors, marking them as transient and providing clearer messaging for users. * fix: update timeout handling in MCPConnection for improved defaulting - Changed timeout handling in MCPConnection to use logical OR instead of nullish coalescing for better default value assignment. - Ensured consistent timeout behavior for both standard and SSE connections, enhancing reliability in connection management. --- .../src/mcp/__tests__/MCPConnection.test.ts | 30 + .../MCPConnectionAgentLifecycle.test.ts | 530 ++++++++++++++++++ packages/api/src/mcp/connection.ts | 85 ++- 3 files changed, 625 insertions(+), 20 deletions(-) create mode 100644 packages/api/src/mcp/__tests__/MCPConnectionAgentLifecycle.test.ts diff --git a/packages/api/src/mcp/__tests__/MCPConnection.test.ts b/packages/api/src/mcp/__tests__/MCPConnection.test.ts index 9f3a2dbf5d..4cca1b3316 100644 --- a/packages/api/src/mcp/__tests__/MCPConnection.test.ts +++ b/packages/api/src/mcp/__tests__/MCPConnection.test.ts @@ -290,6 +290,16 @@ describe('extractSSEErrorMessage', () => { }; } + if (rawMessage === 'fetch failed') { + return { + message: + 'fetch failed (request aborted, likely after a timeout — connection may still be usable)', + code, + isProxyHint: false, + isTransient: true, + }; + } + return { message: rawMessage, code, @@ -528,4 +538,24 @@ describe('extractSSEErrorMessage', () => { expect(result.isTransient).toBe(false); }); }); + + describe('fetch failed errors', () => { + it('should detect "fetch failed" as transient', () => { + const error = { message: 'fetch failed' }; + const result = extractSSEErrorMessage(error); + + expect(result.message).toContain('fetch failed'); + expect(result.message).toContain('request aborted'); + expect(result.isProxyHint).toBe(false); + expect(result.isTransient).toBe(true); + }); + + it('should not match "fetch failed" as a substring in a longer message', () => { + const error = { message: 'Something fetch failed to do' }; + const result = extractSSEErrorMessage(error); + + expect(result.message).toBe('Something fetch failed to do'); + expect(result.isTransient).toBe(false); + }); + }); }); diff --git a/packages/api/src/mcp/__tests__/MCPConnectionAgentLifecycle.test.ts b/packages/api/src/mcp/__tests__/MCPConnectionAgentLifecycle.test.ts new file mode 100644 index 0000000000..f3b3a4c0f2 --- /dev/null +++ b/packages/api/src/mcp/__tests__/MCPConnectionAgentLifecycle.test.ts @@ -0,0 +1,530 @@ +/** + * Integration tests for MCPConnection undici Agent lifecycle. + * + * These tests spin up real in-process MCP servers using the official SDK's + * StreamableHTTPServerTransport and SSEServerTransport, then connect via + * MCPConnection and assert that: + * + * 1. Agents are reused across requests — one per transport, not one per request. + * 2. All Agents are closed when disconnect() is called. + * 3. Prior Agents are closed before a new transport is built during reconnection. + * 4. A second disconnect() does not double-close already-cleared Agents. + * 5. SSE 404 without an active session is silently ignored (backwards compat). + * 6. SSE 404 with an active session falls through so reconnection can fire. + * 7. Regression: the old per-request Agent pattern results in leaked agents that + * are never closed — proving the fix is necessary. + */ + +import * as http from 'http'; +import * as net from 'net'; +import { randomUUID } from 'crypto'; +import { Agent, fetch as undiciFetch } from 'undici'; +import { Server as McpServerCore } from '@modelcontextprotocol/sdk/server/index.js'; +import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; +import { logger } from '@librechat/data-schemas'; +import { MCPConnection } from '~/mcp/connection'; + +import type { Socket } from 'net'; + +jest.mock('@librechat/data-schemas', () => ({ + logger: { + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + debug: jest.fn(), + }, +})); + +jest.mock('~/auth', () => ({ + createSSRFSafeUndiciConnect: jest.fn(() => undefined), + resolveHostnameSSRF: jest.fn(async () => false), +})); + +jest.mock('~/mcp/mcpConfig', () => ({ + mcpConfig: { CONNECTION_CHECK_TTL: 0 }, +})); + +const mockLogger = logger as jest.Mocked; + +/** + * Track every Agent created during the test run so we can forcibly tear down their + * internal connection pools in afterAll. The MCP SDK's Client / EventSource may hold + * references to undici internals that keep Node's event loop alive. + */ +const allAgentsCreated: Agent[] = []; +const OriginalAgent = Agent; +const PatchedAgent = new Proxy(OriginalAgent, { + construct(target, args) { + const instance = new target(...(args as [Agent.Options?])); + allAgentsCreated.push(instance); + return instance; + }, +}); +(global as Record).__undiciAgent = PatchedAgent; + +/** Cleanly disconnect an MCPConnection — suppress reconnection first so no timers linger. */ +async function safeDisconnect(conn: MCPConnection | null): Promise { + if (!conn) { + return; + } + (conn as unknown as { shouldStopReconnecting: boolean }).shouldStopReconnecting = true; + conn.removeAllListeners(); + await conn.disconnect(); +} + +afterAll(async () => { + const destroying = allAgentsCreated.map((a) => { + if (!a.destroyed && !a.closed) { + return a.destroy().catch(() => undefined); + } + return Promise.resolve(); + }); + allAgentsCreated.length = 0; + await Promise.all(destroying); +}); + +interface TestServer { + url: string; + close: () => Promise; +} + +function getFreePort(): Promise { + return new Promise((resolve, reject) => { + const srv = net.createServer(); + srv.listen(0, '127.0.0.1', () => { + const addr = srv.address() as net.AddressInfo; + srv.close((err) => (err ? reject(err) : resolve(addr.port))); + }); + }); +} + +/** Wraps an http.Server with socket tracking so close() kills all lingering connections. */ +function trackSockets(httpServer: http.Server): () => Promise { + const sockets = new Set(); + httpServer.on('connection', (socket: Socket) => { + sockets.add(socket); + socket.once('close', () => sockets.delete(socket)); + }); + return () => + new Promise((resolve) => { + for (const socket of sockets) { + socket.destroy(); + } + sockets.clear(); + httpServer.close(() => resolve()); + }); +} + +async function createStreamableServer(): Promise { + const sessions = new Map(); + + const httpServer = http.createServer(async (req, res) => { + const sid = req.headers['mcp-session-id'] as string | undefined; + let transport = sid ? sessions.get(sid) : undefined; + + if (!transport) { + transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID() }); + const mcp = new McpServer({ name: 'test-streamable', version: '0.0.1' }); + await mcp.connect(transport); + } + + await transport.handleRequest(req, res); + + if (transport.sessionId && !sessions.has(transport.sessionId)) { + sessions.set(transport.sessionId, transport); + transport.onclose = () => sessions.delete(transport!.sessionId!); + } + }); + + const destroySockets = trackSockets(httpServer); + const port = await getFreePort(); + await new Promise((resolve) => httpServer.listen(port, '127.0.0.1', resolve)); + + return { + url: `http://127.0.0.1:${port}/`, + close: async () => { + const closing = [...sessions.values()].map((t) => t.close().catch(() => undefined)); + sessions.clear(); + await Promise.all(closing); + await destroySockets(); + }, + }; +} + +async function createSSEServer(): Promise { + const transports = new Map(); + const mcpServer = new McpServerCore({ name: 'test-sse', version: '0.0.1' }, { capabilities: {} }); + + const httpServer = http.createServer(async (req, res) => { + if (req.method === 'GET' && req.url === '/sse') { + const t = new SSEServerTransport('/messages', res); + transports.set(t.sessionId, t); + t.onclose = () => transports.delete(t.sessionId); + await mcpServer.connect(t); + return; + } + + if (req.method === 'POST' && req.url?.startsWith('/messages')) { + const sid = new URL(req.url, 'http://x').searchParams.get('sessionId') ?? ''; + const t = transports.get(sid); + if (!t) { + res.writeHead(404).end(); + return; + } + await t.handlePostMessage(req, res); + return; + } + + res.writeHead(404).end(); + }); + + const destroySockets = trackSockets(httpServer); + const port = await getFreePort(); + await new Promise((resolve) => httpServer.listen(port, '127.0.0.1', resolve)); + + return { + url: `http://127.0.0.1:${port}/sse`, + close: async () => { + const closing = [...transports.values()].map((t) => t.close().catch(() => undefined)); + transports.clear(); + await Promise.all(closing); + await destroySockets(); + }, + }; +} + +describe('MCPConnection Agent lifecycle – streamable-http', () => { + let server: TestServer; + let conn: MCPConnection | null; + let closeSpy: jest.SpyInstance; + + beforeEach(async () => { + server = await createStreamableServer(); + conn = null; + closeSpy = jest.spyOn(Agent.prototype, 'close'); + }); + + afterEach(async () => { + await safeDisconnect(conn); + conn = null; + jest.restoreAllMocks(); + await server.close(); + }); + + it('reuses the same Agent across multiple requests instead of creating one per request', async () => { + conn = new MCPConnection({ + serverName: 'test', + serverConfig: { type: 'streamable-http', url: server.url }, + useSSRFProtection: false, + }); + + await conn.connect(); + + await conn.fetchTools(); + await conn.fetchTools(); + await conn.fetchTools(); + + await safeDisconnect(conn); + + /** + * streamable-http creates one Agent via createFetchFunction. + * If agents were per-request (old bug), they would not be stored and close + * would be called 0 times. With our fix, Agents are stored and closed on + * disconnect regardless of request count — confirming reuse. + */ + const closeCount = closeSpy.mock.calls.length; + expect(closeCount).toBeGreaterThanOrEqual(1); + expect(closeCount).not.toBe(3); + + conn = null; + }); + + it('calls Agent.close() on every registered Agent when disconnect() is called', async () => { + conn = new MCPConnection({ + serverName: 'test', + serverConfig: { type: 'streamable-http', url: server.url }, + useSSRFProtection: false, + }); + + await conn.connect(); + expect(closeSpy).not.toHaveBeenCalled(); + + await safeDisconnect(conn); + expect(closeSpy).toHaveBeenCalled(); + conn = null; + }); + + it('does not call Agent.close() before disconnect()', async () => { + conn = new MCPConnection({ + serverName: 'test', + serverConfig: { type: 'streamable-http', url: server.url }, + useSSRFProtection: false, + }); + + await conn.connect(); + expect(closeSpy).not.toHaveBeenCalled(); + }); + + it('closes prior Agents on the connectClient() teardown path', async () => { + conn = new MCPConnection({ + serverName: 'test', + serverConfig: { type: 'streamable-http', url: server.url }, + useSSRFProtection: false, + }); + + await conn.connect(); + expect(closeSpy).not.toHaveBeenCalled(); + + (conn as unknown as { connectionState: string }).connectionState = 'disconnected'; + await conn.connectClient(); + + expect(closeSpy.mock.calls.length).toBeGreaterThan(0); + }); + + it('does not double-close Agents when disconnect() is called twice', async () => { + conn = new MCPConnection({ + serverName: 'test', + serverConfig: { type: 'streamable-http', url: server.url }, + useSSRFProtection: false, + }); + + await conn.connect(); + await safeDisconnect(conn); + + const countAfterFirst = closeSpy.mock.calls.length; + expect(countAfterFirst).toBeGreaterThan(0); + + await safeDisconnect(conn); + expect(closeSpy.mock.calls.length).toBe(countAfterFirst); + conn = null; + }); +}); + +describe('MCPConnection Agent lifecycle – SSE', () => { + let server: TestServer; + let conn: MCPConnection | null; + let closeSpy: jest.SpyInstance; + + beforeEach(async () => { + server = await createSSEServer(); + conn = null; + closeSpy = jest.spyOn(Agent.prototype, 'close'); + }); + + afterEach(async () => { + await safeDisconnect(conn); + conn = null; + jest.restoreAllMocks(); + await server.close(); + }); + + it('reuses the same Agents across multiple requests instead of creating one per request', async () => { + conn = new MCPConnection({ + serverName: 'test-sse', + serverConfig: { url: server.url }, + useSSRFProtection: false, + }); + + await conn.connect(); + + await conn.fetchTools(); + await conn.fetchTools(); + await conn.fetchTools(); + + await safeDisconnect(conn); + + /** + * SSE creates two Agents: sseAgent (eventSourceInit) + createFetchFunction agent. + * Close count must be at least 2 regardless of how many POST requests were made. + * If agents were per-request (old bug), they would not be stored and close + * would be called 0 times. + */ + expect(closeSpy.mock.calls.length).toBeGreaterThanOrEqual(2); + conn = null; + }); + + it('calls Agent.close() on every registered Agent when disconnect() is called', async () => { + conn = new MCPConnection({ + serverName: 'test-sse', + serverConfig: { url: server.url }, + useSSRFProtection: false, + }); + + await conn.connect(); + expect(closeSpy).not.toHaveBeenCalled(); + + await safeDisconnect(conn); + expect(closeSpy).toHaveBeenCalled(); + conn = null; + }); + + it('closes at least two Agents for SSE transport (eventSourceInit + fetch)', async () => { + conn = new MCPConnection({ + serverName: 'test-sse', + serverConfig: { url: server.url }, + useSSRFProtection: false, + }); + + await conn.connect(); + await safeDisconnect(conn); + + expect(closeSpy.mock.calls.length).toBeGreaterThanOrEqual(2); + conn = null; + }); + + it('does not double-close Agents when disconnect() is called twice', async () => { + conn = new MCPConnection({ + serverName: 'test-sse', + serverConfig: { url: server.url }, + useSSRFProtection: false, + }); + + await conn.connect(); + await safeDisconnect(conn); + + const countAfterFirst = closeSpy.mock.calls.length; + await safeDisconnect(conn); + expect(closeSpy.mock.calls.length).toBe(countAfterFirst); + conn = null; + }); +}); + +describe('Regression: old per-request Agent pattern leaks agents', () => { + let server: TestServer; + let conn: MCPConnection | null; + + beforeEach(async () => { + server = await createStreamableServer(); + conn = null; + }); + + afterEach(async () => { + await safeDisconnect(conn); + conn = null; + jest.restoreAllMocks(); + await server.close(); + }); + + it('per-request Agent allocation prevents any agent from being closed on disconnect', async () => { + conn = new MCPConnection({ + serverName: 'test-regression', + serverConfig: { type: 'streamable-http', url: server.url }, + useSSRFProtection: false, + }); + + /** + * Monkey-patch createFetchFunction to replicate the old per-request Agent behavior. + * In the old code, `new Agent()` was inside the returned closure, so each call to + * the fetch function allocated a fresh Agent that was never stored or closed. + */ + const privateSelf = conn as unknown as Record & { agents: Agent[] }; + + const originalMethod = (privateSelf.createFetchFunction as (...a: unknown[]) => unknown).bind( + conn, + ); + + privateSelf.createFetchFunction = (_getHeaders: unknown, timeout?: number) => { + const effectiveTimeout = timeout ?? 60000; + return (input: unknown, init?: unknown) => { + const agent = new Agent({ + bodyTimeout: effectiveTimeout, + headersTimeout: effectiveTimeout, + }); + return undiciFetch(input as string, { + ...(init as Record), + dispatcher: agent, + }); + }; + }; + + const closeSpy = jest.spyOn(Agent.prototype, 'close'); + + await conn.connect(); + await conn.fetchTools(); + await conn.fetchTools(); + await conn.fetchTools(); + + /** + * The old pattern: agents is empty because none were stored. + * disconnecting closes nothing. + */ + expect(privateSelf.agents.length).toBe(0); + + await safeDisconnect(conn); + + expect(closeSpy).not.toHaveBeenCalled(); + + /** Restore the real method so afterEach teardown works cleanly. */ + privateSelf.createFetchFunction = originalMethod; + conn = null; + }); +}); + +describe('MCPConnection SSE 404 handling – session-aware', () => { + function makeTransportStub(sessionId?: string) { + return { + ...(sessionId != null ? { sessionId } : {}), + onerror: undefined as ((e: Error) => void) | undefined, + onclose: undefined as (() => void) | undefined, + onmessage: undefined as ((m: unknown) => void) | undefined, + start: jest.fn(), + close: jest.fn(), + send: jest.fn(), + }; + } + + function makeConn() { + return new MCPConnection({ + serverName: 'test-404', + serverConfig: { url: 'http://127.0.0.1:1/sse' }, + useSSRFProtection: false, + }); + } + + function fire404(conn: MCPConnection, transport: ReturnType) { + ( + conn as unknown as { setupTransportErrorHandlers: (t: unknown) => void } + ).setupTransportErrorHandlers(transport); + const sseError = Object.assign(new Error('Failed to open SSE stream'), { code: 404 }); + transport.onerror?.(sseError); + } + + beforeEach(() => { + mockLogger.warn.mockClear(); + mockLogger.error.mockClear(); + }); + + it('silently ignores a 404 when no session is established (backwards-compat probe)', () => { + const conn = makeConn(); + const transport = makeTransportStub(); + const emitSpy = jest.spyOn(conn, 'emit'); + + fire404(conn, transport); + + expect(mockLogger.warn).toHaveBeenCalledWith(expect.stringContaining('no session')); + expect(emitSpy).not.toHaveBeenCalledWith('connectionChange', 'error'); + }); + + it('falls through on a 404 when a session already exists, triggering reconnection', () => { + const conn = makeConn(); + const transport = makeTransportStub('existing-session-id'); + const emitSpy = jest.spyOn(conn, 'emit'); + + fire404(conn, transport); + + expect(mockLogger.warn).toHaveBeenCalledWith(expect.stringContaining('session lost')); + expect(emitSpy).toHaveBeenCalledWith('connectionChange', 'error'); + }); + + it('treats an empty-string sessionId as no session (guards against falsy sessionId)', () => { + const conn = makeConn(); + const transport = makeTransportStub(''); + const emitSpy = jest.spyOn(conn, 'emit'); + + fire404(conn, transport); + + expect(emitSpy).not.toHaveBeenCalledWith('connectionChange', 'error'); + }); +}); diff --git a/packages/api/src/mcp/connection.ts b/packages/api/src/mcp/connection.ts index 88dbb19b6f..534607234e 100644 --- a/packages/api/src/mcp/connection.ts +++ b/packages/api/src/mcp/connection.ts @@ -201,6 +201,21 @@ function extractSSEErrorMessage(error: unknown): { }; } + /** + * "fetch failed" is a generic undici TypeError that occurs when an in-flight HTTP request + * is aborted (e.g. after an MCP protocol-level timeout fires). The transport itself is still + * functional — only the individual request was lost — so treat this as transient. + */ + if (rawMessage === 'fetch failed') { + return { + message: + 'fetch failed (request aborted, likely after a timeout — connection may still be usable)', + code, + isProxyHint: false, + isTransient: true, + }; + } + return { message: rawMessage, code, @@ -229,6 +244,7 @@ export class MCPConnection extends EventEmitter { private isReconnecting = false; private isInitializing = false; private reconnectAttempts = 0; + private agents: Agent[] = []; private readonly userId?: string; private lastPingTime: number; private lastConnectionCheckAt: number = 0; @@ -306,17 +322,19 @@ export class MCPConnection extends EventEmitter { timeout?: number, ): (input: UndiciRequestInfo, init?: UndiciRequestInit) => Promise { const ssrfConnect = this.useSSRFProtection ? createSSRFSafeUndiciConnect() : undefined; + const effectiveTimeout = timeout || DEFAULT_TIMEOUT; + const agent = new Agent({ + bodyTimeout: effectiveTimeout, + headersTimeout: effectiveTimeout, + ...(ssrfConnect != null ? { connect: ssrfConnect } : {}), + }); + this.agents.push(agent); + return function customFetch( input: UndiciRequestInfo, init?: UndiciRequestInit, ): Promise { const requestHeaders = getHeaders(); - const effectiveTimeout = timeout || DEFAULT_TIMEOUT; - const agent = new Agent({ - bodyTimeout: effectiveTimeout, - headersTimeout: effectiveTimeout, - ...(ssrfConnect != null ? { connect: ssrfConnect } : {}), - }); if (!requestHeaders) { return undiciFetch(input, { ...init, dispatcher: agent }); } @@ -418,6 +436,14 @@ export class MCPConnection extends EventEmitter { */ const sseTimeout = this.timeout || SSE_CONNECT_TIMEOUT; const ssrfConnect = this.useSSRFProtection ? createSSRFSafeUndiciConnect() : undefined; + const sseAgent = new Agent({ + bodyTimeout: sseTimeout, + headersTimeout: sseTimeout, + keepAliveTimeout: sseTimeout, + keepAliveMaxTimeout: sseTimeout * 2, + ...(ssrfConnect != null ? { connect: ssrfConnect } : {}), + }); + this.agents.push(sseAgent); const transport = new SSEClientTransport(url, { requestInit: { /** User/OAuth headers override SSE defaults */ @@ -430,17 +456,9 @@ export class MCPConnection extends EventEmitter { const fetchHeaders = new Headers( Object.assign({}, SSE_REQUEST_HEADERS, init?.headers, headers), ); - const agent = new Agent({ - bodyTimeout: sseTimeout, - headersTimeout: sseTimeout, - /** Extended keep-alive for long-lived SSE connections */ - keepAliveTimeout: sseTimeout, - keepAliveMaxTimeout: sseTimeout * 2, - ...(ssrfConnect != null ? { connect: ssrfConnect } : {}), - }); return undiciFetch(url, { ...init, - dispatcher: agent, + dispatcher: sseAgent, headers: fetchHeaders, }); }, @@ -644,10 +662,11 @@ export class MCPConnection extends EventEmitter { if (this.transport) { try { await this.client.close(); - this.transport = null; } catch (error) { logger.warn(`${this.getLogPrefix()} Error closing connection:`, error); } + this.transport = null; + await this.closeAgents(); } this.transport = await this.constructTransport(this.options); @@ -818,10 +837,24 @@ export class MCPConnection extends EventEmitter { isTransient, } = extractSSEErrorMessage(error); - // Ignore SSE 404 errors for servers that don't support SSE - if (errorCode === 404 && errorMessage.toLowerCase().includes('failed to open sse stream')) { - logger.warn(`${this.getLogPrefix()} SSE stream not available (404). Ignoring.`); - return; + if (errorCode === 404) { + const hasSession = + 'sessionId' in transport && + (transport as { sessionId?: string }).sessionId != null && + (transport as { sessionId?: string }).sessionId !== ''; + + if (!hasSession && errorMessage.toLowerCase().includes('failed to open sse stream')) { + logger.warn( + `${this.getLogPrefix()} SSE stream not available (404), no session. Ignoring.`, + ); + return; + } + + if (hasSession) { + logger.warn( + `${this.getLogPrefix()} 404 with active session — session lost, triggering reconnection.`, + ); + } } // Check if it's an OAuth authentication error @@ -879,12 +912,24 @@ export class MCPConnection extends EventEmitter { }; } + private async closeAgents(): Promise { + const logPrefix = this.getLogPrefix(); + const closing = this.agents.map((agent) => + agent.close().catch((err: unknown) => { + logger.debug(`${logPrefix} Agent close error (non-fatal):`, err); + }), + ); + this.agents = []; + await Promise.all(closing); + } + public async disconnect(): Promise { try { if (this.transport) { await this.client.close(); this.transport = null; } + await this.closeAgents(); if (this.connectionState === 'disconnected') { return; }