From db9399aae69c0ee41b6d1de6e3ee723781b90d8f Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Fri, 12 Dec 2025 20:06:55 -0500 Subject: [PATCH] refactor: Integrate streamId handling for improved resumable functionality for attachments - Added streamId parameter to various functions to support resumable mode in tool loading and memory processing. - Updated related methods to ensure proper handling of attachments and responses based on the presence of streamId, enhancing the overall streaming experience. - Improved logging and attachment management to accommodate both standard and resumable modes. --- api/server/controllers/agents/client.js | 2 ++ .../services/Endpoints/agents/initialize.js | 9 +++++--- api/server/services/ToolService.js | 12 ++++++++-- api/server/services/Tools/search.js | 22 ++++++++++++++++--- client/src/hooks/SSE/useResumableSSE.ts | 8 ------- packages/api/src/agents/memory.ts | 22 ++++++++++++++++--- 6 files changed, 56 insertions(+), 19 deletions(-) diff --git a/api/server/controllers/agents/client.js b/api/server/controllers/agents/client.js index 449bf1b08b..7945acd378 100644 --- a/api/server/controllers/agents/client.js +++ b/api/server/controllers/agents/client.js @@ -594,10 +594,12 @@ class AgentClient extends BaseClient { const userId = this.options.req.user.id + ''; const messageId = this.responseMessageId + ''; const conversationId = this.conversationId + ''; + const streamId = this.options.req?._resumableStreamId || null; const [withoutKeys, processMemory] = await createMemoryProcessor({ userId, config, messageId, + streamId, conversationId, memoryMethods: { setMemory: db.setMemory, diff --git a/api/server/services/Endpoints/agents/initialize.js b/api/server/services/Endpoints/agents/initialize.js index 624253a961..c9a9538ca2 100644 --- a/api/server/services/Endpoints/agents/initialize.js +++ b/api/server/services/Endpoints/agents/initialize.js @@ -25,9 +25,11 @@ const { logViolation } = require('~/cache'); const db = require('~/models'); /** - * @param {AbortSignal} signal + * Creates a tool loader function for the agent. + * @param {AbortSignal} signal - The abort signal + * @param {string | null} [streamId] - The stream ID for resumable mode */ -function createToolLoader(signal) { +function createToolLoader(signal, streamId = null) { /** * @param {object} params * @param {ServerRequest} params.req @@ -52,6 +54,7 @@ function createToolLoader(signal) { agent, signal, tool_resources, + streamId, }); } catch (error) { logger.error('Error loading tools for agent ' + agentId, error); @@ -108,7 +111,7 @@ const initializeClient = async ({ req, res, signal, endpointOption }) => { const agentConfigs = new Map(); const allowedProviders = new Set(appConfig?.endpoints?.[EModelEndpoint.agents]?.allowedProviders); - const loadTools = createToolLoader(signal); + const loadTools = createToolLoader(signal, streamId); /** @type {Array} */ const requestFiles = req.body.files ?? []; /** @type {string} */ diff --git a/api/server/services/ToolService.js b/api/server/services/ToolService.js index 352f573aaa..cb6d3ae667 100644 --- a/api/server/services/ToolService.js +++ b/api/server/services/ToolService.js @@ -369,7 +369,15 @@ async function processRequiredActions(client, requiredActions) { * @param {string | undefined} [params.openAIApiKey] - The OpenAI API key. * @returns {Promise<{ tools?: StructuredTool[]; userMCPAuthMap?: Record> }>} The agent tools. */ -async function loadAgentTools({ req, res, agent, signal, tool_resources, openAIApiKey }) { +async function loadAgentTools({ + req, + res, + agent, + signal, + tool_resources, + openAIApiKey, + streamId = null, +}) { if (!agent.tools || agent.tools.length === 0) { return {}; } else if ( @@ -422,7 +430,7 @@ async function loadAgentTools({ req, res, agent, signal, tool_resources, openAIA /** @type {ReturnType} */ let webSearchCallbacks; if (includesWebSearch) { - webSearchCallbacks = createOnSearchResults(res); + webSearchCallbacks = createOnSearchResults(res, streamId); } /** @type {Record>} */ diff --git a/api/server/services/Tools/search.js b/api/server/services/Tools/search.js index c10c543141..c4cdfc752f 100644 --- a/api/server/services/Tools/search.js +++ b/api/server/services/Tools/search.js @@ -1,13 +1,29 @@ const { nanoid } = require('nanoid'); const { Tools } = require('librechat-data-provider'); const { logger } = require('@librechat/data-schemas'); +const { GenerationJobManager } = require('@librechat/api'); + +/** + * Helper to write attachment events either to res or to job emitter. + * @param {import('http').ServerResponse} res - The server response object + * @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode + * @param {Object} attachment - The attachment data + */ +function writeAttachment(res, streamId, attachment) { + if (streamId) { + GenerationJobManager.emitChunk(streamId, { event: 'attachment', data: attachment }); + } else { + res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`); + } +} /** * Creates a function to handle search results and stream them as attachments * @param {import('http').ServerResponse} res - The HTTP server response object + * @param {string | null} [streamId] - The stream ID for resumable mode, or null for standard mode * @returns {{ onSearchResults: function(SearchResult, GraphRunnableConfig): void; onGetHighlights: function(string): void}} - Function that takes search results and returns or streams an attachment */ -function createOnSearchResults(res) { +function createOnSearchResults(res, streamId = null) { const context = { sourceMap: new Map(), searchResultData: undefined, @@ -70,7 +86,7 @@ function createOnSearchResults(res) { if (!res.headersSent) { return attachment; } - res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`); + writeAttachment(res, streamId, attachment); } /** @@ -92,7 +108,7 @@ function createOnSearchResults(res) { } const attachment = buildAttachment(context); - res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`); + writeAttachment(res, streamId, attachment); } return { diff --git a/client/src/hooks/SSE/useResumableSSE.ts b/client/src/hooks/SSE/useResumableSSE.ts index fd5a6f31d4..7ce0777a4a 100644 --- a/client/src/hooks/SSE/useResumableSSE.ts +++ b/client/src/hooks/SSE/useResumableSSE.ts @@ -189,12 +189,8 @@ export default function useResumableSSE( } if (data.sync != null) { - const textPart = data.resumeState?.aggregatedContent?.find( - (p: { type: string }) => p.type === 'text', - ); console.log('[ResumableSSE] SYNC received', { runSteps: data.resumeState?.runSteps?.length ?? 0, - contentLength: textPart?.text?.length ?? 0, }); const runId = v4(); @@ -231,9 +227,6 @@ export default function useResumableSSE( ); } - const textPart = data.resumeState.aggregatedContent?.find( - (p: { type: string }) => p.type === 'text', - ); console.log('[ResumableSSE] SYNC update', { userMsgId, serverResponseId, @@ -241,7 +234,6 @@ export default function useResumableSSE( foundMessageId: responseIdx >= 0 ? messages[responseIdx]?.messageId : null, messagesCount: messages.length, aggregatedContentLength: data.resumeState.aggregatedContent?.length, - textContentLength: textPart?.text?.length ?? 0, }); if (responseIdx >= 0) { diff --git a/packages/api/src/agents/memory.ts b/packages/api/src/agents/memory.ts index 865aaea7b8..2d5076381a 100644 --- a/packages/api/src/agents/memory.ts +++ b/packages/api/src/agents/memory.ts @@ -17,6 +17,7 @@ import type { TAttachment, MemoryArtifact } from 'librechat-data-provider'; import type { ObjectId, MemoryMethods } from '@librechat/data-schemas'; import type { BaseMessage, ToolMessage } from '@langchain/core/messages'; import type { Response as ServerResponse } from 'express'; +import { GenerationJobManager } from '~/stream/GenerationJobManager'; import { Tokenizer } from '~/utils'; type RequiredMemoryMethods = Pick< @@ -250,6 +251,7 @@ export class BasicToolEndHandler implements EventHandler { constructor(callback?: ToolEndCallback) { this.callback = callback; } + handle( event: string, data: StreamEventData | undefined, @@ -282,6 +284,7 @@ export async function processMemory({ llmConfig, tokenLimit, totalTokens = 0, + streamId = null, }: { res: ServerResponse; setMemory: MemoryMethods['setMemory']; @@ -296,6 +299,7 @@ export async function processMemory({ tokenLimit?: number; totalTokens?: number; llmConfig?: Partial; + streamId?: string | null; }): Promise<(TAttachment | null)[] | undefined> { try { const memoryTool = createMemoryTool({ @@ -363,7 +367,7 @@ ${memory ?? 'No existing memories'}`; } const artifactPromises: Promise[] = []; - const memoryCallback = createMemoryCallback({ res, artifactPromises }); + const memoryCallback = createMemoryCallback({ res, artifactPromises, streamId }); const customHandlers = { [GraphEvents.TOOL_END]: new BasicToolEndHandler(memoryCallback), }; @@ -416,6 +420,7 @@ export async function createMemoryProcessor({ memoryMethods, conversationId, config = {}, + streamId = null, }: { res: ServerResponse; messageId: string; @@ -423,6 +428,7 @@ export async function createMemoryProcessor({ userId: string | ObjectId; memoryMethods: RequiredMemoryMethods; config?: MemoryConfig; + streamId?: string | null; }): Promise<[string, (messages: BaseMessage[]) => Promise<(TAttachment | null)[] | undefined>]> { const { validKeys, instructions, llmConfig, tokenLimit } = config; const finalInstructions = instructions || getDefaultInstructions(validKeys, tokenLimit); @@ -443,6 +449,7 @@ export async function createMemoryProcessor({ llmConfig, messageId, tokenLimit, + streamId, conversationId, memory: withKeys, totalTokens: totalTokens || 0, @@ -461,10 +468,12 @@ async function handleMemoryArtifact({ res, data, metadata, + streamId = null, }: { res: ServerResponse; data: ToolEndData; metadata?: ToolEndMetadata; + streamId?: string | null; }) { const output = data?.output as ToolMessage | undefined; if (!output) { @@ -490,7 +499,11 @@ async function handleMemoryArtifact({ if (!res.headersSent) { return attachment; } - res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`); + if (streamId) { + GenerationJobManager.emitChunk(streamId, { event: 'attachment', data: attachment }); + } else { + res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`); + } return attachment; } @@ -499,14 +512,17 @@ async function handleMemoryArtifact({ * @param params - The parameters object * @param params.res - The server response object * @param params.artifactPromises - Array to collect artifact promises + * @param params.streamId - The stream ID for resumable mode, or null for standard mode * @returns The memory callback function */ export function createMemoryCallback({ res, artifactPromises, + streamId = null, }: { res: ServerResponse; artifactPromises: Promise | null>[]; + streamId?: string | null; }): ToolEndCallback { return async (data: ToolEndData, metadata?: Record) => { const output = data?.output as ToolMessage | undefined; @@ -515,7 +531,7 @@ export function createMemoryCallback({ return; } artifactPromises.push( - handleMemoryArtifact({ res, data, metadata }).catch((error) => { + handleMemoryArtifact({ res, data, metadata, streamId }).catch((error) => { logger.error('Error processing memory artifact content:', error); return null; }),