mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-03-25 17:16:33 +01:00
* fix: store hide_sequential_outputs before processStream clears config processStream now clears config.configurable after completion to break memory retention chains. Save hide_sequential_outputs to a local variable before calling runAgents so the post-stream filter still works. * feat: memory diagnostics * chore: expose garbage collection in backend inspect command Updated the backend inspect command in package.json to include the --expose-gc flag, enabling garbage collection diagnostics for improved memory management during development. * chore: update @librechat/agents dependency to version 3.1.52 Bumped the version of @librechat/agents in package.json and package-lock.json to ensure compatibility and access to the latest features and fixes. * fix: clear heavy config state after processStream to prevent memory leaks Break the reference chain from LangGraph's internal __pregel_scratchpad through @langchain/core RunTree.extra[lc:child_config] into the AsyncLocalStorage context captured by timers and I/O handles. After stream completion, null out symbol-keyed scratchpad properties (currentTaskInput), config.configurable, and callbacks. Also call Graph.clearHeavyState() to release config, signal, content maps, handler registry, and tool sessions. * chore: fix imports for memory utils * chore: add circular dependency check in API build step Enhanced the backend review workflow to include a check for circular dependencies during the API build process. If a circular dependency is detected, an error message is displayed, and the process exits with a failure status. * chore: update API build step to include circular dependency detection Modified the backend review workflow to rename the API package installation step to reflect its new functionality, which now includes detection of circular dependencies during the build process. * chore: add memory diagnostics option to .env.example Included a commented-out configuration option for enabling memory diagnostics in the .env.example file, which logs heap and RSS snapshots every 60 seconds when activated. * chore: remove redundant agentContexts cleanup in disposeClient function Streamlined the disposeClient function by eliminating duplicate cleanup logic for agentContexts, ensuring efficient memory management during client disposal. * refactor: move runOutsideTracing utility to utils and update its usage Refactored the runOutsideTracing function by relocating it to the utils module for better organization. Updated the tool execution handler to utilize the new import, ensuring consistent tracing behavior during tool execution. * refactor: enhance connection management and diagnostics Added a method to ConnectionsRepository for retrieving the active connection count. Updated UserConnectionManager to utilize this new method for app connection count reporting. Refined the OAuthReconnectionTracker's getStats method to improve clarity in diagnostics. Introduced a new tracing utility in the utils module to streamline tracing context management. Additionally, added a safeguard in memory diagnostics to prevent unnecessary snapshot collection for very short intervals. * refactor: enhance tracing utility and add memory diagnostics tests Refactored the runOutsideTracing function to improve warning logic when the AsyncLocalStorage context is missing. Added tests for memory diagnostics and tracing utilities to ensure proper functionality and error handling. Introduced a new test suite for memory diagnostics, covering snapshot collection and garbage collection behavior.
148 lines
5.8 KiB
TypeScript
148 lines
5.8 KiB
TypeScript
import { logger } from '@librechat/data-schemas';
|
|
import { MCPConnectionFactory } from '~/mcp/MCPConnectionFactory';
|
|
import { MCPConnection } from './connection';
|
|
import { MCPServersRegistry } from '~/mcp/registry/MCPServersRegistry';
|
|
import type * as t from './types';
|
|
|
|
const CONNECT_CONCURRENCY = 3;
|
|
|
|
/**
|
|
* Manages MCP connections with lazy loading and reconnection.
|
|
* Maintains a pool of connections and handles connection lifecycle management.
|
|
* Queries server configurations dynamically from the MCPServersRegistry (single source of truth).
|
|
*
|
|
* Scope-aware: Each repository is tied to a specific owner scope:
|
|
* - ownerId = undefined → manages app-level servers only
|
|
* - ownerId = userId → manages user-level and private servers for that user
|
|
*/
|
|
export class ConnectionsRepository {
|
|
protected connections: Map<string, MCPConnection> = new Map();
|
|
protected oauthOpts: t.OAuthConnectionOptions | undefined;
|
|
private readonly ownerId: string | undefined;
|
|
|
|
constructor(ownerId?: string, oauthOpts?: t.OAuthConnectionOptions) {
|
|
this.ownerId = ownerId;
|
|
this.oauthOpts = oauthOpts;
|
|
}
|
|
|
|
/** Returns the number of active connections in this repository */
|
|
public getConnectionCount(): number {
|
|
return this.connections.size;
|
|
}
|
|
|
|
/** Checks whether this repository can connect to a specific server */
|
|
async has(serverName: string): Promise<boolean> {
|
|
const config = await MCPServersRegistry.getInstance().getServerConfig(serverName, this.ownerId);
|
|
const canConnect = !!config && this.isAllowedToConnectToServer(config);
|
|
if (!canConnect) {
|
|
//if connection is no longer possible we attempt to disconnect any leftover connections
|
|
await this.disconnect(serverName);
|
|
}
|
|
return canConnect;
|
|
}
|
|
|
|
/** Gets or creates a connection for the specified server with lazy loading */
|
|
async get(serverName: string): Promise<MCPConnection | null> {
|
|
const serverConfig = await MCPServersRegistry.getInstance().getServerConfig(
|
|
serverName,
|
|
this.ownerId,
|
|
);
|
|
|
|
const existingConnection = this.connections.get(serverName);
|
|
if (!serverConfig || !this.isAllowedToConnectToServer(serverConfig)) {
|
|
if (existingConnection) {
|
|
await existingConnection.disconnect();
|
|
}
|
|
return null;
|
|
}
|
|
if (existingConnection) {
|
|
// Check if config was cached/updated since connection was created
|
|
if (serverConfig.updatedAt && existingConnection.isStale(serverConfig.updatedAt)) {
|
|
logger.info(
|
|
`${this.prefix(serverName)} Existing connection for ${serverName} is outdated. Recreating a new connection.`,
|
|
{
|
|
connectionCreated: new Date(existingConnection.createdAt).toISOString(),
|
|
configCachedAt: new Date(serverConfig.updatedAt).toISOString(),
|
|
},
|
|
);
|
|
|
|
// Disconnect stale connection
|
|
await existingConnection.disconnect();
|
|
this.connections.delete(serverName);
|
|
// Fall through to create new connection
|
|
} else if (await existingConnection.isConnected()) {
|
|
return existingConnection;
|
|
} else {
|
|
await this.disconnect(serverName);
|
|
}
|
|
}
|
|
const connection = await MCPConnectionFactory.create(
|
|
{
|
|
serverName,
|
|
serverConfig,
|
|
useSSRFProtection: MCPServersRegistry.getInstance().shouldEnableSSRFProtection(),
|
|
},
|
|
this.oauthOpts,
|
|
);
|
|
|
|
this.connections.set(serverName, connection);
|
|
return connection;
|
|
}
|
|
|
|
/** Gets or creates connections for multiple servers concurrently */
|
|
async getMany(serverNames: string[]): Promise<Map<string, MCPConnection>> {
|
|
const results: [string, MCPConnection | null][] = [];
|
|
for (let i = 0; i < serverNames.length; i += CONNECT_CONCURRENCY) {
|
|
const batch = serverNames.slice(i, i + CONNECT_CONCURRENCY);
|
|
const batchResults = await Promise.all(
|
|
batch.map(
|
|
async (name): Promise<[string, MCPConnection | null]> => [name, await this.get(name)],
|
|
),
|
|
);
|
|
results.push(...batchResults);
|
|
}
|
|
return new Map(results.filter((v): v is [string, MCPConnection] => v[1] != null));
|
|
}
|
|
|
|
/** Returns all currently loaded connections without creating new ones */
|
|
async getLoaded(): Promise<Map<string, MCPConnection>> {
|
|
return this.getMany(Array.from(this.connections.keys()));
|
|
}
|
|
|
|
/** Gets or creates connections for all configured servers in this repository's scope */
|
|
async getAll(): Promise<Map<string, MCPConnection>> {
|
|
//TODO in the future we should use a scoped config getter (APPLevel, UserLevel, Private)
|
|
//for now the absent config will not throw error
|
|
const allConfigs = await MCPServersRegistry.getInstance().getAllServerConfigs(this.ownerId);
|
|
return this.getMany(Object.keys(allConfigs));
|
|
}
|
|
|
|
/** Disconnects and removes a specific server connection from the pool */
|
|
async disconnect(serverName: string): Promise<void> {
|
|
const connection = this.connections.get(serverName);
|
|
if (!connection) return Promise.resolve();
|
|
this.connections.delete(serverName);
|
|
return connection.disconnect().catch((err) => {
|
|
logger.error(`${this.prefix(serverName)} Error disconnecting`, err);
|
|
});
|
|
}
|
|
|
|
/** Disconnects all active connections and returns array of disconnect promises */
|
|
disconnectAll(): Promise<void>[] {
|
|
const serverNames = Array.from(this.connections.keys());
|
|
return serverNames.map((serverName) => this.disconnect(serverName));
|
|
}
|
|
|
|
// Returns formatted log prefix for server messages
|
|
protected prefix(serverName: string): string {
|
|
return `[MCP][${serverName}]`;
|
|
}
|
|
|
|
private isAllowedToConnectToServer(config: t.ParsedServerConfig) {
|
|
//the repository is not allowed to be connected in case the Connection repository is shared (ownerId is undefined/null) and the server requires Auth or startup false.
|
|
if (this.ownerId === undefined && (config.startup === false || config.requiresOAuth)) {
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
}
|