🪣 fix: Prevent Memory Retention from AsyncLocalStorage Context Propagation (#11942)
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run

* fix: store hide_sequential_outputs before processStream clears config

processStream now clears config.configurable after completion to break
memory retention chains. Save hide_sequential_outputs to a local
variable before calling runAgents so the post-stream filter still works.

* feat: memory diagnostics

* chore: expose garbage collection in backend inspect command

Updated the backend inspect command in package.json to include the --expose-gc flag, enabling garbage collection diagnostics for improved memory management during development.

* chore: update @librechat/agents dependency to version 3.1.52

Bumped the version of @librechat/agents in package.json and package-lock.json to ensure compatibility and access to the latest features and fixes.

* fix: clear heavy config state after processStream to prevent memory leaks

Break the reference chain from LangGraph's internal __pregel_scratchpad
through @langchain/core RunTree.extra[lc:child_config] into the
AsyncLocalStorage context captured by timers and I/O handles.

After stream completion, null out symbol-keyed scratchpad properties
(currentTaskInput), config.configurable, and callbacks. Also call
Graph.clearHeavyState() to release config, signal, content maps,
handler registry, and tool sessions.

* chore: fix imports for memory utils

* chore: add circular dependency check in API build step

Enhanced the backend review workflow to include a check for circular dependencies during the API build process. If a circular dependency is detected, an error message is displayed, and the process exits with a failure status.

* chore: update API build step to include circular dependency detection

Modified the backend review workflow to rename the API package installation step to reflect its new functionality, which now includes detection of circular dependencies during the build process.

* chore: add memory diagnostics option to .env.example

Included a commented-out configuration option for enabling memory diagnostics in the .env.example file, which logs heap and RSS snapshots every 60 seconds when activated.

* chore: remove redundant agentContexts cleanup in disposeClient function

Streamlined the disposeClient function by eliminating duplicate cleanup logic for agentContexts, ensuring efficient memory management during client disposal.

* refactor: move runOutsideTracing utility to utils and update its usage

Refactored the runOutsideTracing function by relocating it to the utils module for better organization. Updated the tool execution handler to utilize the new import, ensuring consistent tracing behavior during tool execution.

* refactor: enhance connection management and diagnostics

Added a method to ConnectionsRepository for retrieving the active connection count. Updated UserConnectionManager to utilize this new method for app connection count reporting. Refined the OAuthReconnectionTracker's getStats method to improve clarity in diagnostics. Introduced a new tracing utility in the utils module to streamline tracing context management. Additionally, added a safeguard in memory diagnostics to prevent unnecessary snapshot collection for very short intervals.

* refactor: enhance tracing utility and add memory diagnostics tests

Refactored the runOutsideTracing function to improve warning logic when the AsyncLocalStorage context is missing. Added tests for memory diagnostics and tracing utilities to ensure proper functionality and error handling. Introduced a new test suite for memory diagnostics, covering snapshot collection and garbage collection behavior.
This commit is contained in:
Danny Avila 2026-02-25 17:41:23 -05:00 committed by GitHub
parent 59bd27b4f4
commit a0f9782e60
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 704 additions and 116 deletions

View file

@ -65,6 +65,9 @@ CONSOLE_JSON=false
DEBUG_LOGGING=true DEBUG_LOGGING=true
DEBUG_CONSOLE=false DEBUG_CONSOLE=false
# Enable memory diagnostics (logs heap/RSS snapshots every 60s, auto-enabled with --inspect)
# MEM_DIAG=true
#=============# #=============#
# Permissions # # Permissions #
#=============# #=============#

View file

@ -42,8 +42,14 @@ jobs:
- name: Install Data Schemas Package - name: Install Data Schemas Package
run: npm run build:data-schemas run: npm run build:data-schemas
- name: Install API Package - name: Build API Package & Detect Circular Dependencies
run: npm run build:api run: |
output=$(npm run build:api 2>&1)
echo "$output"
if echo "$output" | grep -q "Circular depend"; then
echo "Error: Circular dependency detected in @librechat/api!"
exit 1
fi
- name: Create empty auth.json file - name: Create empty auth.json file
run: | run: |

View file

@ -44,7 +44,7 @@
"@google/genai": "^1.19.0", "@google/genai": "^1.19.0",
"@keyv/redis": "^4.3.3", "@keyv/redis": "^4.3.3",
"@langchain/core": "^0.3.80", "@langchain/core": "^0.3.80",
"@librechat/agents": "^3.1.51", "@librechat/agents": "^3.1.52",
"@librechat/api": "*", "@librechat/api": "*",
"@librechat/data-schemas": "*", "@librechat/data-schemas": "*",
"@microsoft/microsoft-graph-client": "^3.0.7", "@microsoft/microsoft-graph-client": "^3.0.7",

View file

@ -35,7 +35,6 @@ const graphPropsToClean = [
'tools', 'tools',
'signal', 'signal',
'config', 'config',
'agentContexts',
'messages', 'messages',
'contentData', 'contentData',
'stepKeyIds', 'stepKeyIds',
@ -277,7 +276,16 @@ function disposeClient(client) {
if (client.run) { if (client.run) {
if (client.run.Graph) { if (client.run.Graph) {
client.run.Graph.resetValues(); if (typeof client.run.Graph.clearHeavyState === 'function') {
client.run.Graph.clearHeavyState();
} else {
client.run.Graph.resetValues();
}
if (client.run.Graph.agentContexts) {
client.run.Graph.agentContexts.clear();
client.run.Graph.agentContexts = null;
}
graphPropsToClean.forEach((prop) => { graphPropsToClean.forEach((prop) => {
if (client.run.Graph[prop] !== undefined) { if (client.run.Graph[prop] !== undefined) {

View file

@ -891,9 +891,10 @@ class AgentClient extends BaseClient {
config.signal = null; config.signal = null;
}; };
const hideSequentialOutputs = config.configurable.hide_sequential_outputs;
await runAgents(initialMessages); await runAgents(initialMessages);
/** @deprecated Agent Chain */ /** @deprecated Agent Chain */
if (config.configurable.hide_sequential_outputs) { if (hideSequentialOutputs) {
this.contentParts = this.contentParts.filter((part, index) => { this.contentParts = this.contentParts.filter((part, index) => {
// Include parts that are either: // Include parts that are either:
// 1. At or after the finalContentStart index // 1. At or after the finalContentStart index

View file

@ -13,11 +13,12 @@ const mongoSanitize = require('express-mongo-sanitize');
const { const {
isEnabled, isEnabled,
ErrorController, ErrorController,
memoryDiagnostics,
performStartupChecks, performStartupChecks,
handleJsonParseError, handleJsonParseError,
initializeFileStorage,
GenerationJobManager, GenerationJobManager,
createStreamServices, createStreamServices,
initializeFileStorage,
} = require('@librechat/api'); } = require('@librechat/api');
const { connectDb, indexSync } = require('~/db'); const { connectDb, indexSync } = require('~/db');
const initializeOAuthReconnectManager = require('./services/initializeOAuthReconnectManager'); const initializeOAuthReconnectManager = require('./services/initializeOAuthReconnectManager');
@ -201,6 +202,11 @@ const startServer = async () => {
const streamServices = createStreamServices(); const streamServices = createStreamServices();
GenerationJobManager.configure(streamServices); GenerationJobManager.configure(streamServices);
GenerationJobManager.initialize(); GenerationJobManager.initialize();
const inspectFlags = process.execArgv.some((arg) => arg.startsWith('--inspect'));
if (inspectFlags || isEnabled(process.env.MEM_DIAG)) {
memoryDiagnostics.start();
}
}); });
}; };

10
package-lock.json generated
View file

@ -59,7 +59,7 @@
"@google/genai": "^1.19.0", "@google/genai": "^1.19.0",
"@keyv/redis": "^4.3.3", "@keyv/redis": "^4.3.3",
"@langchain/core": "^0.3.80", "@langchain/core": "^0.3.80",
"@librechat/agents": "^3.1.51", "@librechat/agents": "^3.1.52",
"@librechat/api": "*", "@librechat/api": "*",
"@librechat/data-schemas": "*", "@librechat/data-schemas": "*",
"@microsoft/microsoft-graph-client": "^3.0.7", "@microsoft/microsoft-graph-client": "^3.0.7",
@ -11859,9 +11859,9 @@
} }
}, },
"node_modules/@librechat/agents": { "node_modules/@librechat/agents": {
"version": "3.1.51", "version": "3.1.52",
"resolved": "https://registry.npmjs.org/@librechat/agents/-/agents-3.1.51.tgz", "resolved": "https://registry.npmjs.org/@librechat/agents/-/agents-3.1.52.tgz",
"integrity": "sha512-inEcLCuD7YF0yCBrnxCgemg2oyRWJtCq49tLtokrD+WyWT97benSB+UyopjWh5woOsxSws3oc60d5mxRtifoLg==", "integrity": "sha512-Bg35zp+vEDZ0AEJQPZ+ukWb/UqBrsLcr3YQWRQpuvpftEgfQz0fHM5Wrxn6l5P7PvaD1ViolxoG44nggjCt7Hw==",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"@anthropic-ai/sdk": "^0.73.0", "@anthropic-ai/sdk": "^0.73.0",
@ -43719,7 +43719,7 @@
"@google/genai": "^1.19.0", "@google/genai": "^1.19.0",
"@keyv/redis": "^4.3.3", "@keyv/redis": "^4.3.3",
"@langchain/core": "^0.3.80", "@langchain/core": "^0.3.80",
"@librechat/agents": "^3.1.51", "@librechat/agents": "^3.1.52",
"@librechat/data-schemas": "*", "@librechat/data-schemas": "*",
"@modelcontextprotocol/sdk": "^1.27.1", "@modelcontextprotocol/sdk": "^1.27.1",
"@smithy/node-http-handler": "^4.4.5", "@smithy/node-http-handler": "^4.4.5",

View file

@ -38,7 +38,7 @@
"update-banner": "node config/update-banner.js", "update-banner": "node config/update-banner.js",
"delete-banner": "node config/delete-banner.js", "delete-banner": "node config/delete-banner.js",
"backend": "cross-env NODE_ENV=production node api/server/index.js", "backend": "cross-env NODE_ENV=production node api/server/index.js",
"backend:inspect": "cross-env NODE_ENV=production node --inspect api/server/index.js", "backend:inspect": "cross-env NODE_ENV=production node --inspect --expose-gc api/server/index.js",
"backend:dev": "cross-env NODE_ENV=development npx nodemon api/server/index.js", "backend:dev": "cross-env NODE_ENV=development npx nodemon api/server/index.js",
"backend:experimental": "cross-env NODE_ENV=production node api/server/experimental.js", "backend:experimental": "cross-env NODE_ENV=production node api/server/experimental.js",
"backend:stop": "node config/stop-backend.js", "backend:stop": "node config/stop-backend.js",

View file

@ -90,7 +90,7 @@
"@google/genai": "^1.19.0", "@google/genai": "^1.19.0",
"@keyv/redis": "^4.3.3", "@keyv/redis": "^4.3.3",
"@langchain/core": "^0.3.80", "@langchain/core": "^0.3.80",
"@librechat/agents": "^3.1.51", "@librechat/agents": "^3.1.52",
"@librechat/data-schemas": "*", "@librechat/data-schemas": "*",
"@modelcontextprotocol/sdk": "^1.27.1", "@modelcontextprotocol/sdk": "^1.27.1",
"@smithy/node-http-handler": "^4.4.5", "@smithy/node-http-handler": "^4.4.5",

View file

@ -9,6 +9,7 @@ import type {
ToolExecuteBatchRequest, ToolExecuteBatchRequest,
} from '@librechat/agents'; } from '@librechat/agents';
import type { StructuredToolInterface } from '@langchain/core/tools'; import type { StructuredToolInterface } from '@langchain/core/tools';
import { runOutsideTracing } from '~/utils';
export interface ToolEndCallbackData { export interface ToolEndCallbackData {
output: { output: {
@ -57,110 +58,122 @@ export function createToolExecuteHandler(options: ToolExecuteOptions): EventHand
const { toolCalls, agentId, configurable, metadata, resolve, reject } = data; const { toolCalls, agentId, configurable, metadata, resolve, reject } = data;
try { try {
const toolNames = [...new Set(toolCalls.map((tc: ToolCallRequest) => tc.name))]; await runOutsideTracing(async () => {
const { loadedTools, configurable: toolConfigurable } = await loadTools(toolNames, agentId); try {
const toolMap = new Map(loadedTools.map((t) => [t.name, t])); const toolNames = [...new Set(toolCalls.map((tc: ToolCallRequest) => tc.name))];
const mergedConfigurable = { ...configurable, ...toolConfigurable }; const { loadedTools, configurable: toolConfigurable } = await loadTools(
toolNames,
agentId,
);
const toolMap = new Map(loadedTools.map((t) => [t.name, t]));
const mergedConfigurable = { ...configurable, ...toolConfigurable };
const results: ToolExecuteResult[] = await Promise.all( const results: ToolExecuteResult[] = await Promise.all(
toolCalls.map(async (tc: ToolCallRequest) => { toolCalls.map(async (tc: ToolCallRequest) => {
const tool = toolMap.get(tc.name); const tool = toolMap.get(tc.name);
if (!tool) { if (!tool) {
logger.warn( logger.warn(
`[ON_TOOL_EXECUTE] Tool "${tc.name}" not found. Available: ${[...toolMap.keys()].join(', ')}`, `[ON_TOOL_EXECUTE] Tool "${tc.name}" not found. Available: ${[...toolMap.keys()].join(', ')}`,
);
return {
toolCallId: tc.id,
status: 'error' as const,
content: '',
errorMessage: `Tool ${tc.name} not found`,
};
}
try {
const toolCallConfig: Record<string, unknown> = {
id: tc.id,
stepId: tc.stepId,
turn: tc.turn,
};
if (
tc.codeSessionContext &&
(tc.name === Constants.EXECUTE_CODE ||
tc.name === Constants.PROGRAMMATIC_TOOL_CALLING)
) {
toolCallConfig.session_id = tc.codeSessionContext.session_id;
if (tc.codeSessionContext.files && tc.codeSessionContext.files.length > 0) {
toolCallConfig._injected_files = tc.codeSessionContext.files;
}
}
if (tc.name === Constants.PROGRAMMATIC_TOOL_CALLING) {
const toolRegistry = mergedConfigurable?.toolRegistry as LCToolRegistry | undefined;
const ptcToolMap = mergedConfigurable?.ptcToolMap as
| Map<string, StructuredToolInterface>
| undefined;
if (toolRegistry) {
const toolDefs: LCTool[] = Array.from(toolRegistry.values()).filter(
(t) =>
t.name !== Constants.PROGRAMMATIC_TOOL_CALLING &&
t.name !== Constants.TOOL_SEARCH,
); );
toolCallConfig.toolDefs = toolDefs; return {
toolCallConfig.toolMap = ptcToolMap ?? toolMap; toolCallId: tc.id,
status: 'error' as const,
content: '',
errorMessage: `Tool ${tc.name} not found`,
};
} }
}
const result = await tool.invoke(tc.args, { try {
toolCall: toolCallConfig, const toolCallConfig: Record<string, unknown> = {
configurable: mergedConfigurable, id: tc.id,
metadata, stepId: tc.stepId,
} as Record<string, unknown>); turn: tc.turn,
};
if (toolEndCallback) { if (
await toolEndCallback( tc.codeSessionContext &&
{ (tc.name === Constants.EXECUTE_CODE ||
output: { tc.name === Constants.PROGRAMMATIC_TOOL_CALLING)
name: tc.name, ) {
tool_call_id: tc.id, toolCallConfig.session_id = tc.codeSessionContext.session_id;
content: result.content, if (tc.codeSessionContext.files && tc.codeSessionContext.files.length > 0) {
artifact: result.artifact, toolCallConfig._injected_files = tc.codeSessionContext.files;
}, }
}, }
{
run_id: (metadata as Record<string, unknown>)?.run_id as string | undefined,
thread_id: (metadata as Record<string, unknown>)?.thread_id as
| string
| undefined,
...metadata,
},
);
}
return { if (tc.name === Constants.PROGRAMMATIC_TOOL_CALLING) {
toolCallId: tc.id, const toolRegistry = mergedConfigurable?.toolRegistry as
content: result.content, | LCToolRegistry
artifact: result.artifact, | undefined;
status: 'success' as const, const ptcToolMap = mergedConfigurable?.ptcToolMap as
}; | Map<string, StructuredToolInterface>
} catch (toolError) { | undefined;
const error = toolError as Error; if (toolRegistry) {
logger.error(`[ON_TOOL_EXECUTE] Tool ${tc.name} error:`, error); const toolDefs: LCTool[] = Array.from(toolRegistry.values()).filter(
return { (t) =>
toolCallId: tc.id, t.name !== Constants.PROGRAMMATIC_TOOL_CALLING &&
status: 'error' as const, t.name !== Constants.TOOL_SEARCH,
content: '', );
errorMessage: error.message, toolCallConfig.toolDefs = toolDefs;
}; toolCallConfig.toolMap = ptcToolMap ?? toolMap;
} }
}), }
);
resolve(results); const result = await tool.invoke(tc.args, {
} catch (error) { toolCall: toolCallConfig,
logger.error('[ON_TOOL_EXECUTE] Fatal error:', error); configurable: mergedConfigurable,
reject(error as Error); metadata,
} as Record<string, unknown>);
if (toolEndCallback) {
await toolEndCallback(
{
output: {
name: tc.name,
tool_call_id: tc.id,
content: result.content,
artifact: result.artifact,
},
},
{
run_id: (metadata as Record<string, unknown>)?.run_id as string | undefined,
thread_id: (metadata as Record<string, unknown>)?.thread_id as
| string
| undefined,
...metadata,
},
);
}
return {
toolCallId: tc.id,
content: result.content,
artifact: result.artifact,
status: 'success' as const,
};
} catch (toolError) {
const error = toolError as Error;
logger.error(`[ON_TOOL_EXECUTE] Tool ${tc.name} error:`, error);
return {
toolCallId: tc.id,
status: 'error' as const,
content: '',
errorMessage: error.message,
};
}
}),
);
resolve(results);
} catch (error) {
logger.error('[ON_TOOL_EXECUTE] Fatal error:', error);
reject(error as Error);
}
});
} catch (outerError) {
logger.error('[ON_TOOL_EXECUTE] Unexpected error:', outerError);
reject(outerError as Error);
} }
}, },
}; };

View file

@ -43,6 +43,8 @@ export * from './web';
export * from './cache'; export * from './cache';
/* Stream */ /* Stream */
export * from './stream'; export * from './stream';
/* Diagnostics */
export { memoryDiagnostics } from './utils/memory';
/* types */ /* types */
export type * from './mcp/types'; export type * from './mcp/types';
export type * from './flow/types'; export type * from './flow/types';

View file

@ -25,6 +25,11 @@ export class ConnectionsRepository {
this.oauthOpts = oauthOpts; this.oauthOpts = oauthOpts;
} }
/** Returns the number of active connections in this repository */
public getConnectionCount(): number {
return this.connections.size;
}
/** Checks whether this repository can connect to a specific server */ /** Checks whether this repository can connect to a specific server */
async has(serverName: string): Promise<boolean> { async has(serverName: string): Promise<boolean> {
const config = await MCPServersRegistry.getInstance().getServerConfig(serverName, this.ownerId); const config = await MCPServersRegistry.getInstance().getServerConfig(serverName, this.ownerId);

View file

@ -237,4 +237,23 @@ export abstract class UserConnectionManager {
} }
} }
} }
/** Returns counts of tracked users and connections for diagnostics */
public getConnectionStats(): {
trackedUsers: number;
totalConnections: number;
activityEntries: number;
appConnectionCount: number;
} {
let totalConnections = 0;
for (const serverMap of this.userConnections.values()) {
totalConnections += serverMap.size;
}
return {
trackedUsers: this.userConnections.size,
totalConnections,
activityEntries: this.userLastActivity.size,
appConnectionCount: this.appConnections?.getConnectionCount() ?? 0,
};
}
} }

View file

@ -18,10 +18,11 @@ import type {
Response as UndiciResponse, Response as UndiciResponse,
} from 'undici'; } from 'undici';
import type { MCPOAuthTokens } from './oauth/types'; import type { MCPOAuthTokens } from './oauth/types';
import { withTimeout } from '~/utils/promise';
import type * as t from './types'; import type * as t from './types';
import { createSSRFSafeUndiciConnect, resolveHostnameSSRF } from '~/auth'; import { createSSRFSafeUndiciConnect, resolveHostnameSSRF } from '~/auth';
import { runOutsideTracing } from '~/utils/tracing';
import { sanitizeUrlForLogging } from './utils'; import { sanitizeUrlForLogging } from './utils';
import { withTimeout } from '~/utils/promise';
import { mcpConfig } from './mcpConfig'; import { mcpConfig } from './mcpConfig';
type FetchLike = (url: string | URL, init?: RequestInit) => Promise<Response>; type FetchLike = (url: string | URL, init?: RequestInit) => Promise<Response>;
@ -698,14 +699,16 @@ export class MCPConnection extends EventEmitter {
await this.closeAgents(); await this.closeAgents();
} }
this.transport = await this.constructTransport(this.options); this.transport = await runOutsideTracing(() => this.constructTransport(this.options));
this.setupTransportDebugHandlers(); this.setupTransportDebugHandlers();
const connectTimeout = this.options.initTimeout ?? 120000; const connectTimeout = this.options.initTimeout ?? 120000;
await withTimeout( await runOutsideTracing(() =>
this.client.connect(this.transport), withTimeout(
connectTimeout, this.client.connect(this.transport!),
`Connection timeout after ${connectTimeout}ms`, connectTimeout,
`Connection timeout after ${connectTimeout}ms`,
),
); );
this.connectionState = 'connected'; this.connectionState = 'connected';

View file

@ -147,6 +147,10 @@ export class OAuthReconnectionManager {
} }
} }
public getTrackerStats() {
return this.reconnectionsTracker.getStats();
}
private async canReconnect(userId: string, serverName: string) { private async canReconnect(userId: string, serverName: string) {
if (this.mcpManager == null) { if (this.mcpManager == null) {
return false; return false;

View file

@ -86,4 +86,17 @@ export class OAuthReconnectionTracker {
const key = `${userId}:${serverName}`; const key = `${userId}:${serverName}`;
this.activeTimestamps.delete(key); this.activeTimestamps.delete(key);
} }
/** Returns map sizes for diagnostics */
public getStats(): {
usersWithFailedServers: number;
usersWithActiveReconnections: number;
activeTimestamps: number;
} {
return {
usersWithFailedServers: this.failed.size,
usersWithActiveReconnections: this.active.size,
activeTimestamps: this.activeTimestamps.size,
};
}
} }

View file

@ -1142,6 +1142,19 @@ class GenerationJobManagerClass {
return this.jobStore.getJobCount(); return this.jobStore.getJobCount();
} }
/** Returns sizes of internal runtime maps for diagnostics */
getRuntimeStats(): {
runtimeStateSize: number;
runStepBufferSize: number;
eventTransportStreams: number;
} {
return {
runtimeStateSize: this.runtimeState.size,
runStepBufferSize: this.runStepBuffers?.size ?? 0,
eventTransportStreams: this.eventTransport.getTrackedStreamIds().length,
};
}
/** /**
* Get job count by status. * Get job count by status.
*/ */

View file

@ -0,0 +1,173 @@
jest.mock('@librechat/data-schemas', () => ({
logger: {
debug: jest.fn(),
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
},
}));
jest.mock('~/stream', () => ({
GenerationJobManager: {
getRuntimeStats: jest.fn(() => null),
},
}));
jest.mock('~/mcp/oauth/OAuthReconnectionManager', () => ({
OAuthReconnectionManager: {
getInstance: jest.fn(() => ({
getTrackerStats: jest.fn(() => null),
})),
},
}));
jest.mock('~/mcp/MCPManager', () => ({
MCPManager: {
getInstance: jest.fn(() => ({
getConnectionStats: jest.fn(() => null),
})),
},
}));
import { logger } from '@librechat/data-schemas';
import { memoryDiagnostics } from '../memory';
type MockFn = jest.Mock<void, unknown[]>;
const debugMock = logger.debug as unknown as MockFn;
const infoMock = logger.info as unknown as MockFn;
const warnMock = logger.warn as unknown as MockFn;
function callsContaining(mock: MockFn, substring: string): unknown[][] {
return mock.mock.calls.filter(
(args) => typeof args[0] === 'string' && (args[0] as string).includes(substring),
);
}
beforeEach(() => {
jest.clearAllMocks();
jest.useFakeTimers();
memoryDiagnostics.stop();
const snaps = memoryDiagnostics.getSnapshots() as unknown[];
snaps.length = 0;
});
afterEach(() => {
memoryDiagnostics.stop();
jest.useRealTimers();
});
describe('memoryDiagnostics', () => {
describe('collectSnapshot', () => {
it('pushes a snapshot with expected shape', () => {
memoryDiagnostics.collectSnapshot();
const snaps = memoryDiagnostics.getSnapshots();
expect(snaps).toHaveLength(1);
expect(snaps[0]).toEqual(
expect.objectContaining({
ts: expect.any(Number),
rss: expect.any(Number),
heapUsed: expect.any(Number),
heapTotal: expect.any(Number),
external: expect.any(Number),
arrayBuffers: expect.any(Number),
}),
);
});
it('caps history at 120 snapshots', () => {
for (let i = 0; i < 130; i++) {
memoryDiagnostics.collectSnapshot();
}
expect(memoryDiagnostics.getSnapshots()).toHaveLength(120);
});
it('does not log trend with fewer than 3 snapshots', () => {
memoryDiagnostics.collectSnapshot();
memoryDiagnostics.collectSnapshot();
expect(callsContaining(debugMock, 'Trend')).toHaveLength(0);
});
it('skips trend when elapsed time is under 0.1 minutes', () => {
memoryDiagnostics.collectSnapshot();
memoryDiagnostics.collectSnapshot();
memoryDiagnostics.collectSnapshot();
expect(callsContaining(debugMock, 'Trend')).toHaveLength(0);
});
it('logs trend data when enough time has elapsed', () => {
memoryDiagnostics.collectSnapshot();
jest.advanceTimersByTime(7_000);
memoryDiagnostics.collectSnapshot();
jest.advanceTimersByTime(7_000);
memoryDiagnostics.collectSnapshot();
const trendCalls = callsContaining(debugMock, 'Trend');
expect(trendCalls.length).toBeGreaterThanOrEqual(1);
const trendPayload = trendCalls[0][1] as Record<string, string>;
expect(trendPayload).toHaveProperty('rssRate');
expect(trendPayload).toHaveProperty('heapRate');
expect(trendPayload.rssRate).toMatch(/MB\/hr$/);
expect(trendPayload.heapRate).toMatch(/MB\/hr$/);
expect(trendPayload.rssRate).not.toBe('Infinity MB/hr');
expect(trendPayload.heapRate).not.toBe('Infinity MB/hr');
});
});
describe('start / stop', () => {
it('start is idempotent — calling twice does not create two intervals', () => {
memoryDiagnostics.start();
memoryDiagnostics.start();
expect(callsContaining(infoMock, 'Starting')).toHaveLength(1);
});
it('stop is idempotent — calling twice does not error', () => {
memoryDiagnostics.start();
memoryDiagnostics.stop();
memoryDiagnostics.stop();
expect(callsContaining(infoMock, 'Stopped')).toHaveLength(1);
});
it('collects an immediate snapshot on start', () => {
expect(memoryDiagnostics.getSnapshots()).toHaveLength(0);
memoryDiagnostics.start();
expect(memoryDiagnostics.getSnapshots().length).toBeGreaterThanOrEqual(1);
});
});
describe('forceGC', () => {
it('returns false and warns when gc is not exposed', () => {
const origGC = global.gc;
global.gc = undefined;
const result = memoryDiagnostics.forceGC();
expect(result).toBe(false);
expect(warnMock).toHaveBeenCalledWith(expect.stringContaining('GC not exposed'));
global.gc = origGC;
});
it('calls gc and returns true when gc is exposed', () => {
const mockGC = jest.fn();
global.gc = mockGC;
const result = memoryDiagnostics.forceGC();
expect(result).toBe(true);
expect(mockGC).toHaveBeenCalledTimes(1);
expect(infoMock).toHaveBeenCalledWith(expect.stringContaining('Forced garbage collection'));
global.gc = undefined;
});
});
});

View file

@ -0,0 +1,137 @@
import { AsyncLocalStorage } from 'node:async_hooks';
const TRACING_ALS_KEY = Symbol.for('ls:tracing_async_local_storage');
const typedGlobal = globalThis as typeof globalThis & Record<symbol, AsyncLocalStorage<unknown>>;
let originalStorage: AsyncLocalStorage<unknown> | undefined;
beforeEach(() => {
originalStorage = typedGlobal[TRACING_ALS_KEY];
jest.restoreAllMocks();
});
afterEach(() => {
if (originalStorage) {
typedGlobal[TRACING_ALS_KEY] = originalStorage;
} else {
delete typedGlobal[TRACING_ALS_KEY];
}
delete process.env.LANGCHAIN_TRACING_V2;
});
async function freshImport(): Promise<typeof import('../tracing')> {
jest.resetModules();
return import('../tracing');
}
describe('runOutsideTracing', () => {
it('clears the ALS context to undefined inside fn', async () => {
const als = new AsyncLocalStorage<string>();
typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage<unknown>;
const { runOutsideTracing } = await freshImport();
let captured: string | undefined = 'NOT_CLEARED';
als.run('should-not-propagate', () => {
runOutsideTracing(() => {
captured = als.getStore();
});
});
expect(captured).toBeUndefined();
});
it('returns the value produced by fn (sync)', async () => {
const als = new AsyncLocalStorage<string>();
typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage<unknown>;
const { runOutsideTracing } = await freshImport();
const result = als.run('ctx', () => runOutsideTracing(() => 42));
expect(result).toBe(42);
});
it('returns the promise produced by fn (async)', async () => {
const als = new AsyncLocalStorage<string>();
typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage<unknown>;
const { runOutsideTracing } = await freshImport();
const result = await als.run('ctx', () =>
runOutsideTracing(async () => {
await Promise.resolve();
return 'async-value';
}),
);
expect(result).toBe('async-value');
});
it('propagates sync errors thrown inside fn', async () => {
const als = new AsyncLocalStorage<string>();
typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage<unknown>;
const { runOutsideTracing } = await freshImport();
expect(() =>
runOutsideTracing(() => {
throw new Error('boom');
}),
).toThrow('boom');
});
it('propagates async rejections from fn', async () => {
const als = new AsyncLocalStorage<string>();
typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage<unknown>;
const { runOutsideTracing } = await freshImport();
await expect(
runOutsideTracing(async () => {
throw new Error('async-boom');
}),
).rejects.toThrow('async-boom');
});
it('falls back to fn() when ALS is not on globalThis', async () => {
delete typedGlobal[TRACING_ALS_KEY];
const { runOutsideTracing } = await freshImport();
const result = runOutsideTracing(() => 'fallback');
expect(result).toBe('fallback');
});
it('does not warn when LANGCHAIN_TRACING_V2 is not set', async () => {
delete typedGlobal[TRACING_ALS_KEY];
delete process.env.LANGCHAIN_TRACING_V2;
const warnSpy = jest.fn();
jest.resetModules();
jest.doMock('@librechat/data-schemas', () => ({
logger: { warn: warnSpy },
}));
const { runOutsideTracing } = await import('../tracing');
runOutsideTracing(() => 'ok');
expect(warnSpy).not.toHaveBeenCalled();
});
it('warns once when LANGCHAIN_TRACING_V2 is set but ALS is missing', async () => {
delete typedGlobal[TRACING_ALS_KEY];
process.env.LANGCHAIN_TRACING_V2 = 'true';
const warnSpy = jest.fn();
jest.resetModules();
jest.doMock('@librechat/data-schemas', () => ({
logger: { warn: warnSpy },
}));
const { runOutsideTracing } = await import('../tracing');
runOutsideTracing(() => 'first');
runOutsideTracing(() => 'second');
expect(warnSpy).toHaveBeenCalledTimes(1);
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining('LANGCHAIN_TRACING_V2 is set but ALS not found'),
);
});
});

View file

@ -25,3 +25,4 @@ export * from './http';
export * from './tokens'; export * from './tokens';
export * from './url'; export * from './url';
export * from './message'; export * from './message';
export * from './tracing';

View file

@ -0,0 +1,150 @@
import { logger } from '@librechat/data-schemas';
import { GenerationJobManager } from '~/stream';
import { OAuthReconnectionManager } from '~/mcp/oauth/OAuthReconnectionManager';
import { MCPManager } from '~/mcp/MCPManager';
type ConnectionStats = ReturnType<InstanceType<typeof MCPManager>['getConnectionStats']>;
type TrackerStats = ReturnType<InstanceType<typeof OAuthReconnectionManager>['getTrackerStats']>;
type RuntimeStats = ReturnType<(typeof GenerationJobManager)['getRuntimeStats']>;
const INTERVAL_MS = 60_000;
const SNAPSHOT_HISTORY_LIMIT = 120;
interface MemorySnapshot {
ts: number;
rss: number;
heapUsed: number;
heapTotal: number;
external: number;
arrayBuffers: number;
mcpConnections: ConnectionStats | null;
oauthTracker: TrackerStats | null;
generationJobs: RuntimeStats | null;
}
const snapshots: MemorySnapshot[] = [];
let interval: NodeJS.Timeout | null = null;
function toMB(bytes: number): string {
return (bytes / 1024 / 1024).toFixed(2);
}
function getMCPStats(): {
mcpConnections: ConnectionStats | null;
oauthTracker: TrackerStats | null;
} {
let mcpConnections: ConnectionStats | null = null;
let oauthTracker: TrackerStats | null = null;
try {
mcpConnections = MCPManager.getInstance().getConnectionStats();
} catch {
/* not initialized yet */
}
try {
oauthTracker = OAuthReconnectionManager.getInstance().getTrackerStats();
} catch {
/* not initialized yet */
}
return { mcpConnections, oauthTracker };
}
function getJobStats(): { generationJobs: RuntimeStats | null } {
try {
return { generationJobs: GenerationJobManager.getRuntimeStats() };
} catch {
return { generationJobs: null };
}
}
function collectSnapshot(): void {
const mem = process.memoryUsage();
const mcpStats = getMCPStats();
const jobStats = getJobStats();
const snapshot: MemorySnapshot = {
ts: Date.now(),
rss: mem.rss,
heapUsed: mem.heapUsed,
heapTotal: mem.heapTotal,
external: mem.external,
arrayBuffers: mem.arrayBuffers ?? 0,
...mcpStats,
...jobStats,
};
snapshots.push(snapshot);
if (snapshots.length > SNAPSHOT_HISTORY_LIMIT) {
snapshots.shift();
}
logger.debug('[MemDiag] Snapshot', {
rss: `${toMB(mem.rss)} MB`,
heapUsed: `${toMB(mem.heapUsed)} MB`,
heapTotal: `${toMB(mem.heapTotal)} MB`,
external: `${toMB(mem.external)} MB`,
arrayBuffers: `${toMB(mem.arrayBuffers ?? 0)} MB`,
mcp: mcpStats,
jobs: jobStats,
snapshotCount: snapshots.length,
});
if (snapshots.length < 3) {
return;
}
const first = snapshots[0];
const last = snapshots[snapshots.length - 1];
const elapsedMin = (last.ts - first.ts) / 60_000;
if (elapsedMin < 0.1) {
return;
}
const rssDelta = last.rss - first.rss;
const heapDelta = last.heapUsed - first.heapUsed;
logger.debug('[MemDiag] Trend', {
overMinutes: elapsedMin.toFixed(1),
rssDelta: `${toMB(rssDelta)} MB`,
heapDelta: `${toMB(heapDelta)} MB`,
rssRate: `${toMB((rssDelta / elapsedMin) * 60)} MB/hr`,
heapRate: `${toMB((heapDelta / elapsedMin) * 60)} MB/hr`,
});
}
function forceGC(): boolean {
if (global.gc) {
global.gc();
logger.info('[MemDiag] Forced garbage collection');
return true;
}
logger.warn('[MemDiag] GC not exposed. Start with --expose-gc to enable.');
return false;
}
function getSnapshots(): readonly MemorySnapshot[] {
return snapshots;
}
function start(): void {
if (interval) {
return;
}
logger.info(`[MemDiag] Starting memory diagnostics (interval: ${INTERVAL_MS / 1000}s)`);
collectSnapshot();
interval = setInterval(collectSnapshot, INTERVAL_MS);
if (interval.unref) {
interval.unref();
}
}
function stop(): void {
if (!interval) {
return;
}
clearInterval(interval);
interval = null;
logger.info('[MemDiag] Stopped memory diagnostics');
}
export const memoryDiagnostics = { start, stop, forceGC, getSnapshots, collectSnapshot };

View file

@ -0,0 +1,31 @@
import { logger } from '@librechat/data-schemas';
import { AsyncLocalStorage } from 'node:async_hooks';
import { isEnabled } from '~/utils/common';
/** @see https://github.com/langchain-ai/langchainjs — @langchain/core RunTree ALS */
const TRACING_ALS_KEY = Symbol.for('ls:tracing_async_local_storage');
let warnedMissing = false;
/**
* Runs `fn` outside the LangGraph/LangSmith tracing AsyncLocalStorage context
* so I/O handles (child processes, sockets, timers) created during `fn`
* do not permanently retain the RunTree graph config message data chain.
*
* Relies on the private symbol `ls:tracing_async_local_storage` from `@langchain/core`.
* If the symbol is absent, falls back to calling `fn()` directly.
*/
export function runOutsideTracing<T>(fn: () => T): T {
const storage = (globalThis as typeof globalThis & Record<symbol, AsyncLocalStorage<unknown>>)[
TRACING_ALS_KEY
];
if (!storage && !warnedMissing && isEnabled(process.env.LANGCHAIN_TRACING_V2)) {
warnedMissing = true;
logger.warn(
'[runOutsideTracing] LANGCHAIN_TRACING_V2 is set but ALS not found — ' +
'runOutsideTracing will be a no-op. ' +
'Verify @langchain/core version still uses Symbol.for("ls:tracing_async_local_storage").',
);
}
return storage ? storage.run(undefined as unknown, fn) : fn();
}