From a0f9782e6082675a4e2ae7e07a61d5094fcc1fbb Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Wed, 25 Feb 2026 17:41:23 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=AA=A3=20fix:=20Prevent=20Memory=20Retent?= =?UTF-8?q?ion=20from=20AsyncLocalStorage=20Context=20Propagation=20(#1194?= =?UTF-8?q?2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: store hide_sequential_outputs before processStream clears config processStream now clears config.configurable after completion to break memory retention chains. Save hide_sequential_outputs to a local variable before calling runAgents so the post-stream filter still works. * feat: memory diagnostics * chore: expose garbage collection in backend inspect command Updated the backend inspect command in package.json to include the --expose-gc flag, enabling garbage collection diagnostics for improved memory management during development. * chore: update @librechat/agents dependency to version 3.1.52 Bumped the version of @librechat/agents in package.json and package-lock.json to ensure compatibility and access to the latest features and fixes. * fix: clear heavy config state after processStream to prevent memory leaks Break the reference chain from LangGraph's internal __pregel_scratchpad through @langchain/core RunTree.extra[lc:child_config] into the AsyncLocalStorage context captured by timers and I/O handles. After stream completion, null out symbol-keyed scratchpad properties (currentTaskInput), config.configurable, and callbacks. Also call Graph.clearHeavyState() to release config, signal, content maps, handler registry, and tool sessions. * chore: fix imports for memory utils * chore: add circular dependency check in API build step Enhanced the backend review workflow to include a check for circular dependencies during the API build process. If a circular dependency is detected, an error message is displayed, and the process exits with a failure status. * chore: update API build step to include circular dependency detection Modified the backend review workflow to rename the API package installation step to reflect its new functionality, which now includes detection of circular dependencies during the build process. * chore: add memory diagnostics option to .env.example Included a commented-out configuration option for enabling memory diagnostics in the .env.example file, which logs heap and RSS snapshots every 60 seconds when activated. * chore: remove redundant agentContexts cleanup in disposeClient function Streamlined the disposeClient function by eliminating duplicate cleanup logic for agentContexts, ensuring efficient memory management during client disposal. * refactor: move runOutsideTracing utility to utils and update its usage Refactored the runOutsideTracing function by relocating it to the utils module for better organization. Updated the tool execution handler to utilize the new import, ensuring consistent tracing behavior during tool execution. * refactor: enhance connection management and diagnostics Added a method to ConnectionsRepository for retrieving the active connection count. Updated UserConnectionManager to utilize this new method for app connection count reporting. Refined the OAuthReconnectionTracker's getStats method to improve clarity in diagnostics. Introduced a new tracing utility in the utils module to streamline tracing context management. Additionally, added a safeguard in memory diagnostics to prevent unnecessary snapshot collection for very short intervals. * refactor: enhance tracing utility and add memory diagnostics tests Refactored the runOutsideTracing function to improve warning logic when the AsyncLocalStorage context is missing. Added tests for memory diagnostics and tracing utilities to ensure proper functionality and error handling. Introduced a new test suite for memory diagnostics, covering snapshot collection and garbage collection behavior. --- .env.example | 3 + .github/workflows/backend-review.yml | 10 +- api/package.json | 2 +- api/server/cleanup.js | 12 +- api/server/controllers/agents/client.js | 3 +- api/server/index.js | 8 +- package-lock.json | 10 +- package.json | 2 +- packages/api/package.json | 2 +- packages/api/src/agents/handlers.ts | 205 ++++++++++-------- packages/api/src/index.ts | 2 + packages/api/src/mcp/ConnectionsRepository.ts | 5 + packages/api/src/mcp/UserConnectionManager.ts | 19 ++ packages/api/src/mcp/connection.ts | 15 +- .../src/mcp/oauth/OAuthReconnectionManager.ts | 4 + .../src/mcp/oauth/OAuthReconnectionTracker.ts | 13 ++ .../api/src/stream/GenerationJobManager.ts | 13 ++ .../api/src/utils/__tests__/memory.test.ts | 173 +++++++++++++++ .../api/src/utils/__tests__/tracing.test.ts | 137 ++++++++++++ packages/api/src/utils/index.ts | 1 + packages/api/src/utils/memory.ts | 150 +++++++++++++ packages/api/src/utils/tracing.ts | 31 +++ 22 files changed, 704 insertions(+), 116 deletions(-) create mode 100644 packages/api/src/utils/__tests__/memory.test.ts create mode 100644 packages/api/src/utils/__tests__/tracing.test.ts create mode 100644 packages/api/src/utils/memory.ts create mode 100644 packages/api/src/utils/tracing.ts diff --git a/.env.example b/.env.example index f6d2ec271f..3e94a0c63a 100644 --- a/.env.example +++ b/.env.example @@ -65,6 +65,9 @@ CONSOLE_JSON=false DEBUG_LOGGING=true DEBUG_CONSOLE=false +# Enable memory diagnostics (logs heap/RSS snapshots every 60s, auto-enabled with --inspect) +# MEM_DIAG=true + #=============# # Permissions # #=============# diff --git a/.github/workflows/backend-review.yml b/.github/workflows/backend-review.yml index 2379b8fee7..e151087790 100644 --- a/.github/workflows/backend-review.yml +++ b/.github/workflows/backend-review.yml @@ -42,8 +42,14 @@ jobs: - name: Install Data Schemas Package run: npm run build:data-schemas - - name: Install API Package - run: npm run build:api + - name: Build API Package & Detect Circular Dependencies + run: | + output=$(npm run build:api 2>&1) + echo "$output" + if echo "$output" | grep -q "Circular depend"; then + echo "Error: Circular dependency detected in @librechat/api!" + exit 1 + fi - name: Create empty auth.json file run: | diff --git a/api/package.json b/api/package.json index 7c3c1045ed..1447087b38 100644 --- a/api/package.json +++ b/api/package.json @@ -44,7 +44,7 @@ "@google/genai": "^1.19.0", "@keyv/redis": "^4.3.3", "@langchain/core": "^0.3.80", - "@librechat/agents": "^3.1.51", + "@librechat/agents": "^3.1.52", "@librechat/api": "*", "@librechat/data-schemas": "*", "@microsoft/microsoft-graph-client": "^3.0.7", diff --git a/api/server/cleanup.js b/api/server/cleanup.js index c482a2267e..364c02cd8a 100644 --- a/api/server/cleanup.js +++ b/api/server/cleanup.js @@ -35,7 +35,6 @@ const graphPropsToClean = [ 'tools', 'signal', 'config', - 'agentContexts', 'messages', 'contentData', 'stepKeyIds', @@ -277,7 +276,16 @@ function disposeClient(client) { if (client.run) { if (client.run.Graph) { - client.run.Graph.resetValues(); + if (typeof client.run.Graph.clearHeavyState === 'function') { + client.run.Graph.clearHeavyState(); + } else { + client.run.Graph.resetValues(); + } + + if (client.run.Graph.agentContexts) { + client.run.Graph.agentContexts.clear(); + client.run.Graph.agentContexts = null; + } graphPropsToClean.forEach((prop) => { if (client.run.Graph[prop] !== undefined) { diff --git a/api/server/controllers/agents/client.js b/api/server/controllers/agents/client.js index 49240a6b3b..7aea6d1e8f 100644 --- a/api/server/controllers/agents/client.js +++ b/api/server/controllers/agents/client.js @@ -891,9 +891,10 @@ class AgentClient extends BaseClient { config.signal = null; }; + const hideSequentialOutputs = config.configurable.hide_sequential_outputs; await runAgents(initialMessages); /** @deprecated Agent Chain */ - if (config.configurable.hide_sequential_outputs) { + if (hideSequentialOutputs) { this.contentParts = this.contentParts.filter((part, index) => { // Include parts that are either: // 1. At or after the finalContentStart index diff --git a/api/server/index.js b/api/server/index.js index 193eb423ad..2aff26ceaf 100644 --- a/api/server/index.js +++ b/api/server/index.js @@ -13,11 +13,12 @@ const mongoSanitize = require('express-mongo-sanitize'); const { isEnabled, ErrorController, + memoryDiagnostics, performStartupChecks, handleJsonParseError, - initializeFileStorage, GenerationJobManager, createStreamServices, + initializeFileStorage, } = require('@librechat/api'); const { connectDb, indexSync } = require('~/db'); const initializeOAuthReconnectManager = require('./services/initializeOAuthReconnectManager'); @@ -201,6 +202,11 @@ const startServer = async () => { const streamServices = createStreamServices(); GenerationJobManager.configure(streamServices); GenerationJobManager.initialize(); + + const inspectFlags = process.execArgv.some((arg) => arg.startsWith('--inspect')); + if (inspectFlags || isEnabled(process.env.MEM_DIAG)) { + memoryDiagnostics.start(); + } }); }; diff --git a/package-lock.json b/package-lock.json index 3a875f9fb7..bbb379c4d4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -59,7 +59,7 @@ "@google/genai": "^1.19.0", "@keyv/redis": "^4.3.3", "@langchain/core": "^0.3.80", - "@librechat/agents": "^3.1.51", + "@librechat/agents": "^3.1.52", "@librechat/api": "*", "@librechat/data-schemas": "*", "@microsoft/microsoft-graph-client": "^3.0.7", @@ -11859,9 +11859,9 @@ } }, "node_modules/@librechat/agents": { - "version": "3.1.51", - "resolved": "https://registry.npmjs.org/@librechat/agents/-/agents-3.1.51.tgz", - "integrity": "sha512-inEcLCuD7YF0yCBrnxCgemg2oyRWJtCq49tLtokrD+WyWT97benSB+UyopjWh5woOsxSws3oc60d5mxRtifoLg==", + "version": "3.1.52", + "resolved": "https://registry.npmjs.org/@librechat/agents/-/agents-3.1.52.tgz", + "integrity": "sha512-Bg35zp+vEDZ0AEJQPZ+ukWb/UqBrsLcr3YQWRQpuvpftEgfQz0fHM5Wrxn6l5P7PvaD1ViolxoG44nggjCt7Hw==", "license": "MIT", "dependencies": { "@anthropic-ai/sdk": "^0.73.0", @@ -43719,7 +43719,7 @@ "@google/genai": "^1.19.0", "@keyv/redis": "^4.3.3", "@langchain/core": "^0.3.80", - "@librechat/agents": "^3.1.51", + "@librechat/agents": "^3.1.52", "@librechat/data-schemas": "*", "@modelcontextprotocol/sdk": "^1.27.1", "@smithy/node-http-handler": "^4.4.5", diff --git a/package.json b/package.json index 4791843326..02a46df399 100644 --- a/package.json +++ b/package.json @@ -38,7 +38,7 @@ "update-banner": "node config/update-banner.js", "delete-banner": "node config/delete-banner.js", "backend": "cross-env NODE_ENV=production node api/server/index.js", - "backend:inspect": "cross-env NODE_ENV=production node --inspect api/server/index.js", + "backend:inspect": "cross-env NODE_ENV=production node --inspect --expose-gc api/server/index.js", "backend:dev": "cross-env NODE_ENV=development npx nodemon api/server/index.js", "backend:experimental": "cross-env NODE_ENV=production node api/server/experimental.js", "backend:stop": "node config/stop-backend.js", diff --git a/packages/api/package.json b/packages/api/package.json index 8e55d8d901..1854457b42 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -90,7 +90,7 @@ "@google/genai": "^1.19.0", "@keyv/redis": "^4.3.3", "@langchain/core": "^0.3.80", - "@librechat/agents": "^3.1.51", + "@librechat/agents": "^3.1.52", "@librechat/data-schemas": "*", "@modelcontextprotocol/sdk": "^1.27.1", "@smithy/node-http-handler": "^4.4.5", diff --git a/packages/api/src/agents/handlers.ts b/packages/api/src/agents/handlers.ts index 62200b1a46..07c68c9d8a 100644 --- a/packages/api/src/agents/handlers.ts +++ b/packages/api/src/agents/handlers.ts @@ -9,6 +9,7 @@ import type { ToolExecuteBatchRequest, } from '@librechat/agents'; import type { StructuredToolInterface } from '@langchain/core/tools'; +import { runOutsideTracing } from '~/utils'; export interface ToolEndCallbackData { output: { @@ -57,110 +58,122 @@ export function createToolExecuteHandler(options: ToolExecuteOptions): EventHand const { toolCalls, agentId, configurable, metadata, resolve, reject } = data; try { - const toolNames = [...new Set(toolCalls.map((tc: ToolCallRequest) => tc.name))]; - const { loadedTools, configurable: toolConfigurable } = await loadTools(toolNames, agentId); - const toolMap = new Map(loadedTools.map((t) => [t.name, t])); - const mergedConfigurable = { ...configurable, ...toolConfigurable }; + await runOutsideTracing(async () => { + try { + const toolNames = [...new Set(toolCalls.map((tc: ToolCallRequest) => tc.name))]; + const { loadedTools, configurable: toolConfigurable } = await loadTools( + toolNames, + agentId, + ); + const toolMap = new Map(loadedTools.map((t) => [t.name, t])); + const mergedConfigurable = { ...configurable, ...toolConfigurable }; - const results: ToolExecuteResult[] = await Promise.all( - toolCalls.map(async (tc: ToolCallRequest) => { - const tool = toolMap.get(tc.name); + const results: ToolExecuteResult[] = await Promise.all( + toolCalls.map(async (tc: ToolCallRequest) => { + const tool = toolMap.get(tc.name); - if (!tool) { - logger.warn( - `[ON_TOOL_EXECUTE] Tool "${tc.name}" not found. Available: ${[...toolMap.keys()].join(', ')}`, - ); - return { - toolCallId: tc.id, - status: 'error' as const, - content: '', - errorMessage: `Tool ${tc.name} not found`, - }; - } - - try { - const toolCallConfig: Record = { - id: tc.id, - stepId: tc.stepId, - turn: tc.turn, - }; - - if ( - tc.codeSessionContext && - (tc.name === Constants.EXECUTE_CODE || - tc.name === Constants.PROGRAMMATIC_TOOL_CALLING) - ) { - toolCallConfig.session_id = tc.codeSessionContext.session_id; - if (tc.codeSessionContext.files && tc.codeSessionContext.files.length > 0) { - toolCallConfig._injected_files = tc.codeSessionContext.files; - } - } - - if (tc.name === Constants.PROGRAMMATIC_TOOL_CALLING) { - const toolRegistry = mergedConfigurable?.toolRegistry as LCToolRegistry | undefined; - const ptcToolMap = mergedConfigurable?.ptcToolMap as - | Map - | undefined; - if (toolRegistry) { - const toolDefs: LCTool[] = Array.from(toolRegistry.values()).filter( - (t) => - t.name !== Constants.PROGRAMMATIC_TOOL_CALLING && - t.name !== Constants.TOOL_SEARCH, + if (!tool) { + logger.warn( + `[ON_TOOL_EXECUTE] Tool "${tc.name}" not found. Available: ${[...toolMap.keys()].join(', ')}`, ); - toolCallConfig.toolDefs = toolDefs; - toolCallConfig.toolMap = ptcToolMap ?? toolMap; + return { + toolCallId: tc.id, + status: 'error' as const, + content: '', + errorMessage: `Tool ${tc.name} not found`, + }; } - } - const result = await tool.invoke(tc.args, { - toolCall: toolCallConfig, - configurable: mergedConfigurable, - metadata, - } as Record); + try { + const toolCallConfig: Record = { + id: tc.id, + stepId: tc.stepId, + turn: tc.turn, + }; - if (toolEndCallback) { - await toolEndCallback( - { - output: { - name: tc.name, - tool_call_id: tc.id, - content: result.content, - artifact: result.artifact, - }, - }, - { - run_id: (metadata as Record)?.run_id as string | undefined, - thread_id: (metadata as Record)?.thread_id as - | string - | undefined, - ...metadata, - }, - ); - } + if ( + tc.codeSessionContext && + (tc.name === Constants.EXECUTE_CODE || + tc.name === Constants.PROGRAMMATIC_TOOL_CALLING) + ) { + toolCallConfig.session_id = tc.codeSessionContext.session_id; + if (tc.codeSessionContext.files && tc.codeSessionContext.files.length > 0) { + toolCallConfig._injected_files = tc.codeSessionContext.files; + } + } - return { - toolCallId: tc.id, - content: result.content, - artifact: result.artifact, - status: 'success' as const, - }; - } catch (toolError) { - const error = toolError as Error; - logger.error(`[ON_TOOL_EXECUTE] Tool ${tc.name} error:`, error); - return { - toolCallId: tc.id, - status: 'error' as const, - content: '', - errorMessage: error.message, - }; - } - }), - ); + if (tc.name === Constants.PROGRAMMATIC_TOOL_CALLING) { + const toolRegistry = mergedConfigurable?.toolRegistry as + | LCToolRegistry + | undefined; + const ptcToolMap = mergedConfigurable?.ptcToolMap as + | Map + | undefined; + if (toolRegistry) { + const toolDefs: LCTool[] = Array.from(toolRegistry.values()).filter( + (t) => + t.name !== Constants.PROGRAMMATIC_TOOL_CALLING && + t.name !== Constants.TOOL_SEARCH, + ); + toolCallConfig.toolDefs = toolDefs; + toolCallConfig.toolMap = ptcToolMap ?? toolMap; + } + } - resolve(results); - } catch (error) { - logger.error('[ON_TOOL_EXECUTE] Fatal error:', error); - reject(error as Error); + const result = await tool.invoke(tc.args, { + toolCall: toolCallConfig, + configurable: mergedConfigurable, + metadata, + } as Record); + + if (toolEndCallback) { + await toolEndCallback( + { + output: { + name: tc.name, + tool_call_id: tc.id, + content: result.content, + artifact: result.artifact, + }, + }, + { + run_id: (metadata as Record)?.run_id as string | undefined, + thread_id: (metadata as Record)?.thread_id as + | string + | undefined, + ...metadata, + }, + ); + } + + return { + toolCallId: tc.id, + content: result.content, + artifact: result.artifact, + status: 'success' as const, + }; + } catch (toolError) { + const error = toolError as Error; + logger.error(`[ON_TOOL_EXECUTE] Tool ${tc.name} error:`, error); + return { + toolCallId: tc.id, + status: 'error' as const, + content: '', + errorMessage: error.message, + }; + } + }), + ); + + resolve(results); + } catch (error) { + logger.error('[ON_TOOL_EXECUTE] Fatal error:', error); + reject(error as Error); + } + }); + } catch (outerError) { + logger.error('[ON_TOOL_EXECUTE] Unexpected error:', outerError); + reject(outerError as Error); } }, }; diff --git a/packages/api/src/index.ts b/packages/api/src/index.ts index fefdafaefd..a7edb3882d 100644 --- a/packages/api/src/index.ts +++ b/packages/api/src/index.ts @@ -43,6 +43,8 @@ export * from './web'; export * from './cache'; /* Stream */ export * from './stream'; +/* Diagnostics */ +export { memoryDiagnostics } from './utils/memory'; /* types */ export type * from './mcp/types'; export type * from './flow/types'; diff --git a/packages/api/src/mcp/ConnectionsRepository.ts b/packages/api/src/mcp/ConnectionsRepository.ts index b14af57b29..3e0c2aca2d 100644 --- a/packages/api/src/mcp/ConnectionsRepository.ts +++ b/packages/api/src/mcp/ConnectionsRepository.ts @@ -25,6 +25,11 @@ export class ConnectionsRepository { this.oauthOpts = oauthOpts; } + /** Returns the number of active connections in this repository */ + public getConnectionCount(): number { + return this.connections.size; + } + /** Checks whether this repository can connect to a specific server */ async has(serverName: string): Promise { const config = await MCPServersRegistry.getInstance().getServerConfig(serverName, this.ownerId); diff --git a/packages/api/src/mcp/UserConnectionManager.ts b/packages/api/src/mcp/UserConnectionManager.ts index e5d94689a0..1b90072618 100644 --- a/packages/api/src/mcp/UserConnectionManager.ts +++ b/packages/api/src/mcp/UserConnectionManager.ts @@ -237,4 +237,23 @@ export abstract class UserConnectionManager { } } } + + /** Returns counts of tracked users and connections for diagnostics */ + public getConnectionStats(): { + trackedUsers: number; + totalConnections: number; + activityEntries: number; + appConnectionCount: number; + } { + let totalConnections = 0; + for (const serverMap of this.userConnections.values()) { + totalConnections += serverMap.size; + } + return { + trackedUsers: this.userConnections.size, + totalConnections, + activityEntries: this.userLastActivity.size, + appConnectionCount: this.appConnections?.getConnectionCount() ?? 0, + }; + } } diff --git a/packages/api/src/mcp/connection.ts b/packages/api/src/mcp/connection.ts index 5744059708..8ac55224f8 100644 --- a/packages/api/src/mcp/connection.ts +++ b/packages/api/src/mcp/connection.ts @@ -18,10 +18,11 @@ import type { Response as UndiciResponse, } from 'undici'; import type { MCPOAuthTokens } from './oauth/types'; -import { withTimeout } from '~/utils/promise'; import type * as t from './types'; import { createSSRFSafeUndiciConnect, resolveHostnameSSRF } from '~/auth'; +import { runOutsideTracing } from '~/utils/tracing'; import { sanitizeUrlForLogging } from './utils'; +import { withTimeout } from '~/utils/promise'; import { mcpConfig } from './mcpConfig'; type FetchLike = (url: string | URL, init?: RequestInit) => Promise; @@ -698,14 +699,16 @@ export class MCPConnection extends EventEmitter { await this.closeAgents(); } - this.transport = await this.constructTransport(this.options); + this.transport = await runOutsideTracing(() => this.constructTransport(this.options)); this.setupTransportDebugHandlers(); const connectTimeout = this.options.initTimeout ?? 120000; - await withTimeout( - this.client.connect(this.transport), - connectTimeout, - `Connection timeout after ${connectTimeout}ms`, + await runOutsideTracing(() => + withTimeout( + this.client.connect(this.transport!), + connectTimeout, + `Connection timeout after ${connectTimeout}ms`, + ), ); this.connectionState = 'connected'; diff --git a/packages/api/src/mcp/oauth/OAuthReconnectionManager.ts b/packages/api/src/mcp/oauth/OAuthReconnectionManager.ts index ca9ce5c71f..f14c4abf15 100644 --- a/packages/api/src/mcp/oauth/OAuthReconnectionManager.ts +++ b/packages/api/src/mcp/oauth/OAuthReconnectionManager.ts @@ -147,6 +147,10 @@ export class OAuthReconnectionManager { } } + public getTrackerStats() { + return this.reconnectionsTracker.getStats(); + } + private async canReconnect(userId: string, serverName: string) { if (this.mcpManager == null) { return false; diff --git a/packages/api/src/mcp/oauth/OAuthReconnectionTracker.ts b/packages/api/src/mcp/oauth/OAuthReconnectionTracker.ts index b65f8ad115..9f6ef4abd3 100644 --- a/packages/api/src/mcp/oauth/OAuthReconnectionTracker.ts +++ b/packages/api/src/mcp/oauth/OAuthReconnectionTracker.ts @@ -86,4 +86,17 @@ export class OAuthReconnectionTracker { const key = `${userId}:${serverName}`; this.activeTimestamps.delete(key); } + + /** Returns map sizes for diagnostics */ + public getStats(): { + usersWithFailedServers: number; + usersWithActiveReconnections: number; + activeTimestamps: number; + } { + return { + usersWithFailedServers: this.failed.size, + usersWithActiveReconnections: this.active.size, + activeTimestamps: this.activeTimestamps.size, + }; + } } diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 815133d616..cd5ff04eb0 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -1142,6 +1142,19 @@ class GenerationJobManagerClass { return this.jobStore.getJobCount(); } + /** Returns sizes of internal runtime maps for diagnostics */ + getRuntimeStats(): { + runtimeStateSize: number; + runStepBufferSize: number; + eventTransportStreams: number; + } { + return { + runtimeStateSize: this.runtimeState.size, + runStepBufferSize: this.runStepBuffers?.size ?? 0, + eventTransportStreams: this.eventTransport.getTrackedStreamIds().length, + }; + } + /** * Get job count by status. */ diff --git a/packages/api/src/utils/__tests__/memory.test.ts b/packages/api/src/utils/__tests__/memory.test.ts new file mode 100644 index 0000000000..c821088856 --- /dev/null +++ b/packages/api/src/utils/__tests__/memory.test.ts @@ -0,0 +1,173 @@ +jest.mock('@librechat/data-schemas', () => ({ + logger: { + debug: jest.fn(), + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + }, +})); + +jest.mock('~/stream', () => ({ + GenerationJobManager: { + getRuntimeStats: jest.fn(() => null), + }, +})); + +jest.mock('~/mcp/oauth/OAuthReconnectionManager', () => ({ + OAuthReconnectionManager: { + getInstance: jest.fn(() => ({ + getTrackerStats: jest.fn(() => null), + })), + }, +})); + +jest.mock('~/mcp/MCPManager', () => ({ + MCPManager: { + getInstance: jest.fn(() => ({ + getConnectionStats: jest.fn(() => null), + })), + }, +})); + +import { logger } from '@librechat/data-schemas'; +import { memoryDiagnostics } from '../memory'; + +type MockFn = jest.Mock; + +const debugMock = logger.debug as unknown as MockFn; +const infoMock = logger.info as unknown as MockFn; +const warnMock = logger.warn as unknown as MockFn; + +function callsContaining(mock: MockFn, substring: string): unknown[][] { + return mock.mock.calls.filter( + (args) => typeof args[0] === 'string' && (args[0] as string).includes(substring), + ); +} + +beforeEach(() => { + jest.clearAllMocks(); + jest.useFakeTimers(); + memoryDiagnostics.stop(); + + const snaps = memoryDiagnostics.getSnapshots() as unknown[]; + snaps.length = 0; +}); + +afterEach(() => { + memoryDiagnostics.stop(); + jest.useRealTimers(); +}); + +describe('memoryDiagnostics', () => { + describe('collectSnapshot', () => { + it('pushes a snapshot with expected shape', () => { + memoryDiagnostics.collectSnapshot(); + + const snaps = memoryDiagnostics.getSnapshots(); + expect(snaps).toHaveLength(1); + expect(snaps[0]).toEqual( + expect.objectContaining({ + ts: expect.any(Number), + rss: expect.any(Number), + heapUsed: expect.any(Number), + heapTotal: expect.any(Number), + external: expect.any(Number), + arrayBuffers: expect.any(Number), + }), + ); + }); + + it('caps history at 120 snapshots', () => { + for (let i = 0; i < 130; i++) { + memoryDiagnostics.collectSnapshot(); + } + expect(memoryDiagnostics.getSnapshots()).toHaveLength(120); + }); + + it('does not log trend with fewer than 3 snapshots', () => { + memoryDiagnostics.collectSnapshot(); + memoryDiagnostics.collectSnapshot(); + + expect(callsContaining(debugMock, 'Trend')).toHaveLength(0); + }); + + it('skips trend when elapsed time is under 0.1 minutes', () => { + memoryDiagnostics.collectSnapshot(); + memoryDiagnostics.collectSnapshot(); + memoryDiagnostics.collectSnapshot(); + + expect(callsContaining(debugMock, 'Trend')).toHaveLength(0); + }); + + it('logs trend data when enough time has elapsed', () => { + memoryDiagnostics.collectSnapshot(); + + jest.advanceTimersByTime(7_000); + memoryDiagnostics.collectSnapshot(); + + jest.advanceTimersByTime(7_000); + memoryDiagnostics.collectSnapshot(); + + const trendCalls = callsContaining(debugMock, 'Trend'); + expect(trendCalls.length).toBeGreaterThanOrEqual(1); + + const trendPayload = trendCalls[0][1] as Record; + expect(trendPayload).toHaveProperty('rssRate'); + expect(trendPayload).toHaveProperty('heapRate'); + expect(trendPayload.rssRate).toMatch(/MB\/hr$/); + expect(trendPayload.heapRate).toMatch(/MB\/hr$/); + expect(trendPayload.rssRate).not.toBe('Infinity MB/hr'); + expect(trendPayload.heapRate).not.toBe('Infinity MB/hr'); + }); + }); + + describe('start / stop', () => { + it('start is idempotent — calling twice does not create two intervals', () => { + memoryDiagnostics.start(); + memoryDiagnostics.start(); + + expect(callsContaining(infoMock, 'Starting')).toHaveLength(1); + }); + + it('stop is idempotent — calling twice does not error', () => { + memoryDiagnostics.start(); + memoryDiagnostics.stop(); + memoryDiagnostics.stop(); + + expect(callsContaining(infoMock, 'Stopped')).toHaveLength(1); + }); + + it('collects an immediate snapshot on start', () => { + expect(memoryDiagnostics.getSnapshots()).toHaveLength(0); + memoryDiagnostics.start(); + expect(memoryDiagnostics.getSnapshots().length).toBeGreaterThanOrEqual(1); + }); + }); + + describe('forceGC', () => { + it('returns false and warns when gc is not exposed', () => { + const origGC = global.gc; + global.gc = undefined; + + const result = memoryDiagnostics.forceGC(); + + expect(result).toBe(false); + expect(warnMock).toHaveBeenCalledWith(expect.stringContaining('GC not exposed')); + + global.gc = origGC; + }); + + it('calls gc and returns true when gc is exposed', () => { + const mockGC = jest.fn(); + global.gc = mockGC; + + const result = memoryDiagnostics.forceGC(); + + expect(result).toBe(true); + expect(mockGC).toHaveBeenCalledTimes(1); + expect(infoMock).toHaveBeenCalledWith(expect.stringContaining('Forced garbage collection')); + + global.gc = undefined; + }); + }); +}); diff --git a/packages/api/src/utils/__tests__/tracing.test.ts b/packages/api/src/utils/__tests__/tracing.test.ts new file mode 100644 index 0000000000..679b28e327 --- /dev/null +++ b/packages/api/src/utils/__tests__/tracing.test.ts @@ -0,0 +1,137 @@ +import { AsyncLocalStorage } from 'node:async_hooks'; + +const TRACING_ALS_KEY = Symbol.for('ls:tracing_async_local_storage'); +const typedGlobal = globalThis as typeof globalThis & Record>; + +let originalStorage: AsyncLocalStorage | undefined; + +beforeEach(() => { + originalStorage = typedGlobal[TRACING_ALS_KEY]; + jest.restoreAllMocks(); +}); + +afterEach(() => { + if (originalStorage) { + typedGlobal[TRACING_ALS_KEY] = originalStorage; + } else { + delete typedGlobal[TRACING_ALS_KEY]; + } + delete process.env.LANGCHAIN_TRACING_V2; +}); + +async function freshImport(): Promise { + jest.resetModules(); + return import('../tracing'); +} + +describe('runOutsideTracing', () => { + it('clears the ALS context to undefined inside fn', async () => { + const als = new AsyncLocalStorage(); + typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage; + + const { runOutsideTracing } = await freshImport(); + + let captured: string | undefined = 'NOT_CLEARED'; + als.run('should-not-propagate', () => { + runOutsideTracing(() => { + captured = als.getStore(); + }); + }); + + expect(captured).toBeUndefined(); + }); + + it('returns the value produced by fn (sync)', async () => { + const als = new AsyncLocalStorage(); + typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage; + + const { runOutsideTracing } = await freshImport(); + + const result = als.run('ctx', () => runOutsideTracing(() => 42)); + expect(result).toBe(42); + }); + + it('returns the promise produced by fn (async)', async () => { + const als = new AsyncLocalStorage(); + typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage; + + const { runOutsideTracing } = await freshImport(); + + const result = await als.run('ctx', () => + runOutsideTracing(async () => { + await Promise.resolve(); + return 'async-value'; + }), + ); + expect(result).toBe('async-value'); + }); + + it('propagates sync errors thrown inside fn', async () => { + const als = new AsyncLocalStorage(); + typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage; + + const { runOutsideTracing } = await freshImport(); + + expect(() => + runOutsideTracing(() => { + throw new Error('boom'); + }), + ).toThrow('boom'); + }); + + it('propagates async rejections from fn', async () => { + const als = new AsyncLocalStorage(); + typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage; + + const { runOutsideTracing } = await freshImport(); + + await expect( + runOutsideTracing(async () => { + throw new Error('async-boom'); + }), + ).rejects.toThrow('async-boom'); + }); + + it('falls back to fn() when ALS is not on globalThis', async () => { + delete typedGlobal[TRACING_ALS_KEY]; + + const { runOutsideTracing } = await freshImport(); + + const result = runOutsideTracing(() => 'fallback'); + expect(result).toBe('fallback'); + }); + + it('does not warn when LANGCHAIN_TRACING_V2 is not set', async () => { + delete typedGlobal[TRACING_ALS_KEY]; + delete process.env.LANGCHAIN_TRACING_V2; + + const warnSpy = jest.fn(); + jest.resetModules(); + jest.doMock('@librechat/data-schemas', () => ({ + logger: { warn: warnSpy }, + })); + const { runOutsideTracing } = await import('../tracing'); + + runOutsideTracing(() => 'ok'); + expect(warnSpy).not.toHaveBeenCalled(); + }); + + it('warns once when LANGCHAIN_TRACING_V2 is set but ALS is missing', async () => { + delete typedGlobal[TRACING_ALS_KEY]; + process.env.LANGCHAIN_TRACING_V2 = 'true'; + + const warnSpy = jest.fn(); + jest.resetModules(); + jest.doMock('@librechat/data-schemas', () => ({ + logger: { warn: warnSpy }, + })); + const { runOutsideTracing } = await import('../tracing'); + + runOutsideTracing(() => 'first'); + runOutsideTracing(() => 'second'); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining('LANGCHAIN_TRACING_V2 is set but ALS not found'), + ); + }); +}); diff --git a/packages/api/src/utils/index.ts b/packages/api/src/utils/index.ts index d4351eb5a0..470780cd5c 100644 --- a/packages/api/src/utils/index.ts +++ b/packages/api/src/utils/index.ts @@ -25,3 +25,4 @@ export * from './http'; export * from './tokens'; export * from './url'; export * from './message'; +export * from './tracing'; diff --git a/packages/api/src/utils/memory.ts b/packages/api/src/utils/memory.ts new file mode 100644 index 0000000000..214548d14b --- /dev/null +++ b/packages/api/src/utils/memory.ts @@ -0,0 +1,150 @@ +import { logger } from '@librechat/data-schemas'; +import { GenerationJobManager } from '~/stream'; +import { OAuthReconnectionManager } from '~/mcp/oauth/OAuthReconnectionManager'; +import { MCPManager } from '~/mcp/MCPManager'; + +type ConnectionStats = ReturnType['getConnectionStats']>; +type TrackerStats = ReturnType['getTrackerStats']>; +type RuntimeStats = ReturnType<(typeof GenerationJobManager)['getRuntimeStats']>; + +const INTERVAL_MS = 60_000; +const SNAPSHOT_HISTORY_LIMIT = 120; + +interface MemorySnapshot { + ts: number; + rss: number; + heapUsed: number; + heapTotal: number; + external: number; + arrayBuffers: number; + mcpConnections: ConnectionStats | null; + oauthTracker: TrackerStats | null; + generationJobs: RuntimeStats | null; +} + +const snapshots: MemorySnapshot[] = []; +let interval: NodeJS.Timeout | null = null; + +function toMB(bytes: number): string { + return (bytes / 1024 / 1024).toFixed(2); +} + +function getMCPStats(): { + mcpConnections: ConnectionStats | null; + oauthTracker: TrackerStats | null; +} { + let mcpConnections: ConnectionStats | null = null; + let oauthTracker: TrackerStats | null = null; + + try { + mcpConnections = MCPManager.getInstance().getConnectionStats(); + } catch { + /* not initialized yet */ + } + + try { + oauthTracker = OAuthReconnectionManager.getInstance().getTrackerStats(); + } catch { + /* not initialized yet */ + } + + return { mcpConnections, oauthTracker }; +} + +function getJobStats(): { generationJobs: RuntimeStats | null } { + try { + return { generationJobs: GenerationJobManager.getRuntimeStats() }; + } catch { + return { generationJobs: null }; + } +} + +function collectSnapshot(): void { + const mem = process.memoryUsage(); + const mcpStats = getMCPStats(); + const jobStats = getJobStats(); + + const snapshot: MemorySnapshot = { + ts: Date.now(), + rss: mem.rss, + heapUsed: mem.heapUsed, + heapTotal: mem.heapTotal, + external: mem.external, + arrayBuffers: mem.arrayBuffers ?? 0, + ...mcpStats, + ...jobStats, + }; + + snapshots.push(snapshot); + if (snapshots.length > SNAPSHOT_HISTORY_LIMIT) { + snapshots.shift(); + } + + logger.debug('[MemDiag] Snapshot', { + rss: `${toMB(mem.rss)} MB`, + heapUsed: `${toMB(mem.heapUsed)} MB`, + heapTotal: `${toMB(mem.heapTotal)} MB`, + external: `${toMB(mem.external)} MB`, + arrayBuffers: `${toMB(mem.arrayBuffers ?? 0)} MB`, + mcp: mcpStats, + jobs: jobStats, + snapshotCount: snapshots.length, + }); + + if (snapshots.length < 3) { + return; + } + + const first = snapshots[0]; + const last = snapshots[snapshots.length - 1]; + const elapsedMin = (last.ts - first.ts) / 60_000; + if (elapsedMin < 0.1) { + return; + } + const rssDelta = last.rss - first.rss; + const heapDelta = last.heapUsed - first.heapUsed; + logger.debug('[MemDiag] Trend', { + overMinutes: elapsedMin.toFixed(1), + rssDelta: `${toMB(rssDelta)} MB`, + heapDelta: `${toMB(heapDelta)} MB`, + rssRate: `${toMB((rssDelta / elapsedMin) * 60)} MB/hr`, + heapRate: `${toMB((heapDelta / elapsedMin) * 60)} MB/hr`, + }); +} + +function forceGC(): boolean { + if (global.gc) { + global.gc(); + logger.info('[MemDiag] Forced garbage collection'); + return true; + } + logger.warn('[MemDiag] GC not exposed. Start with --expose-gc to enable.'); + return false; +} + +function getSnapshots(): readonly MemorySnapshot[] { + return snapshots; +} + +function start(): void { + if (interval) { + return; + } + logger.info(`[MemDiag] Starting memory diagnostics (interval: ${INTERVAL_MS / 1000}s)`); + collectSnapshot(); + interval = setInterval(collectSnapshot, INTERVAL_MS); + if (interval.unref) { + interval.unref(); + } +} + +function stop(): void { + if (!interval) { + return; + } + clearInterval(interval); + interval = null; + logger.info('[MemDiag] Stopped memory diagnostics'); +} + +export const memoryDiagnostics = { start, stop, forceGC, getSnapshots, collectSnapshot }; diff --git a/packages/api/src/utils/tracing.ts b/packages/api/src/utils/tracing.ts new file mode 100644 index 0000000000..6a82caf092 --- /dev/null +++ b/packages/api/src/utils/tracing.ts @@ -0,0 +1,31 @@ +import { logger } from '@librechat/data-schemas'; +import { AsyncLocalStorage } from 'node:async_hooks'; +import { isEnabled } from '~/utils/common'; + +/** @see https://github.com/langchain-ai/langchainjs — @langchain/core RunTree ALS */ +const TRACING_ALS_KEY = Symbol.for('ls:tracing_async_local_storage'); + +let warnedMissing = false; + +/** + * Runs `fn` outside the LangGraph/LangSmith tracing AsyncLocalStorage context + * so I/O handles (child processes, sockets, timers) created during `fn` + * do not permanently retain the RunTree → graph config → message data chain. + * + * Relies on the private symbol `ls:tracing_async_local_storage` from `@langchain/core`. + * If the symbol is absent, falls back to calling `fn()` directly. + */ +export function runOutsideTracing(fn: () => T): T { + const storage = (globalThis as typeof globalThis & Record>)[ + TRACING_ALS_KEY + ]; + if (!storage && !warnedMissing && isEnabled(process.env.LANGCHAIN_TRACING_V2)) { + warnedMissing = true; + logger.warn( + '[runOutsideTracing] LANGCHAIN_TRACING_V2 is set but ALS not found — ' + + 'runOutsideTracing will be a no-op. ' + + 'Verify @langchain/core version still uses Symbol.for("ls:tracing_async_local_storage").', + ); + } + return storage ? storage.run(undefined as unknown, fn) : fn(); +}