import { EventEmitter } from 'events'; import { logger } from '@librechat/data-schemas'; import { fetch as undiciFetch, Agent } from 'undici'; import { StdioClientTransport, getDefaultEnvironment, } from '@modelcontextprotocol/sdk/client/stdio.js'; import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js'; import { WebSocketClientTransport } from '@modelcontextprotocol/sdk/client/websocket.js'; import { ResourceListChangedNotificationSchema } from '@modelcontextprotocol/sdk/types.js'; import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; import type { Transport } from '@modelcontextprotocol/sdk/shared/transport.js'; import type { RequestInit as UndiciRequestInit, RequestInfo as UndiciRequestInfo, Response as UndiciResponse, } from 'undici'; import type { MCPOAuthTokens } from './oauth/types'; import type * as t from './types'; import { createSSRFSafeUndiciConnect, resolveHostnameSSRF } from '~/auth'; import { runOutsideTracing } from '~/utils/tracing'; import { sanitizeUrlForLogging } from './utils'; import { withTimeout } from '~/utils/promise'; import { mcpConfig } from './mcpConfig'; type FetchLike = (url: string | URL, init?: RequestInit) => Promise; function isStdioOptions(options: t.MCPOptions): options is t.StdioOptions { return 'command' in options; } function isWebSocketOptions(options: t.MCPOptions): options is t.WebSocketOptions { if ('url' in options) { const protocol = new URL(options.url).protocol; return protocol === 'ws:' || protocol === 'wss:'; } return false; } function isSSEOptions(options: t.MCPOptions): options is t.SSEOptions { if ('url' in options) { const protocol = new URL(options.url).protocol; return protocol !== 'ws:' && protocol !== 'wss:'; } return false; } /** * Checks if the provided options are for a Streamable HTTP transport. * * Streamable HTTP is an MCP transport that uses HTTP POST for sending messages * and supports streaming responses. It provides better performance than * SSE transport while maintaining compatibility with most network environments. * * @param options MCP connection options to check * @returns True if options are for a streamable HTTP transport */ function isStreamableHTTPOptions(options: t.MCPOptions): options is t.StreamableHTTPOptions { if ('url' in options && 'type' in options) { const optionType = options.type as string; if (optionType === 'streamable-http' || optionType === 'http') { const protocol = new URL(options.url).protocol; return protocol !== 'ws:' && protocol !== 'wss:'; } } return false; } const FIVE_MINUTES = 5 * 60 * 1000; const DEFAULT_TIMEOUT = 60000; /** SSE connections through proxies may need longer initial handshake time */ const SSE_CONNECT_TIMEOUT = 120000; const DEFAULT_INIT_TIMEOUT = 30000; interface CircuitBreakerState { cycleCount: number; cycleWindowStart: number; cooldownUntil: number; failedRounds: number; failedWindowStart: number; failedBackoffUntil: number; } /** Default body timeout for Streamable HTTP GET SSE streams that idle between server pushes */ const DEFAULT_SSE_READ_TIMEOUT = FIVE_MINUTES; /** * Error message prefixes emitted by the MCP SDK's StreamableHTTPClientTransport * (client/streamableHttp.ts → _handleSseStream / _scheduleReconnection). * These are SDK-internal strings, not part of a public API. If the SDK changes * them, suppression in setupTransportErrorHandlers will silently stop working. */ const SDK_SSE_STREAM_DISCONNECTED = 'SSE stream disconnected'; const SDK_SSE_RECONNECT_FAILED = 'Failed to reconnect SSE stream'; /** * Headers for SSE connections. * * Headers we intentionally DO NOT include: * - Accept: text/event-stream - Already set by eventsource library AND MCP SDK * - X-Accel-Buffering: This is a RESPONSE header for Nginx, not a request header. * The upstream MCP server must send this header for Nginx to respect it. * - Connection: keep-alive: Forbidden in HTTP/2 (RFC 7540 §8.1.2.2). * HTTP/2 manages connection persistence differently. */ const SSE_REQUEST_HEADERS = { 'Cache-Control': 'no-cache', }; /** * Extracts a meaningful error message from SSE transport errors. * The MCP SDK's SSEClientTransport can produce "SSE error: undefined" when the * underlying eventsource library encounters connection issues without a specific message. * * @returns Object containing: * - message: Human-readable error description * - code: HTTP status code if available * - isProxyHint: Whether this error suggests proxy misconfiguration * - isTransient: Whether this is likely a transient error that will auto-reconnect */ function extractSSEErrorMessage(error: unknown): { message: string; code?: number; isProxyHint: boolean; isTransient: boolean; } { if (!error || typeof error !== 'object') { return { message: 'Unknown SSE transport error', isProxyHint: true, isTransient: true, }; } const errorObj = error as { message?: string; code?: number; event?: unknown }; const rawMessage = errorObj.message ?? ''; const code = errorObj.code; /** * Handle the common "SSE error: undefined" case. * This typically occurs when: * 1. A reverse proxy buffers the SSE stream (proxy issue) * 2. The server closes an idle connection (normal SSE behavior) * 3. Network interruption without specific error details * * In all cases, the eventsource library will attempt to reconnect automatically. */ if (rawMessage === 'SSE error: undefined' || rawMessage === 'undefined' || !rawMessage) { return { message: 'SSE connection closed. This can occur due to: (1) idle connection timeout (normal), ' + '(2) reverse proxy buffering (check proxy_buffering config), or (3) network interruption.', code, isProxyHint: true, isTransient: true, }; } /** * Check for timeout patterns. Use case-insensitive matching for common timeout error codes: * - ETIMEDOUT: TCP connection timeout * - ESOCKETTIMEDOUT: Socket timeout * - "timed out" / "timeout": Generic timeout messages */ const lowerMessage = rawMessage.toLowerCase(); if ( rawMessage.includes('ETIMEDOUT') || rawMessage.includes('ESOCKETTIMEDOUT') || lowerMessage.includes('timed out') || lowerMessage.includes('timeout after') || lowerMessage.includes('request timeout') ) { return { message: `SSE connection timed out: ${rawMessage}. If behind a reverse proxy, increase proxy_read_timeout.`, code, isProxyHint: true, isTransient: true, }; } // Connection reset is often transient (server restart, proxy reload) if (rawMessage.includes('ECONNRESET')) { return { message: `SSE connection reset: ${rawMessage}. The server or proxy may have restarted.`, code, isProxyHint: false, isTransient: true, }; } // Connection refused is more serious - server may be down if (rawMessage.includes('ECONNREFUSED')) { return { message: `SSE connection refused: ${rawMessage}. Verify the MCP server is running and accessible.`, code, isProxyHint: false, isTransient: false, }; } // DNS failure is usually a configuration issue, not transient if (rawMessage.includes('ENOTFOUND') || rawMessage.includes('getaddrinfo')) { return { message: `SSE DNS resolution failed: ${rawMessage}. Check the server URL is correct.`, code, isProxyHint: false, isTransient: false, }; } // Check for HTTP status codes in the message const statusMatch = rawMessage.match(/\b(4\d{2}|5\d{2})\b/); if (statusMatch) { const statusCode = parseInt(statusMatch[1], 10); // 5xx errors are often transient, 4xx are usually not const isServerError = statusCode >= 500 && statusCode < 600; return { message: rawMessage, code: statusCode, isProxyHint: statusCode === 502 || statusCode === 503 || statusCode === 504, isTransient: isServerError, }; } /** * "fetch failed" is a generic undici TypeError that occurs when an in-flight HTTP request * is aborted (e.g. after an MCP protocol-level timeout fires). The transport itself is still * functional — only the individual request was lost — so treat this as transient. */ if (rawMessage === 'fetch failed') { return { message: 'fetch failed (request aborted, likely after a timeout — connection may still be usable)', code, isProxyHint: false, isTransient: true, }; } return { message: rawMessage, code, isProxyHint: false, isTransient: false, }; } interface MCPConnectionParams { serverName: string; serverConfig: t.MCPOptions; userId?: string; oauthTokens?: MCPOAuthTokens | null; useSSRFProtection?: boolean; } export class MCPConnection extends EventEmitter { public client: Client; private options: t.MCPOptions; private transport: Transport | null = null; // Make this nullable private connectionState: t.ConnectionState = 'disconnected'; private connectPromise: Promise | null = null; private readonly MAX_RECONNECT_ATTEMPTS = 3; public readonly serverName: string; private shouldStopReconnecting = false; private isReconnecting = false; private isInitializing = false; private reconnectAttempts = 0; private agents: Agent[] = []; private readonly userId?: string; private lastPingTime: number; private lastConnectionCheckAt: number = 0; private oauthTokens?: MCPOAuthTokens | null; private requestHeaders?: Record | null; private oauthRequired = false; private oauthRecovery = false; private readonly useSSRFProtection: boolean; iconPath?: string; timeout?: number; sseReadTimeout?: number; url?: string; /** * Timestamp when this connection was created. * Used to detect if connection is stale compared to updated config. */ public readonly createdAt: number; private static circuitBreakers: Map = new Map(); public static clearCooldown(serverName: string): void { MCPConnection.circuitBreakers.delete(serverName); logger.debug(`[MCP][${serverName}] Circuit breaker state cleared`); } private getCircuitBreaker(): CircuitBreakerState { let cb = MCPConnection.circuitBreakers.get(this.serverName); if (!cb) { cb = { cycleCount: 0, cycleWindowStart: Date.now(), cooldownUntil: 0, failedRounds: 0, failedWindowStart: Date.now(), failedBackoffUntil: 0, }; MCPConnection.circuitBreakers.set(this.serverName, cb); } return cb; } private isCircuitOpen(): boolean { const cb = this.getCircuitBreaker(); const now = Date.now(); return now < cb.cooldownUntil || now < cb.failedBackoffUntil; } private recordCycle(): void { const cb = this.getCircuitBreaker(); const now = Date.now(); if (now - cb.cycleWindowStart > mcpConfig.CB_CYCLE_WINDOW_MS) { cb.cycleCount = 0; cb.cycleWindowStart = now; } cb.cycleCount++; if (cb.cycleCount >= mcpConfig.CB_MAX_CYCLES) { cb.cooldownUntil = now + mcpConfig.CB_CYCLE_COOLDOWN_MS; cb.cycleCount = 0; cb.cycleWindowStart = now; logger.warn( `${this.getLogPrefix()} Circuit breaker: too many cycles, cooling down for ${mcpConfig.CB_CYCLE_COOLDOWN_MS}ms`, ); } } private recordFailedRound(): void { const cb = this.getCircuitBreaker(); const now = Date.now(); if (now - cb.failedWindowStart > mcpConfig.CB_FAILED_WINDOW_MS) { cb.failedRounds = 0; cb.failedWindowStart = now; } cb.failedRounds++; if (cb.failedRounds >= mcpConfig.CB_MAX_FAILED_ROUNDS) { const backoff = Math.min( mcpConfig.CB_BASE_BACKOFF_MS * Math.pow(2, cb.failedRounds - mcpConfig.CB_MAX_FAILED_ROUNDS), mcpConfig.CB_MAX_BACKOFF_MS, ); cb.failedBackoffUntil = now + backoff; logger.warn( `${this.getLogPrefix()} Circuit breaker: too many failures, backing off for ${backoff}ms`, ); } } private resetFailedRounds(): void { const cb = this.getCircuitBreaker(); cb.failedRounds = 0; cb.failedWindowStart = Date.now(); cb.failedBackoffUntil = 0; } public static decrementCycleCount(serverName: string): void { const cb = MCPConnection.circuitBreakers.get(serverName); if (cb && cb.cycleCount > 0) { cb.cycleCount--; } } setRequestHeaders(headers: Record | null): void { if (!headers) { return; } const normalizedHeaders: Record = {}; for (const [key, value] of Object.entries(headers)) { normalizedHeaders[key.toLowerCase()] = value; } this.requestHeaders = normalizedHeaders; } getRequestHeaders(): Record | null | undefined { return this.requestHeaders; } constructor(params: MCPConnectionParams) { super(); this.options = params.serverConfig; this.serverName = params.serverName; this.userId = params.userId; this.useSSRFProtection = params.useSSRFProtection === true; this.iconPath = params.serverConfig.iconPath; this.timeout = params.serverConfig.timeout; this.sseReadTimeout = params.serverConfig.sseReadTimeout; this.lastPingTime = Date.now(); this.createdAt = Date.now(); // Record creation timestamp for staleness detection if (params.oauthTokens) { this.oauthTokens = params.oauthTokens; } this.client = new Client( { name: '@librechat/api-client', version: '1.2.3', }, { capabilities: {}, }, ); this.setupEventListeners(); } /** Helper to generate consistent log prefixes */ private getLogPrefix(): string { const userPart = this.userId ? `[User: ${this.userId}]` : ''; return `[MCP]${userPart}[${this.serverName}]`; } /** * Factory function to create fetch functions without capturing the entire `this` context. * This helps prevent memory leaks by only passing necessary dependencies. * * When `sseBodyTimeout` is provided, a second Agent is created with a much longer * body timeout for GET requests (the Streamable HTTP SSE stream). POST requests * continue using the normal timeout so they fail fast on real errors. */ private createFetchFunction( getHeaders: () => Record | null | undefined, timeout?: number, sseBodyTimeout?: number, ): (input: UndiciRequestInfo, init?: UndiciRequestInit) => Promise { const ssrfConnect = this.useSSRFProtection ? createSSRFSafeUndiciConnect() : undefined; const connectOpts = ssrfConnect != null ? { connect: ssrfConnect } : {}; const effectiveTimeout = timeout || DEFAULT_TIMEOUT; const postAgent = new Agent({ bodyTimeout: effectiveTimeout, headersTimeout: effectiveTimeout, ...connectOpts, }); this.agents.push(postAgent); let getAgent: Agent | undefined; if (sseBodyTimeout != null) { getAgent = new Agent({ bodyTimeout: sseBodyTimeout, headersTimeout: effectiveTimeout, ...connectOpts, }); this.agents.push(getAgent); } return function customFetch( input: UndiciRequestInfo, init?: UndiciRequestInit, ): Promise { const isGet = (init?.method ?? 'GET').toUpperCase() === 'GET'; const dispatcher = isGet && getAgent ? getAgent : postAgent; const requestHeaders = getHeaders(); if (!requestHeaders) { return undiciFetch(input, { ...init, redirect: 'manual', dispatcher }); } let initHeaders: Record = {}; if (init?.headers) { if (init.headers instanceof Headers) { initHeaders = Object.fromEntries(init.headers.entries()); } else if (Array.isArray(init.headers)) { initHeaders = Object.fromEntries(init.headers); } else { initHeaders = init.headers as Record; } } return undiciFetch(input, { ...init, redirect: 'manual', headers: { ...initHeaders, ...requestHeaders, }, dispatcher, }); }; } private emitError(error: unknown, errorContext: string): void { const errorMessage = error instanceof Error ? error.message : String(error); logger.error(`${this.getLogPrefix()} ${errorContext}: ${errorMessage}`); } private async constructTransport(options: t.MCPOptions): Promise { try { let type: t.MCPOptions['type']; if (isStdioOptions(options)) { type = 'stdio'; } else if (isWebSocketOptions(options)) { type = 'websocket'; } else if (isStreamableHTTPOptions(options)) { // Could be either 'streamable-http' or 'http', normalize to 'streamable-http' type = 'streamable-http'; } else if (isSSEOptions(options)) { type = 'sse'; } else { throw new Error( 'Cannot infer transport type: options.type is not provided and cannot be inferred from other properties.', ); } switch (type) { case 'stdio': if (!isStdioOptions(options)) { throw new Error('Invalid options for stdio transport.'); } return new StdioClientTransport({ command: options.command, args: options.args, // workaround bug of mcp sdk that can't pass env: // https://github.com/modelcontextprotocol/typescript-sdk/issues/216 env: { ...getDefaultEnvironment(), ...(options.env ?? {}) }, }); case 'websocket': { if (!isWebSocketOptions(options)) { throw new Error('Invalid options for websocket transport.'); } this.url = options.url; /** * SSRF pre-check: always validate resolved IPs for WebSocket, regardless * of allowlist configuration. Allowlisting a domain grants trust to that * name, not to whatever IP it resolves to at runtime (DNS rebinding). * * Note: WebSocketClientTransport does its own DNS resolution, creating a * small TOCTOU window. This is an SDK limitation — the transport accepts * only a URL with no custom DNS lookup hook. */ const wsHostname = new URL(options.url).hostname; const isSSRF = await resolveHostnameSSRF(wsHostname); if (isSSRF) { throw new Error( `SSRF protection: WebSocket host "${wsHostname}" resolved to a private/reserved IP address`, ); } return new WebSocketClientTransport(new URL(options.url)); } case 'sse': { if (!isSSEOptions(options)) { throw new Error('Invalid options for sse transport.'); } this.url = options.url; const url = new URL(options.url); logger.info( `${this.getLogPrefix()} Creating SSE transport: ${sanitizeUrlForLogging(url)}`, ); const abortController = new AbortController(); /** Add OAuth token to headers if available */ const headers = { ...options.headers }; if (this.oauthTokens?.access_token) { headers['Authorization'] = `Bearer ${this.oauthTokens.access_token}`; } /** * SSE connections need longer timeouts for reliability. * The connect timeout is extended because proxies may delay initial response. */ const sseTimeout = this.timeout || SSE_CONNECT_TIMEOUT; const ssrfConnect = this.useSSRFProtection ? createSSRFSafeUndiciConnect() : undefined; const sseAgent = new Agent({ bodyTimeout: sseTimeout, headersTimeout: sseTimeout, keepAliveTimeout: sseTimeout, keepAliveMaxTimeout: sseTimeout * 2, ...(ssrfConnect != null ? { connect: ssrfConnect } : {}), }); this.agents.push(sseAgent); const transport = new SSEClientTransport(url, { requestInit: { /** User/OAuth headers override SSE defaults */ headers: { ...SSE_REQUEST_HEADERS, ...headers }, signal: abortController.signal, }, eventSourceInit: { fetch: (url, init) => { /** Merge headers: SSE defaults < init headers < user headers (user wins) */ const fetchHeaders = new Headers( Object.assign({}, SSE_REQUEST_HEADERS, init?.headers, headers), ); return undiciFetch(url, { ...init, redirect: 'manual', dispatcher: sseAgent, headers: fetchHeaders, }); }, }, fetch: this.createFetchFunction( this.getRequestHeaders.bind(this), sseTimeout, ) as unknown as FetchLike, }); transport.onclose = () => { logger.info(`${this.getLogPrefix()} SSE transport closed`); this.emit('connectionChange', 'disconnected'); }; this.setupTransportErrorHandlers(transport); return transport; } case 'streamable-http': { if (!isStreamableHTTPOptions(options)) { throw new Error('Invalid options for streamable-http transport.'); } this.url = options.url; const url = new URL(options.url); logger.info( `${this.getLogPrefix()} Creating streamable-http transport: ${sanitizeUrlForLogging(url)}`, ); const abortController = new AbortController(); /** Add OAuth token to headers if available */ const headers = { ...options.headers }; if (this.oauthTokens?.access_token) { headers['Authorization'] = `Bearer ${this.oauthTokens.access_token}`; } const transport = new StreamableHTTPClientTransport(url, { requestInit: { headers, signal: abortController.signal, }, fetch: this.createFetchFunction( this.getRequestHeaders.bind(this), this.timeout, this.sseReadTimeout || DEFAULT_SSE_READ_TIMEOUT, ) as unknown as FetchLike, }); transport.onclose = () => { logger.info(`${this.getLogPrefix()} Streamable-http transport closed`); this.emit('connectionChange', 'disconnected'); }; this.setupTransportErrorHandlers(transport); return transport; } default: { throw new Error(`Unsupported transport type: ${type}`); } } } catch (error) { this.emitError(error, 'Failed to construct transport'); throw error; } } private setupEventListeners(): void { this.isInitializing = true; this.on('connectionChange', (state: t.ConnectionState) => { this.connectionState = state; if (state === 'connected') { this.isReconnecting = false; this.isInitializing = false; this.shouldStopReconnecting = false; this.reconnectAttempts = 0; /** * // FOR DEBUGGING * // this.client.setRequestHandler(PingRequestSchema, async (request, extra) => { * // logger.info(`[MCP][${this.serverName}] PingRequest: ${JSON.stringify(request)}`); * // if (getEventListeners && extra.signal) { * // const listenerCount = getEventListeners(extra.signal, 'abort').length; * // logger.debug(`Signal has ${listenerCount} abort listeners`); * // } * // return {}; * // }); */ } else if (state === 'error' && !this.isReconnecting && !this.isInitializing) { this.handleReconnection().catch((error) => { logger.error(`${this.getLogPrefix()} Reconnection handler failed:`, error); }); } }); this.subscribeToResources(); } private async handleReconnection(): Promise { if ( this.isReconnecting || this.shouldStopReconnecting || this.isInitializing || this.oauthRequired ) { if (this.oauthRequired) { logger.info(`${this.getLogPrefix()} OAuth required, skipping reconnection attempts`); } return; } this.isReconnecting = true; const backoffDelay = (attempt: number) => { const base = Math.min(1000 * Math.pow(2, attempt), 30000); const jitter = Math.floor(Math.random() * 1000); // up to 1s of random jitter return base + jitter; }; try { while ( this.reconnectAttempts < this.MAX_RECONNECT_ATTEMPTS && !(this.shouldStopReconnecting as boolean) ) { this.reconnectAttempts++; const delay = backoffDelay(this.reconnectAttempts); logger.info( `${this.getLogPrefix()} Reconnecting ${this.reconnectAttempts}/${this.MAX_RECONNECT_ATTEMPTS} (delay: ${delay}ms)`, ); await new Promise((resolve) => setTimeout(resolve, delay)); try { await this.connect(); this.reconnectAttempts = 0; return; } catch (error) { logger.error(`${this.getLogPrefix()} Reconnection attempt failed:`, error); // Stop immediately if rate limited - retrying will only make it worse if (this.isRateLimitError(error)) { /** * Rate limiting sets shouldStopReconnecting to prevent hammering the server. * Silent return here (vs throw in connectClient) because we're already in * error recovery mode - throwing would just add noise. The connection * must be recreated to retry after rate limit lifts. */ logger.warn( `${this.getLogPrefix()} Rate limited (429), stopping reconnection attempts`, ); logger.debug( `${this.getLogPrefix()} Rate limit block is permanent for this connection instance`, ); this.shouldStopReconnecting = true; return; } if ( this.reconnectAttempts === this.MAX_RECONNECT_ATTEMPTS || (this.shouldStopReconnecting as boolean) ) { logger.error(`${this.getLogPrefix()} Stopping reconnection attempts`); return; } } } } finally { this.isReconnecting = false; } } private subscribeToResources(): void { this.client.setNotificationHandler(ResourceListChangedNotificationSchema, async () => { this.emit('resourcesChanged'); }); } async connectClient(): Promise { if (this.connectionState === 'connected') { return; } if (this.connectPromise) { return this.connectPromise; } if (this.shouldStopReconnecting) { return; } if (this.isCircuitOpen()) { this.connectionState = 'error'; this.emit('connectionChange', 'error'); throw new Error(`${this.getLogPrefix()} Circuit breaker is open, connection attempt blocked`); } this.emit('connectionChange', 'connecting'); this.connectPromise = (async () => { try { if (this.transport) { try { await this.client.close(); } catch (error) { logger.warn(`${this.getLogPrefix()} Error closing connection:`, error); } this.transport = null; await this.closeAgents(); } this.transport = await runOutsideTracing(() => this.constructTransport(this.options)); this.patchTransportSend(); const connectTimeout = this.options.initTimeout ?? DEFAULT_INIT_TIMEOUT; await runOutsideTracing(() => withTimeout( this.client.connect(this.transport!), connectTimeout, `Connection timeout after ${connectTimeout}ms`, ), ); this.setupTransportOnMessageHandler(); this.connectionState = 'connected'; this.emit('connectionChange', 'connected'); this.reconnectAttempts = 0; this.resetFailedRounds(); if (this.oauthRecovery) { MCPConnection.decrementCycleCount(this.serverName); this.oauthRecovery = false; logger.debug( `${this.getLogPrefix()} OAuth recovery: decremented cycle count after successful reconnect`, ); } } catch (error) { // Check if it's a rate limit error - stop immediately to avoid making it worse if (this.isRateLimitError(error)) { /** * Rate limiting sets shouldStopReconnecting to prevent hammering the server. * This is a permanent block for this connection instance - the connection * must be recreated (e.g., by user re-initiating) to retry after rate limit lifts. * * We throw here (unlike handleReconnection which returns silently) because: * - connectClient() is a public API - callers expect async errors to throw * - Other errors in this catch block also throw for consistency * - handleReconnection is private/internal error recovery, different context */ logger.warn(`${this.getLogPrefix()} Rate limited (429), stopping connection attempts`); this.shouldStopReconnecting = true; this.connectionState = 'error'; this.emit('connectionChange', 'error'); throw error; } // Check if it's an OAuth authentication error if (this.isOAuthError(error)) { logger.warn(`${this.getLogPrefix()} OAuth authentication required`); this.oauthRequired = true; const serverUrl = this.url; logger.debug( `${this.getLogPrefix()} Server URL for OAuth: ${serverUrl ? sanitizeUrlForLogging(serverUrl) : 'undefined'}`, ); const oauthTimeout = this.options.initTimeout ?? 60000 * 2; /** Promise that will resolve when OAuth is handled */ const oauthHandledPromise = new Promise((resolve, reject) => { let timeoutId: NodeJS.Timeout | null = null; let oauthHandledListener: (() => void) | null = null; let oauthFailedListener: ((error: Error) => void) | null = null; /** Cleanup function to remove listeners and clear timeout */ const cleanup = () => { if (timeoutId) { clearTimeout(timeoutId); } if (oauthHandledListener) { this.off('oauthHandled', oauthHandledListener); } if (oauthFailedListener) { this.off('oauthFailed', oauthFailedListener); } }; // Success handler oauthHandledListener = () => { cleanup(); resolve(); }; // Failure handler oauthFailedListener = (error: Error) => { cleanup(); reject(error); }; // Timeout handler timeoutId = setTimeout(() => { cleanup(); reject(new Error(`OAuth handling timeout after ${oauthTimeout}ms`)); }, oauthTimeout); // Listen for both success and failure events this.once('oauthHandled', oauthHandledListener); this.once('oauthFailed', oauthFailedListener); }); // Emit the event this.emit('oauthRequired', { serverName: this.serverName, error, serverUrl, userId: this.userId, }); try { // Wait for OAuth to be handled await oauthHandledPromise; this.oauthRequired = false; this.oauthRecovery = true; logger.info( `${this.getLogPrefix()} OAuth handled successfully, connection will be retried`, ); return; } catch (oauthError) { // OAuth failed or timed out this.oauthRequired = false; logger.error(`${this.getLogPrefix()} OAuth handling failed:`, oauthError); // Re-throw the original authentication error throw error; } } this.connectionState = 'error'; this.emit('connectionChange', 'error'); this.recordFailedRound(); throw error; } finally { this.connectPromise = null; } })(); return this.connectPromise; } private patchTransportSend(): void { if (!this.transport) { return; } const originalSend = this.transport.send.bind(this.transport); this.transport.send = async (msg) => { if ('result' in msg && !('method' in msg) && Object.keys(msg.result ?? {}).length === 0) { if (Date.now() - this.lastPingTime < FIVE_MINUTES) { throw new Error('Empty result'); } this.lastPingTime = Date.now(); } const method = 'method' in msg ? msg.method : undefined; const id = 'id' in msg ? (msg as { id: string | number | null }).id : undefined; logger.debug( `${this.getLogPrefix()} Transport sending: method=${method ?? 'response'} id=${id ?? 'none'}`, ); return originalSend(msg); }; } private setupTransportOnMessageHandler(): void { if (!this.transport?.onmessage) { return; } const sdkHandler = this.transport.onmessage; this.transport.onmessage = (msg) => { const method = 'method' in msg ? msg.method : undefined; const id = 'id' in msg ? (msg as { id: string | number | null }).id : undefined; logger.debug( `${this.getLogPrefix()} Transport received: method=${method ?? 'response'} id=${id ?? 'none'}`, ); sdkHandler(msg); }; } async connect(): Promise { try { // preserve cycle tracking across reconnects so the circuit breaker can detect rapid cycling await this.disconnect(false); await this.connectClient(); if (!(await this.isConnected())) { throw new Error('Connection not established'); } } catch (error) { logger.error(`${this.getLogPrefix()} Connection failed:`, error); throw error; } } private setupTransportErrorHandlers(transport: Transport): void { transport.onerror = (error) => { const rawMessage = error && typeof error === 'object' ? ((error as { message?: string }).message ?? '') : ''; /** * The MCP SDK's StreamableHTTPClientTransport fires onerror for SSE GET stream * disconnects but also handles reconnection internally via _scheduleReconnection. * Escalating these to a full transport rebuild creates a redundant reconnection * loop. Log at debug level and let the SDK recover the GET stream on its own. * * "Maximum reconnection attempts … exceeded" means the SDK gave up — that one * must fall through so our higher-level reconnection takes over. */ if ( rawMessage.startsWith(SDK_SSE_STREAM_DISCONNECTED) || rawMessage.startsWith(SDK_SSE_RECONNECT_FAILED) ) { logger.debug(`${this.getLogPrefix()} SDK SSE stream recovery in progress: ${rawMessage}`); return; } const { message: errorMessage, code: errorCode, isProxyHint, isTransient, } = extractSSEErrorMessage(error); if (errorCode === 400 || errorCode === 404 || errorCode === 405) { const hasSession = 'sessionId' in transport && (transport as { sessionId?: string }).sessionId != null && (transport as { sessionId?: string }).sessionId !== ''; if (!hasSession && errorMessage.toLowerCase().includes('failed to open sse stream')) { logger.warn( `${this.getLogPrefix()} SSE stream not available (${errorCode}), no session. Ignoring.`, ); return; } if (hasSession) { logger.warn( `${this.getLogPrefix()} ${errorCode} with active session — session lost, triggering reconnection.`, ); } } // Check if it's an OAuth authentication error if (this.isOAuthError(error)) { logger.warn(`${this.getLogPrefix()} OAuth authentication error detected`); this.emit('oauthError', error); } /** * Log with enhanced context for debugging. * All transport.onerror events are logged as errors to preserve stack traces. * isTransient indicates whether auto-reconnection is expected to succeed. * * The MCP SDK's SseError extends Error and includes: * - code: HTTP status code or eventsource error code * - event: The original eventsource ErrorEvent * - stack: Full stack trace */ const errorContext: Record = { code: errorCode, isTransient, }; if (isProxyHint) { errorContext.hint = 'Check Nginx/proxy configuration for SSE endpoints'; } // Extract additional debug info from SseError if available if (error && typeof error === 'object') { const sseError = error as { event?: unknown; stack?: string }; // Include the original eventsource event for debugging if (sseError.event && typeof sseError.event === 'object') { const event = sseError.event as { code?: number; message?: string; type?: string }; errorContext.eventDetails = { type: event.type, code: event.code, message: event.message, }; } // Include stack trace if available if (sseError.stack) { errorContext.stack = sseError.stack; } } const errorLabel = isTransient ? 'Transport error (transient, will reconnect)' : 'Transport error (may require manual intervention)'; logger.error(`${this.getLogPrefix()} ${errorLabel}: ${errorMessage}`, errorContext); this.emit('connectionChange', 'error'); }; } private async closeAgents(): Promise { const logPrefix = this.getLogPrefix(); const closing = this.agents.map((agent) => agent.close().catch((err: unknown) => { logger.debug(`${logPrefix} Agent close error (non-fatal):`, err); }), ); this.agents = []; await Promise.all(closing); } public async disconnect(resetCycleTracking = true): Promise { try { if (this.transport) { await this.client.close(); this.transport = null; } await this.closeAgents(); if (this.connectionState === 'disconnected') { return; } this.connectionState = 'disconnected'; this.emit('connectionChange', 'disconnected'); } finally { this.connectPromise = null; if (!resetCycleTracking) { this.recordCycle(); } } } async fetchResources(): Promise { try { const { resources } = await this.client.listResources(); return resources; } catch (error) { this.emitError(error, 'Failed to fetch resources'); return []; } } async fetchTools() { try { const { tools } = await this.client.listTools(); return tools; } catch (error) { this.emitError(error, 'Failed to fetch tools'); return []; } } async fetchPrompts(): Promise { try { const { prompts } = await this.client.listPrompts(); return prompts; } catch (error) { this.emitError(error, 'Failed to fetch prompts'); return []; } } public async isConnected(): Promise { // First check if we're in a connected state if (this.connectionState !== 'connected') { return false; } // If we recently checked, skip expensive verification const now = Date.now(); if (now - this.lastConnectionCheckAt < mcpConfig.CONNECTION_CHECK_TTL) { return true; } this.lastConnectionCheckAt = now; try { // Try ping first as it's the lightest check await this.client.ping(); return this.connectionState === 'connected'; } catch (error) { // Check if the error is because ping is not supported (method not found) const pingUnsupported = error instanceof Error && ((error as Error)?.message.includes('-32601') || (error as Error)?.message.includes('-32602') || (error as Error)?.message.includes('invalid method ping') || (error as Error)?.message.includes('Unsupported method: ping') || (error as Error)?.message.includes('method not found')); if (!pingUnsupported) { logger.error(`${this.getLogPrefix()} Ping failed:`, error); return false; } // Ping is not supported by this server, try an alternative verification logger.debug( `${this.getLogPrefix()} Server does not support ping method, verifying connection with capabilities`, ); try { // Get server capabilities to verify connection is truly active const capabilities = this.client.getServerCapabilities(); // If we have capabilities, try calling a supported method to verify connection if (capabilities?.tools) { await this.client.listTools(); return this.connectionState === 'connected'; } else if (capabilities?.resources) { await this.client.listResources(); return this.connectionState === 'connected'; } else if (capabilities?.prompts) { await this.client.listPrompts(); return this.connectionState === 'connected'; } else { // No capabilities to test, but we're in connected state and initialization succeeded logger.debug( `${this.getLogPrefix()} No capabilities to test, assuming connected based on state`, ); return this.connectionState === 'connected'; } } catch (capabilityError) { // If capability check fails, the connection is likely broken logger.error(`${this.getLogPrefix()} Connection verification failed:`, capabilityError); return false; } } } public setOAuthTokens(tokens: MCPOAuthTokens): void { this.oauthTokens = tokens; } /** * Check if this connection is stale compared to config update time. * A connection is stale if it was created before the config was updated. * * @param configUpdatedAt - Unix timestamp (ms) when config was last updated * @returns true if connection was created before config update, false otherwise */ public isStale(configUpdatedAt: number): boolean { return this.createdAt < configUpdatedAt; } private isOAuthError(error: unknown): boolean { if (!error || typeof error !== 'object') { return false; } // Check for error code if ('code' in error) { const code = (error as { code?: number }).code; if (code === 401 || code === 403) { return true; } } // Check message for various auth error indicators if ('message' in error && typeof error.message === 'string') { const message = error.message.toLowerCase(); // Check for 401 status if (message.includes('401') || message.includes('non-200 status code (401)')) { return true; } // Check for invalid_token (OAuth servers return this for expired/revoked tokens) if (message.includes('invalid_token')) { return true; } // Check for invalid_grant (OAuth servers return this for expired/revoked grants) if (message.includes('invalid_grant')) { return true; } // Check for authentication required if (message.includes('authentication required') || message.includes('unauthorized')) { return true; } } return false; } /** * Checks if an error indicates rate limiting (HTTP 429). * Rate limited requests should stop reconnection attempts to avoid making the situation worse. */ private isRateLimitError(error: unknown): boolean { if (!error || typeof error !== 'object') { return false; } // Check for error code if ('code' in error) { const code = (error as { code?: number }).code; if (code === 429) { return true; } } // Check message for rate limit indicators if ('message' in error && typeof error.message === 'string') { const message = error.message.toLowerCase(); if ( message.includes('429') || message.includes('rate limit') || message.includes('too many requests') ) { return true; } } return false; } }