mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-18 01:10:14 +01:00
♻️ refactor: MCPManager for Scalability, Fix App-Level Detection, Add Lazy Connections (#8930)
* feat: MCP Connection management overhaul - Making MCPManager manageable Refactor the monolithic MCPManager into focused, single-responsibility classes: • MCPServersRegistry: Server configuration discovery and metadata management • UserConnectionManager: Manages user-level connections • ConnectionsRepository: Low-level connection pool with lazy loading • MCPConnectionFactory: Handles MCP connection creation with OAuth support New Features: • Lazy loading of app-level connections for horizontal scaling • Automatic reconnection for app-level connections • Enhanced OAuth detection with explicit requiresOAuth flag • Centralized MCP configuration management Bug Fixes: • App-level connection detection in MCPManager.callTool • MCP Connection Reinitialization route behavior Optimizations: • MCPConnection.isConnected() caching to reduce overhead • Concurrent server metadata retrieval instead of sequential This refactoring addresses scalability bottlenecks and improves reliability while maintaining backward compatibility with existing configurations. * feat: Enabled import order in eslint. * # Moved tests to __tests__ folder # added tests for MCPServersRegistry.ts * # Add unit tests for ConnectionsRepository functionality * # Add unit tests for MCPConnectionFactory functionality * # Reorganize MCP connection tests and improve error handling * # reordering imports * # Update testPathIgnorePatterns in jest.config.mjs to exclude development TypeScript files * # removed mcp/manager.ts
This commit is contained in:
parent
9dbf153489
commit
8780a78165
32 changed files with 2571 additions and 1468 deletions
384
packages/api/src/mcp/MCPConnectionFactory.ts
Normal file
384
packages/api/src/mcp/MCPConnectionFactory.ts
Normal file
|
|
@ -0,0 +1,384 @@
|
|||
import { logger } from '@librechat/data-schemas';
|
||||
import type { OAuthClientInformation } from '@modelcontextprotocol/sdk/shared/auth.js';
|
||||
import type { TokenMethods } from '@librechat/data-schemas';
|
||||
import type { TUser } from 'librechat-data-provider';
|
||||
import type { MCPOAuthTokens, MCPOAuthFlowMetadata } from '~/mcp/oauth';
|
||||
import type { FlowStateManager } from '~/flow/manager';
|
||||
import type { FlowMetadata } from '~/flow/types';
|
||||
import { MCPTokenStorage, MCPOAuthHandler } from '~/mcp/oauth';
|
||||
import { MCPConnection } from './connection';
|
||||
import { processMCPEnv } from '~/utils';
|
||||
import type * as t from './types';
|
||||
|
||||
export interface BasicConnectionOptions {
|
||||
serverName: string;
|
||||
serverConfig: t.MCPOptions;
|
||||
}
|
||||
|
||||
export interface OAuthConnectionOptions {
|
||||
useOAuth: true;
|
||||
user: TUser;
|
||||
customUserVars?: Record<string, string>;
|
||||
flowManager: FlowStateManager<MCPOAuthTokens | null>;
|
||||
tokenMethods?: TokenMethods;
|
||||
signal?: AbortSignal;
|
||||
oauthStart?: (authURL: string) => Promise<void>;
|
||||
oauthEnd?: () => Promise<void>;
|
||||
returnOnOAuth?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory for creating MCP connections with optional OAuth authentication.
|
||||
* Handles OAuth flows, token management, and connection retry logic.
|
||||
* NOTE: Much of the OAuth logic was extracted from the old MCPManager class as is.
|
||||
*/
|
||||
export class MCPConnectionFactory {
|
||||
protected readonly serverName: string;
|
||||
protected readonly serverConfig: t.MCPOptions;
|
||||
protected readonly logPrefix: string;
|
||||
protected readonly useOAuth: boolean;
|
||||
|
||||
// OAuth-related properties (only set when useOAuth is true)
|
||||
protected readonly userId?: string;
|
||||
protected readonly flowManager?: FlowStateManager<MCPOAuthTokens | null>;
|
||||
protected readonly tokenMethods?: TokenMethods;
|
||||
protected readonly signal?: AbortSignal;
|
||||
protected readonly oauthStart?: (authURL: string) => Promise<void>;
|
||||
protected readonly oauthEnd?: () => Promise<void>;
|
||||
protected readonly returnOnOAuth?: boolean;
|
||||
|
||||
/** Creates a new MCP connection with optional OAuth support */
|
||||
static async create(
|
||||
basic: BasicConnectionOptions,
|
||||
oauth?: OAuthConnectionOptions,
|
||||
): Promise<MCPConnection> {
|
||||
const factory = new this(basic, oauth);
|
||||
return factory.createConnection();
|
||||
}
|
||||
|
||||
protected constructor(basic: BasicConnectionOptions, oauth?: OAuthConnectionOptions) {
|
||||
this.serverConfig = processMCPEnv(basic.serverConfig, oauth?.user, oauth?.customUserVars);
|
||||
this.serverName = basic.serverName;
|
||||
this.useOAuth = !!oauth?.useOAuth;
|
||||
this.logPrefix = oauth?.user
|
||||
? `[MCP][${basic.serverName}][${oauth.user.id}]`
|
||||
: `[MCP][${basic.serverName}]`;
|
||||
|
||||
if (oauth?.useOAuth) {
|
||||
this.userId = oauth.user.id;
|
||||
this.flowManager = oauth.flowManager;
|
||||
this.tokenMethods = oauth.tokenMethods;
|
||||
this.signal = oauth.signal;
|
||||
this.oauthStart = oauth.oauthStart;
|
||||
this.oauthEnd = oauth.oauthEnd;
|
||||
this.returnOnOAuth = oauth.returnOnOAuth;
|
||||
}
|
||||
}
|
||||
|
||||
/** Creates the base MCP connection with OAuth tokens */
|
||||
protected async createConnection(): Promise<MCPConnection> {
|
||||
const oauthTokens = this.useOAuth ? await this.getOAuthTokens() : null;
|
||||
const connection = new MCPConnection({
|
||||
serverName: this.serverName,
|
||||
serverConfig: this.serverConfig,
|
||||
userId: this.userId,
|
||||
oauthTokens,
|
||||
});
|
||||
|
||||
if (this.useOAuth) this.handleOAuthEvents(connection);
|
||||
await this.attemptToConnect(connection);
|
||||
return connection;
|
||||
}
|
||||
|
||||
/** Retrieves existing OAuth tokens from storage or returns null */
|
||||
protected async getOAuthTokens(): Promise<MCPOAuthTokens | null> {
|
||||
if (!this.tokenMethods?.findToken) return null;
|
||||
|
||||
try {
|
||||
const tokens = await this.flowManager!.createFlowWithHandler(
|
||||
`tokens:${this.userId}:${this.serverName}`,
|
||||
'mcp_get_tokens',
|
||||
async () => {
|
||||
return await MCPTokenStorage.getTokens({
|
||||
userId: this.userId!,
|
||||
serverName: this.serverName,
|
||||
findToken: this.tokenMethods!.findToken!,
|
||||
createToken: this.tokenMethods!.createToken,
|
||||
updateToken: this.tokenMethods!.updateToken,
|
||||
refreshTokens: this.createRefreshTokensFunction(),
|
||||
});
|
||||
},
|
||||
this.signal,
|
||||
);
|
||||
|
||||
if (tokens) logger.info(`${this.logPrefix} Loaded OAuth tokens`);
|
||||
return tokens;
|
||||
} catch (error) {
|
||||
logger.debug(`${this.logPrefix} No existing tokens found or error loading tokens`, error);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/** Creates a function to refresh OAuth tokens when they expire */
|
||||
protected createRefreshTokensFunction(): (
|
||||
refreshToken: string,
|
||||
metadata: {
|
||||
userId: string;
|
||||
serverName: string;
|
||||
identifier: string;
|
||||
clientInfo?: OAuthClientInformation;
|
||||
},
|
||||
) => Promise<MCPOAuthTokens> {
|
||||
return async (refreshToken, metadata) => {
|
||||
return await MCPOAuthHandler.refreshOAuthTokens(
|
||||
refreshToken,
|
||||
{
|
||||
serverUrl: (this.serverConfig as t.SSEOptions | t.StreamableHTTPOptions).url,
|
||||
serverName: metadata.serverName,
|
||||
clientInfo: metadata.clientInfo,
|
||||
},
|
||||
this.serverConfig.oauth,
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
/** Sets up OAuth event handlers for the connection */
|
||||
protected handleOAuthEvents(connection: MCPConnection): void {
|
||||
connection.on('oauthRequired', async (data) => {
|
||||
logger.info(`${this.logPrefix} oauthRequired event received`);
|
||||
|
||||
// If we just want to initiate OAuth and return, handle it differently
|
||||
if (this.returnOnOAuth) {
|
||||
try {
|
||||
const config = this.serverConfig;
|
||||
const { authorizationUrl, flowId, flowMetadata } =
|
||||
await MCPOAuthHandler.initiateOAuthFlow(
|
||||
this.serverName,
|
||||
data.serverUrl || '',
|
||||
this.userId!,
|
||||
config?.oauth,
|
||||
);
|
||||
|
||||
// Create the flow state so the OAuth callback can find it
|
||||
// We spawn this in the background without waiting for it
|
||||
this.flowManager!.createFlow(flowId, 'mcp_oauth', flowMetadata).catch(() => {
|
||||
// The OAuth callback will resolve this flow, so we expect it to timeout here
|
||||
// which is fine - we just need the flow state to exist
|
||||
});
|
||||
|
||||
if (this.oauthStart) {
|
||||
logger.info(`${this.logPrefix} OAuth flow started, issuing authorization URL`);
|
||||
await this.oauthStart(authorizationUrl);
|
||||
}
|
||||
|
||||
// Emit oauthFailed to signal that connection should not proceed
|
||||
// but OAuth was successfully initiated
|
||||
connection.emit('oauthFailed', new Error('OAuth flow initiated - return early'));
|
||||
return;
|
||||
} catch (error) {
|
||||
logger.error(`${this.logPrefix} Failed to initiate OAuth flow`, error);
|
||||
connection.emit('oauthFailed', new Error('OAuth initiation failed'));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Normal OAuth handling - wait for completion
|
||||
const result = await this.handleOAuthRequired();
|
||||
|
||||
if (result?.tokens && this.tokenMethods?.createToken) {
|
||||
try {
|
||||
connection.setOAuthTokens(result.tokens);
|
||||
await MCPTokenStorage.storeTokens({
|
||||
userId: this.userId!,
|
||||
serverName: this.serverName,
|
||||
tokens: result.tokens,
|
||||
createToken: this.tokenMethods.createToken,
|
||||
updateToken: this.tokenMethods.updateToken,
|
||||
findToken: this.tokenMethods.findToken,
|
||||
clientInfo: result.clientInfo,
|
||||
});
|
||||
logger.info(`${this.logPrefix} OAuth tokens saved to storage`);
|
||||
} catch (error) {
|
||||
logger.error(`${this.logPrefix} Failed to save OAuth tokens to storage`, error);
|
||||
}
|
||||
}
|
||||
|
||||
// Only emit oauthHandled if we actually got tokens (OAuth succeeded)
|
||||
if (result?.tokens) {
|
||||
connection.emit('oauthHandled');
|
||||
} else {
|
||||
// OAuth failed, emit oauthFailed to properly reject the promise
|
||||
logger.warn(`${this.logPrefix} OAuth failed, emitting oauthFailed event`);
|
||||
connection.emit('oauthFailed', new Error('OAuth authentication failed'));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/** Attempts to establish connection with timeout handling */
|
||||
protected async attemptToConnect(connection: MCPConnection): Promise<void> {
|
||||
const connectTimeout = this.serverConfig.initTimeout ?? 30000;
|
||||
const connectionTimeout = new Promise<void>((_, reject) =>
|
||||
setTimeout(
|
||||
() => reject(new Error(`Connection timeout after ${connectTimeout}ms`)),
|
||||
connectTimeout,
|
||||
),
|
||||
);
|
||||
const connectionAttempt = this.connectTo(connection);
|
||||
await Promise.race([connectionAttempt, connectionTimeout]);
|
||||
|
||||
if (await connection.isConnected()) return;
|
||||
logger.error(`${this.logPrefix} Failed to establish connection.`);
|
||||
}
|
||||
|
||||
// Handles connection attempts with retry logic and OAuth error handling
|
||||
private async connectTo(connection: MCPConnection): Promise<void> {
|
||||
const maxAttempts = 3;
|
||||
let attempts = 0;
|
||||
let oauthHandled = false;
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
try {
|
||||
await connection.connect();
|
||||
if (await connection.isConnected()) {
|
||||
return;
|
||||
}
|
||||
throw new Error('Connection attempt succeeded but status is not connected');
|
||||
} catch (error) {
|
||||
attempts++;
|
||||
|
||||
if (this.useOAuth && this.isOAuthError(error)) {
|
||||
// Only handle OAuth if this is a user connection (has oauthStart handler)
|
||||
if (this.oauthStart && !oauthHandled) {
|
||||
const errorWithFlag = error as (Error & { isOAuthError?: boolean }) | undefined;
|
||||
if (errorWithFlag?.isOAuthError) {
|
||||
oauthHandled = true;
|
||||
logger.info(`${this.logPrefix} Handling OAuth`);
|
||||
await this.handleOAuthRequired();
|
||||
}
|
||||
}
|
||||
// Don't retry on OAuth errors - just throw
|
||||
logger.info(`${this.logPrefix} OAuth required, stopping connection attempts`);
|
||||
throw error;
|
||||
}
|
||||
|
||||
if (attempts === maxAttempts) {
|
||||
logger.error(`${this.logPrefix} Failed to connect after ${maxAttempts} attempts`, error);
|
||||
throw error;
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, 2000 * attempts));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Determines if an error indicates OAuth authentication is required
|
||||
private isOAuthError(error: unknown): boolean {
|
||||
if (!error || typeof error !== 'object') {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check for SSE error with 401 status
|
||||
if ('message' in error && typeof error.message === 'string') {
|
||||
return error.message.includes('401') || error.message.includes('Non-200 status code (401)');
|
||||
}
|
||||
|
||||
// Check for error code
|
||||
if ('code' in error) {
|
||||
const code = (error as { code?: number }).code;
|
||||
return code === 401 || code === 403;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Manages OAuth flow initiation and completion */
|
||||
protected async handleOAuthRequired(): Promise<{
|
||||
tokens: MCPOAuthTokens | null;
|
||||
clientInfo?: OAuthClientInformation;
|
||||
} | null> {
|
||||
const serverUrl = (this.serverConfig as t.SSEOptions | t.StreamableHTTPOptions).url;
|
||||
logger.debug(`${this.logPrefix} \`handleOAuthRequired\` called with serverUrl: ${serverUrl}`);
|
||||
|
||||
if (!this.flowManager || !serverUrl) {
|
||||
logger.error(
|
||||
`${this.logPrefix} OAuth required but flow manager not available or server URL missing for ${this.serverName}`,
|
||||
);
|
||||
logger.warn(`${this.logPrefix} Please configure OAuth credentials for ${this.serverName}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
logger.debug(`${this.logPrefix} Checking for existing OAuth flow for ${this.serverName}...`);
|
||||
|
||||
/** Flow ID to check if a flow already exists */
|
||||
const flowId = MCPOAuthHandler.generateFlowId(this.userId!, this.serverName);
|
||||
|
||||
/** Check if there's already an ongoing OAuth flow for this flowId */
|
||||
const existingFlow = await this.flowManager.getFlowState(flowId, 'mcp_oauth');
|
||||
if (existingFlow && existingFlow.status === 'PENDING') {
|
||||
logger.debug(
|
||||
`${this.logPrefix} OAuth flow already exists for ${flowId}, waiting for completion`,
|
||||
);
|
||||
/** Tokens from existing flow to complete */
|
||||
const tokens = await this.flowManager.createFlow(flowId, 'mcp_oauth');
|
||||
if (typeof this.oauthEnd === 'function') {
|
||||
await this.oauthEnd();
|
||||
}
|
||||
logger.info(
|
||||
`${this.logPrefix} OAuth flow completed, tokens received for ${this.serverName}`,
|
||||
);
|
||||
|
||||
/** Client information from the existing flow metadata */
|
||||
const existingMetadata = existingFlow.metadata as unknown as MCPOAuthFlowMetadata;
|
||||
const clientInfo = existingMetadata?.clientInfo;
|
||||
|
||||
return { tokens, clientInfo };
|
||||
}
|
||||
|
||||
logger.debug(`${this.logPrefix} Initiating new OAuth flow for ${this.serverName}...`);
|
||||
const {
|
||||
authorizationUrl,
|
||||
flowId: newFlowId,
|
||||
flowMetadata,
|
||||
} = await MCPOAuthHandler.initiateOAuthFlow(
|
||||
this.serverName,
|
||||
serverUrl,
|
||||
this.userId!,
|
||||
this.serverConfig.oauth,
|
||||
);
|
||||
|
||||
if (typeof this.oauthStart === 'function') {
|
||||
logger.info(`${this.logPrefix} OAuth flow started, issued authorization URL to user`);
|
||||
await this.oauthStart(authorizationUrl);
|
||||
} else {
|
||||
logger.info(`
|
||||
═══════════════════════════════════════════════════════════════════════
|
||||
Please visit the following URL to authenticate:
|
||||
|
||||
${authorizationUrl}
|
||||
|
||||
${this.logPrefix} Flow ID: ${newFlowId}
|
||||
═══════════════════════════════════════════════════════════════════════
|
||||
`);
|
||||
}
|
||||
|
||||
/** Tokens from the new flow */
|
||||
const tokens = await this.flowManager.createFlow(
|
||||
newFlowId,
|
||||
'mcp_oauth',
|
||||
flowMetadata as FlowMetadata,
|
||||
);
|
||||
if (typeof this.oauthEnd === 'function') {
|
||||
await this.oauthEnd();
|
||||
}
|
||||
logger.info(`${this.logPrefix} OAuth flow completed, tokens received for ${this.serverName}`);
|
||||
|
||||
/** Client information from the flow metadata */
|
||||
const clientInfo = flowMetadata?.clientInfo;
|
||||
|
||||
return { tokens, clientInfo };
|
||||
} catch (error) {
|
||||
logger.error(`${this.logPrefix} Failed to complete OAuth flow for ${this.serverName}`, error);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue