LibreChat/packages/api/src/mcp/ConnectionsRepository.ts
Danny Avila 32cadb1cc5
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions
🩹 fix: MCP Server Recovery from Startup Inspection Failures (#12145)
* feat: MCP server reinitialization recovery mechanism

- Added functionality to store a stub configuration for MCP servers that fail inspection at startup, allowing for recovery via reinitialization.
- Introduced `reinspectServer` method in `MCPServersRegistry` to handle reinspection of previously failed servers.
- Enhanced `MCPServersInitializer` to log and manage server initialization failures, ensuring proper handling of inspection failures.
- Added integration tests to verify the recovery process for unreachable MCP servers, ensuring that stub configurations are stored and can be reinitialized successfully.
- Updated type definitions to include `inspectionFailed` flag in server configurations for better state management.

* fix: MCP server handling for inspection failures

- Updated `reinitMCPServer` to return a structured response when the server is unreachable, providing clearer feedback on the failure.
- Modified `ConnectionsRepository` to prevent connections to servers marked as inspection failed, improving error handling.
- Adjusted `MCPServersRegistry` methods to ensure proper management of server states, including throwing errors for non-failed servers during reinspection.
- Enhanced integration tests to validate the behavior of the system when dealing with unreachable MCP servers and inspection failures, ensuring robust recovery mechanisms.

* fix: Clear all cached server configurations in MCPServersRegistry

- Added a comment to clarify the necessity of clearing all cached server configurations when updating a server's configuration, as the cache is keyed by userId without a reverse index for enumeration.

* fix: Update integration test for file_tools_server inspection handling

- Modified the test to verify that the `file_tools_server` is stored as a stub when inspection fails, ensuring it can be reinitialized correctly.
- Adjusted expectations to confirm that the `inspectionFailed` flag is set to true for the stub configuration, enhancing the robustness of the recovery mechanism.

* test: Add unit tests for reinspecting servers in MCPServersRegistry

- Introduced tests for the `reinspectServer` method to validate error handling when called on a healthy server and when the server does not exist.
- Ensured that appropriate exceptions are thrown for both scenarios, enhancing the robustness of server state management.

* test: Add integration test for concurrent reinspectServer calls

- Introduced a new test to validate that multiple concurrent calls to reinspectServer do not crash or corrupt the server state.
- Ensured that at least one call succeeds and any failures are due to the server not being in a failed state, enhancing the reliability of the reinitialization process.

* test: Enhance integration test for concurrent MCP server reinitialization

- Added a new test to validate that concurrent calls to reinitialize the MCP server do not crash or corrupt the server state.
- Ensured that at least one call succeeds and that failures are handled gracefully, improving the reliability of the reinitialization process.
- Reset MCPManager instance after each test to maintain a clean state for subsequent tests.
2026-03-08 21:49:04 -04:00

152 lines
6 KiB
TypeScript

import { logger } from '@librechat/data-schemas';
import { MCPConnectionFactory } from '~/mcp/MCPConnectionFactory';
import { MCPConnection } from './connection';
import { MCPServersRegistry } from '~/mcp/registry/MCPServersRegistry';
import type * as t from './types';
const CONNECT_CONCURRENCY = 3;
/**
* Manages MCP connections with lazy loading and reconnection.
* Maintains a pool of connections and handles connection lifecycle management.
* Queries server configurations dynamically from the MCPServersRegistry (single source of truth).
*
* Scope-aware: Each repository is tied to a specific owner scope:
* - ownerId = undefined → manages app-level servers only
* - ownerId = userId → manages user-level and private servers for that user
*/
export class ConnectionsRepository {
protected connections: Map<string, MCPConnection> = new Map();
protected oauthOpts: t.OAuthConnectionOptions | undefined;
private readonly ownerId: string | undefined;
constructor(ownerId?: string, oauthOpts?: t.OAuthConnectionOptions) {
this.ownerId = ownerId;
this.oauthOpts = oauthOpts;
}
/** Returns the number of active connections in this repository */
public getConnectionCount(): number {
return this.connections.size;
}
/** Checks whether this repository can connect to a specific server */
async has(serverName: string): Promise<boolean> {
const config = await MCPServersRegistry.getInstance().getServerConfig(serverName, this.ownerId);
const canConnect = !!config && this.isAllowedToConnectToServer(config);
if (!canConnect) {
//if connection is no longer possible we attempt to disconnect any leftover connections
await this.disconnect(serverName);
}
return canConnect;
}
/** Gets or creates a connection for the specified server with lazy loading */
async get(serverName: string): Promise<MCPConnection | null> {
const serverConfig = await MCPServersRegistry.getInstance().getServerConfig(
serverName,
this.ownerId,
);
const existingConnection = this.connections.get(serverName);
if (!serverConfig || !this.isAllowedToConnectToServer(serverConfig)) {
if (existingConnection) {
await existingConnection.disconnect();
}
return null;
}
if (existingConnection) {
// Check if config was cached/updated since connection was created
if (serverConfig.updatedAt && existingConnection.isStale(serverConfig.updatedAt)) {
logger.info(
`${this.prefix(serverName)} Existing connection for ${serverName} is outdated. Recreating a new connection.`,
{
connectionCreated: new Date(existingConnection.createdAt).toISOString(),
configCachedAt: new Date(serverConfig.updatedAt).toISOString(),
},
);
// Disconnect stale connection
await existingConnection.disconnect();
this.connections.delete(serverName);
// Fall through to create new connection
} else if (await existingConnection.isConnected()) {
return existingConnection;
} else {
await this.disconnect(serverName);
}
}
const connection = await MCPConnectionFactory.create(
{
serverName,
serverConfig,
dbSourced: !!(serverConfig as t.ParsedServerConfig).dbId,
useSSRFProtection: MCPServersRegistry.getInstance().shouldEnableSSRFProtection(),
},
this.oauthOpts,
);
this.connections.set(serverName, connection);
return connection;
}
/** Gets or creates connections for multiple servers concurrently */
async getMany(serverNames: string[]): Promise<Map<string, MCPConnection>> {
const results: [string, MCPConnection | null][] = [];
for (let i = 0; i < serverNames.length; i += CONNECT_CONCURRENCY) {
const batch = serverNames.slice(i, i + CONNECT_CONCURRENCY);
const batchResults = await Promise.all(
batch.map(
async (name): Promise<[string, MCPConnection | null]> => [name, await this.get(name)],
),
);
results.push(...batchResults);
}
return new Map(results.filter((v): v is [string, MCPConnection] => v[1] != null));
}
/** Returns all currently loaded connections without creating new ones */
async getLoaded(): Promise<Map<string, MCPConnection>> {
return this.getMany(Array.from(this.connections.keys()));
}
/** Gets or creates connections for all configured servers in this repository's scope */
async getAll(): Promise<Map<string, MCPConnection>> {
//TODO in the future we should use a scoped config getter (APPLevel, UserLevel, Private)
//for now the absent config will not throw error
const allConfigs = await MCPServersRegistry.getInstance().getAllServerConfigs(this.ownerId);
return this.getMany(Object.keys(allConfigs));
}
/** Disconnects and removes a specific server connection from the pool */
async disconnect(serverName: string): Promise<void> {
const connection = this.connections.get(serverName);
if (!connection) return Promise.resolve();
this.connections.delete(serverName);
return connection.disconnect().catch((err) => {
logger.error(`${this.prefix(serverName)} Error disconnecting`, err);
});
}
/** Disconnects all active connections and returns array of disconnect promises */
disconnectAll(): Promise<void>[] {
const serverNames = Array.from(this.connections.keys());
return serverNames.map((serverName) => this.disconnect(serverName));
}
// Returns formatted log prefix for server messages
protected prefix(serverName: string): string {
return `[MCP][${serverName}]`;
}
private isAllowedToConnectToServer(config: t.ParsedServerConfig) {
if (config.inspectionFailed) {
return false;
}
//the repository is not allowed to be connected in case the Connection repository is shared (ownerId is undefined/null) and the server requires Auth or startup false.
if (this.ownerId === undefined && (config.startup === false || config.requiresOAuth)) {
return false;
}
return true;
}
}