mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-03-10 18:12:35 +01:00
🩹 fix: MCP Server Recovery from Startup Inspection Failures (#12145)
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions
* feat: MCP server reinitialization recovery mechanism - Added functionality to store a stub configuration for MCP servers that fail inspection at startup, allowing for recovery via reinitialization. - Introduced `reinspectServer` method in `MCPServersRegistry` to handle reinspection of previously failed servers. - Enhanced `MCPServersInitializer` to log and manage server initialization failures, ensuring proper handling of inspection failures. - Added integration tests to verify the recovery process for unreachable MCP servers, ensuring that stub configurations are stored and can be reinitialized successfully. - Updated type definitions to include `inspectionFailed` flag in server configurations for better state management. * fix: MCP server handling for inspection failures - Updated `reinitMCPServer` to return a structured response when the server is unreachable, providing clearer feedback on the failure. - Modified `ConnectionsRepository` to prevent connections to servers marked as inspection failed, improving error handling. - Adjusted `MCPServersRegistry` methods to ensure proper management of server states, including throwing errors for non-failed servers during reinspection. - Enhanced integration tests to validate the behavior of the system when dealing with unreachable MCP servers and inspection failures, ensuring robust recovery mechanisms. * fix: Clear all cached server configurations in MCPServersRegistry - Added a comment to clarify the necessity of clearing all cached server configurations when updating a server's configuration, as the cache is keyed by userId without a reverse index for enumeration. * fix: Update integration test for file_tools_server inspection handling - Modified the test to verify that the `file_tools_server` is stored as a stub when inspection fails, ensuring it can be reinitialized correctly. - Adjusted expectations to confirm that the `inspectionFailed` flag is set to true for the stub configuration, enhancing the robustness of the recovery mechanism. * test: Add unit tests for reinspecting servers in MCPServersRegistry - Introduced tests for the `reinspectServer` method to validate error handling when called on a healthy server and when the server does not exist. - Ensured that appropriate exceptions are thrown for both scenarios, enhancing the robustness of server state management. * test: Add integration test for concurrent reinspectServer calls - Introduced a new test to validate that multiple concurrent calls to reinspectServer do not crash or corrupt the server state. - Ensured that at least one call succeeds and any failures are due to the server not being in a failed state, enhancing the reliability of the reinitialization process. * test: Enhance integration test for concurrent MCP server reinitialization - Added a new test to validate that concurrent calls to reinitialize the MCP server do not crash or corrupt the server state. - Ensured that at least one call succeeds and that failures are handled gracefully, improving the reliability of the reinitialization process. - Reset MCPManager instance after each test to maintain a clean state for subsequent tests.
This commit is contained in:
parent
8b18a16446
commit
32cadb1cc5
9 changed files with 627 additions and 15 deletions
|
|
@ -1,8 +1,8 @@
|
||||||
const { logger } = require('@librechat/data-schemas');
|
const { logger } = require('@librechat/data-schemas');
|
||||||
const { CacheKeys, Constants } = require('librechat-data-provider');
|
const { CacheKeys, Constants } = require('librechat-data-provider');
|
||||||
|
const { getMCPManager, getMCPServersRegistry, getFlowStateManager } = require('~/config');
|
||||||
const { findToken, createToken, updateToken, deleteTokens } = require('~/models');
|
const { findToken, createToken, updateToken, deleteTokens } = require('~/models');
|
||||||
const { updateMCPServerTools } = require('~/server/services/Config');
|
const { updateMCPServerTools } = require('~/server/services/Config');
|
||||||
const { getMCPManager, getFlowStateManager } = require('~/config');
|
|
||||||
const { getLogStores } = require('~/cache');
|
const { getLogStores } = require('~/cache');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -41,6 +41,33 @@ async function reinitMCPServer({
|
||||||
let oauthUrl = null;
|
let oauthUrl = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
const registry = getMCPServersRegistry();
|
||||||
|
const serverConfig = await registry.getServerConfig(serverName, user?.id);
|
||||||
|
if (serverConfig?.inspectionFailed) {
|
||||||
|
logger.info(
|
||||||
|
`[MCP Reinitialize] Server ${serverName} had failed inspection, attempting reinspection`,
|
||||||
|
);
|
||||||
|
try {
|
||||||
|
const storageLocation = serverConfig.dbId ? 'DB' : 'CACHE';
|
||||||
|
await registry.reinspectServer(serverName, storageLocation, user?.id);
|
||||||
|
logger.info(`[MCP Reinitialize] Reinspection succeeded for server: ${serverName}`);
|
||||||
|
} catch (reinspectError) {
|
||||||
|
logger.error(
|
||||||
|
`[MCP Reinitialize] Reinspection failed for server ${serverName}:`,
|
||||||
|
reinspectError,
|
||||||
|
);
|
||||||
|
return {
|
||||||
|
availableTools: null,
|
||||||
|
success: false,
|
||||||
|
message: `MCP server '${serverName}' is still unreachable`,
|
||||||
|
oauthRequired: false,
|
||||||
|
serverName,
|
||||||
|
oauthUrl: null,
|
||||||
|
tools: null,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const customUserVars = userMCPAuthMap?.[`${Constants.mcp_prefix}${serverName}`];
|
const customUserVars = userMCPAuthMap?.[`${Constants.mcp_prefix}${serverName}`];
|
||||||
const flowManager = _flowManager ?? getFlowStateManager(getLogStores(CacheKeys.FLOWS));
|
const flowManager = _flowManager ?? getFlowStateManager(getLogStores(CacheKeys.FLOWS));
|
||||||
const mcpManager = getMCPManager();
|
const mcpManager = getMCPManager();
|
||||||
|
|
|
||||||
|
|
@ -140,6 +140,9 @@ export class ConnectionsRepository {
|
||||||
}
|
}
|
||||||
|
|
||||||
private isAllowedToConnectToServer(config: t.ParsedServerConfig) {
|
private isAllowedToConnectToServer(config: t.ParsedServerConfig) {
|
||||||
|
if (config.inspectionFailed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
//the repository is not allowed to be connected in case the Connection repository is shared (ownerId is undefined/null) and the server requires Auth or startup false.
|
//the repository is not allowed to be connected in case the Connection repository is shared (ownerId is undefined/null) and the server requires Auth or startup false.
|
||||||
if (this.ownerId === undefined && (config.startup === false || config.requiresOAuth)) {
|
if (this.ownerId === undefined && (config.startup === false || config.requiresOAuth)) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,10 @@
|
||||||
import { registryStatusCache as statusCache } from './cache/RegistryStatusCache';
|
|
||||||
import { isLeader } from '~/cluster';
|
|
||||||
import { withTimeout } from '~/utils';
|
|
||||||
import { logger } from '@librechat/data-schemas';
|
import { logger } from '@librechat/data-schemas';
|
||||||
import { ParsedServerConfig } from '~/mcp/types';
|
|
||||||
import { sanitizeUrlForLogging } from '~/mcp/utils';
|
|
||||||
import type * as t from '~/mcp/types';
|
import type * as t from '~/mcp/types';
|
||||||
|
import { registryStatusCache as statusCache } from './cache/RegistryStatusCache';
|
||||||
import { MCPServersRegistry } from './MCPServersRegistry';
|
import { MCPServersRegistry } from './MCPServersRegistry';
|
||||||
|
import { sanitizeUrlForLogging } from '~/mcp/utils';
|
||||||
|
import { withTimeout } from '~/utils';
|
||||||
|
import { isLeader } from '~/cluster';
|
||||||
|
|
||||||
const MCP_INIT_TIMEOUT_MS =
|
const MCP_INIT_TIMEOUT_MS =
|
||||||
process.env.MCP_INIT_TIMEOUT_MS != null ? parseInt(process.env.MCP_INIT_TIMEOUT_MS) : 30_000;
|
process.env.MCP_INIT_TIMEOUT_MS != null ? parseInt(process.env.MCP_INIT_TIMEOUT_MS) : 30_000;
|
||||||
|
|
@ -80,11 +79,22 @@ export class MCPServersInitializer {
|
||||||
MCPServersInitializer.logParsedConfig(serverName, result.config);
|
MCPServersInitializer.logParsedConfig(serverName, result.config);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(`${MCPServersInitializer.prefix(serverName)} Failed to initialize:`, error);
|
logger.error(`${MCPServersInitializer.prefix(serverName)} Failed to initialize:`, error);
|
||||||
|
try {
|
||||||
|
await MCPServersRegistry.getInstance().addServerStub(serverName, rawConfig, 'CACHE');
|
||||||
|
logger.info(
|
||||||
|
`${MCPServersInitializer.prefix(serverName)} Stored stub config for recovery via reinitialize`,
|
||||||
|
);
|
||||||
|
} catch (stubError) {
|
||||||
|
logger.error(
|
||||||
|
`${MCPServersInitializer.prefix(serverName)} Failed to store stub config:`,
|
||||||
|
stubError,
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Logs server configuration summary after initialization
|
// Logs server configuration summary after initialization
|
||||||
private static logParsedConfig(serverName: string, config: ParsedServerConfig): void {
|
private static logParsedConfig(serverName: string, config: t.ParsedServerConfig): void {
|
||||||
const prefix = MCPServersInitializer.prefix(serverName);
|
const prefix = MCPServersInitializer.prefix(serverName);
|
||||||
logger.info(`${prefix} -------------------------------------------------┐`);
|
logger.info(`${prefix} -------------------------------------------------┐`);
|
||||||
logger.info(`${prefix} URL: ${config.url ? sanitizeUrlForLogging(config.url) : 'N/A'}`);
|
logger.info(`${prefix} URL: ${config.url ? sanitizeUrlForLogging(config.url) : 'N/A'}`);
|
||||||
|
|
|
||||||
|
|
@ -144,6 +144,24 @@ export class MCPServersRegistry {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stores a minimal config stub so the server remains "known" to the registry
|
||||||
|
* even when inspection fails at startup. This enables reinitialize to recover.
|
||||||
|
*/
|
||||||
|
public async addServerStub(
|
||||||
|
serverName: string,
|
||||||
|
config: t.MCPOptions,
|
||||||
|
storageLocation: 'CACHE' | 'DB',
|
||||||
|
userId?: string,
|
||||||
|
): Promise<t.AddServerResult> {
|
||||||
|
const configRepo = this.getConfigRepository(storageLocation);
|
||||||
|
const stubConfig: t.ParsedServerConfig = { ...config, inspectionFailed: true };
|
||||||
|
const result = await configRepo.add(serverName, stubConfig, userId);
|
||||||
|
await this.readThroughCache.delete(this.getReadThroughCacheKey(serverName, userId));
|
||||||
|
await this.readThroughCache.delete(this.getReadThroughCacheKey(serverName));
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
public async addServer(
|
public async addServer(
|
||||||
serverName: string,
|
serverName: string,
|
||||||
config: t.MCPOptions,
|
config: t.MCPOptions,
|
||||||
|
|
@ -170,6 +188,52 @@ export class MCPServersRegistry {
|
||||||
return await configRepo.add(serverName, parsedConfig, userId);
|
return await configRepo.add(serverName, parsedConfig, userId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Re-inspects a server that previously failed initialization.
|
||||||
|
* Uses the stored stub config to attempt a full inspection and replaces the stub on success.
|
||||||
|
*/
|
||||||
|
public async reinspectServer(
|
||||||
|
serverName: string,
|
||||||
|
storageLocation: 'CACHE' | 'DB',
|
||||||
|
userId?: string,
|
||||||
|
): Promise<t.AddServerResult> {
|
||||||
|
const configRepo = this.getConfigRepository(storageLocation);
|
||||||
|
const existing = await configRepo.get(serverName, userId);
|
||||||
|
if (!existing) {
|
||||||
|
throw new Error(`Server "${serverName}" not found in ${storageLocation} for reinspection.`);
|
||||||
|
}
|
||||||
|
if (!existing.inspectionFailed) {
|
||||||
|
throw new Error(
|
||||||
|
`Server "${serverName}" is not in a failed state. Use updateServer() instead.`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const { inspectionFailed: _, ...configForInspection } = existing;
|
||||||
|
let parsedConfig: t.ParsedServerConfig;
|
||||||
|
try {
|
||||||
|
parsedConfig = await MCPServerInspector.inspect(
|
||||||
|
serverName,
|
||||||
|
configForInspection,
|
||||||
|
undefined,
|
||||||
|
this.allowedDomains,
|
||||||
|
);
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[MCPServersRegistry] Reinspection failed for server "${serverName}":`, error);
|
||||||
|
if (isMCPDomainNotAllowedError(error)) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
throw new MCPInspectionFailedError(serverName, error as Error);
|
||||||
|
}
|
||||||
|
|
||||||
|
const updatedConfig = { ...parsedConfig, updatedAt: Date.now() };
|
||||||
|
await configRepo.update(serverName, updatedConfig, userId);
|
||||||
|
await this.readThroughCache.delete(this.getReadThroughCacheKey(serverName, userId));
|
||||||
|
await this.readThroughCache.delete(this.getReadThroughCacheKey(serverName));
|
||||||
|
// Full clear required: getAllServerConfigs is keyed by userId with no reverse index to enumerate cached keys
|
||||||
|
await this.readThroughCacheAll.clear();
|
||||||
|
return { serverName, config: updatedConfig };
|
||||||
|
}
|
||||||
|
|
||||||
public async updateServer(
|
public async updateServer(
|
||||||
serverName: string,
|
serverName: string,
|
||||||
config: t.MCPOptions,
|
config: t.MCPOptions,
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,488 @@
|
||||||
|
/**
|
||||||
|
* Integration tests for MCP server reinitialize recovery (issue #12143).
|
||||||
|
*
|
||||||
|
* Reproduces the bug: when an MCP server is unreachable at startup,
|
||||||
|
* inspection fails and the server config is never stored — making the
|
||||||
|
* reinitialize button return 404 and blocking all recovery.
|
||||||
|
*
|
||||||
|
* These tests spin up a real in-process MCP server using the SDK's
|
||||||
|
* StreamableHTTPServerTransport and exercise the full
|
||||||
|
* MCPServersInitializer → MCPServersRegistry → MCPServerInspector pipeline
|
||||||
|
* with real connections — no mocked transports, no mocked inspections.
|
||||||
|
*
|
||||||
|
* Minimal mocks: only logger, auth/SSRF, cluster, mcpConfig, and DB repo
|
||||||
|
* (to avoid MongoDB). Everything else — the inspector, registry, cache,
|
||||||
|
* initializer, and MCP connection — runs for real.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import * as net from 'net';
|
||||||
|
import * as http from 'http';
|
||||||
|
import { Agent } from 'undici';
|
||||||
|
import { randomUUID } from 'crypto';
|
||||||
|
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
|
||||||
|
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
|
||||||
|
import { Keyv } from 'keyv';
|
||||||
|
import { Types } from 'mongoose';
|
||||||
|
import type { IUser } from '@librechat/data-schemas';
|
||||||
|
import type { Socket } from 'net';
|
||||||
|
import type * as t from '~/mcp/types';
|
||||||
|
import { MCPInspectionFailedError } from '~/mcp/errors';
|
||||||
|
import { registryStatusCache } from '~/mcp/registry/cache/RegistryStatusCache';
|
||||||
|
import { MCPServersInitializer } from '~/mcp/registry/MCPServersInitializer';
|
||||||
|
import { MCPServersRegistry } from '~/mcp/registry/MCPServersRegistry';
|
||||||
|
import { ConnectionsRepository } from '~/mcp/ConnectionsRepository';
|
||||||
|
import { FlowStateManager } from '~/flow/manager';
|
||||||
|
import { MCPConnection } from '~/mcp/connection';
|
||||||
|
import { MCPManager } from '~/mcp/MCPManager';
|
||||||
|
|
||||||
|
jest.mock('@librechat/data-schemas', () => ({
|
||||||
|
logger: {
|
||||||
|
info: jest.fn(),
|
||||||
|
warn: jest.fn(),
|
||||||
|
error: jest.fn(),
|
||||||
|
debug: jest.fn(),
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
jest.mock('~/auth', () => ({
|
||||||
|
createSSRFSafeUndiciConnect: jest.fn(() => undefined),
|
||||||
|
resolveHostnameSSRF: jest.fn(async () => false),
|
||||||
|
}));
|
||||||
|
|
||||||
|
jest.mock('~/cluster', () => ({
|
||||||
|
isLeader: jest.fn().mockResolvedValue(true),
|
||||||
|
}));
|
||||||
|
|
||||||
|
jest.mock('~/mcp/mcpConfig', () => ({
|
||||||
|
mcpConfig: { CONNECTION_CHECK_TTL: 0 },
|
||||||
|
}));
|
||||||
|
|
||||||
|
jest.mock('~/mcp/registry/db/ServerConfigsDB', () => ({
|
||||||
|
ServerConfigsDB: jest.fn().mockImplementation(() => ({
|
||||||
|
get: jest.fn().mockResolvedValue(undefined),
|
||||||
|
getAll: jest.fn().mockResolvedValue({}),
|
||||||
|
add: jest.fn().mockResolvedValue(undefined),
|
||||||
|
update: jest.fn().mockResolvedValue(undefined),
|
||||||
|
remove: jest.fn().mockResolvedValue(undefined),
|
||||||
|
reset: jest.fn().mockResolvedValue(undefined),
|
||||||
|
})),
|
||||||
|
}));
|
||||||
|
|
||||||
|
const mockMongoose = {} as typeof import('mongoose');
|
||||||
|
|
||||||
|
const allAgentsCreated: Agent[] = [];
|
||||||
|
const OriginalAgent = Agent;
|
||||||
|
const PatchedAgent = new Proxy(OriginalAgent, {
|
||||||
|
construct(target, args) {
|
||||||
|
const instance = new target(...(args as [Agent.Options?]));
|
||||||
|
allAgentsCreated.push(instance);
|
||||||
|
return instance;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
(global as Record<string, unknown>).__undiciAgent = PatchedAgent;
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
const destroying = allAgentsCreated.map((a) => {
|
||||||
|
if (!a.destroyed && !a.closed) {
|
||||||
|
return a.destroy().catch(() => undefined);
|
||||||
|
}
|
||||||
|
return Promise.resolve();
|
||||||
|
});
|
||||||
|
allAgentsCreated.length = 0;
|
||||||
|
await Promise.all(destroying);
|
||||||
|
});
|
||||||
|
|
||||||
|
async function safeDisconnect(conn: MCPConnection | null): Promise<void> {
|
||||||
|
if (!conn) return;
|
||||||
|
(conn as unknown as { shouldStopReconnecting: boolean }).shouldStopReconnecting = true;
|
||||||
|
conn.removeAllListeners();
|
||||||
|
await conn.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
function getFreePort(): Promise<number> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const srv = net.createServer();
|
||||||
|
srv.listen(0, '127.0.0.1', () => {
|
||||||
|
const addr = srv.address() as net.AddressInfo;
|
||||||
|
srv.close((err) => (err ? reject(err) : resolve(addr.port)));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function trackSockets(httpServer: http.Server): () => Promise<void> {
|
||||||
|
const sockets = new Set<Socket>();
|
||||||
|
httpServer.on('connection', (socket: Socket) => {
|
||||||
|
sockets.add(socket);
|
||||||
|
socket.once('close', () => sockets.delete(socket));
|
||||||
|
});
|
||||||
|
return () =>
|
||||||
|
new Promise<void>((resolve) => {
|
||||||
|
for (const socket of sockets) socket.destroy();
|
||||||
|
sockets.clear();
|
||||||
|
httpServer.close(() => resolve());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
interface TestServer {
|
||||||
|
url: string;
|
||||||
|
port: number;
|
||||||
|
close: () => Promise<void>;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function createMCPServerOnPort(port: number): Promise<TestServer> {
|
||||||
|
const sessions = new Map<string, StreamableHTTPServerTransport>();
|
||||||
|
|
||||||
|
const httpServer = http.createServer(async (req, res) => {
|
||||||
|
const sid = req.headers['mcp-session-id'] as string | undefined;
|
||||||
|
let transport = sid ? sessions.get(sid) : undefined;
|
||||||
|
|
||||||
|
if (!transport) {
|
||||||
|
transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID() });
|
||||||
|
const mcp = new McpServer({ name: 'recovery-test-server', version: '0.0.1' });
|
||||||
|
mcp.tool('echo', 'Echo tool for testing', {}, async () => ({
|
||||||
|
content: [{ type: 'text', text: 'ok' }],
|
||||||
|
}));
|
||||||
|
mcp.tool('greet', 'Greeting tool', {}, async () => ({
|
||||||
|
content: [{ type: 'text', text: 'hello' }],
|
||||||
|
}));
|
||||||
|
await mcp.connect(transport);
|
||||||
|
}
|
||||||
|
|
||||||
|
await transport.handleRequest(req, res);
|
||||||
|
|
||||||
|
if (transport.sessionId && !sessions.has(transport.sessionId)) {
|
||||||
|
sessions.set(transport.sessionId, transport);
|
||||||
|
transport.onclose = () => sessions.delete(transport!.sessionId!);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const destroySockets = trackSockets(httpServer);
|
||||||
|
await new Promise<void>((resolve) => httpServer.listen(port, '127.0.0.1', resolve));
|
||||||
|
|
||||||
|
return {
|
||||||
|
url: `http://127.0.0.1:${port}/`,
|
||||||
|
port,
|
||||||
|
close: async () => {
|
||||||
|
const closing = [...sessions.values()].map((t) => t.close().catch(() => undefined));
|
||||||
|
sessions.clear();
|
||||||
|
await Promise.all(closing);
|
||||||
|
await destroySockets();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('MCP reinitialize recovery – integration (issue #12143)', () => {
|
||||||
|
let server: TestServer | null = null;
|
||||||
|
let conn: MCPConnection | null = null;
|
||||||
|
let registry: MCPServersRegistry;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
(MCPServersRegistry as unknown as { instance: undefined }).instance = undefined;
|
||||||
|
MCPServersRegistry.createInstance(mockMongoose, ['127.0.0.1']);
|
||||||
|
registry = MCPServersRegistry.getInstance();
|
||||||
|
await registryStatusCache.reset();
|
||||||
|
await registry.reset();
|
||||||
|
MCPServersInitializer.resetProcessFlag();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
await safeDisconnect(conn);
|
||||||
|
conn = null;
|
||||||
|
// Reset MCPManager if it was created during the test
|
||||||
|
try {
|
||||||
|
const mgr = MCPManager.getInstance();
|
||||||
|
await Promise.all(mgr.appConnections?.disconnectAll() ?? []);
|
||||||
|
} catch {
|
||||||
|
// Not initialized — nothing to clean up
|
||||||
|
}
|
||||||
|
(MCPManager as unknown as { instance: null }).instance = null;
|
||||||
|
if (server) {
|
||||||
|
await server.close();
|
||||||
|
server = null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should store a stub config when the MCP server is unreachable at startup', async () => {
|
||||||
|
const deadPort = await getFreePort();
|
||||||
|
const configs: t.MCPServers = {
|
||||||
|
'speedy-mcp': {
|
||||||
|
type: 'streamable-http',
|
||||||
|
url: `http://127.0.0.1:${deadPort}/`,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
await MCPServersInitializer.initialize(configs);
|
||||||
|
|
||||||
|
// Before the fix: getServerConfig would return undefined here
|
||||||
|
// After the fix: a stub with inspectionFailed=true is stored
|
||||||
|
const config = await registry.getServerConfig('speedy-mcp');
|
||||||
|
expect(config).toBeDefined();
|
||||||
|
expect(config!.inspectionFailed).toBe(true);
|
||||||
|
expect(config!.url).toBe(`http://127.0.0.1:${deadPort}/`);
|
||||||
|
expect(config!.tools).toBeUndefined();
|
||||||
|
expect(config!.capabilities).toBeUndefined();
|
||||||
|
expect(config!.toolFunctions).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should recover via reinspectServer after the MCP server comes back online', async () => {
|
||||||
|
// Phase 1: Server is down at startup
|
||||||
|
const deadPort = await getFreePort();
|
||||||
|
const configs: t.MCPServers = {
|
||||||
|
'speedy-mcp': {
|
||||||
|
type: 'streamable-http',
|
||||||
|
url: `http://127.0.0.1:${deadPort}/`,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
await MCPServersInitializer.initialize(configs);
|
||||||
|
|
||||||
|
const stubConfig = await registry.getServerConfig('speedy-mcp');
|
||||||
|
expect(stubConfig).toBeDefined();
|
||||||
|
expect(stubConfig!.inspectionFailed).toBe(true);
|
||||||
|
|
||||||
|
// Phase 2: Start the real server on the same (previously dead) port
|
||||||
|
server = await createMCPServerOnPort(deadPort);
|
||||||
|
|
||||||
|
// Phase 3: Reinspect — this is what the reinitialize button triggers
|
||||||
|
const result = await registry.reinspectServer('speedy-mcp', 'CACHE');
|
||||||
|
|
||||||
|
// Verify the stub was replaced with a fully inspected config
|
||||||
|
expect(result.config.inspectionFailed).toBeUndefined();
|
||||||
|
expect(result.config.tools).toContain('echo');
|
||||||
|
expect(result.config.tools).toContain('greet');
|
||||||
|
expect(result.config.capabilities).toBeDefined();
|
||||||
|
expect(result.config.toolFunctions).toBeDefined();
|
||||||
|
|
||||||
|
// Verify the registry now returns the real config
|
||||||
|
const realConfig = await registry.getServerConfig('speedy-mcp');
|
||||||
|
expect(realConfig).toBeDefined();
|
||||||
|
expect(realConfig!.inspectionFailed).toBeUndefined();
|
||||||
|
expect(realConfig!.tools).toContain('echo');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should allow a real client connection after reinspection succeeds', async () => {
|
||||||
|
// Phase 1: Server is down at startup
|
||||||
|
const deadPort = await getFreePort();
|
||||||
|
const configs: t.MCPServers = {
|
||||||
|
'speedy-mcp': {
|
||||||
|
type: 'streamable-http',
|
||||||
|
url: `http://127.0.0.1:${deadPort}/`,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
await MCPServersInitializer.initialize(configs);
|
||||||
|
expect((await registry.getServerConfig('speedy-mcp'))!.inspectionFailed).toBe(true);
|
||||||
|
|
||||||
|
// Phase 2: Server comes back online on the same port
|
||||||
|
server = await createMCPServerOnPort(deadPort);
|
||||||
|
|
||||||
|
// Phase 3: Reinspect
|
||||||
|
await registry.reinspectServer('speedy-mcp', 'CACHE');
|
||||||
|
|
||||||
|
// Phase 4: Establish a real client connection
|
||||||
|
conn = new MCPConnection({
|
||||||
|
serverName: 'speedy-mcp',
|
||||||
|
serverConfig: { type: 'streamable-http', url: server.url },
|
||||||
|
useSSRFProtection: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
await conn.connect();
|
||||||
|
const tools = await conn.fetchTools();
|
||||||
|
|
||||||
|
expect(tools).toHaveLength(2);
|
||||||
|
expect(tools.map((t) => t.name)).toContain('echo');
|
||||||
|
expect(tools.map((t) => t.name)).toContain('greet');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not attempt connections to stub servers via ConnectionsRepository', async () => {
|
||||||
|
const deadPort = await getFreePort();
|
||||||
|
await MCPServersInitializer.initialize({
|
||||||
|
'stub-srv': { type: 'streamable-http', url: `http://127.0.0.1:${deadPort}/` },
|
||||||
|
});
|
||||||
|
expect((await registry.getServerConfig('stub-srv'))!.inspectionFailed).toBe(true);
|
||||||
|
|
||||||
|
const repo = new ConnectionsRepository(undefined);
|
||||||
|
expect(await repo.has('stub-srv')).toBe(false);
|
||||||
|
expect(await repo.get('stub-srv')).toBeNull();
|
||||||
|
|
||||||
|
const all = await repo.getAll();
|
||||||
|
expect(all.has('stub-srv')).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('addServerStub should clear negative read-through cache entries', async () => {
|
||||||
|
// Query a server that doesn't exist — result is negative-cached
|
||||||
|
const config1 = await registry.getServerConfig('late-server');
|
||||||
|
expect(config1).toBeUndefined();
|
||||||
|
|
||||||
|
// Store a stub (simulating a failed init that runs after the lookup)
|
||||||
|
await registry.addServerStub(
|
||||||
|
'late-server',
|
||||||
|
{ type: 'streamable-http', url: 'http://127.0.0.1:9999/' },
|
||||||
|
'CACHE',
|
||||||
|
);
|
||||||
|
|
||||||
|
// The stub should be found despite the earlier negative cache entry
|
||||||
|
const config2 = await registry.getServerConfig('late-server');
|
||||||
|
expect(config2).toBeDefined();
|
||||||
|
expect(config2!.inspectionFailed).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('concurrent reinspectServer calls should not crash or corrupt state', async () => {
|
||||||
|
const deadPort = await getFreePort();
|
||||||
|
await MCPServersInitializer.initialize({
|
||||||
|
'race-server': {
|
||||||
|
type: 'streamable-http',
|
||||||
|
url: `http://127.0.0.1:${deadPort}/`,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
expect((await registry.getServerConfig('race-server'))!.inspectionFailed).toBe(true);
|
||||||
|
|
||||||
|
server = await createMCPServerOnPort(deadPort);
|
||||||
|
|
||||||
|
// Simulate multiple users clicking Reinitialize at the same time.
|
||||||
|
// reinitMCPServer calls reinspectServer internally — this tests the critical section.
|
||||||
|
const n = 3 + Math.floor(Math.random() * 8); // 3–10 concurrent calls
|
||||||
|
const results = await Promise.allSettled(
|
||||||
|
Array.from({ length: n }, () => registry.reinspectServer('race-server', 'CACHE')),
|
||||||
|
);
|
||||||
|
|
||||||
|
const successes = results.filter((r) => r.status === 'fulfilled');
|
||||||
|
const failures = results.filter((r) => r.status === 'rejected');
|
||||||
|
|
||||||
|
// At least one must succeed
|
||||||
|
expect(successes.length).toBeGreaterThanOrEqual(1);
|
||||||
|
|
||||||
|
// Any failure must be the "not in a failed state" guard (the first call already
|
||||||
|
// replaced the stub), not an unhandled crash or data corruption.
|
||||||
|
for (const f of failures) {
|
||||||
|
expect((f as PromiseRejectedResult).reason.message).toMatch(/not in a failed state/);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Final state must be fully recovered regardless of how many succeeded
|
||||||
|
const config = await registry.getServerConfig('race-server');
|
||||||
|
expect(config).toBeDefined();
|
||||||
|
expect(config!.inspectionFailed).toBeUndefined();
|
||||||
|
expect(config!.tools).toContain('echo');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('concurrent reinitMCPServer-equivalent flows should not crash or corrupt state', async () => {
|
||||||
|
const deadPort = await getFreePort();
|
||||||
|
const serverName = 'concurrent-reinit';
|
||||||
|
const configs: t.MCPServers = {
|
||||||
|
[serverName]: {
|
||||||
|
type: 'streamable-http',
|
||||||
|
url: `http://127.0.0.1:${deadPort}/`,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
// Reset MCPManager singleton so createInstance works
|
||||||
|
(MCPManager as unknown as { instance: null }).instance = null;
|
||||||
|
|
||||||
|
// Initialize with dead server — this sets up both registry (stub) and MCPManager
|
||||||
|
await MCPManager.createInstance(configs);
|
||||||
|
const mcpManager = MCPManager.getInstance();
|
||||||
|
|
||||||
|
expect((await registry.getServerConfig(serverName))!.inspectionFailed).toBe(true);
|
||||||
|
|
||||||
|
// Server comes back online
|
||||||
|
server = await createMCPServerOnPort(deadPort);
|
||||||
|
|
||||||
|
const flowManager = new FlowStateManager<null>(new Keyv(), { ttl: 60_000 });
|
||||||
|
const makeUser = (): IUser =>
|
||||||
|
({
|
||||||
|
_id: new Types.ObjectId(),
|
||||||
|
id: new Types.ObjectId().toString(),
|
||||||
|
username: 'testuser',
|
||||||
|
email: 'test@example.com',
|
||||||
|
name: 'Test',
|
||||||
|
avatar: '',
|
||||||
|
provider: 'email',
|
||||||
|
role: 'user',
|
||||||
|
emailVerified: true,
|
||||||
|
createdAt: new Date(),
|
||||||
|
updatedAt: new Date(),
|
||||||
|
}) as IUser;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Replicate reinitMCPServer logic: check inspectionFailed → reinspect → getConnection.
|
||||||
|
* Each call uses a distinct user to simulate concurrent requests from different users.
|
||||||
|
*/
|
||||||
|
async function simulateReinitMCPServer(): Promise<{ success: boolean; tools: number }> {
|
||||||
|
const user = makeUser();
|
||||||
|
const config = await registry.getServerConfig(serverName, user.id);
|
||||||
|
if (config?.inspectionFailed) {
|
||||||
|
try {
|
||||||
|
const storageLocation = config.dbId ? 'DB' : 'CACHE';
|
||||||
|
await registry.reinspectServer(serverName, storageLocation, user.id);
|
||||||
|
} catch {
|
||||||
|
// Mirrors reinitMCPServer early return on failed reinspection
|
||||||
|
return { success: false, tools: 0 };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const connection = await mcpManager.getConnection({
|
||||||
|
serverName,
|
||||||
|
user,
|
||||||
|
flowManager,
|
||||||
|
forceNew: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
const tools = await connection.fetchTools();
|
||||||
|
return { success: true, tools: tools.length };
|
||||||
|
}
|
||||||
|
|
||||||
|
const n = 3 + Math.floor(Math.random() * 5); // 3–7 concurrent calls
|
||||||
|
const results = await Promise.allSettled(
|
||||||
|
Array.from({ length: n }, () => simulateReinitMCPServer()),
|
||||||
|
);
|
||||||
|
|
||||||
|
// All promises should resolve (no unhandled throws)
|
||||||
|
for (const r of results) {
|
||||||
|
expect(r.status).toBe('fulfilled');
|
||||||
|
}
|
||||||
|
|
||||||
|
const values = (results as PromiseFulfilledResult<{ success: boolean; tools: number }>[]).map(
|
||||||
|
(r) => r.value,
|
||||||
|
);
|
||||||
|
|
||||||
|
// At least one full reinit must succeed with tools
|
||||||
|
const succeeded = values.filter((v) => v.success);
|
||||||
|
expect(succeeded.length).toBeGreaterThanOrEqual(1);
|
||||||
|
for (const s of succeeded) {
|
||||||
|
expect(s.tools).toBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Any that returned success: false hit the reinspect guard — that's fine
|
||||||
|
const earlyReturned = values.filter((v) => !v.success);
|
||||||
|
expect(earlyReturned.every((v) => v.tools === 0)).toBe(true);
|
||||||
|
|
||||||
|
// Final registry state must be fully recovered
|
||||||
|
const finalConfig = await registry.getServerConfig(serverName);
|
||||||
|
expect(finalConfig).toBeDefined();
|
||||||
|
expect(finalConfig!.inspectionFailed).toBeUndefined();
|
||||||
|
expect(finalConfig!.tools).toContain('echo');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('reinspectServer should throw MCPInspectionFailedError when the server is still unreachable', async () => {
|
||||||
|
const deadPort = await getFreePort();
|
||||||
|
const configs: t.MCPServers = {
|
||||||
|
'still-broken': {
|
||||||
|
type: 'streamable-http',
|
||||||
|
url: `http://127.0.0.1:${deadPort}/`,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
await MCPServersInitializer.initialize(configs);
|
||||||
|
expect((await registry.getServerConfig('still-broken'))!.inspectionFailed).toBe(true);
|
||||||
|
|
||||||
|
// Server is STILL down — reinspection should fail with MCPInspectionFailedError
|
||||||
|
await expect(registry.reinspectServer('still-broken', 'CACHE')).rejects.toThrow(
|
||||||
|
MCPInspectionFailedError,
|
||||||
|
);
|
||||||
|
|
||||||
|
// The stub should remain intact for future retry
|
||||||
|
const config = await registry.getServerConfig('still-broken');
|
||||||
|
expect(config).toBeDefined();
|
||||||
|
expect(config!.inspectionFailed).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
@ -303,9 +303,10 @@ describe('MCPServersInitializer Redis Integration Tests', () => {
|
||||||
const searchToolsServer = await registry.getServerConfig('search_tools_server');
|
const searchToolsServer = await registry.getServerConfig('search_tools_server');
|
||||||
expect(searchToolsServer).toBeDefined();
|
expect(searchToolsServer).toBeDefined();
|
||||||
|
|
||||||
// Verify file_tools_server was not added (due to inspection failure)
|
// Verify file_tools_server was stored as a stub (for recovery via reinitialize)
|
||||||
const fileToolsServer = await registry.getServerConfig('file_tools_server');
|
const fileToolsServer = await registry.getServerConfig('file_tools_server');
|
||||||
expect(fileToolsServer).toBeUndefined();
|
expect(fileToolsServer).toBeDefined();
|
||||||
|
expect(fileToolsServer?.inspectionFailed).toBe(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should set initialized status after completion', async () => {
|
it('should set initialized status after completion', async () => {
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,11 @@
|
||||||
import { logger } from '@librechat/data-schemas';
|
import { logger } from '@librechat/data-schemas';
|
||||||
import * as t from '~/mcp/types';
|
import * as t from '~/mcp/types';
|
||||||
import { MCPConnectionFactory } from '~/mcp/MCPConnectionFactory';
|
|
||||||
import { MCPServersInitializer } from '~/mcp/registry/MCPServersInitializer';
|
|
||||||
import { MCPConnection } from '~/mcp/connection';
|
|
||||||
import { registryStatusCache } from '~/mcp/registry/cache/RegistryStatusCache';
|
import { registryStatusCache } from '~/mcp/registry/cache/RegistryStatusCache';
|
||||||
|
import { MCPServersInitializer } from '~/mcp/registry/MCPServersInitializer';
|
||||||
import { MCPServerInspector } from '~/mcp/registry/MCPServerInspector';
|
import { MCPServerInspector } from '~/mcp/registry/MCPServerInspector';
|
||||||
import { MCPServersRegistry } from '~/mcp/registry/MCPServersRegistry';
|
import { MCPServersRegistry } from '~/mcp/registry/MCPServersRegistry';
|
||||||
|
import { MCPConnectionFactory } from '~/mcp/MCPConnectionFactory';
|
||||||
|
import { MCPConnection } from '~/mcp/connection';
|
||||||
|
|
||||||
const FIXED_TIME = 1699564800000;
|
const FIXED_TIME = 1699564800000;
|
||||||
const originalDateNow = Date.now;
|
const originalDateNow = Date.now;
|
||||||
|
|
@ -296,9 +296,10 @@ describe('MCPServersInitializer', () => {
|
||||||
const searchToolsServer = await registry.getServerConfig('search_tools_server');
|
const searchToolsServer = await registry.getServerConfig('search_tools_server');
|
||||||
expect(searchToolsServer).toBeDefined();
|
expect(searchToolsServer).toBeDefined();
|
||||||
|
|
||||||
// Verify file_tools_server was not added (due to inspection failure)
|
// Verify file_tools_server was stored as a stub (for recovery via reinitialize)
|
||||||
const fileToolsServer = await registry.getServerConfig('file_tools_server');
|
const fileToolsServer = await registry.getServerConfig('file_tools_server');
|
||||||
expect(fileToolsServer).toBeUndefined();
|
expect(fileToolsServer).toBeDefined();
|
||||||
|
expect(fileToolsServer?.inspectionFailed).toBe(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should log server configuration after initialization', async () => {
|
it('should log server configuration after initialization', async () => {
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
import * as t from '~/mcp/types';
|
import type * as t from '~/mcp/types';
|
||||||
import { MCPServersRegistry } from '~/mcp/registry/MCPServersRegistry';
|
import { MCPServersRegistry } from '~/mcp/registry/MCPServersRegistry';
|
||||||
import { MCPServerInspector } from '~/mcp/registry/MCPServerInspector';
|
import { MCPServerInspector } from '~/mcp/registry/MCPServerInspector';
|
||||||
|
|
||||||
|
|
@ -193,6 +193,22 @@ describe('MCPServersRegistry', () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('reinspectServer', () => {
|
||||||
|
it('should throw when called on a healthy (non-stub) server', async () => {
|
||||||
|
await registry.addServer('healthy_server', testParsedConfig, 'CACHE');
|
||||||
|
|
||||||
|
await expect(registry.reinspectServer('healthy_server', 'CACHE')).rejects.toThrow(
|
||||||
|
'is not in a failed state',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw when the server does not exist', async () => {
|
||||||
|
await expect(registry.reinspectServer('ghost_server', 'CACHE')).rejects.toThrow(
|
||||||
|
'not found in CACHE',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('Read-through cache', () => {
|
describe('Read-through cache', () => {
|
||||||
describe('getServerConfig', () => {
|
describe('getServerConfig', () => {
|
||||||
it('should cache repeated calls for the same server', async () => {
|
it('should cache repeated calls for the same server', async () => {
|
||||||
|
|
|
||||||
|
|
@ -156,6 +156,8 @@ export type ParsedServerConfig = MCPOptions & {
|
||||||
dbId?: string;
|
dbId?: string;
|
||||||
/** True if access is only via agent (not directly shared with user) */
|
/** True if access is only via agent (not directly shared with user) */
|
||||||
consumeOnly?: boolean;
|
consumeOnly?: boolean;
|
||||||
|
/** True when inspection failed at startup; the server is known but not fully initialized */
|
||||||
|
inspectionFailed?: boolean;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type AddServerResult = {
|
export type AddServerResult = {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue