LibreChat/packages/api/src/mcp/UserConnectionManager.ts
Danny Avila d13037881a
🔐 fix: MCP OAuth Tool Discovery and Event Emission (#11599)
* fix: MCP OAuth tool discovery and event emission in event-driven mode

- Add discoverServerTools method to MCPManager for tool discovery when OAuth is required
- Fix OAuth event emission to send both ON_RUN_STEP and ON_RUN_STEP_DELTA events
- Fix hasSubscriber flag reset in GenerationJobManager for proper event buffering
- Add ToolDiscoveryOptions and ToolDiscoveryResult types
- Update reinitMCPServer to use new discovery method and propagate OAuth URLs

* refactor: Update ToolService and MCP modules for improved functionality

- Reintroduced Constants in ToolService for better reference management.
- Enhanced loadToolDefinitionsWrapper to handle both response and streamId scenarios.
- Updated MCP module to correct type definitions for oauthStart parameter.
- Improved MCPConnectionFactory to ensure proper disconnection handling during tool discovery.
- Adjusted tests to reflect changes in mock implementations and ensure accurate behavior during OAuth handling.

* fix: Refine OAuth handling in MCPConnectionFactory and related tests

- Updated the OAuth URL assignment logic in reinitMCPServer to prevent overwriting existing URLs.
- Enhanced error logging to provide clearer messages when tool discovery fails.
- Adjusted tests to reflect changes in OAuth handling, ensuring accurate detection of OAuth requirements without generating URLs in discovery mode.

* refactor: Clean up OAuth URL assignment in reinitMCPServer

- Removed redundant OAuth URL assignment logic in the reinitMCPServer function to streamline the tool discovery process.
- Enhanced error logging for tool discovery failures, improving clarity in debugging and monitoring.

* fix: Update response handling in ToolService for event-driven mode

- Changed the condition in loadToolDefinitionsWrapper to check for writableEnded instead of headersSent, ensuring proper event emission when the response is still writable.
- This adjustment enhances the reliability of event handling during tool execution, particularly in streaming scenarios.
2026-02-01 19:37:04 -05:00

239 lines
9 KiB
TypeScript

import { logger } from '@librechat/data-schemas';
import { ErrorCode, McpError } from '@modelcontextprotocol/sdk/types.js';
import { MCPConnectionFactory } from '~/mcp/MCPConnectionFactory';
import { MCPServersRegistry } from '~/mcp/registry/MCPServersRegistry';
import { MCPConnection } from './connection';
import type * as t from './types';
import { ConnectionsRepository } from '~/mcp/ConnectionsRepository';
import { mcpConfig } from './mcpConfig';
/**
* Abstract base class for managing user-specific MCP connections with lifecycle management.
* Only meant to be extended by MCPManager.
* Much of the logic was move here from the old MCPManager to make it more manageable.
* User connections will soon be ephemeral and not cached anymore:
* https://github.com/danny-avila/LibreChat/discussions/8790
*/
export abstract class UserConnectionManager {
// Connections shared by all users.
public appConnections: ConnectionsRepository | null = null;
// Connections per userId -> serverName -> connection
protected userConnections: Map<string, Map<string, MCPConnection>> = new Map();
/** Last activity timestamp for users (not per server) */
protected userLastActivity: Map<string, number> = new Map();
/** Updates the last activity timestamp for a user */
protected updateUserLastActivity(userId: string): void {
const now = Date.now();
this.userLastActivity.set(userId, now);
logger.debug(
`[MCP][User: ${userId}] Updated last activity timestamp: ${new Date(now).toISOString()}`,
);
}
/** Gets or creates a connection for a specific user */
public async getUserConnection({
serverName,
forceNew,
user,
flowManager,
customUserVars,
requestBody,
tokenMethods,
oauthStart,
oauthEnd,
signal,
returnOnOAuth = false,
connectionTimeout,
}: {
serverName: string;
forceNew?: boolean;
} & Omit<t.OAuthConnectionOptions, 'useOAuth'>): Promise<MCPConnection> {
const userId = user?.id;
if (!userId) {
throw new McpError(ErrorCode.InvalidRequest, `[MCP] User object missing id property`);
}
if (await this.appConnections!.has(serverName)) {
throw new McpError(
ErrorCode.InvalidRequest,
`[MCP][User: ${userId}] Trying to create user-specific connection for app-level server "${serverName}"`,
);
}
const config = await MCPServersRegistry.getInstance().getServerConfig(serverName, userId);
const userServerMap = this.userConnections.get(userId);
let connection = forceNew ? undefined : userServerMap?.get(serverName);
const now = Date.now();
// Check if user is idle
const lastActivity = this.userLastActivity.get(userId);
if (lastActivity && now - lastActivity > mcpConfig.USER_CONNECTION_IDLE_TIMEOUT) {
logger.info(`[MCP][User: ${userId}] User idle for too long. Disconnecting all connections.`);
// Disconnect all user connections
try {
await this.disconnectUserConnections(userId);
} catch (err) {
logger.error(`[MCP][User: ${userId}] Error disconnecting idle connections:`, err);
}
connection = undefined; // Force creation of a new connection
} else if (connection) {
if (!config || (config.updatedAt && connection.isStale(config.updatedAt))) {
if (config) {
logger.info(
`[MCP][User: ${userId}][${serverName}] Config was updated, disconnecting stale connection`,
);
}
await this.disconnectUserConnection(userId, serverName);
connection = undefined;
} else if (await connection.isConnected()) {
logger.debug(`[MCP][User: ${userId}][${serverName}] Reusing active connection`);
this.updateUserLastActivity(userId);
return connection;
} else {
// Connection exists but is not connected, attempt to remove potentially stale entry
logger.warn(
`[MCP][User: ${userId}][${serverName}] Found existing but disconnected connection object. Cleaning up.`,
);
this.removeUserConnection(userId, serverName); // Clean up maps
connection = undefined;
}
}
// Now check if config exists for new connection creation
if (!config) {
throw new McpError(
ErrorCode.InvalidRequest,
`[MCP][User: ${userId}] Configuration for server "${serverName}" not found.`,
);
}
// If no valid connection exists, create a new one
logger.info(`[MCP][User: ${userId}][${serverName}] Establishing new connection`);
try {
connection = await MCPConnectionFactory.create(
{
serverName: serverName,
serverConfig: config,
},
{
useOAuth: true,
user: user,
customUserVars: customUserVars,
flowManager: flowManager,
tokenMethods: tokenMethods,
signal: signal,
oauthStart: oauthStart,
oauthEnd: oauthEnd,
returnOnOAuth: returnOnOAuth,
requestBody: requestBody,
connectionTimeout: connectionTimeout,
},
);
if (!(await connection?.isConnected())) {
throw new Error('Failed to establish connection after initialization attempt.');
}
if (!this.userConnections.has(userId)) {
this.userConnections.set(userId, new Map());
}
this.userConnections.get(userId)?.set(serverName, connection);
logger.info(`[MCP][User: ${userId}][${serverName}] Connection successfully established`);
// Update timestamp on creation
this.updateUserLastActivity(userId);
return connection;
} catch (error) {
logger.error(`[MCP][User: ${userId}][${serverName}] Failed to establish connection`, error);
// Ensure partial connection state is cleaned up if initialization fails
await connection?.disconnect().catch((disconnectError) => {
logger.error(
`[MCP][User: ${userId}][${serverName}] Error during cleanup after failed connection`,
disconnectError,
);
});
// Ensure cleanup even if connection attempt fails
this.removeUserConnection(userId, serverName);
throw error; // Re-throw the error to the caller
}
}
/** Returns all connections for a specific user */
public getUserConnections(userId: string) {
return this.userConnections.get(userId);
}
/** Removes a specific user connection entry */
protected removeUserConnection(userId: string, serverName: string): void {
const userMap = this.userConnections.get(userId);
if (userMap) {
userMap.delete(serverName);
if (userMap.size === 0) {
this.userConnections.delete(userId);
// Only remove user activity timestamp if all connections are gone
this.userLastActivity.delete(userId);
}
}
logger.debug(`[MCP][User: ${userId}][${serverName}] Removed connection entry.`);
}
/** Disconnects and removes a specific user connection */
public async disconnectUserConnection(userId: string, serverName: string): Promise<void> {
const userMap = this.userConnections.get(userId);
const connection = userMap?.get(serverName);
if (connection) {
logger.info(`[MCP][User: ${userId}][${serverName}] Disconnecting...`);
await connection.disconnect();
this.removeUserConnection(userId, serverName);
}
}
/** Disconnects and removes all connections for a specific user */
public async disconnectUserConnections(userId: string): Promise<void> {
const userMap = this.userConnections.get(userId);
const disconnectPromises: Promise<void>[] = [];
if (userMap) {
logger.info(`[MCP][User: ${userId}] Disconnecting all servers...`);
const userServers = Array.from(userMap.keys());
for (const serverName of userServers) {
disconnectPromises.push(
this.disconnectUserConnection(userId, serverName).catch((error) => {
logger.error(
`[MCP][User: ${userId}][${serverName}] Error during disconnection:`,
error,
);
}),
);
}
await Promise.allSettled(disconnectPromises);
// Ensure user activity timestamp is removed
this.userLastActivity.delete(userId);
logger.info(`[MCP][User: ${userId}] All connections processed for disconnection.`);
}
}
/** Check for and disconnect idle connections */
protected checkIdleConnections(currentUserId?: string): void {
const now = Date.now();
// Iterate through all users to check for idle ones
for (const [userId, lastActivity] of this.userLastActivity.entries()) {
if (currentUserId && currentUserId === userId) {
continue;
}
if (now - lastActivity > mcpConfig.USER_CONNECTION_IDLE_TIMEOUT) {
logger.info(
`[MCP][User: ${userId}] User idle for too long. Disconnecting all connections...`,
);
// Disconnect all user connections asynchronously (fire and forget)
this.disconnectUserConnections(userId).catch((err) =>
logger.error(`[MCP][User: ${userId}] Error disconnecting idle connections:`, err),
);
}
}
}
}