const { Tools, StepTypes, imageGenTools, FileContext } = require('librechat-data-provider'); const { EnvVar, GraphEvents, ToolEndHandler, ChatModelStreamHandler, } = require('@librechat/agents'); const { processCodeOutput } = require('~/server/services/Files/Code/process'); const { saveBase64Image } = require('~/server/services/Files/process'); const { loadAuthValues } = require('~/app/clients/tools/util'); const { logger } = require('~/config'); /** @typedef {import('@librechat/agents').Graph} Graph */ /** @typedef {import('@librechat/agents').EventHandler} EventHandler */ /** @typedef {import('@librechat/agents').ModelEndData} ModelEndData */ /** @typedef {import('@librechat/agents').ToolEndData} ToolEndData */ /** @typedef {import('@librechat/agents').ToolEndCallback} ToolEndCallback */ /** @typedef {import('@librechat/agents').ChatModelStreamHandler} ChatModelStreamHandler */ /** @typedef {import('@librechat/agents').ContentAggregatorResult['aggregateContent']} ContentAggregator */ /** @typedef {import('@librechat/agents').GraphEvents} GraphEvents */ /** * Sends message data in Server Sent Events format. * @param {ServerResponse} res - The server response. * @param {{ data: string | Record, event?: string }} event - The message event. * @param {string} event.event - The type of event. * @param {string} event.data - The message to be sent. */ const sendEvent = (res, event) => { if (typeof event.data === 'string' && event.data.length === 0) { return; } res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`); }; class ModelEndHandler { /** * @param {Array} collectedUsage */ constructor(collectedUsage) { if (!Array.isArray(collectedUsage)) { throw new Error('collectedUsage must be an array'); } this.collectedUsage = collectedUsage; } /** * @param {string} event * @param {ModelEndData | undefined} data * @param {Record | undefined} metadata * @param {Graph} graph * @returns */ handle(event, data, metadata, graph) { if (!graph || !metadata) { console.warn(`Graph or metadata not found in ${event} event`); return; } const usage = data?.output?.usage_metadata; if (metadata?.model) { usage.model = metadata.model; } if (usage) { this.collectedUsage.push(usage); } } } /** * Get default handlers for stream events. * @param {Object} options - The options object. * @param {ServerResponse} options.res - The options object. * @param {ContentAggregator} options.aggregateContent - The options object. * @param {ToolEndCallback} options.toolEndCallback - Callback to use when tool ends. * @param {Array} options.collectedUsage - The list of collected usage metadata. * @returns {Record} The default handlers. * @throws {Error} If the request is not found. */ function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedUsage }) { if (!res || !aggregateContent) { throw new Error( `[getDefaultHandlers] Missing required options: res: ${!res}, aggregateContent: ${!aggregateContent}`, ); } const handlers = { [GraphEvents.CHAT_MODEL_END]: new ModelEndHandler(collectedUsage), [GraphEvents.TOOL_END]: new ToolEndHandler(toolEndCallback), [GraphEvents.CHAT_MODEL_STREAM]: new ChatModelStreamHandler(), [GraphEvents.ON_RUN_STEP]: { /** * Handle ON_RUN_STEP event. * @param {string} event - The event name. * @param {StreamEventData} data - The event data. * @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata. */ handle: (event, data, metadata) => { if (data?.stepDetails.type === StepTypes.TOOL_CALLS) { sendEvent(res, { event, data }); } else if (metadata?.last_agent_index === metadata?.agent_index) { sendEvent(res, { event, data }); } else if (!metadata?.hide_sequential_outputs) { sendEvent(res, { event, data }); } else { const agentName = metadata?.name ?? 'Agent'; const isToolCall = data?.stepDetails.type === StepTypes.TOOL_CALLS; const action = isToolCall ? 'performing a task...' : 'thinking...'; sendEvent(res, { event: 'on_agent_update', data: { runId: metadata?.run_id, message: `${agentName} is ${action}`, }, }); } aggregateContent({ event, data }); }, }, [GraphEvents.ON_RUN_STEP_DELTA]: { /** * Handle ON_RUN_STEP_DELTA event. * @param {string} event - The event name. * @param {StreamEventData} data - The event data. * @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata. */ handle: (event, data, metadata) => { if (data?.delta.type === StepTypes.TOOL_CALLS) { sendEvent(res, { event, data }); } else if (metadata?.last_agent_index === metadata?.agent_index) { sendEvent(res, { event, data }); } else if (!metadata?.hide_sequential_outputs) { sendEvent(res, { event, data }); } aggregateContent({ event, data }); }, }, [GraphEvents.ON_RUN_STEP_COMPLETED]: { /** * Handle ON_RUN_STEP_COMPLETED event. * @param {string} event - The event name. * @param {StreamEventData & { result: ToolEndData }} data - The event data. * @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata. */ handle: (event, data, metadata) => { if (data?.result != null) { sendEvent(res, { event, data }); } else if (metadata?.last_agent_index === metadata?.agent_index) { sendEvent(res, { event, data }); } else if (!metadata?.hide_sequential_outputs) { sendEvent(res, { event, data }); } aggregateContent({ event, data }); }, }, [GraphEvents.ON_MESSAGE_DELTA]: { /** * Handle ON_MESSAGE_DELTA event. * @param {string} event - The event name. * @param {StreamEventData} data - The event data. * @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata. */ handle: (event, data, metadata) => { if (metadata?.last_agent_index === metadata?.agent_index) { sendEvent(res, { event, data }); } else if (!metadata?.hide_sequential_outputs) { sendEvent(res, { event, data }); } aggregateContent({ event, data }); }, }, }; return handlers; } /** * * @param {Object} params * @param {ServerRequest} params.req * @param {ServerResponse} params.res * @param {Promise[]} params.artifactPromises * @returns {ToolEndCallback} The tool end callback. */ function createToolEndCallback({ req, res, artifactPromises }) { /** * @type {ToolEndCallback} */ return async (data, metadata) => { const output = data?.output; if (!output) { return; } if (!output.artifact) { return; } if (imageGenTools.has(output.name)) { artifactPromises.push( (async () => { const fileMetadata = Object.assign(output.artifact, { messageId: metadata.run_id, toolCallId: output.tool_call_id, conversationId: metadata.thread_id, }); if (!res.headersSent) { return fileMetadata; } if (!fileMetadata) { return null; } res.write(`event: attachment\ndata: ${JSON.stringify(fileMetadata)}\n\n`); return fileMetadata; })().catch((error) => { logger.error('Error processing code output:', error); return null; }), ); return; } if (output.artifact.content) { /** @type {FormattedContent[]} */ const content = output.artifact.content; for (const part of content) { if (part.type !== 'image_url') { continue; } const { url } = part.image_url; artifactPromises.push( (async () => { const filename = `${output.tool_call_id}-image-${new Date().getTime()}`; const file = await saveBase64Image(url, { req, filename, endpoint: metadata.provider, context: FileContext.image_generation, }); const fileMetadata = Object.assign(file, { messageId: metadata.run_id, toolCallId: output.tool_call_id, conversationId: metadata.thread_id, }); if (!res.headersSent) { return fileMetadata; } if (!fileMetadata) { return null; } res.write(`event: attachment\ndata: ${JSON.stringify(fileMetadata)}\n\n`); return fileMetadata; })().catch((error) => { logger.error('Error processing artifact content:', error); return null; }), ); } return; } { if (output.name !== Tools.execute_code) { return; } } if (!output.artifact.files) { return; } for (const file of output.artifact.files) { const { id, name } = file; artifactPromises.push( (async () => { const result = await loadAuthValues({ userId: req.user.id, authFields: [EnvVar.CODE_API_KEY], }); const fileMetadata = await processCodeOutput({ req, id, name, apiKey: result[EnvVar.CODE_API_KEY], messageId: metadata.run_id, toolCallId: output.tool_call_id, conversationId: metadata.thread_id, session_id: output.artifact.session_id, }); if (!res.headersSent) { return fileMetadata; } if (!fileMetadata) { return null; } res.write(`event: attachment\ndata: ${JSON.stringify(fileMetadata)}\n\n`); return fileMetadata; })().catch((error) => { logger.error('Error processing code output:', error); return null; }), ); } }; } module.exports = { sendEvent, getDefaultHandlers, createToolEndCallback, };