🚉 feat: MCP Registry Individual Server Init (2) (#9940)
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run

* initialize servers sequentially

* adjust for exported properties that are not nullable anymore

* use underscore separator

* mock with set

* customize init timeout via env var

* refactor for readability, use loaded conns for tool functions

* address PR comments

* clean up fire-and-forget

* fix tests
This commit is contained in:
Federico Ruggi 2025-10-03 22:01:34 +02:00 committed by GitHub
parent 0e5bb6f98c
commit c0ed738aed
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 341 additions and 80 deletions

View file

@ -450,7 +450,7 @@ async function getMCPSetupData(userId) {
logger.error(`[MCP][User: ${userId}] Error getting app connections:`, error);
}
const userConnections = mcpManager.getUserConnections(userId) || new Map();
const oauthServers = mcpManager.getOAuthServers() || new Set();
const oauthServers = mcpManager.getOAuthServers();
return {
mcpConfig,

View file

@ -170,7 +170,7 @@ describe('tests for the new helper functions used by the MCP connection status e
const mockMCPManager = {
appConnections: { getAll: jest.fn(() => null) },
getUserConnections: jest.fn(() => null),
getOAuthServers: jest.fn(() => null),
getOAuthServers: jest.fn(() => new Set()),
};
mockGetMCPManager.mockReturnValue(mockMCPManager);

View file

@ -38,7 +38,7 @@ export class MCPManager extends UserConnectionManager {
/** Initializes the MCPManager by setting up server registry and app connections */
public async initialize() {
await this.serversRegistry.initialize();
this.appConnections = new ConnectionsRepository(this.serversRegistry.appServerConfigs!);
this.appConnections = new ConnectionsRepository(this.serversRegistry.appServerConfigs);
}
/** Retrieves an app-level or user-specific connection based on provided arguments */
@ -63,22 +63,23 @@ export class MCPManager extends UserConnectionManager {
}
/** Get servers that require OAuth */
public getOAuthServers(): Set<string> | null {
return this.serversRegistry.oauthServers!;
public getOAuthServers(): Set<string> {
return this.serversRegistry.oauthServers;
}
/** Get all servers */
public getAllServers(): t.MCPServers | null {
return this.serversRegistry.rawConfigs!;
public getAllServers(): t.MCPServers {
return this.serversRegistry.rawConfigs;
}
/** Returns all available tool functions from app-level connections */
public getAppToolFunctions(): t.LCAvailableTools | null {
return this.serversRegistry.toolFunctions!;
public getAppToolFunctions(): t.LCAvailableTools {
return this.serversRegistry.toolFunctions;
}
/** Returns all available tool functions from all connections available to user */
public async getAllToolFunctions(userId: string): Promise<t.LCAvailableTools | null> {
const allToolFunctions: t.LCAvailableTools = this.getAppToolFunctions() ?? {};
const allToolFunctions: t.LCAvailableTools = this.getAppToolFunctions();
const userConnections = this.getUserConnections(userId);
if (!userConnections || userConnections.size === 0) {
return allToolFunctions;
@ -128,7 +129,7 @@ export class MCPManager extends UserConnectionManager {
* @returns Object mapping server names to their instructions
*/
public getInstructions(serverNames?: string[]): Record<string, string> {
const instructions = this.serversRegistry.serverInstructions!;
const instructions = this.serversRegistry.serverInstructions;
if (!serverNames) return instructions;
return pick(instructions, serverNames);
}

View file

@ -1,5 +1,3 @@
import pick from 'lodash/pick';
import pickBy from 'lodash/pickBy';
import mapValues from 'lodash/mapValues';
import { logger } from '@librechat/data-schemas';
import { Constants } from 'librechat-data-provider';
@ -11,6 +9,14 @@ import { detectOAuthRequirement } from '~/mcp/oauth';
import { sanitizeUrlForLogging } from '~/mcp/utils';
import { processMCPEnv, isEnabled } from '~/utils';
const DEFAULT_MCP_INIT_TIMEOUT_MS = 30_000;
function getMCPInitTimeout(): number {
return process.env.MCP_INIT_TIMEOUT_MS != null
? parseInt(process.env.MCP_INIT_TIMEOUT_MS)
: DEFAULT_MCP_INIT_TIMEOUT_MS;
}
/**
* Manages MCP server configurations and metadata discovery.
* Fetches server capabilities, OAuth requirements, and tool definitions for registry.
@ -20,19 +26,21 @@ import { processMCPEnv, isEnabled } from '~/utils';
export class MCPServersRegistry {
private initialized: boolean = false;
private connections: ConnectionsRepository;
private initTimeoutMs: number;
public readonly rawConfigs: t.MCPServers;
public readonly parsedConfigs: Record<string, t.ParsedServerConfig>;
public oauthServers: Set<string> | null = null;
public serverInstructions: Record<string, string> | null = null;
public toolFunctions: t.LCAvailableTools | null = null;
public appServerConfigs: t.MCPServers | null = null;
public oauthServers: Set<string> = new Set();
public serverInstructions: Record<string, string> = {};
public toolFunctions: t.LCAvailableTools = {};
public appServerConfigs: t.MCPServers = {};
constructor(configs: t.MCPServers) {
this.rawConfigs = configs;
this.parsedConfigs = mapValues(configs, (con) => processMCPEnv({ options: con }));
this.connections = new ConnectionsRepository(configs);
this.initTimeoutMs = getMCPInitTimeout();
}
/** Initializes all startup-enabled servers by gathering their metadata asynchronously */
@ -42,21 +50,43 @@ export class MCPServersRegistry {
const serverNames = Object.keys(this.parsedConfigs);
await Promise.allSettled(serverNames.map((serverName) => this.gatherServerInfo(serverName)));
this.setOAuthServers();
this.setServerInstructions();
this.setAppServerConfigs();
await this.setAppToolFunctions();
this.connections.disconnectAll();
await Promise.allSettled(
serverNames.map((serverName) => this.initializeServerWithTimeout(serverName)),
);
}
/** Fetches all metadata for a single server in parallel */
private async gatherServerInfo(serverName: string): Promise<void> {
/** Wraps server initialization with a timeout to prevent hanging */
private async initializeServerWithTimeout(serverName: string): Promise<void> {
let timeoutId: NodeJS.Timeout | null = null;
try {
await Promise.race([
this.initializeServer(serverName),
new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => {
reject(new Error('Server initialization timed out'));
}, this.initTimeoutMs);
}),
]);
} catch (error) {
logger.warn(`${this.prefix(serverName)} Server initialization failed:`, error);
throw error;
} finally {
if (timeoutId != null) {
clearTimeout(timeoutId);
}
}
}
/** Initializes a single server with all its metadata and adds it to appropriate collections */
private async initializeServer(serverName: string): Promise<void> {
const start = Date.now();
const config = this.parsedConfigs[serverName];
// 1. Detect OAuth requirements if not already specified
try {
await this.fetchOAuthRequirement(serverName);
const config = this.parsedConfigs[serverName];
if (config.startup !== false && !config.requiresOAuth) {
await Promise.allSettled([
@ -68,54 +98,49 @@ export class MCPServersRegistry {
),
]);
}
this.logUpdatedConfig(serverName);
} catch (error) {
logger.warn(`${this.prefix(serverName)} Failed to initialize server:`, error);
}
}
/** Sets app-level server configs (startup enabled, non-OAuth servers) */
private setAppServerConfigs(): void {
const appServers = Object.keys(
pickBy(
this.parsedConfigs,
(config) => config.startup !== false && config.requiresOAuth === false,
),
);
this.appServerConfigs = pick(this.rawConfigs, appServers);
}
/** Creates set of server names that require OAuth authentication */
private setOAuthServers(): Set<string> {
if (this.oauthServers) return this.oauthServers;
this.oauthServers = new Set(
Object.keys(pickBy(this.parsedConfigs, (config) => config.requiresOAuth)),
);
return this.oauthServers;
}
/** Collects server instructions from all configured servers */
private setServerInstructions(): void {
this.serverInstructions = mapValues(
pickBy(this.parsedConfigs, (config) => config.serverInstructions),
(config) => config.serverInstructions as string,
);
}
/** Builds registry of all available tool functions from loaded connections */
private async setAppToolFunctions(): Promise<void> {
const connections = (await this.connections.getLoaded()).entries();
const allToolFunctions: t.LCAvailableTools = {};
for (const [serverName, conn] of connections) {
// 2. Fetch tool functions for this server if a connection was established
const getToolFunctions = async (): Promise<t.LCAvailableTools | null> => {
try {
const toolFunctions = await this.getToolFunctions(serverName, conn);
Object.assign(allToolFunctions, toolFunctions);
const loadedConns = await this.connections.getLoaded();
const conn = loadedConns.get(serverName);
if (conn == null) {
return null;
}
return this.getToolFunctions(serverName, conn);
} catch (error) {
logger.warn(`${this.prefix(serverName)} Error fetching tool functions:`, error);
return null;
}
};
const toolFunctions = await getToolFunctions();
// 3. Disconnect this server's connection if it was established (fire-and-forget)
void this.connections.disconnect(serverName);
// 4. Side effects
// 4.1 Add to OAuth servers if needed
if (config.requiresOAuth) {
this.oauthServers.add(serverName);
}
this.toolFunctions = allToolFunctions;
// 4.2 Add server instructions if available
if (config.serverInstructions != null) {
this.serverInstructions[serverName] = config.serverInstructions as string;
}
// 4.3 Add to app server configs if eligible (startup enabled, non-OAuth servers)
if (config.startup !== false && config.requiresOAuth === false) {
this.appServerConfigs[serverName] = this.rawConfigs[serverName];
}
// 4.4 Add tool functions if available
if (toolFunctions != null) {
Object.assign(this.toolFunctions, toolFunctions);
}
const duration = Date.now() - start;
this.logUpdatedConfig(serverName, duration);
}
/** Converts server tools to LibreChat-compatible tool functions format */
@ -185,7 +210,7 @@ export class MCPServersRegistry {
}
// Logs server configuration summary after initialization
private logUpdatedConfig(serverName: string): void {
private logUpdatedConfig(serverName: string, initDuration: number): void {
const prefix = this.prefix(serverName);
const config = this.parsedConfigs[serverName];
logger.info(`${prefix} -------------------------------------------------┐`);
@ -194,6 +219,7 @@ export class MCPServersRegistry {
logger.info(`${prefix} Capabilities: ${config.capabilities}`);
logger.info(`${prefix} Tools: ${config.tools}`);
logger.info(`${prefix} Server Instructions: ${config.serverInstructions}`);
logger.info(`${prefix} Initialized in: ${initDuration}ms`);
logger.info(`${prefix} -------------------------------------------------┘`);
}

View file

@ -113,6 +113,7 @@ describe('MCPServersRegistry - Initialize Function', () => {
get: jest.fn(),
getLoaded: jest.fn(),
disconnectAll: jest.fn(),
disconnect: jest.fn().mockResolvedValue(undefined),
} as unknown as jest.Mocked<ConnectionsRepository>;
mockConnectionsRepo.get.mockImplementation((serverName: string) => {
@ -160,6 +161,7 @@ describe('MCPServersRegistry - Initialize Function', () => {
});
afterEach(() => {
delete process.env.MCP_INIT_TIMEOUT_MS;
jest.clearAllMocks();
});
@ -179,15 +181,14 @@ describe('MCPServersRegistry - Initialize Function', () => {
const registry = new MCPServersRegistry(rawConfigs);
// Verify initial state
expect(registry.oauthServers).toBeNull();
expect(registry.serverInstructions).toBeNull();
expect(registry.toolFunctions).toBeNull();
expect(registry.appServerConfigs).toBeNull();
expect(registry.oauthServers.size).toBe(0);
expect(registry.serverInstructions).toEqual({});
expect(registry.toolFunctions).toEqual({});
expect(registry.appServerConfigs).toEqual({});
await registry.initialize();
// Test oauthServers Set
expect(registry.oauthServers).toBeInstanceOf(Set);
expect(registry.oauthServers).toEqual(
new Set(['oauth_server', 'oauth_predefined', 'oauth_startup_enabled']),
);
@ -228,18 +229,49 @@ describe('MCPServersRegistry - Initialize Function', () => {
expect(registry.toolFunctions).toEqual(expectedToolFunctions);
});
it('should handle errors gracefully and continue initialization', async () => {
it('should handle errors gracefully and continue initialization of other servers', async () => {
const registry = new MCPServersRegistry(rawConfigs);
// Make one server throw an error
mockDetectOAuthRequirement.mockRejectedValueOnce(new Error('OAuth detection failed'));
// Make one specific server throw an error during OAuth detection
mockDetectOAuthRequirement.mockImplementation((url: string) => {
if (url === 'https://api.github.com/mcp') {
return Promise.reject(new Error('OAuth detection failed'));
}
// Return normal responses for other servers
const oauthResults: Record<string, OAuthDetectionResult> = {
'https://api.disabled.com/mcp': {
requiresOAuth: false,
method: 'no-metadata-found',
metadata: null,
},
'https://api.public.com/mcp': {
requiresOAuth: false,
method: 'no-metadata-found',
metadata: null,
},
};
return Promise.resolve(
oauthResults[url] ?? {
requiresOAuth: false,
method: 'no-metadata-found',
metadata: null,
},
);
});
await registry.initialize();
// Should still initialize successfully
// Should still initialize successfully for other servers
expect(registry.oauthServers).toBeInstanceOf(Set);
expect(registry.toolFunctions).toBeDefined();
// The failed server should not be in oauthServers (since it failed OAuth detection)
expect(registry.oauthServers.has('oauth_server')).toBe(false);
// But other servers should still be processed successfully
expect(registry.appServerConfigs).toHaveProperty('stdio_server');
expect(registry.appServerConfigs).toHaveProperty('non_oauth_server');
// Error should be logged as a warning at the higher level
expect(mockLogger.warn).toHaveBeenCalledWith(
expect.stringContaining('[MCP][oauth_server] Failed to initialize server:'),
@ -247,12 +279,15 @@ describe('MCPServersRegistry - Initialize Function', () => {
);
});
it('should disconnect all connections after initialization', async () => {
it('should disconnect individual connections after each server initialization', async () => {
const registry = new MCPServersRegistry(rawConfigs);
await registry.initialize();
expect(mockConnectionsRepo.disconnectAll).toHaveBeenCalledTimes(1);
// Verify disconnect was called for each server during initialization
// All servers attempt to connect during initialization for metadata gathering
const serverNames = Object.keys(rawConfigs);
expect(mockConnectionsRepo.disconnect).toHaveBeenCalledTimes(serverNames.length);
});
it('should log configuration updates for each startup-enabled server', async () => {
@ -357,5 +392,204 @@ describe('MCPServersRegistry - Initialize Function', () => {
// Verify getInstructions was called for both "true" cases
expect(mockClient.getInstructions).toHaveBeenCalledTimes(2);
});
it('should use Promise.allSettled for individual server initialization', async () => {
const registry = new MCPServersRegistry(rawConfigs);
// Spy on Promise.allSettled to verify it's being used
const allSettledSpy = jest.spyOn(Promise, 'allSettled');
await registry.initialize();
// Verify Promise.allSettled was called with an array of server initialization promises
expect(allSettledSpy).toHaveBeenCalledWith(expect.arrayContaining([expect.any(Promise)]));
// Verify it was called with the correct number of server promises
const serverNames = Object.keys(rawConfigs);
expect(allSettledSpy).toHaveBeenCalledWith(
expect.arrayContaining(new Array(serverNames.length).fill(expect.any(Promise))),
);
allSettledSpy.mockRestore();
});
it('should isolate server failures and not affect other servers', async () => {
const registry = new MCPServersRegistry(rawConfigs);
// Make multiple servers fail in different ways
mockConnectionsRepo.get.mockImplementation((serverName: string) => {
if (serverName === 'stdio_server') {
// First server fails
throw new Error('Connection failed for stdio_server');
}
if (serverName === 'websocket_server') {
// Second server fails
throw new Error('Connection failed for websocket_server');
}
// Other servers succeed
const connection = mockConnections.get(serverName);
if (!connection) {
throw new Error(`Connection not found for server: ${serverName}`);
}
return Promise.resolve(connection);
});
await registry.initialize();
// Despite failures, initialization should complete
expect(registry.oauthServers).toBeInstanceOf(Set);
expect(registry.toolFunctions).toBeDefined();
// Successful servers should still be processed
expect(registry.appServerConfigs).toHaveProperty('non_oauth_server');
// Failed servers should not crash the whole initialization
expect(mockLogger.warn).toHaveBeenCalledWith(
expect.stringContaining('[MCP][stdio_server] Failed to fetch server capabilities:'),
expect.any(Error),
);
expect(mockLogger.warn).toHaveBeenCalledWith(
expect.stringContaining('[MCP][websocket_server] Failed to fetch server capabilities:'),
expect.any(Error),
);
});
it('should properly clean up connections even when some servers fail', async () => {
const registry = new MCPServersRegistry(rawConfigs);
// Track disconnect failures but suppress unhandled rejections
const disconnectErrors: Error[] = [];
mockConnectionsRepo.disconnect.mockImplementation((serverName: string) => {
if (serverName === 'stdio_server') {
const error = new Error('Disconnect failed');
disconnectErrors.push(error);
return Promise.reject(error).catch(() => {}); // Suppress unhandled rejection
}
return Promise.resolve();
});
await registry.initialize();
// Should still attempt to disconnect all servers during initialization
const serverNames = Object.keys(rawConfigs);
expect(mockConnectionsRepo.disconnect).toHaveBeenCalledTimes(serverNames.length);
expect(disconnectErrors).toHaveLength(1);
});
it('should timeout individual server initialization after configured timeout', async () => {
const timeout = 2000;
// Create registry with a short timeout for testing
process.env.MCP_INIT_TIMEOUT_MS = `${timeout}`;
const registry = new MCPServersRegistry(rawConfigs);
// Make one server hang indefinitely during OAuth detection
mockDetectOAuthRequirement.mockImplementation((url: string) => {
if (url === 'https://api.github.com/mcp') {
// Slow init
return new Promise((res) => setTimeout(res, timeout * 2));
}
// Return normal responses for other servers
return Promise.resolve({
requiresOAuth: false,
method: 'no-metadata-found',
metadata: null,
});
});
const start = Date.now();
await registry.initialize();
const duration = Date.now() - start;
// Should complete within reasonable time despite one server hanging
// Allow some buffer for test execution overhead
expect(duration).toBeLessThan(timeout * 1.5);
// The timeout should prevent the hanging server from blocking initialization
// Other servers should still be processed successfully
expect(registry.appServerConfigs).toHaveProperty('stdio_server');
expect(registry.appServerConfigs).toHaveProperty('non_oauth_server');
}, 10_000); // 10 second Jest timeout
it('should skip tool function fetching if connection was not established', async () => {
const testConfig: t.MCPServers = {
server_with_connection: {
type: 'stdio',
args: [],
command: 'test-command',
},
server_without_connection: {
type: 'stdio',
args: [],
command: 'failing-command',
},
};
const registry = new MCPServersRegistry(testConfig);
const mockClient = {
listTools: jest.fn().mockResolvedValue({
tools: [
{
name: 'test_tool',
description: 'Test tool',
inputSchema: { type: 'object', properties: {} },
},
],
}),
getInstructions: jest.fn().mockReturnValue(undefined),
getServerCapabilities: jest.fn().mockReturnValue({ tools: {} }),
};
const mockConnection = {
client: mockClient,
} as unknown as jest.Mocked<MCPConnection>;
mockConnectionsRepo.get.mockImplementation((serverName: string) => {
if (serverName === 'server_with_connection') {
return Promise.resolve(mockConnection);
}
throw new Error('Connection failed');
});
// Mock getLoaded to return connections map - the real implementation returns all loaded connections at once
mockConnectionsRepo.getLoaded.mockResolvedValue(
new Map([['server_with_connection', mockConnection]]),
);
mockDetectOAuthRequirement.mockResolvedValue({
requiresOAuth: false,
method: 'no-metadata-found',
metadata: null,
});
await registry.initialize();
expect(registry.toolFunctions).toHaveProperty('test_tool_mcp_server_with_connection');
expect(Object.keys(registry.toolFunctions)).toHaveLength(1);
});
it('should handle getLoaded returning empty map gracefully', async () => {
const testConfig: t.MCPServers = {
test_server: {
type: 'stdio',
args: [],
command: 'test-command',
},
};
const registry = new MCPServersRegistry(testConfig);
mockConnectionsRepo.get.mockRejectedValue(new Error('All connections failed'));
mockConnectionsRepo.getLoaded.mockResolvedValue(new Map());
mockDetectOAuthRequirement.mockResolvedValue({
requiresOAuth: false,
method: 'no-metadata-found',
metadata: null,
});
await registry.initialize();
expect(registry.toolFunctions).toEqual({});
});
});
});

View file

@ -72,7 +72,7 @@ export class OAuthReconnectionManager {
// 1. derive the servers to reconnect
const serversToReconnect = [];
for (const serverName of this.mcpManager.getOAuthServers() ?? []) {
for (const serverName of this.mcpManager.getOAuthServers()) {
const canReconnect = await this.canReconnect(userId, serverName);
if (canReconnect) {
serversToReconnect.push(serverName);