LibreChat/packages/api/src/mcp/UserConnectionManager.ts
Atef Bellaaj ad6ba4b6d1
🧬 refactor: Wire Database Methods into MCP Package via Registry Pattern (#10715)
* Refactor: MCPServersRegistry Singleton Pattern with Dependency Injection for DB methods consumption

* refactor: error handling in MCP initialization and improve logging for MCPServersRegistry instance creation.

- Added checks for mongoose instance in ServerConfigsDB constructor and refined error messages for clarity.
- Reorder and use type imports

---------

Co-authored-by: Atef Bellaaj <slalom.bellaaj@external.daimlertruck.com>
Co-authored-by: Danny Avila <danny@librechat.ai>
2025-12-11 16:37:12 -05:00

239 lines
9 KiB
TypeScript

import { logger } from '@librechat/data-schemas';
import { ErrorCode, McpError } from '@modelcontextprotocol/sdk/types.js';
import { MCPConnectionFactory } from '~/mcp/MCPConnectionFactory';
import { MCPServersRegistry } from '~/mcp/registry/MCPServersRegistry';
import { MCPConnection } from './connection';
import type * as t from './types';
import { ConnectionsRepository } from '~/mcp/ConnectionsRepository';
import { mcpConfig } from './mcpConfig';
/**
* Abstract base class for managing user-specific MCP connections with lifecycle management.
* Only meant to be extended by MCPManager.
* Much of the logic was move here from the old MCPManager to make it more manageable.
* User connections will soon be ephemeral and not cached anymore:
* https://github.com/danny-avila/LibreChat/discussions/8790
*/
export abstract class UserConnectionManager {
// Connections shared by all users.
public appConnections: ConnectionsRepository | null = null;
// Connections per userId -> serverName -> connection
protected userConnections: Map<string, Map<string, MCPConnection>> = new Map();
/** Last activity timestamp for users (not per server) */
protected userLastActivity: Map<string, number> = new Map();
/** Updates the last activity timestamp for a user */
protected updateUserLastActivity(userId: string): void {
const now = Date.now();
this.userLastActivity.set(userId, now);
logger.debug(
`[MCP][User: ${userId}] Updated last activity timestamp: ${new Date(now).toISOString()}`,
);
}
/** Gets or creates a connection for a specific user */
public async getUserConnection({
serverName,
forceNew,
user,
flowManager,
customUserVars,
requestBody,
tokenMethods,
oauthStart,
oauthEnd,
signal,
returnOnOAuth = false,
connectionTimeout,
}: {
serverName: string;
forceNew?: boolean;
} & Omit<t.OAuthConnectionOptions, 'useOAuth'>): Promise<MCPConnection> {
const userId = user.id;
if (!userId) {
throw new McpError(ErrorCode.InvalidRequest, `[MCP] User object missing id property`);
}
if (await this.appConnections!.has(serverName)) {
throw new McpError(
ErrorCode.InvalidRequest,
`[MCP][User: ${userId}] Trying to create user-specific connection for app-level server "${serverName}"`,
);
}
const config = await MCPServersRegistry.getInstance().getServerConfig(serverName, userId);
const userServerMap = this.userConnections.get(userId);
let connection = forceNew ? undefined : userServerMap?.get(serverName);
const now = Date.now();
// Check if user is idle
const lastActivity = this.userLastActivity.get(userId);
if (lastActivity && now - lastActivity > mcpConfig.USER_CONNECTION_IDLE_TIMEOUT) {
logger.info(`[MCP][User: ${userId}] User idle for too long. Disconnecting all connections.`);
// Disconnect all user connections
try {
await this.disconnectUserConnections(userId);
} catch (err) {
logger.error(`[MCP][User: ${userId}] Error disconnecting idle connections:`, err);
}
connection = undefined; // Force creation of a new connection
} else if (connection) {
if (!config || (config.lastUpdatedAt && connection.isStale(config.lastUpdatedAt))) {
if (config) {
logger.info(
`[MCP][User: ${userId}][${serverName}] Config was updated, disconnecting stale connection`,
);
}
await this.disconnectUserConnection(userId, serverName);
connection = undefined;
} else if (await connection.isConnected()) {
logger.debug(`[MCP][User: ${userId}][${serverName}] Reusing active connection`);
this.updateUserLastActivity(userId);
return connection;
} else {
// Connection exists but is not connected, attempt to remove potentially stale entry
logger.warn(
`[MCP][User: ${userId}][${serverName}] Found existing but disconnected connection object. Cleaning up.`,
);
this.removeUserConnection(userId, serverName); // Clean up maps
connection = undefined;
}
}
// Now check if config exists for new connection creation
if (!config) {
throw new McpError(
ErrorCode.InvalidRequest,
`[MCP][User: ${userId}] Configuration for server "${serverName}" not found.`,
);
}
// If no valid connection exists, create a new one
logger.info(`[MCP][User: ${userId}][${serverName}] Establishing new connection`);
try {
connection = await MCPConnectionFactory.create(
{
serverName: serverName,
serverConfig: config,
},
{
useOAuth: true,
user: user,
customUserVars: customUserVars,
flowManager: flowManager,
tokenMethods: tokenMethods,
signal: signal,
oauthStart: oauthStart,
oauthEnd: oauthEnd,
returnOnOAuth: returnOnOAuth,
requestBody: requestBody,
connectionTimeout: connectionTimeout,
},
);
if (!(await connection?.isConnected())) {
throw new Error('Failed to establish connection after initialization attempt.');
}
if (!this.userConnections.has(userId)) {
this.userConnections.set(userId, new Map());
}
this.userConnections.get(userId)?.set(serverName, connection);
logger.info(`[MCP][User: ${userId}][${serverName}] Connection successfully established`);
// Update timestamp on creation
this.updateUserLastActivity(userId);
return connection;
} catch (error) {
logger.error(`[MCP][User: ${userId}][${serverName}] Failed to establish connection`, error);
// Ensure partial connection state is cleaned up if initialization fails
await connection?.disconnect().catch((disconnectError) => {
logger.error(
`[MCP][User: ${userId}][${serverName}] Error during cleanup after failed connection`,
disconnectError,
);
});
// Ensure cleanup even if connection attempt fails
this.removeUserConnection(userId, serverName);
throw error; // Re-throw the error to the caller
}
}
/** Returns all connections for a specific user */
public getUserConnections(userId: string) {
return this.userConnections.get(userId);
}
/** Removes a specific user connection entry */
protected removeUserConnection(userId: string, serverName: string): void {
const userMap = this.userConnections.get(userId);
if (userMap) {
userMap.delete(serverName);
if (userMap.size === 0) {
this.userConnections.delete(userId);
// Only remove user activity timestamp if all connections are gone
this.userLastActivity.delete(userId);
}
}
logger.debug(`[MCP][User: ${userId}][${serverName}] Removed connection entry.`);
}
/** Disconnects and removes a specific user connection */
public async disconnectUserConnection(userId: string, serverName: string): Promise<void> {
const userMap = this.userConnections.get(userId);
const connection = userMap?.get(serverName);
if (connection) {
logger.info(`[MCP][User: ${userId}][${serverName}] Disconnecting...`);
await connection.disconnect();
this.removeUserConnection(userId, serverName);
}
}
/** Disconnects and removes all connections for a specific user */
public async disconnectUserConnections(userId: string): Promise<void> {
const userMap = this.userConnections.get(userId);
const disconnectPromises: Promise<void>[] = [];
if (userMap) {
logger.info(`[MCP][User: ${userId}] Disconnecting all servers...`);
const userServers = Array.from(userMap.keys());
for (const serverName of userServers) {
disconnectPromises.push(
this.disconnectUserConnection(userId, serverName).catch((error) => {
logger.error(
`[MCP][User: ${userId}][${serverName}] Error during disconnection:`,
error,
);
}),
);
}
await Promise.allSettled(disconnectPromises);
// Ensure user activity timestamp is removed
this.userLastActivity.delete(userId);
logger.info(`[MCP][User: ${userId}] All connections processed for disconnection.`);
}
}
/** Check for and disconnect idle connections */
protected checkIdleConnections(currentUserId?: string): void {
const now = Date.now();
// Iterate through all users to check for idle ones
for (const [userId, lastActivity] of this.userLastActivity.entries()) {
if (currentUserId && currentUserId === userId) {
continue;
}
if (now - lastActivity > mcpConfig.USER_CONNECTION_IDLE_TIMEOUT) {
logger.info(
`[MCP][User: ${userId}] User idle for too long. Disconnecting all connections...`,
);
// Disconnect all user connections asynchronously (fire and forget)
this.disconnectUserConnections(userId).catch((err) =>
logger.error(`[MCP][User: ${userId}] Error disconnecting idle connections:`, err),
);
}
}
}
}