From 39adeac86ef149ec6a1f1cbd2079fdcffa8ec29f Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Thu, 18 Dec 2025 19:26:00 -0500 Subject: [PATCH] fix(mcp): clean up OAuth flows on abort and simplify flow handling - Add abort handler in reconnectServer to clean up mcp_oauth and mcp_get_tokens flows - Update createAbortHandler to clean up both flow types on tool call abort - Pass abort signal to createFlow in returnOnOAuth path - Simplify handleOAuthRequired to always cancel existing flows and start fresh - This ensures user always gets a new OAuth URL instead of waiting for stale flows --- api/server/services/MCP.js | 88 +++++++++++++------- packages/api/src/mcp/MCPConnectionFactory.ts | 67 +++++---------- 2 files changed, 75 insertions(+), 80 deletions(-) diff --git a/api/server/services/MCP.js b/api/server/services/MCP.js index 3d5c91866d..81d7107de4 100644 --- a/api/server/services/MCP.js +++ b/api/server/services/MCP.js @@ -156,7 +156,9 @@ function createAbortHandler({ userId, serverName, toolName, flowManager }) { return function () { logger.info(`[MCP][User: ${userId}][${serverName}][${toolName}] Tool call aborted`); const flowId = MCPOAuthHandler.generateFlowId(userId, serverName); + // Clean up both mcp_oauth and mcp_get_tokens flows flowManager.failFlow(flowId, 'mcp_oauth', new Error('Tool call aborted')); + flowManager.failFlow(flowId, 'mcp_get_tokens', new Error('Tool call aborted')); }; } @@ -204,38 +206,60 @@ async function reconnectServer({ type: 'tool_call_chunk', }; - const runStepEmitter = createRunStepEmitter({ - res, - index, - runId, - stepId, - toolCall, - streamId, - }); - const runStepDeltaEmitter = createRunStepDeltaEmitter({ - res, - stepId, - toolCall, - streamId, - }); - const callback = createOAuthCallback({ runStepEmitter, runStepDeltaEmitter }); - const oauthStart = createOAuthStart({ - res, - flowId, - callback, - flowManager, - }); - return await reinitMCPServer({ - user, - signal, - serverName, - oauthStart, - flowManager, - userMCPAuthMap, - forceNew: true, - returnOnOAuth: false, - connectionTimeout: Time.TWO_MINUTES, - }); + // Set up abort handler to clean up OAuth flows if request is aborted + const oauthFlowId = MCPOAuthHandler.generateFlowId(user.id, serverName); + const abortHandler = () => { + logger.info( + `[MCP][User: ${user.id}][${serverName}] Tool loading aborted, cleaning up OAuth flows`, + ); + // Clean up both mcp_oauth and mcp_get_tokens flows + flowManager.failFlow(oauthFlowId, 'mcp_oauth', new Error('Tool loading aborted')); + flowManager.failFlow(oauthFlowId, 'mcp_get_tokens', new Error('Tool loading aborted')); + }; + + if (signal) { + signal.addEventListener('abort', abortHandler, { once: true }); + } + + try { + const runStepEmitter = createRunStepEmitter({ + res, + index, + runId, + stepId, + toolCall, + streamId, + }); + const runStepDeltaEmitter = createRunStepDeltaEmitter({ + res, + stepId, + toolCall, + streamId, + }); + const callback = createOAuthCallback({ runStepEmitter, runStepDeltaEmitter }); + const oauthStart = createOAuthStart({ + res, + flowId, + callback, + flowManager, + }); + return await reinitMCPServer({ + user, + signal, + serverName, + oauthStart, + flowManager, + userMCPAuthMap, + forceNew: true, + returnOnOAuth: false, + connectionTimeout: Time.TWO_MINUTES, + }); + } finally { + // Clean up abort handler to prevent memory leaks + if (signal) { + signal.removeEventListener('abort', abortHandler); + } + } } /** diff --git a/packages/api/src/mcp/MCPConnectionFactory.ts b/packages/api/src/mcp/MCPConnectionFactory.ts index c2f3a114bd..1a97755ec3 100644 --- a/packages/api/src/mcp/MCPConnectionFactory.ts +++ b/packages/api/src/mcp/MCPConnectionFactory.ts @@ -1,7 +1,7 @@ import { logger } from '@librechat/data-schemas'; import type { OAuthClientInformation } from '@modelcontextprotocol/sdk/shared/auth.js'; import type { TokenMethods } from '@librechat/data-schemas'; -import type { MCPOAuthTokens, MCPOAuthFlowMetadata, OAuthMetadata } from '~/mcp/oauth'; +import type { MCPOAuthTokens, OAuthMetadata } from '~/mcp/oauth'; import type { FlowStateManager } from '~/flow/manager'; import type { FlowMetadata } from '~/flow/types'; import type * as t from './types'; @@ -173,9 +173,10 @@ export class MCPConnectionFactory { // Create the flow state so the OAuth callback can find it // We spawn this in the background without waiting for it - this.flowManager!.createFlow(flowId, 'mcp_oauth', flowMetadata).catch(() => { + // Pass signal so the flow can be aborted if the request is cancelled + this.flowManager!.createFlow(flowId, 'mcp_oauth', flowMetadata, this.signal).catch(() => { // The OAuth callback will resolve this flow, so we expect it to timeout here - // which is fine - we just need the flow state to exist + // or it will be aborted if the request is cancelled - both are fine }); if (this.oauthStart) { @@ -354,56 +355,26 @@ export class MCPConnectionFactory { /** Check if there's already an ongoing OAuth flow for this flowId */ const existingFlow = await this.flowManager.getFlowState(flowId, 'mcp_oauth'); - if (existingFlow && existingFlow.status === 'PENDING') { + // If any flow exists (PENDING, COMPLETED, FAILED), cancel it and start fresh + // This ensures the user always gets a new OAuth URL instead of waiting for stale flows + if (existingFlow) { logger.debug( - `${this.logPrefix} OAuth flow already exists for ${flowId}, waiting for completion`, + `${this.logPrefix} Found existing OAuth flow (status: ${existingFlow.status}), cancelling to start fresh`, ); - /** Tokens from existing flow to complete */ - const tokens = await this.flowManager.createFlow(flowId, 'mcp_oauth'); - if (typeof this.oauthEnd === 'function') { - await this.oauthEnd(); - } - logger.info( - `${this.logPrefix} OAuth flow completed, tokens received for ${this.serverName}`, - ); - - /** Client information from the existing flow metadata */ - const existingMetadata = existingFlow.metadata as unknown as MCPOAuthFlowMetadata; - const clientInfo = existingMetadata?.clientInfo; - - return { tokens, clientInfo }; - } - - // Clean up old completed/failed flows, but only if they're actually stale - // This prevents race conditions where we delete a flow that's still being processed - if (existingFlow && existingFlow.status !== 'PENDING') { - const STALE_FLOW_THRESHOLD = 2 * 60 * 1000; // 2 minutes - const { isStale, age, status } = await this.flowManager.isFlowStale( - flowId, - 'mcp_oauth', - STALE_FLOW_THRESHOLD, - ); - - if (isStale) { - try { + try { + if (existingFlow.status === 'PENDING') { + await this.flowManager.failFlow( + flowId, + 'mcp_oauth', + new Error('Cancelled for new OAuth request'), + ); + } else { await this.flowManager.deleteFlow(flowId, 'mcp_oauth'); - logger.debug( - `${this.logPrefix} Cleared stale ${status} OAuth flow (age: ${Math.round(age / 1000)}s)`, - ); - } catch (error) { - logger.warn(`${this.logPrefix} Failed to clear stale OAuth flow`, error); - } - } else { - logger.debug( - `${this.logPrefix} Skipping cleanup of recent ${status} flow (age: ${Math.round(age / 1000)}s, threshold: ${STALE_FLOW_THRESHOLD / 1000}s)`, - ); - // If flow is recent but not pending, something might be wrong - if (status === 'FAILED') { - logger.warn( - `${this.logPrefix} Recent OAuth flow failed, will retry after ${Math.round((STALE_FLOW_THRESHOLD - age) / 1000)}s`, - ); } + } catch (error) { + logger.warn(`${this.logPrefix} Failed to cancel existing OAuth flow`, error); } + // Continue to start a new flow below } logger.debug(`${this.logPrefix} Initiating new OAuth flow for ${this.serverName}...`);