LibreChat/packages/api/src/mcp/UserConnectionManager.ts
Theo N. Truong ce7e6edad8
Some checks failed
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Has been cancelled
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Has been cancelled
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Has been cancelled
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Has been cancelled
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Has been cancelled
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Has been cancelled
🔄 refactor: MCP Registry System with Distributed Caching (#10191)
* refactor: Restructure MCP registry system with caching

- Split MCPServersRegistry into modular components:
  - MCPServerInspector: handles server inspection and health checks
  - MCPServersInitializer: manages server initialization logic
  - MCPServersRegistry: simplified registry coordination
- Add distributed caching layer:
  - ServerConfigsCacheRedis: Redis-backed configuration cache
  - ServerConfigsCacheInMemory: in-memory fallback cache
  - RegistryStatusCache: distributed leader election state
- Add promise utilities (withTimeout) replacing Promise.race patterns
- Add comprehensive cache integration tests for all cache implementations
- Remove unused MCPManager.getAllToolFunctions method

* fix: Update OAuth flow to include user-specific headers

* chore: Update Jest configuration to ignore additional test files

- Added patterns to ignore files ending with .helper.ts and .helper.d.ts in testPathIgnorePatterns for cleaner test runs.

* fix: oauth headers in callback

* chore: Update Jest testPathIgnorePatterns to exclude helper files

- Modified testPathIgnorePatterns in package.json to ignore files ending with .helper.ts and .helper.d.ts for cleaner test execution.

* ci: update test mocks

---------

Co-authored-by: Danny Avila <danny@librechat.ai>
2025-10-31 15:00:21 -04:00

231 lines
8.6 KiB
TypeScript

import { logger } from '@librechat/data-schemas';
import { ErrorCode, McpError } from '@modelcontextprotocol/sdk/types.js';
import { MCPConnectionFactory } from '~/mcp/MCPConnectionFactory';
import { mcpServersRegistry as serversRegistry } from '~/mcp/registry/MCPServersRegistry';
import { MCPConnection } from './connection';
import type * as t from './types';
import { ConnectionsRepository } from '~/mcp/ConnectionsRepository';
/**
* Abstract base class for managing user-specific MCP connections with lifecycle management.
* Only meant to be extended by MCPManager.
* Much of the logic was move here from the old MCPManager to make it more manageable.
* User connections will soon be ephemeral and not cached anymore:
* https://github.com/danny-avila/LibreChat/discussions/8790
*/
export abstract class UserConnectionManager {
// Connections shared by all users.
public appConnections: ConnectionsRepository | null = null;
// Connections per userId -> serverName -> connection
protected userConnections: Map<string, Map<string, MCPConnection>> = new Map();
/** Last activity timestamp for users (not per server) */
protected userLastActivity: Map<string, number> = new Map();
protected readonly USER_CONNECTION_IDLE_TIMEOUT = 15 * 60 * 1000; // 15 minutes (TODO: make configurable)
/** Updates the last activity timestamp for a user */
protected updateUserLastActivity(userId: string): void {
const now = Date.now();
this.userLastActivity.set(userId, now);
logger.debug(
`[MCP][User: ${userId}] Updated last activity timestamp: ${new Date(now).toISOString()}`,
);
}
/** Gets or creates a connection for a specific user */
public async getUserConnection({
serverName,
forceNew,
user,
flowManager,
customUserVars,
requestBody,
tokenMethods,
oauthStart,
oauthEnd,
signal,
returnOnOAuth = false,
connectionTimeout,
}: {
serverName: string;
forceNew?: boolean;
} & Omit<t.OAuthConnectionOptions, 'useOAuth'>): Promise<MCPConnection> {
const userId = user.id;
if (!userId) {
throw new McpError(ErrorCode.InvalidRequest, `[MCP] User object missing id property`);
}
if (this.appConnections!.has(serverName)) {
throw new McpError(
ErrorCode.InvalidRequest,
`[MCP][User: ${userId}] Trying to create user-specific connection for app-level server "${serverName}"`,
);
}
const userServerMap = this.userConnections.get(userId);
let connection = forceNew ? undefined : userServerMap?.get(serverName);
const now = Date.now();
// Check if user is idle
const lastActivity = this.userLastActivity.get(userId);
if (lastActivity && now - lastActivity > this.USER_CONNECTION_IDLE_TIMEOUT) {
logger.info(`[MCP][User: ${userId}] User idle for too long. Disconnecting all connections.`);
// Disconnect all user connections
try {
await this.disconnectUserConnections(userId);
} catch (err) {
logger.error(`[MCP][User: ${userId}] Error disconnecting idle connections:`, err);
}
connection = undefined; // Force creation of a new connection
} else if (connection) {
if (await connection.isConnected()) {
logger.debug(`[MCP][User: ${userId}][${serverName}] Reusing active connection`);
this.updateUserLastActivity(userId);
return connection;
} else {
// Connection exists but is not connected, attempt to remove potentially stale entry
logger.warn(
`[MCP][User: ${userId}][${serverName}] Found existing but disconnected connection object. Cleaning up.`,
);
this.removeUserConnection(userId, serverName); // Clean up maps
connection = undefined;
}
}
// If no valid connection exists, create a new one
if (!connection) {
logger.info(`[MCP][User: ${userId}][${serverName}] Establishing new connection`);
}
const config = await serversRegistry.getServerConfig(serverName, userId);
if (!config) {
throw new McpError(
ErrorCode.InvalidRequest,
`[MCP][User: ${userId}] Configuration for server "${serverName}" not found.`,
);
}
try {
connection = await MCPConnectionFactory.create(
{
serverName: serverName,
serverConfig: config,
},
{
useOAuth: true,
user: user,
customUserVars: customUserVars,
flowManager: flowManager,
tokenMethods: tokenMethods,
signal: signal,
oauthStart: oauthStart,
oauthEnd: oauthEnd,
returnOnOAuth: returnOnOAuth,
requestBody: requestBody,
connectionTimeout: connectionTimeout,
},
);
if (!(await connection?.isConnected())) {
throw new Error('Failed to establish connection after initialization attempt.');
}
if (!this.userConnections.has(userId)) {
this.userConnections.set(userId, new Map());
}
this.userConnections.get(userId)?.set(serverName, connection);
logger.info(`[MCP][User: ${userId}][${serverName}] Connection successfully established`);
// Update timestamp on creation
this.updateUserLastActivity(userId);
return connection;
} catch (error) {
logger.error(`[MCP][User: ${userId}][${serverName}] Failed to establish connection`, error);
// Ensure partial connection state is cleaned up if initialization fails
await connection?.disconnect().catch((disconnectError) => {
logger.error(
`[MCP][User: ${userId}][${serverName}] Error during cleanup after failed connection`,
disconnectError,
);
});
// Ensure cleanup even if connection attempt fails
this.removeUserConnection(userId, serverName);
throw error; // Re-throw the error to the caller
}
}
/** Returns all connections for a specific user */
public getUserConnections(userId: string) {
return this.userConnections.get(userId);
}
/** Removes a specific user connection entry */
protected removeUserConnection(userId: string, serverName: string): void {
const userMap = this.userConnections.get(userId);
if (userMap) {
userMap.delete(serverName);
if (userMap.size === 0) {
this.userConnections.delete(userId);
// Only remove user activity timestamp if all connections are gone
this.userLastActivity.delete(userId);
}
}
logger.debug(`[MCP][User: ${userId}][${serverName}] Removed connection entry.`);
}
/** Disconnects and removes a specific user connection */
public async disconnectUserConnection(userId: string, serverName: string): Promise<void> {
const userMap = this.userConnections.get(userId);
const connection = userMap?.get(serverName);
if (connection) {
logger.info(`[MCP][User: ${userId}][${serverName}] Disconnecting...`);
await connection.disconnect();
this.removeUserConnection(userId, serverName);
}
}
/** Disconnects and removes all connections for a specific user */
public async disconnectUserConnections(userId: string): Promise<void> {
const userMap = this.userConnections.get(userId);
const disconnectPromises: Promise<void>[] = [];
if (userMap) {
logger.info(`[MCP][User: ${userId}] Disconnecting all servers...`);
const userServers = Array.from(userMap.keys());
for (const serverName of userServers) {
disconnectPromises.push(
this.disconnectUserConnection(userId, serverName).catch((error) => {
logger.error(
`[MCP][User: ${userId}][${serverName}] Error during disconnection:`,
error,
);
}),
);
}
await Promise.allSettled(disconnectPromises);
// Ensure user activity timestamp is removed
this.userLastActivity.delete(userId);
logger.info(`[MCP][User: ${userId}] All connections processed for disconnection.`);
}
}
/** Check for and disconnect idle connections */
protected checkIdleConnections(currentUserId?: string): void {
const now = Date.now();
// Iterate through all users to check for idle ones
for (const [userId, lastActivity] of this.userLastActivity.entries()) {
if (currentUserId && currentUserId === userId) {
continue;
}
if (now - lastActivity > this.USER_CONNECTION_IDLE_TIMEOUT) {
logger.info(
`[MCP][User: ${userId}] User idle for too long. Disconnecting all connections...`,
);
// Disconnect all user connections asynchronously (fire and forget)
this.disconnectUserConnections(userId).catch((err) =>
logger.error(`[MCP][User: ${userId}] Error disconnecting idle connections:`, err),
);
}
}
}
}