📡 refactor: SSE Connection Settings and Error Handling (#11129)

* 🛜 refactor: SSE Connection Handling in MCP

- Introduced a longer initial handshake timeout for SSE connections to improve reliability through proxies.
- Added standardized request headers for SSE connections to ensure proper handling.
- Implemented a function to extract meaningful error messages from SSE transport errors, addressing common issues like timeouts, connection resets, and DNS failures.
- Enhanced error logging with detailed context to aid in debugging and provide insights into connection issues.

This update aims to improve the robustness and user experience of SSE connections in the MCP.

* chore: Update SSE Connection Headers Documentation

- Clarified the documentation for SSE connection headers by removing unnecessary details and emphasizing the headers that are intentionally excluded.
- Improved readability and conciseness of the comments regarding HTTP/2 connection management.

This change aims to enhance the clarity of the code documentation for better understanding and maintenance.

* refactor: Improved the handling of SSE transport errors by implementing a function to extract meaningful error messages, addressing various scenarios such as timeouts, connection resets, and DNS failures.
- Added comprehensive unit tests for the new error handling function, ensuring robust detection and reporting of common SSE error cases.
- Updated comments in the connection handling code to clarify the merging of headers, emphasizing user-defined overrides.

This update aims to improve the reliability and clarity of error reporting in SSE connections, enhancing the overall user experience.

* refactor: Enhance SSE error message extraction for improved timeout detection

- Updated the `extractSSEErrorMessage` function to include case-insensitive matching for various timeout patterns, including 'ESOCKETTIMEDOUT', 'timed out', 'timeout after', and 'request timeout'.
- Added unit tests to ensure accurate detection of these timeout messages and prevent false positives in unrelated contexts.
- Improved comments for clarity on the timeout detection logic.

This change aims to enhance the reliability of error handling in SSE connections, providing clearer feedback for timeout scenarios.
This commit is contained in:
Danny Avila 2025-12-28 12:19:27 -05:00 committed by GitHub
parent eeb5522464
commit 0b8e0fcede
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 555 additions and 20 deletions

View file

@ -188,3 +188,344 @@ describe('MCPConnection Error Detection', () => {
});
});
});
/**
* Tests for extractSSEErrorMessage function.
* This function extracts meaningful error messages from SSE transport errors,
* particularly handling the "SSE error: undefined" case from the MCP SDK.
*/
describe('extractSSEErrorMessage', () => {
/**
* Standalone implementation of extractSSEErrorMessage for testing.
* This mirrors the function in connection.ts.
* Keep in sync with the actual implementation.
*/
function extractSSEErrorMessage(error: unknown): {
message: string;
code?: number;
isProxyHint: boolean;
isTransient: boolean;
} {
if (!error || typeof error !== 'object') {
return {
message: 'Unknown SSE transport error',
isProxyHint: true,
isTransient: true,
};
}
const errorObj = error as { message?: string; code?: number; event?: unknown };
const rawMessage = errorObj.message ?? '';
const code = errorObj.code;
// Handle the common "SSE error: undefined" case
if (rawMessage === 'SSE error: undefined' || rawMessage === 'undefined' || !rawMessage) {
return {
message:
'SSE connection closed. This can occur due to: (1) idle connection timeout (normal), ' +
'(2) reverse proxy buffering (check proxy_buffering config), or (3) network interruption.',
code,
isProxyHint: true,
isTransient: true,
};
}
// Check for timeout patterns with case-insensitive matching
const lowerMessage = rawMessage.toLowerCase();
if (
rawMessage.includes('ETIMEDOUT') ||
rawMessage.includes('ESOCKETTIMEDOUT') ||
lowerMessage.includes('timed out') ||
lowerMessage.includes('timeout after') ||
lowerMessage.includes('request timeout')
) {
return {
message: `SSE connection timed out: ${rawMessage}. If behind a reverse proxy, increase proxy_read_timeout.`,
code,
isProxyHint: true,
isTransient: true,
};
}
// Connection reset is often transient
if (rawMessage.includes('ECONNRESET')) {
return {
message: `SSE connection reset: ${rawMessage}. The server or proxy may have restarted.`,
code,
isProxyHint: false,
isTransient: true,
};
}
// Connection refused is more serious
if (rawMessage.includes('ECONNREFUSED')) {
return {
message: `SSE connection refused: ${rawMessage}. Verify the MCP server is running and accessible.`,
code,
isProxyHint: false,
isTransient: false,
};
}
// DNS failure
if (rawMessage.includes('ENOTFOUND') || rawMessage.includes('getaddrinfo')) {
return {
message: `SSE DNS resolution failed: ${rawMessage}. Check the server URL is correct.`,
code,
isProxyHint: false,
isTransient: false,
};
}
// Check for HTTP status codes
const statusMatch = rawMessage.match(/\b(4\d{2}|5\d{2})\b/);
if (statusMatch) {
const statusCode = parseInt(statusMatch[1], 10);
const isServerError = statusCode >= 500 && statusCode < 600;
return {
message: rawMessage,
code: statusCode,
isProxyHint: statusCode === 502 || statusCode === 503 || statusCode === 504,
isTransient: isServerError,
};
}
return {
message: rawMessage,
code,
isProxyHint: false,
isTransient: false,
};
}
describe('undefined/empty error handling', () => {
it('should handle "SSE error: undefined" from MCP SDK', () => {
const error = { message: 'SSE error: undefined', code: undefined };
const result = extractSSEErrorMessage(error);
expect(result.message).toContain('SSE connection closed');
expect(result.isProxyHint).toBe(true);
expect(result.isTransient).toBe(true);
});
it('should handle empty message', () => {
const error = { message: '' };
const result = extractSSEErrorMessage(error);
expect(result.message).toContain('SSE connection closed');
expect(result.isTransient).toBe(true);
});
it('should handle message "undefined"', () => {
const error = { message: 'undefined' };
const result = extractSSEErrorMessage(error);
expect(result.message).toContain('SSE connection closed');
expect(result.isTransient).toBe(true);
});
it('should handle null error', () => {
const result = extractSSEErrorMessage(null);
expect(result.message).toBe('Unknown SSE transport error');
expect(result.isTransient).toBe(true);
});
it('should handle undefined error', () => {
const result = extractSSEErrorMessage(undefined);
expect(result.message).toBe('Unknown SSE transport error');
expect(result.isTransient).toBe(true);
});
it('should handle non-object error', () => {
const result = extractSSEErrorMessage('string error');
expect(result.message).toBe('Unknown SSE transport error');
expect(result.isTransient).toBe(true);
});
});
describe('timeout errors', () => {
it('should detect ETIMEDOUT', () => {
const error = { message: 'connect ETIMEDOUT 1.2.3.4:443' };
const result = extractSSEErrorMessage(error);
expect(result.message).toContain('SSE connection timed out');
expect(result.message).toContain('proxy_read_timeout');
expect(result.isProxyHint).toBe(true);
expect(result.isTransient).toBe(true);
});
it('should detect ESOCKETTIMEDOUT', () => {
const error = { message: 'ESOCKETTIMEDOUT' };
const result = extractSSEErrorMessage(error);
expect(result.message).toContain('SSE connection timed out');
expect(result.isTransient).toBe(true);
});
it('should detect "timed out" (case insensitive)', () => {
const error = { message: 'Connection Timed Out' };
const result = extractSSEErrorMessage(error);
expect(result.message).toContain('SSE connection timed out');
expect(result.isTransient).toBe(true);
});
it('should detect "timeout after"', () => {
const error = { message: 'Request timeout after 60000ms' };
const result = extractSSEErrorMessage(error);
expect(result.message).toContain('SSE connection timed out');
expect(result.isTransient).toBe(true);
});
it('should detect "request timeout"', () => {
const error = { message: 'Request Timeout' };
const result = extractSSEErrorMessage(error);
expect(result.message).toContain('SSE connection timed out');
expect(result.isTransient).toBe(true);
});
it('should NOT match "timeout" in unrelated context', () => {
// URL containing "timeout" should not trigger timeout detection
const error = { message: 'Failed to connect to https://api.example.com/timeout-settings' };
const result = extractSSEErrorMessage(error);
expect(result.message).not.toContain('SSE connection timed out');
expect(result.message).toBe('Failed to connect to https://api.example.com/timeout-settings');
});
});
describe('connection errors', () => {
it('should detect ECONNRESET as transient', () => {
const error = { message: 'read ECONNRESET' };
const result = extractSSEErrorMessage(error);
expect(result.message).toContain('SSE connection reset');
expect(result.isProxyHint).toBe(false);
expect(result.isTransient).toBe(true);
});
it('should detect ECONNREFUSED as non-transient', () => {
const error = { message: 'connect ECONNREFUSED 127.0.0.1:8080' };
const result = extractSSEErrorMessage(error);
expect(result.message).toContain('SSE connection refused');
expect(result.message).toContain('Verify the MCP server is running');
expect(result.isTransient).toBe(false);
});
});
describe('DNS errors', () => {
it('should detect ENOTFOUND', () => {
const error = { message: 'getaddrinfo ENOTFOUND unknown.host.com' };
const result = extractSSEErrorMessage(error);
expect(result.message).toContain('SSE DNS resolution failed');
expect(result.message).toContain('Check the server URL');
expect(result.isTransient).toBe(false);
});
it('should detect getaddrinfo errors', () => {
const error = { message: 'getaddrinfo EAI_AGAIN example.com' };
const result = extractSSEErrorMessage(error);
expect(result.message).toContain('SSE DNS resolution failed');
expect(result.isTransient).toBe(false);
});
});
describe('HTTP status code errors', () => {
it('should detect 502 as proxy hint and transient', () => {
const error = { message: 'Non-200 status code (502): Bad Gateway' };
const result = extractSSEErrorMessage(error);
expect(result.code).toBe(502);
expect(result.isProxyHint).toBe(true);
expect(result.isTransient).toBe(true);
});
it('should detect 503 as proxy hint and transient', () => {
const error = { message: 'Error: Service Unavailable (503)' };
const result = extractSSEErrorMessage(error);
expect(result.code).toBe(503);
expect(result.isProxyHint).toBe(true);
expect(result.isTransient).toBe(true);
});
it('should detect 504 as proxy hint and transient', () => {
const error = { message: 'Gateway Timeout 504' };
const result = extractSSEErrorMessage(error);
expect(result.code).toBe(504);
expect(result.isProxyHint).toBe(true);
expect(result.isTransient).toBe(true);
});
it('should detect 500 as transient but not proxy hint', () => {
const error = { message: 'Internal Server Error (500)' };
const result = extractSSEErrorMessage(error);
expect(result.code).toBe(500);
expect(result.isProxyHint).toBe(false);
expect(result.isTransient).toBe(true);
});
it('should detect 404 as non-transient', () => {
const error = { message: 'Not Found (404)' };
const result = extractSSEErrorMessage(error);
expect(result.code).toBe(404);
expect(result.isProxyHint).toBe(false);
expect(result.isTransient).toBe(false);
});
it('should detect 401 as non-transient', () => {
const error = { message: 'Unauthorized (401)' };
const result = extractSSEErrorMessage(error);
expect(result.code).toBe(401);
expect(result.isTransient).toBe(false);
});
});
describe('SseError from MCP SDK', () => {
it('should handle SseError with event property', () => {
const error = {
message: 'SSE error: undefined',
code: undefined,
event: { type: 'error', code: undefined, message: undefined },
};
const result = extractSSEErrorMessage(error);
expect(result.message).toContain('SSE connection closed');
expect(result.isTransient).toBe(true);
});
it('should preserve code from SseError', () => {
const error = {
message: 'SSE error: Server sent HTTP 204, not reconnecting',
code: 204,
};
const result = extractSSEErrorMessage(error);
expect(result.code).toBe(204);
});
});
describe('regular error messages', () => {
it('should pass through regular error messages', () => {
const error = { message: 'Some specific error message', code: 42 };
const result = extractSSEErrorMessage(error);
expect(result.message).toBe('Some specific error message');
expect(result.code).toBe(42);
expect(result.isProxyHint).toBe(false);
expect(result.isTransient).toBe(false);
});
});
});

View file

@ -68,6 +68,145 @@ function isStreamableHTTPOptions(options: t.MCPOptions): options is t.Streamable
const FIVE_MINUTES = 5 * 60 * 1000;
const DEFAULT_TIMEOUT = 60000;
/** SSE connections through proxies may need longer initial handshake time */
const SSE_CONNECT_TIMEOUT = 120000;
/**
* Headers for SSE connections.
*
* Headers we intentionally DO NOT include:
* - Accept: text/event-stream - Already set by eventsource library AND MCP SDK
* - X-Accel-Buffering: This is a RESPONSE header for Nginx, not a request header.
* The upstream MCP server must send this header for Nginx to respect it.
* - Connection: keep-alive: Forbidden in HTTP/2 (RFC 7540 §8.1.2.2).
* HTTP/2 manages connection persistence differently.
*/
const SSE_REQUEST_HEADERS = {
'Cache-Control': 'no-cache',
};
/**
* Extracts a meaningful error message from SSE transport errors.
* The MCP SDK's SSEClientTransport can produce "SSE error: undefined" when the
* underlying eventsource library encounters connection issues without a specific message.
*
* @returns Object containing:
* - message: Human-readable error description
* - code: HTTP status code if available
* - isProxyHint: Whether this error suggests proxy misconfiguration
* - isTransient: Whether this is likely a transient error that will auto-reconnect
*/
function extractSSEErrorMessage(error: unknown): {
message: string;
code?: number;
isProxyHint: boolean;
isTransient: boolean;
} {
if (!error || typeof error !== 'object') {
return {
message: 'Unknown SSE transport error',
isProxyHint: true,
isTransient: true,
};
}
const errorObj = error as { message?: string; code?: number; event?: unknown };
const rawMessage = errorObj.message ?? '';
const code = errorObj.code;
/**
* Handle the common "SSE error: undefined" case.
* This typically occurs when:
* 1. A reverse proxy buffers the SSE stream (proxy issue)
* 2. The server closes an idle connection (normal SSE behavior)
* 3. Network interruption without specific error details
*
* In all cases, the eventsource library will attempt to reconnect automatically.
*/
if (rawMessage === 'SSE error: undefined' || rawMessage === 'undefined' || !rawMessage) {
return {
message:
'SSE connection closed. This can occur due to: (1) idle connection timeout (normal), ' +
'(2) reverse proxy buffering (check proxy_buffering config), or (3) network interruption.',
code,
isProxyHint: true,
isTransient: true,
};
}
/**
* Check for timeout patterns. Use case-insensitive matching for common timeout error codes:
* - ETIMEDOUT: TCP connection timeout
* - ESOCKETTIMEDOUT: Socket timeout
* - "timed out" / "timeout": Generic timeout messages
*/
const lowerMessage = rawMessage.toLowerCase();
if (
rawMessage.includes('ETIMEDOUT') ||
rawMessage.includes('ESOCKETTIMEDOUT') ||
lowerMessage.includes('timed out') ||
lowerMessage.includes('timeout after') ||
lowerMessage.includes('request timeout')
) {
return {
message: `SSE connection timed out: ${rawMessage}. If behind a reverse proxy, increase proxy_read_timeout.`,
code,
isProxyHint: true,
isTransient: true,
};
}
// Connection reset is often transient (server restart, proxy reload)
if (rawMessage.includes('ECONNRESET')) {
return {
message: `SSE connection reset: ${rawMessage}. The server or proxy may have restarted.`,
code,
isProxyHint: false,
isTransient: true,
};
}
// Connection refused is more serious - server may be down
if (rawMessage.includes('ECONNREFUSED')) {
return {
message: `SSE connection refused: ${rawMessage}. Verify the MCP server is running and accessible.`,
code,
isProxyHint: false,
isTransient: false,
};
}
// DNS failure is usually a configuration issue, not transient
if (rawMessage.includes('ENOTFOUND') || rawMessage.includes('getaddrinfo')) {
return {
message: `SSE DNS resolution failed: ${rawMessage}. Check the server URL is correct.`,
code,
isProxyHint: false,
isTransient: false,
};
}
// Check for HTTP status codes in the message
const statusMatch = rawMessage.match(/\b(4\d{2}|5\d{2})\b/);
if (statusMatch) {
const statusCode = parseInt(statusMatch[1], 10);
// 5xx errors are often transient, 4xx are usually not
const isServerError = statusCode >= 500 && statusCode < 600;
return {
message: rawMessage,
code: statusCode,
isProxyHint: statusCode === 502 || statusCode === 503 || statusCode === 504,
isTransient: isServerError,
};
}
return {
message: rawMessage,
code,
isProxyHint: false,
isTransient: false,
};
}
interface MCPConnectionParams {
serverName: string;
@ -258,18 +397,29 @@ export class MCPConnection extends EventEmitter {
headers['Authorization'] = `Bearer ${this.oauthTokens.access_token}`;
}
const timeoutValue = this.timeout || DEFAULT_TIMEOUT;
/**
* SSE connections need longer timeouts for reliability.
* The connect timeout is extended because proxies may delay initial response.
*/
const sseTimeout = this.timeout || SSE_CONNECT_TIMEOUT;
const transport = new SSEClientTransport(url, {
requestInit: {
headers,
/** User/OAuth headers override SSE defaults */
headers: { ...SSE_REQUEST_HEADERS, ...headers },
signal: abortController.signal,
},
eventSourceInit: {
fetch: (url, init) => {
const fetchHeaders = new Headers(Object.assign({}, init?.headers, headers));
/** Merge headers: SSE defaults < init headers < user headers (user wins) */
const fetchHeaders = new Headers(
Object.assign({}, SSE_REQUEST_HEADERS, init?.headers, headers),
);
const agent = new Agent({
bodyTimeout: timeoutValue,
headersTimeout: timeoutValue,
bodyTimeout: sseTimeout,
headersTimeout: sseTimeout,
/** Extended keep-alive for long-lived SSE connections */
keepAliveTimeout: sseTimeout,
keepAliveMaxTimeout: sseTimeout * 2,
});
return undiciFetch(url, {
...init,
@ -280,7 +430,7 @@ export class MCPConnection extends EventEmitter {
},
fetch: this.createFetchFunction(
this.getRequestHeaders.bind(this),
this.timeout,
sseTimeout,
) as unknown as FetchLike,
});
@ -639,26 +789,70 @@ export class MCPConnection extends EventEmitter {
private setupTransportErrorHandlers(transport: Transport): void {
transport.onerror = (error) => {
if (error && typeof error === 'object' && 'code' in error) {
const errorCode = (error as unknown as { code?: number }).code;
// Extract meaningful error information (handles "SSE error: undefined" cases)
const {
message: errorMessage,
code: errorCode,
isProxyHint,
isTransient,
} = extractSSEErrorMessage(error);
// Ignore SSE 404 errors for servers that don't support SSE
if (
errorCode === 404 &&
String(error?.message).toLowerCase().includes('failed to open sse stream')
) {
logger.warn(`${this.getLogPrefix()} SSE stream not available (404). Ignoring.`);
return;
// Ignore SSE 404 errors for servers that don't support SSE
if (errorCode === 404 && errorMessage.toLowerCase().includes('failed to open sse stream')) {
logger.warn(`${this.getLogPrefix()} SSE stream not available (404). Ignoring.`);
return;
}
// Check if it's an OAuth authentication error
if (this.isOAuthError(error)) {
logger.warn(`${this.getLogPrefix()} OAuth authentication error detected`);
this.emit('oauthError', error);
}
/**
* Log with enhanced context for debugging.
* All transport.onerror events are logged as errors to preserve stack traces.
* isTransient indicates whether auto-reconnection is expected to succeed.
*
* The MCP SDK's SseError extends Error and includes:
* - code: HTTP status code or eventsource error code
* - event: The original eventsource ErrorEvent
* - stack: Full stack trace
*/
const errorContext: Record<string, unknown> = {
code: errorCode,
isTransient,
};
if (isProxyHint) {
errorContext.hint = 'Check Nginx/proxy configuration for SSE endpoints';
}
// Extract additional debug info from SseError if available
if (error && typeof error === 'object') {
const sseError = error as { event?: unknown; stack?: string };
// Include the original eventsource event for debugging
if (sseError.event && typeof sseError.event === 'object') {
const event = sseError.event as { code?: number; message?: string; type?: string };
errorContext.eventDetails = {
type: event.type,
code: event.code,
message: event.message,
};
}
// Check if it's an OAuth authentication error
if (this.isOAuthError(error)) {
logger.warn(`${this.getLogPrefix()} OAuth authentication error detected`);
this.emit('oauthError', error);
// Include stack trace if available
if (sseError.stack) {
errorContext.stack = sseError.stack;
}
}
logger.error(`${this.getLogPrefix()} Transport error:`, error);
const errorLabel = isTransient
? 'Transport error (transient, will reconnect)'
: 'Transport error (may require manual intervention)';
logger.error(`${this.getLogPrefix()} ${errorLabel}: ${errorMessage}`, errorContext);
this.emit('connectionChange', 'error');
};