import { logger } from '@librechat/data-schemas'; import type * as t from './types'; import { MCPServersRegistry } from '~/mcp/registry/MCPServersRegistry'; import { MCPConnectionFactory } from '~/mcp/MCPConnectionFactory'; import { hasCustomUserVars } from './utils'; import { MCPConnection } from './connection'; const CONNECT_CONCURRENCY = 3; /** * Manages MCP connections with lazy loading and reconnection. * Maintains a pool of connections and handles connection lifecycle management. * Queries server configurations dynamically from the MCPServersRegistry (single source of truth). * * Scope-aware: Each repository is tied to a specific owner scope: * - ownerId = undefined → manages app-level servers only * - ownerId = userId → manages user-level and private servers for that user */ export class ConnectionsRepository { protected connections: Map = new Map(); protected oauthOpts: t.OAuthConnectionOptions | undefined; private readonly ownerId: string | undefined; constructor(ownerId?: string, oauthOpts?: t.OAuthConnectionOptions) { this.ownerId = ownerId; this.oauthOpts = oauthOpts; } /** Returns the number of active connections in this repository */ public getConnectionCount(): number { return this.connections.size; } /** Checks whether this repository can connect to a specific server */ async has(serverName: string): Promise { const config = await MCPServersRegistry.getInstance().getServerConfig(serverName, this.ownerId); const canConnect = !!config && this.isAllowedToConnectToServer(config); if (!canConnect) { //if connection is no longer possible we attempt to disconnect any leftover connections await this.disconnect(serverName); } return canConnect; } /** Gets or creates a connection for the specified server with lazy loading */ async get(serverName: string): Promise { const serverConfig = await MCPServersRegistry.getInstance().getServerConfig( serverName, this.ownerId, ); const existingConnection = this.connections.get(serverName); if (!serverConfig || !this.isAllowedToConnectToServer(serverConfig)) { if (existingConnection) { await existingConnection.disconnect(); } return null; } if (existingConnection) { // Check if config was cached/updated since connection was created if (serverConfig.updatedAt && existingConnection.isStale(serverConfig.updatedAt)) { logger.info( `${this.prefix(serverName)} Existing connection for ${serverName} is outdated. Recreating a new connection.`, { connectionCreated: new Date(existingConnection.createdAt).toISOString(), configCachedAt: new Date(serverConfig.updatedAt).toISOString(), }, ); // Disconnect stale connection await existingConnection.disconnect(); this.connections.delete(serverName); // Fall through to create new connection } else if (await existingConnection.isConnected()) { return existingConnection; } else { await this.disconnect(serverName); } } const connection = await MCPConnectionFactory.create( { serverName, serverConfig, dbSourced: !!(serverConfig as t.ParsedServerConfig).dbId, useSSRFProtection: MCPServersRegistry.getInstance().shouldEnableSSRFProtection(), }, this.oauthOpts, ); this.connections.set(serverName, connection); return connection; } /** Gets or creates connections for multiple servers concurrently */ async getMany(serverNames: string[]): Promise> { const results: [string, MCPConnection | null][] = []; for (let i = 0; i < serverNames.length; i += CONNECT_CONCURRENCY) { const batch = serverNames.slice(i, i + CONNECT_CONCURRENCY); const batchResults = await Promise.all( batch.map( async (name): Promise<[string, MCPConnection | null]> => [name, await this.get(name)], ), ); results.push(...batchResults); } return new Map(results.filter((v): v is [string, MCPConnection] => v[1] != null)); } /** Returns all currently loaded connections without creating new ones */ async getLoaded(): Promise> { return this.getMany(Array.from(this.connections.keys())); } /** Gets or creates connections for all configured servers in this repository's scope */ async getAll(): Promise> { //TODO in the future we should use a scoped config getter (APPLevel, UserLevel, Private) //for now the absent config will not throw error const allConfigs = await MCPServersRegistry.getInstance().getAllServerConfigs(this.ownerId); return this.getMany(Object.keys(allConfigs)); } /** Disconnects and removes a specific server connection from the pool */ async disconnect(serverName: string): Promise { const connection = this.connections.get(serverName); if (!connection) return Promise.resolve(); this.connections.delete(serverName); return connection.disconnect().catch((err) => { logger.error(`${this.prefix(serverName)} Error disconnecting`, err); }); } /** Disconnects all active connections and returns array of disconnect promises */ disconnectAll(): Promise[] { const serverNames = Array.from(this.connections.keys()); return serverNames.map((serverName) => this.disconnect(serverName)); } // Returns formatted log prefix for server messages protected prefix(serverName: string): string { return `[MCP][${serverName}]`; } /** * App-level (shared) connections cannot serve servers that need per-user context: * env/header placeholders like `{{MY_KEY}}` are only resolved by `processMCPEnv()` * when real `customUserVars` values exist — which requires a user-level connection. */ private isAllowedToConnectToServer(config: t.ParsedServerConfig) { if (config.inspectionFailed) { return false; } if ( this.ownerId === undefined && (config.startup === false || config.requiresOAuth || hasCustomUserVars(config)) ) { return false; } return true; } }