diff --git a/.env.example b/.env.example index f6d2ec271f..3e94a0c63a 100644 --- a/.env.example +++ b/.env.example @@ -65,6 +65,9 @@ CONSOLE_JSON=false DEBUG_LOGGING=true DEBUG_CONSOLE=false +# Enable memory diagnostics (logs heap/RSS snapshots every 60s, auto-enabled with --inspect) +# MEM_DIAG=true + #=============# # Permissions # #=============# diff --git a/.github/workflows/backend-review.yml b/.github/workflows/backend-review.yml index 2379b8fee7..e151087790 100644 --- a/.github/workflows/backend-review.yml +++ b/.github/workflows/backend-review.yml @@ -42,8 +42,14 @@ jobs: - name: Install Data Schemas Package run: npm run build:data-schemas - - name: Install API Package - run: npm run build:api + - name: Build API Package & Detect Circular Dependencies + run: | + output=$(npm run build:api 2>&1) + echo "$output" + if echo "$output" | grep -q "Circular depend"; then + echo "Error: Circular dependency detected in @librechat/api!" + exit 1 + fi - name: Create empty auth.json file run: | diff --git a/api/package.json b/api/package.json index 7c3c1045ed..1447087b38 100644 --- a/api/package.json +++ b/api/package.json @@ -44,7 +44,7 @@ "@google/genai": "^1.19.0", "@keyv/redis": "^4.3.3", "@langchain/core": "^0.3.80", - "@librechat/agents": "^3.1.51", + "@librechat/agents": "^3.1.52", "@librechat/api": "*", "@librechat/data-schemas": "*", "@microsoft/microsoft-graph-client": "^3.0.7", diff --git a/api/server/cleanup.js b/api/server/cleanup.js index c482a2267e..364c02cd8a 100644 --- a/api/server/cleanup.js +++ b/api/server/cleanup.js @@ -35,7 +35,6 @@ const graphPropsToClean = [ 'tools', 'signal', 'config', - 'agentContexts', 'messages', 'contentData', 'stepKeyIds', @@ -277,7 +276,16 @@ function disposeClient(client) { if (client.run) { if (client.run.Graph) { - client.run.Graph.resetValues(); + if (typeof client.run.Graph.clearHeavyState === 'function') { + client.run.Graph.clearHeavyState(); + } else { + client.run.Graph.resetValues(); + } + + if (client.run.Graph.agentContexts) { + client.run.Graph.agentContexts.clear(); + client.run.Graph.agentContexts = null; + } graphPropsToClean.forEach((prop) => { if (client.run.Graph[prop] !== undefined) { diff --git a/api/server/controllers/agents/client.js b/api/server/controllers/agents/client.js index 49240a6b3b..7aea6d1e8f 100644 --- a/api/server/controllers/agents/client.js +++ b/api/server/controllers/agents/client.js @@ -891,9 +891,10 @@ class AgentClient extends BaseClient { config.signal = null; }; + const hideSequentialOutputs = config.configurable.hide_sequential_outputs; await runAgents(initialMessages); /** @deprecated Agent Chain */ - if (config.configurable.hide_sequential_outputs) { + if (hideSequentialOutputs) { this.contentParts = this.contentParts.filter((part, index) => { // Include parts that are either: // 1. At or after the finalContentStart index diff --git a/api/server/index.js b/api/server/index.js index 193eb423ad..2aff26ceaf 100644 --- a/api/server/index.js +++ b/api/server/index.js @@ -13,11 +13,12 @@ const mongoSanitize = require('express-mongo-sanitize'); const { isEnabled, ErrorController, + memoryDiagnostics, performStartupChecks, handleJsonParseError, - initializeFileStorage, GenerationJobManager, createStreamServices, + initializeFileStorage, } = require('@librechat/api'); const { connectDb, indexSync } = require('~/db'); const initializeOAuthReconnectManager = require('./services/initializeOAuthReconnectManager'); @@ -201,6 +202,11 @@ const startServer = async () => { const streamServices = createStreamServices(); GenerationJobManager.configure(streamServices); GenerationJobManager.initialize(); + + const inspectFlags = process.execArgv.some((arg) => arg.startsWith('--inspect')); + if (inspectFlags || isEnabled(process.env.MEM_DIAG)) { + memoryDiagnostics.start(); + } }); }; diff --git a/package-lock.json b/package-lock.json index 3a875f9fb7..bbb379c4d4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -59,7 +59,7 @@ "@google/genai": "^1.19.0", "@keyv/redis": "^4.3.3", "@langchain/core": "^0.3.80", - "@librechat/agents": "^3.1.51", + "@librechat/agents": "^3.1.52", "@librechat/api": "*", "@librechat/data-schemas": "*", "@microsoft/microsoft-graph-client": "^3.0.7", @@ -11859,9 +11859,9 @@ } }, "node_modules/@librechat/agents": { - "version": "3.1.51", - "resolved": "https://registry.npmjs.org/@librechat/agents/-/agents-3.1.51.tgz", - "integrity": "sha512-inEcLCuD7YF0yCBrnxCgemg2oyRWJtCq49tLtokrD+WyWT97benSB+UyopjWh5woOsxSws3oc60d5mxRtifoLg==", + "version": "3.1.52", + "resolved": "https://registry.npmjs.org/@librechat/agents/-/agents-3.1.52.tgz", + "integrity": "sha512-Bg35zp+vEDZ0AEJQPZ+ukWb/UqBrsLcr3YQWRQpuvpftEgfQz0fHM5Wrxn6l5P7PvaD1ViolxoG44nggjCt7Hw==", "license": "MIT", "dependencies": { "@anthropic-ai/sdk": "^0.73.0", @@ -43719,7 +43719,7 @@ "@google/genai": "^1.19.0", "@keyv/redis": "^4.3.3", "@langchain/core": "^0.3.80", - "@librechat/agents": "^3.1.51", + "@librechat/agents": "^3.1.52", "@librechat/data-schemas": "*", "@modelcontextprotocol/sdk": "^1.27.1", "@smithy/node-http-handler": "^4.4.5", diff --git a/package.json b/package.json index 4791843326..02a46df399 100644 --- a/package.json +++ b/package.json @@ -38,7 +38,7 @@ "update-banner": "node config/update-banner.js", "delete-banner": "node config/delete-banner.js", "backend": "cross-env NODE_ENV=production node api/server/index.js", - "backend:inspect": "cross-env NODE_ENV=production node --inspect api/server/index.js", + "backend:inspect": "cross-env NODE_ENV=production node --inspect --expose-gc api/server/index.js", "backend:dev": "cross-env NODE_ENV=development npx nodemon api/server/index.js", "backend:experimental": "cross-env NODE_ENV=production node api/server/experimental.js", "backend:stop": "node config/stop-backend.js", diff --git a/packages/api/package.json b/packages/api/package.json index 8e55d8d901..1854457b42 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -90,7 +90,7 @@ "@google/genai": "^1.19.0", "@keyv/redis": "^4.3.3", "@langchain/core": "^0.3.80", - "@librechat/agents": "^3.1.51", + "@librechat/agents": "^3.1.52", "@librechat/data-schemas": "*", "@modelcontextprotocol/sdk": "^1.27.1", "@smithy/node-http-handler": "^4.4.5", diff --git a/packages/api/src/agents/handlers.ts b/packages/api/src/agents/handlers.ts index 62200b1a46..07c68c9d8a 100644 --- a/packages/api/src/agents/handlers.ts +++ b/packages/api/src/agents/handlers.ts @@ -9,6 +9,7 @@ import type { ToolExecuteBatchRequest, } from '@librechat/agents'; import type { StructuredToolInterface } from '@langchain/core/tools'; +import { runOutsideTracing } from '~/utils'; export interface ToolEndCallbackData { output: { @@ -57,110 +58,122 @@ export function createToolExecuteHandler(options: ToolExecuteOptions): EventHand const { toolCalls, agentId, configurable, metadata, resolve, reject } = data; try { - const toolNames = [...new Set(toolCalls.map((tc: ToolCallRequest) => tc.name))]; - const { loadedTools, configurable: toolConfigurable } = await loadTools(toolNames, agentId); - const toolMap = new Map(loadedTools.map((t) => [t.name, t])); - const mergedConfigurable = { ...configurable, ...toolConfigurable }; + await runOutsideTracing(async () => { + try { + const toolNames = [...new Set(toolCalls.map((tc: ToolCallRequest) => tc.name))]; + const { loadedTools, configurable: toolConfigurable } = await loadTools( + toolNames, + agentId, + ); + const toolMap = new Map(loadedTools.map((t) => [t.name, t])); + const mergedConfigurable = { ...configurable, ...toolConfigurable }; - const results: ToolExecuteResult[] = await Promise.all( - toolCalls.map(async (tc: ToolCallRequest) => { - const tool = toolMap.get(tc.name); + const results: ToolExecuteResult[] = await Promise.all( + toolCalls.map(async (tc: ToolCallRequest) => { + const tool = toolMap.get(tc.name); - if (!tool) { - logger.warn( - `[ON_TOOL_EXECUTE] Tool "${tc.name}" not found. Available: ${[...toolMap.keys()].join(', ')}`, - ); - return { - toolCallId: tc.id, - status: 'error' as const, - content: '', - errorMessage: `Tool ${tc.name} not found`, - }; - } - - try { - const toolCallConfig: Record = { - id: tc.id, - stepId: tc.stepId, - turn: tc.turn, - }; - - if ( - tc.codeSessionContext && - (tc.name === Constants.EXECUTE_CODE || - tc.name === Constants.PROGRAMMATIC_TOOL_CALLING) - ) { - toolCallConfig.session_id = tc.codeSessionContext.session_id; - if (tc.codeSessionContext.files && tc.codeSessionContext.files.length > 0) { - toolCallConfig._injected_files = tc.codeSessionContext.files; - } - } - - if (tc.name === Constants.PROGRAMMATIC_TOOL_CALLING) { - const toolRegistry = mergedConfigurable?.toolRegistry as LCToolRegistry | undefined; - const ptcToolMap = mergedConfigurable?.ptcToolMap as - | Map - | undefined; - if (toolRegistry) { - const toolDefs: LCTool[] = Array.from(toolRegistry.values()).filter( - (t) => - t.name !== Constants.PROGRAMMATIC_TOOL_CALLING && - t.name !== Constants.TOOL_SEARCH, + if (!tool) { + logger.warn( + `[ON_TOOL_EXECUTE] Tool "${tc.name}" not found. Available: ${[...toolMap.keys()].join(', ')}`, ); - toolCallConfig.toolDefs = toolDefs; - toolCallConfig.toolMap = ptcToolMap ?? toolMap; + return { + toolCallId: tc.id, + status: 'error' as const, + content: '', + errorMessage: `Tool ${tc.name} not found`, + }; } - } - const result = await tool.invoke(tc.args, { - toolCall: toolCallConfig, - configurable: mergedConfigurable, - metadata, - } as Record); + try { + const toolCallConfig: Record = { + id: tc.id, + stepId: tc.stepId, + turn: tc.turn, + }; - if (toolEndCallback) { - await toolEndCallback( - { - output: { - name: tc.name, - tool_call_id: tc.id, - content: result.content, - artifact: result.artifact, - }, - }, - { - run_id: (metadata as Record)?.run_id as string | undefined, - thread_id: (metadata as Record)?.thread_id as - | string - | undefined, - ...metadata, - }, - ); - } + if ( + tc.codeSessionContext && + (tc.name === Constants.EXECUTE_CODE || + tc.name === Constants.PROGRAMMATIC_TOOL_CALLING) + ) { + toolCallConfig.session_id = tc.codeSessionContext.session_id; + if (tc.codeSessionContext.files && tc.codeSessionContext.files.length > 0) { + toolCallConfig._injected_files = tc.codeSessionContext.files; + } + } - return { - toolCallId: tc.id, - content: result.content, - artifact: result.artifact, - status: 'success' as const, - }; - } catch (toolError) { - const error = toolError as Error; - logger.error(`[ON_TOOL_EXECUTE] Tool ${tc.name} error:`, error); - return { - toolCallId: tc.id, - status: 'error' as const, - content: '', - errorMessage: error.message, - }; - } - }), - ); + if (tc.name === Constants.PROGRAMMATIC_TOOL_CALLING) { + const toolRegistry = mergedConfigurable?.toolRegistry as + | LCToolRegistry + | undefined; + const ptcToolMap = mergedConfigurable?.ptcToolMap as + | Map + | undefined; + if (toolRegistry) { + const toolDefs: LCTool[] = Array.from(toolRegistry.values()).filter( + (t) => + t.name !== Constants.PROGRAMMATIC_TOOL_CALLING && + t.name !== Constants.TOOL_SEARCH, + ); + toolCallConfig.toolDefs = toolDefs; + toolCallConfig.toolMap = ptcToolMap ?? toolMap; + } + } - resolve(results); - } catch (error) { - logger.error('[ON_TOOL_EXECUTE] Fatal error:', error); - reject(error as Error); + const result = await tool.invoke(tc.args, { + toolCall: toolCallConfig, + configurable: mergedConfigurable, + metadata, + } as Record); + + if (toolEndCallback) { + await toolEndCallback( + { + output: { + name: tc.name, + tool_call_id: tc.id, + content: result.content, + artifact: result.artifact, + }, + }, + { + run_id: (metadata as Record)?.run_id as string | undefined, + thread_id: (metadata as Record)?.thread_id as + | string + | undefined, + ...metadata, + }, + ); + } + + return { + toolCallId: tc.id, + content: result.content, + artifact: result.artifact, + status: 'success' as const, + }; + } catch (toolError) { + const error = toolError as Error; + logger.error(`[ON_TOOL_EXECUTE] Tool ${tc.name} error:`, error); + return { + toolCallId: tc.id, + status: 'error' as const, + content: '', + errorMessage: error.message, + }; + } + }), + ); + + resolve(results); + } catch (error) { + logger.error('[ON_TOOL_EXECUTE] Fatal error:', error); + reject(error as Error); + } + }); + } catch (outerError) { + logger.error('[ON_TOOL_EXECUTE] Unexpected error:', outerError); + reject(outerError as Error); } }, }; diff --git a/packages/api/src/index.ts b/packages/api/src/index.ts index fefdafaefd..a7edb3882d 100644 --- a/packages/api/src/index.ts +++ b/packages/api/src/index.ts @@ -43,6 +43,8 @@ export * from './web'; export * from './cache'; /* Stream */ export * from './stream'; +/* Diagnostics */ +export { memoryDiagnostics } from './utils/memory'; /* types */ export type * from './mcp/types'; export type * from './flow/types'; diff --git a/packages/api/src/mcp/ConnectionsRepository.ts b/packages/api/src/mcp/ConnectionsRepository.ts index b14af57b29..3e0c2aca2d 100644 --- a/packages/api/src/mcp/ConnectionsRepository.ts +++ b/packages/api/src/mcp/ConnectionsRepository.ts @@ -25,6 +25,11 @@ export class ConnectionsRepository { this.oauthOpts = oauthOpts; } + /** Returns the number of active connections in this repository */ + public getConnectionCount(): number { + return this.connections.size; + } + /** Checks whether this repository can connect to a specific server */ async has(serverName: string): Promise { const config = await MCPServersRegistry.getInstance().getServerConfig(serverName, this.ownerId); diff --git a/packages/api/src/mcp/UserConnectionManager.ts b/packages/api/src/mcp/UserConnectionManager.ts index e5d94689a0..1b90072618 100644 --- a/packages/api/src/mcp/UserConnectionManager.ts +++ b/packages/api/src/mcp/UserConnectionManager.ts @@ -237,4 +237,23 @@ export abstract class UserConnectionManager { } } } + + /** Returns counts of tracked users and connections for diagnostics */ + public getConnectionStats(): { + trackedUsers: number; + totalConnections: number; + activityEntries: number; + appConnectionCount: number; + } { + let totalConnections = 0; + for (const serverMap of this.userConnections.values()) { + totalConnections += serverMap.size; + } + return { + trackedUsers: this.userConnections.size, + totalConnections, + activityEntries: this.userLastActivity.size, + appConnectionCount: this.appConnections?.getConnectionCount() ?? 0, + }; + } } diff --git a/packages/api/src/mcp/connection.ts b/packages/api/src/mcp/connection.ts index 5744059708..8ac55224f8 100644 --- a/packages/api/src/mcp/connection.ts +++ b/packages/api/src/mcp/connection.ts @@ -18,10 +18,11 @@ import type { Response as UndiciResponse, } from 'undici'; import type { MCPOAuthTokens } from './oauth/types'; -import { withTimeout } from '~/utils/promise'; import type * as t from './types'; import { createSSRFSafeUndiciConnect, resolveHostnameSSRF } from '~/auth'; +import { runOutsideTracing } from '~/utils/tracing'; import { sanitizeUrlForLogging } from './utils'; +import { withTimeout } from '~/utils/promise'; import { mcpConfig } from './mcpConfig'; type FetchLike = (url: string | URL, init?: RequestInit) => Promise; @@ -698,14 +699,16 @@ export class MCPConnection extends EventEmitter { await this.closeAgents(); } - this.transport = await this.constructTransport(this.options); + this.transport = await runOutsideTracing(() => this.constructTransport(this.options)); this.setupTransportDebugHandlers(); const connectTimeout = this.options.initTimeout ?? 120000; - await withTimeout( - this.client.connect(this.transport), - connectTimeout, - `Connection timeout after ${connectTimeout}ms`, + await runOutsideTracing(() => + withTimeout( + this.client.connect(this.transport!), + connectTimeout, + `Connection timeout after ${connectTimeout}ms`, + ), ); this.connectionState = 'connected'; diff --git a/packages/api/src/mcp/oauth/OAuthReconnectionManager.ts b/packages/api/src/mcp/oauth/OAuthReconnectionManager.ts index ca9ce5c71f..f14c4abf15 100644 --- a/packages/api/src/mcp/oauth/OAuthReconnectionManager.ts +++ b/packages/api/src/mcp/oauth/OAuthReconnectionManager.ts @@ -147,6 +147,10 @@ export class OAuthReconnectionManager { } } + public getTrackerStats() { + return this.reconnectionsTracker.getStats(); + } + private async canReconnect(userId: string, serverName: string) { if (this.mcpManager == null) { return false; diff --git a/packages/api/src/mcp/oauth/OAuthReconnectionTracker.ts b/packages/api/src/mcp/oauth/OAuthReconnectionTracker.ts index b65f8ad115..9f6ef4abd3 100644 --- a/packages/api/src/mcp/oauth/OAuthReconnectionTracker.ts +++ b/packages/api/src/mcp/oauth/OAuthReconnectionTracker.ts @@ -86,4 +86,17 @@ export class OAuthReconnectionTracker { const key = `${userId}:${serverName}`; this.activeTimestamps.delete(key); } + + /** Returns map sizes for diagnostics */ + public getStats(): { + usersWithFailedServers: number; + usersWithActiveReconnections: number; + activeTimestamps: number; + } { + return { + usersWithFailedServers: this.failed.size, + usersWithActiveReconnections: this.active.size, + activeTimestamps: this.activeTimestamps.size, + }; + } } diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 815133d616..cd5ff04eb0 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -1142,6 +1142,19 @@ class GenerationJobManagerClass { return this.jobStore.getJobCount(); } + /** Returns sizes of internal runtime maps for diagnostics */ + getRuntimeStats(): { + runtimeStateSize: number; + runStepBufferSize: number; + eventTransportStreams: number; + } { + return { + runtimeStateSize: this.runtimeState.size, + runStepBufferSize: this.runStepBuffers?.size ?? 0, + eventTransportStreams: this.eventTransport.getTrackedStreamIds().length, + }; + } + /** * Get job count by status. */ diff --git a/packages/api/src/utils/__tests__/memory.test.ts b/packages/api/src/utils/__tests__/memory.test.ts new file mode 100644 index 0000000000..c821088856 --- /dev/null +++ b/packages/api/src/utils/__tests__/memory.test.ts @@ -0,0 +1,173 @@ +jest.mock('@librechat/data-schemas', () => ({ + logger: { + debug: jest.fn(), + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + }, +})); + +jest.mock('~/stream', () => ({ + GenerationJobManager: { + getRuntimeStats: jest.fn(() => null), + }, +})); + +jest.mock('~/mcp/oauth/OAuthReconnectionManager', () => ({ + OAuthReconnectionManager: { + getInstance: jest.fn(() => ({ + getTrackerStats: jest.fn(() => null), + })), + }, +})); + +jest.mock('~/mcp/MCPManager', () => ({ + MCPManager: { + getInstance: jest.fn(() => ({ + getConnectionStats: jest.fn(() => null), + })), + }, +})); + +import { logger } from '@librechat/data-schemas'; +import { memoryDiagnostics } from '../memory'; + +type MockFn = jest.Mock; + +const debugMock = logger.debug as unknown as MockFn; +const infoMock = logger.info as unknown as MockFn; +const warnMock = logger.warn as unknown as MockFn; + +function callsContaining(mock: MockFn, substring: string): unknown[][] { + return mock.mock.calls.filter( + (args) => typeof args[0] === 'string' && (args[0] as string).includes(substring), + ); +} + +beforeEach(() => { + jest.clearAllMocks(); + jest.useFakeTimers(); + memoryDiagnostics.stop(); + + const snaps = memoryDiagnostics.getSnapshots() as unknown[]; + snaps.length = 0; +}); + +afterEach(() => { + memoryDiagnostics.stop(); + jest.useRealTimers(); +}); + +describe('memoryDiagnostics', () => { + describe('collectSnapshot', () => { + it('pushes a snapshot with expected shape', () => { + memoryDiagnostics.collectSnapshot(); + + const snaps = memoryDiagnostics.getSnapshots(); + expect(snaps).toHaveLength(1); + expect(snaps[0]).toEqual( + expect.objectContaining({ + ts: expect.any(Number), + rss: expect.any(Number), + heapUsed: expect.any(Number), + heapTotal: expect.any(Number), + external: expect.any(Number), + arrayBuffers: expect.any(Number), + }), + ); + }); + + it('caps history at 120 snapshots', () => { + for (let i = 0; i < 130; i++) { + memoryDiagnostics.collectSnapshot(); + } + expect(memoryDiagnostics.getSnapshots()).toHaveLength(120); + }); + + it('does not log trend with fewer than 3 snapshots', () => { + memoryDiagnostics.collectSnapshot(); + memoryDiagnostics.collectSnapshot(); + + expect(callsContaining(debugMock, 'Trend')).toHaveLength(0); + }); + + it('skips trend when elapsed time is under 0.1 minutes', () => { + memoryDiagnostics.collectSnapshot(); + memoryDiagnostics.collectSnapshot(); + memoryDiagnostics.collectSnapshot(); + + expect(callsContaining(debugMock, 'Trend')).toHaveLength(0); + }); + + it('logs trend data when enough time has elapsed', () => { + memoryDiagnostics.collectSnapshot(); + + jest.advanceTimersByTime(7_000); + memoryDiagnostics.collectSnapshot(); + + jest.advanceTimersByTime(7_000); + memoryDiagnostics.collectSnapshot(); + + const trendCalls = callsContaining(debugMock, 'Trend'); + expect(trendCalls.length).toBeGreaterThanOrEqual(1); + + const trendPayload = trendCalls[0][1] as Record; + expect(trendPayload).toHaveProperty('rssRate'); + expect(trendPayload).toHaveProperty('heapRate'); + expect(trendPayload.rssRate).toMatch(/MB\/hr$/); + expect(trendPayload.heapRate).toMatch(/MB\/hr$/); + expect(trendPayload.rssRate).not.toBe('Infinity MB/hr'); + expect(trendPayload.heapRate).not.toBe('Infinity MB/hr'); + }); + }); + + describe('start / stop', () => { + it('start is idempotent — calling twice does not create two intervals', () => { + memoryDiagnostics.start(); + memoryDiagnostics.start(); + + expect(callsContaining(infoMock, 'Starting')).toHaveLength(1); + }); + + it('stop is idempotent — calling twice does not error', () => { + memoryDiagnostics.start(); + memoryDiagnostics.stop(); + memoryDiagnostics.stop(); + + expect(callsContaining(infoMock, 'Stopped')).toHaveLength(1); + }); + + it('collects an immediate snapshot on start', () => { + expect(memoryDiagnostics.getSnapshots()).toHaveLength(0); + memoryDiagnostics.start(); + expect(memoryDiagnostics.getSnapshots().length).toBeGreaterThanOrEqual(1); + }); + }); + + describe('forceGC', () => { + it('returns false and warns when gc is not exposed', () => { + const origGC = global.gc; + global.gc = undefined; + + const result = memoryDiagnostics.forceGC(); + + expect(result).toBe(false); + expect(warnMock).toHaveBeenCalledWith(expect.stringContaining('GC not exposed')); + + global.gc = origGC; + }); + + it('calls gc and returns true when gc is exposed', () => { + const mockGC = jest.fn(); + global.gc = mockGC; + + const result = memoryDiagnostics.forceGC(); + + expect(result).toBe(true); + expect(mockGC).toHaveBeenCalledTimes(1); + expect(infoMock).toHaveBeenCalledWith(expect.stringContaining('Forced garbage collection')); + + global.gc = undefined; + }); + }); +}); diff --git a/packages/api/src/utils/__tests__/tracing.test.ts b/packages/api/src/utils/__tests__/tracing.test.ts new file mode 100644 index 0000000000..679b28e327 --- /dev/null +++ b/packages/api/src/utils/__tests__/tracing.test.ts @@ -0,0 +1,137 @@ +import { AsyncLocalStorage } from 'node:async_hooks'; + +const TRACING_ALS_KEY = Symbol.for('ls:tracing_async_local_storage'); +const typedGlobal = globalThis as typeof globalThis & Record>; + +let originalStorage: AsyncLocalStorage | undefined; + +beforeEach(() => { + originalStorage = typedGlobal[TRACING_ALS_KEY]; + jest.restoreAllMocks(); +}); + +afterEach(() => { + if (originalStorage) { + typedGlobal[TRACING_ALS_KEY] = originalStorage; + } else { + delete typedGlobal[TRACING_ALS_KEY]; + } + delete process.env.LANGCHAIN_TRACING_V2; +}); + +async function freshImport(): Promise { + jest.resetModules(); + return import('../tracing'); +} + +describe('runOutsideTracing', () => { + it('clears the ALS context to undefined inside fn', async () => { + const als = new AsyncLocalStorage(); + typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage; + + const { runOutsideTracing } = await freshImport(); + + let captured: string | undefined = 'NOT_CLEARED'; + als.run('should-not-propagate', () => { + runOutsideTracing(() => { + captured = als.getStore(); + }); + }); + + expect(captured).toBeUndefined(); + }); + + it('returns the value produced by fn (sync)', async () => { + const als = new AsyncLocalStorage(); + typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage; + + const { runOutsideTracing } = await freshImport(); + + const result = als.run('ctx', () => runOutsideTracing(() => 42)); + expect(result).toBe(42); + }); + + it('returns the promise produced by fn (async)', async () => { + const als = new AsyncLocalStorage(); + typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage; + + const { runOutsideTracing } = await freshImport(); + + const result = await als.run('ctx', () => + runOutsideTracing(async () => { + await Promise.resolve(); + return 'async-value'; + }), + ); + expect(result).toBe('async-value'); + }); + + it('propagates sync errors thrown inside fn', async () => { + const als = new AsyncLocalStorage(); + typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage; + + const { runOutsideTracing } = await freshImport(); + + expect(() => + runOutsideTracing(() => { + throw new Error('boom'); + }), + ).toThrow('boom'); + }); + + it('propagates async rejections from fn', async () => { + const als = new AsyncLocalStorage(); + typedGlobal[TRACING_ALS_KEY] = als as AsyncLocalStorage; + + const { runOutsideTracing } = await freshImport(); + + await expect( + runOutsideTracing(async () => { + throw new Error('async-boom'); + }), + ).rejects.toThrow('async-boom'); + }); + + it('falls back to fn() when ALS is not on globalThis', async () => { + delete typedGlobal[TRACING_ALS_KEY]; + + const { runOutsideTracing } = await freshImport(); + + const result = runOutsideTracing(() => 'fallback'); + expect(result).toBe('fallback'); + }); + + it('does not warn when LANGCHAIN_TRACING_V2 is not set', async () => { + delete typedGlobal[TRACING_ALS_KEY]; + delete process.env.LANGCHAIN_TRACING_V2; + + const warnSpy = jest.fn(); + jest.resetModules(); + jest.doMock('@librechat/data-schemas', () => ({ + logger: { warn: warnSpy }, + })); + const { runOutsideTracing } = await import('../tracing'); + + runOutsideTracing(() => 'ok'); + expect(warnSpy).not.toHaveBeenCalled(); + }); + + it('warns once when LANGCHAIN_TRACING_V2 is set but ALS is missing', async () => { + delete typedGlobal[TRACING_ALS_KEY]; + process.env.LANGCHAIN_TRACING_V2 = 'true'; + + const warnSpy = jest.fn(); + jest.resetModules(); + jest.doMock('@librechat/data-schemas', () => ({ + logger: { warn: warnSpy }, + })); + const { runOutsideTracing } = await import('../tracing'); + + runOutsideTracing(() => 'first'); + runOutsideTracing(() => 'second'); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining('LANGCHAIN_TRACING_V2 is set but ALS not found'), + ); + }); +}); diff --git a/packages/api/src/utils/index.ts b/packages/api/src/utils/index.ts index d4351eb5a0..470780cd5c 100644 --- a/packages/api/src/utils/index.ts +++ b/packages/api/src/utils/index.ts @@ -25,3 +25,4 @@ export * from './http'; export * from './tokens'; export * from './url'; export * from './message'; +export * from './tracing'; diff --git a/packages/api/src/utils/memory.ts b/packages/api/src/utils/memory.ts new file mode 100644 index 0000000000..214548d14b --- /dev/null +++ b/packages/api/src/utils/memory.ts @@ -0,0 +1,150 @@ +import { logger } from '@librechat/data-schemas'; +import { GenerationJobManager } from '~/stream'; +import { OAuthReconnectionManager } from '~/mcp/oauth/OAuthReconnectionManager'; +import { MCPManager } from '~/mcp/MCPManager'; + +type ConnectionStats = ReturnType['getConnectionStats']>; +type TrackerStats = ReturnType['getTrackerStats']>; +type RuntimeStats = ReturnType<(typeof GenerationJobManager)['getRuntimeStats']>; + +const INTERVAL_MS = 60_000; +const SNAPSHOT_HISTORY_LIMIT = 120; + +interface MemorySnapshot { + ts: number; + rss: number; + heapUsed: number; + heapTotal: number; + external: number; + arrayBuffers: number; + mcpConnections: ConnectionStats | null; + oauthTracker: TrackerStats | null; + generationJobs: RuntimeStats | null; +} + +const snapshots: MemorySnapshot[] = []; +let interval: NodeJS.Timeout | null = null; + +function toMB(bytes: number): string { + return (bytes / 1024 / 1024).toFixed(2); +} + +function getMCPStats(): { + mcpConnections: ConnectionStats | null; + oauthTracker: TrackerStats | null; +} { + let mcpConnections: ConnectionStats | null = null; + let oauthTracker: TrackerStats | null = null; + + try { + mcpConnections = MCPManager.getInstance().getConnectionStats(); + } catch { + /* not initialized yet */ + } + + try { + oauthTracker = OAuthReconnectionManager.getInstance().getTrackerStats(); + } catch { + /* not initialized yet */ + } + + return { mcpConnections, oauthTracker }; +} + +function getJobStats(): { generationJobs: RuntimeStats | null } { + try { + return { generationJobs: GenerationJobManager.getRuntimeStats() }; + } catch { + return { generationJobs: null }; + } +} + +function collectSnapshot(): void { + const mem = process.memoryUsage(); + const mcpStats = getMCPStats(); + const jobStats = getJobStats(); + + const snapshot: MemorySnapshot = { + ts: Date.now(), + rss: mem.rss, + heapUsed: mem.heapUsed, + heapTotal: mem.heapTotal, + external: mem.external, + arrayBuffers: mem.arrayBuffers ?? 0, + ...mcpStats, + ...jobStats, + }; + + snapshots.push(snapshot); + if (snapshots.length > SNAPSHOT_HISTORY_LIMIT) { + snapshots.shift(); + } + + logger.debug('[MemDiag] Snapshot', { + rss: `${toMB(mem.rss)} MB`, + heapUsed: `${toMB(mem.heapUsed)} MB`, + heapTotal: `${toMB(mem.heapTotal)} MB`, + external: `${toMB(mem.external)} MB`, + arrayBuffers: `${toMB(mem.arrayBuffers ?? 0)} MB`, + mcp: mcpStats, + jobs: jobStats, + snapshotCount: snapshots.length, + }); + + if (snapshots.length < 3) { + return; + } + + const first = snapshots[0]; + const last = snapshots[snapshots.length - 1]; + const elapsedMin = (last.ts - first.ts) / 60_000; + if (elapsedMin < 0.1) { + return; + } + const rssDelta = last.rss - first.rss; + const heapDelta = last.heapUsed - first.heapUsed; + logger.debug('[MemDiag] Trend', { + overMinutes: elapsedMin.toFixed(1), + rssDelta: `${toMB(rssDelta)} MB`, + heapDelta: `${toMB(heapDelta)} MB`, + rssRate: `${toMB((rssDelta / elapsedMin) * 60)} MB/hr`, + heapRate: `${toMB((heapDelta / elapsedMin) * 60)} MB/hr`, + }); +} + +function forceGC(): boolean { + if (global.gc) { + global.gc(); + logger.info('[MemDiag] Forced garbage collection'); + return true; + } + logger.warn('[MemDiag] GC not exposed. Start with --expose-gc to enable.'); + return false; +} + +function getSnapshots(): readonly MemorySnapshot[] { + return snapshots; +} + +function start(): void { + if (interval) { + return; + } + logger.info(`[MemDiag] Starting memory diagnostics (interval: ${INTERVAL_MS / 1000}s)`); + collectSnapshot(); + interval = setInterval(collectSnapshot, INTERVAL_MS); + if (interval.unref) { + interval.unref(); + } +} + +function stop(): void { + if (!interval) { + return; + } + clearInterval(interval); + interval = null; + logger.info('[MemDiag] Stopped memory diagnostics'); +} + +export const memoryDiagnostics = { start, stop, forceGC, getSnapshots, collectSnapshot }; diff --git a/packages/api/src/utils/tracing.ts b/packages/api/src/utils/tracing.ts new file mode 100644 index 0000000000..6a82caf092 --- /dev/null +++ b/packages/api/src/utils/tracing.ts @@ -0,0 +1,31 @@ +import { logger } from '@librechat/data-schemas'; +import { AsyncLocalStorage } from 'node:async_hooks'; +import { isEnabled } from '~/utils/common'; + +/** @see https://github.com/langchain-ai/langchainjs — @langchain/core RunTree ALS */ +const TRACING_ALS_KEY = Symbol.for('ls:tracing_async_local_storage'); + +let warnedMissing = false; + +/** + * Runs `fn` outside the LangGraph/LangSmith tracing AsyncLocalStorage context + * so I/O handles (child processes, sockets, timers) created during `fn` + * do not permanently retain the RunTree → graph config → message data chain. + * + * Relies on the private symbol `ls:tracing_async_local_storage` from `@langchain/core`. + * If the symbol is absent, falls back to calling `fn()` directly. + */ +export function runOutsideTracing(fn: () => T): T { + const storage = (globalThis as typeof globalThis & Record>)[ + TRACING_ALS_KEY + ]; + if (!storage && !warnedMissing && isEnabled(process.env.LANGCHAIN_TRACING_V2)) { + warnedMissing = true; + logger.warn( + '[runOutsideTracing] LANGCHAIN_TRACING_V2 is set but ALS not found — ' + + 'runOutsideTracing will be a no-op. ' + + 'Verify @langchain/core version still uses Symbol.for("ls:tracing_async_local_storage").', + ); + } + return storage ? storage.run(undefined as unknown, fn) : fn(); +}