mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-01-06 02:28:51 +01:00
🔧 fix: Fix rampant pings and rate limiting
- Skips idle connection checks in `getMCPManager` to avoid unnecessary pings. - Introduces a skipOAuthTimeout flag during initial connection to prevent timeouts during server discovery. - Uses a lightweight connection state check instead of ping to avoid rate limits. - Prevents refetch spam and rate limit errors when checking connection status. - Fixes an issue where the server connection was not being disconnected.
This commit is contained in:
parent
1da5365397
commit
10e06f2221
9 changed files with 90 additions and 39 deletions
|
|
@ -57,7 +57,7 @@ export class MCPConnection extends EventEmitter {
|
|||
private static instance: MCPConnection | null = null;
|
||||
public client: Client;
|
||||
private transport: Transport | null = null; // Make this nullable
|
||||
private connectionState: t.ConnectionState = 'disconnected';
|
||||
public connectionState: t.ConnectionState = 'disconnected';
|
||||
private connectPromise: Promise<void> | null = null;
|
||||
private readonly MAX_RECONNECT_ATTEMPTS = 3;
|
||||
public readonly serverName: string;
|
||||
|
|
@ -70,6 +70,7 @@ export class MCPConnection extends EventEmitter {
|
|||
private oauthTokens?: MCPOAuthTokens | null;
|
||||
private oauthRequired = false;
|
||||
private oauthTimeoutId: NodeJS.Timeout | null = null;
|
||||
private skipOAuthTimeout = false; // Skip OAuth timeouts during discovery
|
||||
iconPath?: string;
|
||||
timeout?: number;
|
||||
url?: string;
|
||||
|
|
@ -79,6 +80,7 @@ export class MCPConnection extends EventEmitter {
|
|||
private readonly options: t.MCPOptions,
|
||||
userId?: string,
|
||||
oauthTokens?: MCPOAuthTokens | null,
|
||||
skipOAuthTimeout = false, // Skip OAuth timeouts during discovery
|
||||
) {
|
||||
super();
|
||||
this.serverName = serverName;
|
||||
|
|
@ -86,6 +88,7 @@ export class MCPConnection extends EventEmitter {
|
|||
this.iconPath = options.iconPath;
|
||||
this.timeout = options.timeout;
|
||||
this.lastPingTime = Date.now();
|
||||
this.skipOAuthTimeout = skipOAuthTimeout;
|
||||
if (oauthTokens) {
|
||||
this.oauthTokens = oauthTokens;
|
||||
}
|
||||
|
|
@ -208,10 +211,7 @@ export class MCPConnection extends EventEmitter {
|
|||
this.emit('connectionChange', 'disconnected');
|
||||
};
|
||||
|
||||
transport.onerror = (error) => {
|
||||
logger.error(`${this.getLogPrefix()} SSE transport error:`, error);
|
||||
this.emitError(error, 'SSE transport error:');
|
||||
};
|
||||
// Error handling is done by setupTransportErrorHandlers
|
||||
|
||||
transport.onmessage = (message) => {
|
||||
logger.info(`${this.getLogPrefix()} Message received: ${JSON.stringify(message)}`);
|
||||
|
|
@ -250,10 +250,7 @@ export class MCPConnection extends EventEmitter {
|
|||
this.emit('connectionChange', 'disconnected');
|
||||
};
|
||||
|
||||
transport.onerror = (error: Error | unknown) => {
|
||||
logger.error(`${this.getLogPrefix()} Streamable-http transport error:`, error);
|
||||
this.emitError(error, 'Streamable-http transport error:');
|
||||
};
|
||||
// Error handling is done by setupTransportErrorHandlers
|
||||
|
||||
transport.onmessage = (message: JSONRPCMessage) => {
|
||||
logger.info(`${this.getLogPrefix()} Message received: ${JSON.stringify(message)}`);
|
||||
|
|
@ -411,6 +408,24 @@ export class MCPConnection extends EventEmitter {
|
|||
const serverUrl = this.url;
|
||||
logger.debug(`${this.getLogPrefix()} Server URL for OAuth: ${serverUrl}`);
|
||||
|
||||
// In startup mode, immediately emit oauthRequired and fail without timeout
|
||||
if (this.skipOAuthTimeout) {
|
||||
logger.info(
|
||||
`${this.getLogPrefix()} Skip OAuth timeout: OAuth required, failing immediately without timeout`,
|
||||
);
|
||||
|
||||
// Emit the event for discovery purposes
|
||||
this.emit('oauthRequired', {
|
||||
serverName: this.serverName,
|
||||
error,
|
||||
serverUrl,
|
||||
userId: this.userId,
|
||||
});
|
||||
|
||||
// Immediately throw to avoid timeout
|
||||
throw error;
|
||||
}
|
||||
|
||||
const oauthTimeout = this.options.initTimeout ?? 60000;
|
||||
/** Promise that will resolve when OAuth is handled */
|
||||
const oauthHandledPromise = new Promise<void>((resolve, reject) => {
|
||||
|
|
@ -545,6 +560,12 @@ export class MCPConnection extends EventEmitter {
|
|||
|
||||
private setupTransportErrorHandlers(transport: Transport): void {
|
||||
transport.onerror = (error) => {
|
||||
// Suppress error logging if we're already disconnected or disconnecting
|
||||
if (this.connectionState === 'disconnected' || this.shouldStopReconnecting) {
|
||||
logger.debug(`${this.getLogPrefix()} Transport error during disconnect (expected):`, error);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.error(`${this.getLogPrefix()} Transport error:`, error);
|
||||
|
||||
// Check if it's an OAuth authentication error
|
||||
|
|
@ -556,6 +577,15 @@ export class MCPConnection extends EventEmitter {
|
|||
}
|
||||
}
|
||||
|
||||
// Check if it's a rate limit error (429) - don't trigger reconnection
|
||||
const errorMessage = error?.toString() || '';
|
||||
if (errorMessage.includes('429') || errorMessage.includes('too many requests')) {
|
||||
logger.warn(
|
||||
`${this.getLogPrefix()} Rate limit error detected, not triggering reconnection`,
|
||||
);
|
||||
return; // Don't emit error state for rate limits
|
||||
}
|
||||
|
||||
this.emit('connectionChange', 'error');
|
||||
};
|
||||
}
|
||||
|
|
@ -593,14 +623,14 @@ export class MCPConnection extends EventEmitter {
|
|||
this.oauthTimeoutId = null;
|
||||
}
|
||||
|
||||
// Set disconnected state early to suppress error logging during cleanup
|
||||
this.connectionState = 'disconnected';
|
||||
|
||||
if (this.transport) {
|
||||
await this.client.close();
|
||||
this.transport = null;
|
||||
}
|
||||
if (this.connectionState === 'disconnected') {
|
||||
return;
|
||||
}
|
||||
this.connectionState = 'disconnected';
|
||||
|
||||
this.emit('connectionChange', 'disconnected');
|
||||
} finally {
|
||||
this.connectPromise = null;
|
||||
|
|
@ -643,6 +673,11 @@ export class MCPConnection extends EventEmitter {
|
|||
return false;
|
||||
}
|
||||
|
||||
// Don't ping if we're disconnecting or should stop reconnecting
|
||||
if (this.shouldStopReconnecting) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
// Try ping first as it's the lightest check
|
||||
await this.client.ping();
|
||||
|
|
@ -656,6 +691,13 @@ export class MCPConnection extends EventEmitter {
|
|||
(error as Error)?.message.includes('method not found'));
|
||||
|
||||
if (!pingUnsupported) {
|
||||
// Check if it's a rate limit error - don't log as error
|
||||
const errorMessage = (error as Error)?.message || '';
|
||||
if (errorMessage.includes('429') || errorMessage.includes('too many requests')) {
|
||||
logger.warn(`${this.getLogPrefix()} Ping rate limited, assuming connected`);
|
||||
return true; // Assume still connected during rate limiting
|
||||
}
|
||||
|
||||
logger.error(`${this.getLogPrefix()} Ping failed:`, error);
|
||||
return false;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -160,6 +160,7 @@ export class MCPManager extends EventEmitter {
|
|||
}): Promise<void> {
|
||||
const processedConfig = processMCPEnv(config);
|
||||
let tokens: MCPOAuthTokens | null = null;
|
||||
|
||||
if (tokenMethods?.findToken) {
|
||||
try {
|
||||
/** Refresh function for app-level connections */
|
||||
|
|
@ -204,10 +205,13 @@ export class MCPManager extends EventEmitter {
|
|||
logger.debug(`[MCP][${serverName}] No existing tokens found`);
|
||||
}
|
||||
}
|
||||
|
||||
if (tokens) {
|
||||
logger.info(`[MCP][${serverName}] Loaded OAuth tokens`);
|
||||
}
|
||||
const connection = new MCPConnection(serverName, processedConfig, undefined, tokens);
|
||||
|
||||
// Create connection in startup mode to prevent OAuth timeouts
|
||||
const connection = new MCPConnection(serverName, processedConfig, undefined, tokens, true);
|
||||
|
||||
// Track OAuth skipped state explicitly
|
||||
let oauthSkipped = false;
|
||||
|
|
@ -347,7 +351,8 @@ export class MCPManager extends EventEmitter {
|
|||
while (attempts < maxAttempts) {
|
||||
try {
|
||||
await connection.connect();
|
||||
if (await connection.isConnected()) {
|
||||
// Use lightweight connection state check instead of ping
|
||||
if (connection.connectionState === 'connected') {
|
||||
return;
|
||||
}
|
||||
throw new Error('Connection attempt succeeded but status is not connected');
|
||||
|
|
@ -493,7 +498,8 @@ export class MCPManager extends EventEmitter {
|
|||
}
|
||||
connection = undefined; // Force creation of a new connection
|
||||
} else if (connection) {
|
||||
if (await connection.isConnected()) {
|
||||
// Use lightweight connection state check instead of ping
|
||||
if (connection.connectionState === 'connected') {
|
||||
logger.debug(`[MCP][User: ${userId}][${serverName}] Reusing active connection`);
|
||||
this.updateUserLastActivity(userId);
|
||||
return connection;
|
||||
|
|
@ -636,7 +642,8 @@ export class MCPManager extends EventEmitter {
|
|||
});
|
||||
await Promise.race([connectionAttempt, connectionTimeout]);
|
||||
|
||||
if (!(await connection?.isConnected())) {
|
||||
// Use lightweight connection state check instead of ping
|
||||
if (connection?.connectionState !== 'connected') {
|
||||
throw new Error('Failed to establish connection after initialization attempt.');
|
||||
}
|
||||
|
||||
|
|
@ -747,7 +754,8 @@ export class MCPManager extends EventEmitter {
|
|||
flowManager: FlowStateManager<MCPOAuthTokens | null>;
|
||||
skipReconnect?: boolean;
|
||||
}): Promise<boolean> {
|
||||
if (await connection.isConnected()) {
|
||||
// Use lightweight connection state check instead of ping
|
||||
if (connection.connectionState === 'connected') {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -775,7 +783,8 @@ export class MCPManager extends EventEmitter {
|
|||
flowManager,
|
||||
});
|
||||
|
||||
if (await connection.isConnected()) {
|
||||
// Use lightweight connection state check instead of ping
|
||||
if (connection.connectionState === 'connected') {
|
||||
logger.info(`[MCP][${serverName}] App-level connection successfully reconnected`);
|
||||
return true;
|
||||
} else {
|
||||
|
|
@ -959,7 +968,8 @@ export class MCPManager extends EventEmitter {
|
|||
}
|
||||
}
|
||||
|
||||
if (!(await connection.isConnected())) {
|
||||
// Use lightweight connection state check instead of ping
|
||||
if (connection.connectionState !== 'connected') {
|
||||
/** May happen if getUserConnection failed silently or app connection dropped */
|
||||
throw new McpError(
|
||||
ErrorCode.InternalError, // Use InternalError for connection issues
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue