mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-16 08:20:14 +01:00
🔧 fix: Fix rampant pings and rate limiting
- Skips idle connection checks in `getMCPManager` to avoid unnecessary pings. - Introduces a skipOAuthTimeout flag during initial connection to prevent timeouts during server discovery. - Uses a lightweight connection state check instead of ping to avoid rate limits. - Prevents refetch spam and rate limit errors when checking connection status. - Fixes an issue where the server connection was not being disconnected.
This commit is contained in:
parent
4b4741b1aa
commit
3ca736bc53
9 changed files with 90 additions and 39 deletions
|
|
@ -11,12 +11,13 @@ let flowManager = null;
|
|||
|
||||
/**
|
||||
* @param {string} [userId] - Optional user ID, to avoid disconnecting the current user.
|
||||
* @param {boolean} [skipIdleCheck] - Skip idle connection checking to avoid unnecessary pings.
|
||||
* @returns {MCPManager}
|
||||
*/
|
||||
function getMCPManager(userId) {
|
||||
function getMCPManager(userId, skipIdleCheck = false) {
|
||||
if (!mcpManager) {
|
||||
mcpManager = MCPManager.getInstance();
|
||||
} else {
|
||||
} else if (!skipIdleCheck) {
|
||||
mcpManager.checkIdleConnections(userId);
|
||||
}
|
||||
return mcpManager;
|
||||
|
|
|
|||
|
|
@ -97,7 +97,6 @@ function createServerToolsCallback() {
|
|||
return;
|
||||
}
|
||||
await mcpToolsCache.set(serverName, serverTools);
|
||||
logger.warn(`MCP tools for ${serverName} added to cache.`);
|
||||
} catch (error) {
|
||||
logger.error('Error retrieving MCP tools from cache:', error);
|
||||
}
|
||||
|
|
@ -240,7 +239,6 @@ const getAvailableTools = async (req, res) => {
|
|||
// Check if the app-level connection is active
|
||||
try {
|
||||
const mcpManager = getMCPManager();
|
||||
const allConnections = mcpManager.getAllConnections();
|
||||
|
||||
const appConnection = mcpManager.getConnection(serverName);
|
||||
|
||||
|
|
|
|||
|
|
@ -182,7 +182,7 @@ const updateUserPluginsController = async (req, res) => {
|
|||
`[updateUserPluginsController] Disconnecting MCP connection for user ${user.id} and server ${serverName} after plugin auth update for ${pluginKey}.`,
|
||||
);
|
||||
// Don't kill the server connection on revoke anymore, user can just reinitialize the server if thats what they want
|
||||
// await mcpManager.disconnectUserConnection(user.id, serverName);
|
||||
await mcpManager.disconnectUserConnection(user.id, serverName);
|
||||
}
|
||||
} catch (disconnectError) {
|
||||
logger.error(
|
||||
|
|
|
|||
|
|
@ -219,7 +219,7 @@ router.get('/connection/status', requireJwtAuth, async (req, res) => {
|
|||
return res.status(401).json({ error: 'User not authenticated' });
|
||||
}
|
||||
|
||||
const mcpManager = getMCPManager();
|
||||
const mcpManager = getMCPManager(null, true); // Skip idle checks to avoid ping spam
|
||||
const connectionStatus = {};
|
||||
|
||||
// Get all MCP server names from custom config
|
||||
|
|
@ -237,12 +237,14 @@ router.get('/connection/status', requireJwtAuth, async (req, res) => {
|
|||
const userConnection = mcpManager.getUserConnectionIfExists(user.id, serverName);
|
||||
const hasUserConnection = !!userConnection;
|
||||
|
||||
// Determine if connected based on actual connection state
|
||||
// Use lightweight connection state check instead of ping-based isConnected()
|
||||
let connected = false;
|
||||
if (hasAppConnection) {
|
||||
connected = await appConnection.isConnected();
|
||||
// Check connection state without ping to avoid rate limits
|
||||
connected = appConnection.connectionState === 'connected';
|
||||
} else if (hasUserConnection) {
|
||||
connected = await userConnection.isConnected();
|
||||
// Check connection state without ping to avoid rate limits
|
||||
connected = userConnection.connectionState === 'connected';
|
||||
}
|
||||
|
||||
// Determine if this server requires user authentication
|
||||
|
|
|
|||
|
|
@ -59,6 +59,8 @@ export default function MCPPanel() {
|
|||
await Promise.all([
|
||||
queryClient.invalidateQueries([QueryKeys.tools]),
|
||||
queryClient.refetchQueries([QueryKeys.tools]),
|
||||
queryClient.invalidateQueries([QueryKeys.mcpAuthValues]),
|
||||
queryClient.refetchQueries([QueryKeys.mcpAuthValues]),
|
||||
queryClient.invalidateQueries([QueryKeys.mcpConnectionStatus]),
|
||||
queryClient.refetchQueries([QueryKeys.mcpConnectionStatus]),
|
||||
]);
|
||||
|
|
|
|||
|
|
@ -45,13 +45,8 @@ export default function MCPConfigDialog({
|
|||
}: MCPConfigDialogProps) {
|
||||
const localize = useLocalize();
|
||||
|
||||
// Get connection status to determine OAuth requirements with aggressive refresh
|
||||
const { data: statusQuery } = useMCPConnectionStatusQuery({
|
||||
refetchOnMount: true,
|
||||
refetchOnWindowFocus: true,
|
||||
staleTime: 0,
|
||||
cacheTime: 0,
|
||||
});
|
||||
// Get connection status to determine OAuth requirements
|
||||
const { data: statusQuery } = useMCPConnectionStatusQuery();
|
||||
const mcpServerStatuses = statusQuery?.connectionStatus || {};
|
||||
|
||||
// Derive real-time connection status and OAuth requirements
|
||||
|
|
|
|||
|
|
@ -51,9 +51,10 @@ export const useMCPConnectionStatusQuery = <TData = t.TMCPConnectionStatusRespon
|
|||
[QueryKeys.mcpConnectionStatus],
|
||||
() => dataService.getMCPConnectionStatus(),
|
||||
{
|
||||
// refetchOnWindowFocus: false,
|
||||
// refetchOnReconnect: false,
|
||||
// refetchOnMount: true,
|
||||
refetchOnWindowFocus: false, // Stop window focus spam
|
||||
refetchOnReconnect: false, // Stop reconnect spam
|
||||
refetchOnMount: false, // Only fetch when explicitly needed
|
||||
staleTime: 2000, // 2 second cache to prevent excessive calls
|
||||
...config,
|
||||
},
|
||||
);
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ export class MCPConnection extends EventEmitter {
|
|||
private static instance: MCPConnection | null = null;
|
||||
public client: Client;
|
||||
private transport: Transport | null = null; // Make this nullable
|
||||
private connectionState: t.ConnectionState = 'disconnected';
|
||||
public connectionState: t.ConnectionState = 'disconnected';
|
||||
private connectPromise: Promise<void> | null = null;
|
||||
private readonly MAX_RECONNECT_ATTEMPTS = 3;
|
||||
public readonly serverName: string;
|
||||
|
|
@ -70,6 +70,7 @@ export class MCPConnection extends EventEmitter {
|
|||
private oauthTokens?: MCPOAuthTokens | null;
|
||||
private oauthRequired = false;
|
||||
private oauthTimeoutId: NodeJS.Timeout | null = null;
|
||||
private skipOAuthTimeout = false; // Skip OAuth timeouts during discovery
|
||||
iconPath?: string;
|
||||
timeout?: number;
|
||||
url?: string;
|
||||
|
|
@ -79,6 +80,7 @@ export class MCPConnection extends EventEmitter {
|
|||
private readonly options: t.MCPOptions,
|
||||
userId?: string,
|
||||
oauthTokens?: MCPOAuthTokens | null,
|
||||
skipOAuthTimeout = false, // Skip OAuth timeouts during discovery
|
||||
) {
|
||||
super();
|
||||
this.serverName = serverName;
|
||||
|
|
@ -86,6 +88,7 @@ export class MCPConnection extends EventEmitter {
|
|||
this.iconPath = options.iconPath;
|
||||
this.timeout = options.timeout;
|
||||
this.lastPingTime = Date.now();
|
||||
this.skipOAuthTimeout = skipOAuthTimeout;
|
||||
if (oauthTokens) {
|
||||
this.oauthTokens = oauthTokens;
|
||||
}
|
||||
|
|
@ -208,10 +211,7 @@ export class MCPConnection extends EventEmitter {
|
|||
this.emit('connectionChange', 'disconnected');
|
||||
};
|
||||
|
||||
transport.onerror = (error) => {
|
||||
logger.error(`${this.getLogPrefix()} SSE transport error:`, error);
|
||||
this.emitError(error, 'SSE transport error:');
|
||||
};
|
||||
// Error handling is done by setupTransportErrorHandlers
|
||||
|
||||
transport.onmessage = (message) => {
|
||||
logger.info(`${this.getLogPrefix()} Message received: ${JSON.stringify(message)}`);
|
||||
|
|
@ -250,10 +250,7 @@ export class MCPConnection extends EventEmitter {
|
|||
this.emit('connectionChange', 'disconnected');
|
||||
};
|
||||
|
||||
transport.onerror = (error: Error | unknown) => {
|
||||
logger.error(`${this.getLogPrefix()} Streamable-http transport error:`, error);
|
||||
this.emitError(error, 'Streamable-http transport error:');
|
||||
};
|
||||
// Error handling is done by setupTransportErrorHandlers
|
||||
|
||||
transport.onmessage = (message: JSONRPCMessage) => {
|
||||
logger.info(`${this.getLogPrefix()} Message received: ${JSON.stringify(message)}`);
|
||||
|
|
@ -411,6 +408,24 @@ export class MCPConnection extends EventEmitter {
|
|||
const serverUrl = this.url;
|
||||
logger.debug(`${this.getLogPrefix()} Server URL for OAuth: ${serverUrl}`);
|
||||
|
||||
// In startup mode, immediately emit oauthRequired and fail without timeout
|
||||
if (this.skipOAuthTimeout) {
|
||||
logger.info(
|
||||
`${this.getLogPrefix()} Skip OAuth timeout: OAuth required, failing immediately without timeout`,
|
||||
);
|
||||
|
||||
// Emit the event for discovery purposes
|
||||
this.emit('oauthRequired', {
|
||||
serverName: this.serverName,
|
||||
error,
|
||||
serverUrl,
|
||||
userId: this.userId,
|
||||
});
|
||||
|
||||
// Immediately throw to avoid timeout
|
||||
throw error;
|
||||
}
|
||||
|
||||
const oauthTimeout = this.options.initTimeout ?? 60000;
|
||||
/** Promise that will resolve when OAuth is handled */
|
||||
const oauthHandledPromise = new Promise<void>((resolve, reject) => {
|
||||
|
|
@ -545,6 +560,12 @@ export class MCPConnection extends EventEmitter {
|
|||
|
||||
private setupTransportErrorHandlers(transport: Transport): void {
|
||||
transport.onerror = (error) => {
|
||||
// Suppress error logging if we're already disconnected or disconnecting
|
||||
if (this.connectionState === 'disconnected' || this.shouldStopReconnecting) {
|
||||
logger.debug(`${this.getLogPrefix()} Transport error during disconnect (expected):`, error);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.error(`${this.getLogPrefix()} Transport error:`, error);
|
||||
|
||||
// Check if it's an OAuth authentication error
|
||||
|
|
@ -556,6 +577,15 @@ export class MCPConnection extends EventEmitter {
|
|||
}
|
||||
}
|
||||
|
||||
// Check if it's a rate limit error (429) - don't trigger reconnection
|
||||
const errorMessage = error?.toString() || '';
|
||||
if (errorMessage.includes('429') || errorMessage.includes('too many requests')) {
|
||||
logger.warn(
|
||||
`${this.getLogPrefix()} Rate limit error detected, not triggering reconnection`,
|
||||
);
|
||||
return; // Don't emit error state for rate limits
|
||||
}
|
||||
|
||||
this.emit('connectionChange', 'error');
|
||||
};
|
||||
}
|
||||
|
|
@ -593,14 +623,14 @@ export class MCPConnection extends EventEmitter {
|
|||
this.oauthTimeoutId = null;
|
||||
}
|
||||
|
||||
// Set disconnected state early to suppress error logging during cleanup
|
||||
this.connectionState = 'disconnected';
|
||||
|
||||
if (this.transport) {
|
||||
await this.client.close();
|
||||
this.transport = null;
|
||||
}
|
||||
if (this.connectionState === 'disconnected') {
|
||||
return;
|
||||
}
|
||||
this.connectionState = 'disconnected';
|
||||
|
||||
this.emit('connectionChange', 'disconnected');
|
||||
} finally {
|
||||
this.connectPromise = null;
|
||||
|
|
@ -643,6 +673,11 @@ export class MCPConnection extends EventEmitter {
|
|||
return false;
|
||||
}
|
||||
|
||||
// Don't ping if we're disconnecting or should stop reconnecting
|
||||
if (this.shouldStopReconnecting) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
// Try ping first as it's the lightest check
|
||||
await this.client.ping();
|
||||
|
|
@ -656,6 +691,13 @@ export class MCPConnection extends EventEmitter {
|
|||
(error as Error)?.message.includes('method not found'));
|
||||
|
||||
if (!pingUnsupported) {
|
||||
// Check if it's a rate limit error - don't log as error
|
||||
const errorMessage = (error as Error)?.message || '';
|
||||
if (errorMessage.includes('429') || errorMessage.includes('too many requests')) {
|
||||
logger.warn(`${this.getLogPrefix()} Ping rate limited, assuming connected`);
|
||||
return true; // Assume still connected during rate limiting
|
||||
}
|
||||
|
||||
logger.error(`${this.getLogPrefix()} Ping failed:`, error);
|
||||
return false;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -160,6 +160,7 @@ export class MCPManager extends EventEmitter {
|
|||
}): Promise<void> {
|
||||
const processedConfig = processMCPEnv(config);
|
||||
let tokens: MCPOAuthTokens | null = null;
|
||||
|
||||
if (tokenMethods?.findToken) {
|
||||
try {
|
||||
/** Refresh function for app-level connections */
|
||||
|
|
@ -204,10 +205,13 @@ export class MCPManager extends EventEmitter {
|
|||
logger.debug(`[MCP][${serverName}] No existing tokens found`);
|
||||
}
|
||||
}
|
||||
|
||||
if (tokens) {
|
||||
logger.info(`[MCP][${serverName}] Loaded OAuth tokens`);
|
||||
}
|
||||
const connection = new MCPConnection(serverName, processedConfig, undefined, tokens);
|
||||
|
||||
// Create connection in startup mode to prevent OAuth timeouts
|
||||
const connection = new MCPConnection(serverName, processedConfig, undefined, tokens, true);
|
||||
|
||||
// Track OAuth skipped state explicitly
|
||||
let oauthSkipped = false;
|
||||
|
|
@ -347,7 +351,8 @@ export class MCPManager extends EventEmitter {
|
|||
while (attempts < maxAttempts) {
|
||||
try {
|
||||
await connection.connect();
|
||||
if (await connection.isConnected()) {
|
||||
// Use lightweight connection state check instead of ping
|
||||
if (connection.connectionState === 'connected') {
|
||||
return;
|
||||
}
|
||||
throw new Error('Connection attempt succeeded but status is not connected');
|
||||
|
|
@ -493,7 +498,8 @@ export class MCPManager extends EventEmitter {
|
|||
}
|
||||
connection = undefined; // Force creation of a new connection
|
||||
} else if (connection) {
|
||||
if (await connection.isConnected()) {
|
||||
// Use lightweight connection state check instead of ping
|
||||
if (connection.connectionState === 'connected') {
|
||||
logger.debug(`[MCP][User: ${userId}][${serverName}] Reusing active connection`);
|
||||
this.updateUserLastActivity(userId);
|
||||
return connection;
|
||||
|
|
@ -636,7 +642,8 @@ export class MCPManager extends EventEmitter {
|
|||
});
|
||||
await Promise.race([connectionAttempt, connectionTimeout]);
|
||||
|
||||
if (!(await connection?.isConnected())) {
|
||||
// Use lightweight connection state check instead of ping
|
||||
if (connection?.connectionState !== 'connected') {
|
||||
throw new Error('Failed to establish connection after initialization attempt.');
|
||||
}
|
||||
|
||||
|
|
@ -747,7 +754,8 @@ export class MCPManager extends EventEmitter {
|
|||
flowManager: FlowStateManager<MCPOAuthTokens | null>;
|
||||
skipReconnect?: boolean;
|
||||
}): Promise<boolean> {
|
||||
if (await connection.isConnected()) {
|
||||
// Use lightweight connection state check instead of ping
|
||||
if (connection.connectionState === 'connected') {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -775,7 +783,8 @@ export class MCPManager extends EventEmitter {
|
|||
flowManager,
|
||||
});
|
||||
|
||||
if (await connection.isConnected()) {
|
||||
// Use lightweight connection state check instead of ping
|
||||
if (connection.connectionState === 'connected') {
|
||||
logger.info(`[MCP][${serverName}] App-level connection successfully reconnected`);
|
||||
return true;
|
||||
} else {
|
||||
|
|
@ -959,7 +968,8 @@ export class MCPManager extends EventEmitter {
|
|||
}
|
||||
}
|
||||
|
||||
if (!(await connection.isConnected())) {
|
||||
// Use lightweight connection state check instead of ping
|
||||
if (connection.connectionState !== 'connected') {
|
||||
/** May happen if getUserConnection failed silently or app connection dropped */
|
||||
throw new McpError(
|
||||
ErrorCode.InternalError, // Use InternalError for connection issues
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue