refactor: Integrate streamId handling for improved resumable functionality for attachments

- Added streamId parameter to various functions to support resumable mode in tool loading and memory processing.
- Updated related methods to ensure proper handling of attachments and responses based on the presence of streamId, enhancing the overall streaming experience.
- Improved logging and attachment management to accommodate both standard and resumable modes.
This commit is contained in:
Danny Avila 2025-12-12 20:06:55 -05:00
parent e2305a4a76
commit fe1cc4a61d
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
6 changed files with 55 additions and 19 deletions

View file

@ -594,10 +594,12 @@ class AgentClient extends BaseClient {
const userId = this.options.req.user.id + '';
const messageId = this.responseMessageId + '';
const conversationId = this.conversationId + '';
const streamId = this.options.req?._resumableStreamId || null;
const [withoutKeys, processMemory] = await createMemoryProcessor({
userId,
config,
messageId,
streamId,
conversationId,
memoryMethods: {
setMemory: db.setMemory,

View file

@ -25,9 +25,11 @@ const { logViolation } = require('~/cache');
const db = require('~/models');
/**
* @param {AbortSignal} signal
* Creates a tool loader function for the agent.
* @param {AbortSignal} signal - The abort signal
* @param {string | null} [streamId] - The stream ID for resumable mode
*/
function createToolLoader(signal) {
function createToolLoader(signal, streamId = null) {
/**
* @param {object} params
* @param {ServerRequest} params.req
@ -52,6 +54,7 @@ function createToolLoader(signal) {
agent,
signal,
tool_resources,
streamId,
});
} catch (error) {
logger.error('Error loading tools for agent ' + agentId, error);
@ -108,7 +111,7 @@ const initializeClient = async ({ req, res, signal, endpointOption }) => {
const agentConfigs = new Map();
const allowedProviders = new Set(appConfig?.endpoints?.[EModelEndpoint.agents]?.allowedProviders);
const loadTools = createToolLoader(signal);
const loadTools = createToolLoader(signal, streamId);
/** @type {Array<MongoFile>} */
const requestFiles = req.body.files ?? [];
/** @type {string} */

View file

@ -369,7 +369,15 @@ async function processRequiredActions(client, requiredActions) {
* @param {string | undefined} [params.openAIApiKey] - The OpenAI API key.
* @returns {Promise<{ tools?: StructuredTool[]; userMCPAuthMap?: Record<string, Record<string, string>> }>} The agent tools.
*/
async function loadAgentTools({ req, res, agent, signal, tool_resources, openAIApiKey }) {
async function loadAgentTools({
req,
res,
agent,
signal,
tool_resources,
openAIApiKey,
streamId = null,
}) {
if (!agent.tools || agent.tools.length === 0) {
return {};
} else if (
@ -422,7 +430,7 @@ async function loadAgentTools({ req, res, agent, signal, tool_resources, openAIA
/** @type {ReturnType<typeof createOnSearchResults>} */
let webSearchCallbacks;
if (includesWebSearch) {
webSearchCallbacks = createOnSearchResults(res);
webSearchCallbacks = createOnSearchResults(res, streamId);
}
/** @type {Record<string, Record<string, string>>} */

View file

@ -1,13 +1,29 @@
const { nanoid } = require('nanoid');
const { Tools } = require('librechat-data-provider');
const { logger } = require('@librechat/data-schemas');
const { GenerationJobManager } = require('@librechat/api');
/**
* Helper to write attachment events either to res or to job emitter.
* @param {import('http').ServerResponse} res - The server response object
* @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode
* @param {Object} attachment - The attachment data
*/
function writeAttachment(res, streamId, attachment) {
if (streamId) {
GenerationJobManager.emitChunk(streamId, { event: 'attachment', data: attachment });
} else {
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
}
}
/**
* Creates a function to handle search results and stream them as attachments
* @param {import('http').ServerResponse} res - The HTTP server response object
* @param {string | null} [streamId] - The stream ID for resumable mode, or null for standard mode
* @returns {{ onSearchResults: function(SearchResult, GraphRunnableConfig): void; onGetHighlights: function(string): void}} - Function that takes search results and returns or streams an attachment
*/
function createOnSearchResults(res) {
function createOnSearchResults(res, streamId = null) {
const context = {
sourceMap: new Map(),
searchResultData: undefined,
@ -70,7 +86,7 @@ function createOnSearchResults(res) {
if (!res.headersSent) {
return attachment;
}
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
writeAttachment(res, streamId, attachment);
}
/**
@ -92,7 +108,7 @@ function createOnSearchResults(res) {
}
const attachment = buildAttachment(context);
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
writeAttachment(res, streamId, attachment);
}
return {