LibreChat/api/server/controllers/agents/callbacks.js
Danny Avila e391347b9e
🔧 feat: Initial MCP Support (Tools) (#5015)
* 📝 chore: Add comment to clarify purpose of check_updates.sh script

* feat: mcp package

* feat: add librechat-mcp package and update dependencies

* feat: refactor MCPConnectionSingleton to handle transport initialization and connection management

* feat: change private methods to public in MCPConnectionSingleton for improved accessibility

* feat: filesystem demo

* chore: everything demo and move everything under mcp workspace

* chore: move ts-node to mcp workspace

* feat: mcp examples

* feat: working sse MCP example

* refactor: rename MCPConnectionSingleton to MCPConnection for clarity

* refactor: replace MCPConnectionSingleton with MCPConnection for consistency

* refactor: manager/connections

* refactor: update MCPConnection to use type definitions from mcp types

* refactor: update MCPManager to use winston logger and enhance server initialization

* refactor: share logger between connections and manager

* refactor: add schema definitions and update MCPManager to accept logger parameter

* feat: map available MCP tools

* feat: load manifest tools

* feat: add MCP tools delimiter constant and update plugin key generation

* feat: call MCP tools

* feat: update librechat-data-provider version to 0.7.63 and enhance StdioOptionsSchema with additional properties

* refactor: simplify typing

* chore: update types/packages

* feat: MCP Tool Content parsing

* chore: update dependencies and improve package configurations

* feat: add 'mcp' directory to package and update configurations

* refactor: return CONTENT_AND_ARTIFACT format for MCP callTool

* chore: bump @librechat/agents

* WIP: MCP artifacts

* chore: bump @librechat/agents to v1.8.7

* fix: ensure filename has extension when saving base64 image

* fix: move base64 buffer conversion before filename extension check

* chore: update backend review workflow to install MCP package

* fix: use correct `mime` method

* fix: enhance file metadata with message and tool call IDs in image saving process

* fix: refactor ToolCall component to handle MCP tool calls and improve domain extraction

* fix: update ToolItem component for default isInstalled value and improve localization in ToolSelectDialog

* fix: update ToolItem component to use consistent text color for tool description

* style: add theming to ToolSelectDialog

* fix: improve domain extraction logic in ToolCall component

* refactor: conversation item theming, fix rename UI bug, optimize props, add missing types

* feat: enhance MCP options schema with base options (iconPath to start) and make transport type optional, infer based on other option fields

* fix: improve reconnection logic with parallel init and exponential backoff and enhance transport debug logging

* refactor: improve logging format

* refactor: improve logging of available tools by displaying tool names

* refactor: improve reconnection/connection logic

* feat: add MCP package build process to Dockerfile

* feat: add fallback icon for tools without an image in ToolItem component

* feat: Assistants Support for MCP Tools

* fix(build): configure rollup to use output.dir for dynamic imports

* chore: update @librechat/agents to version 1.8.8 and add @langchain/anthropic dependency

* fix: update CONFIG_VERSION to 1.2.0
2024-12-17 13:12:57 -05:00

317 lines
10 KiB
JavaScript

const { Tools, StepTypes, imageGenTools, FileContext } = require('librechat-data-provider');
const {
EnvVar,
GraphEvents,
ToolEndHandler,
ChatModelStreamHandler,
} = require('@librechat/agents');
const { processCodeOutput } = require('~/server/services/Files/Code/process');
const { saveBase64Image } = require('~/server/services/Files/process');
const { loadAuthValues } = require('~/app/clients/tools/util');
const { logger } = require('~/config');
/** @typedef {import('@librechat/agents').Graph} Graph */
/** @typedef {import('@librechat/agents').EventHandler} EventHandler */
/** @typedef {import('@librechat/agents').ModelEndData} ModelEndData */
/** @typedef {import('@librechat/agents').ToolEndData} ToolEndData */
/** @typedef {import('@librechat/agents').ToolEndCallback} ToolEndCallback */
/** @typedef {import('@librechat/agents').ChatModelStreamHandler} ChatModelStreamHandler */
/** @typedef {import('@librechat/agents').ContentAggregatorResult['aggregateContent']} ContentAggregator */
/** @typedef {import('@librechat/agents').GraphEvents} GraphEvents */
/**
* Sends message data in Server Sent Events format.
* @param {ServerResponse} res - The server response.
* @param {{ data: string | Record<string, unknown>, event?: string }} event - The message event.
* @param {string} event.event - The type of event.
* @param {string} event.data - The message to be sent.
*/
const sendEvent = (res, event) => {
if (typeof event.data === 'string' && event.data.length === 0) {
return;
}
res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`);
};
class ModelEndHandler {
/**
* @param {Array<UsageMetadata>} collectedUsage
*/
constructor(collectedUsage) {
if (!Array.isArray(collectedUsage)) {
throw new Error('collectedUsage must be an array');
}
this.collectedUsage = collectedUsage;
}
/**
* @param {string} event
* @param {ModelEndData | undefined} data
* @param {Record<string, unknown> | undefined} metadata
* @param {Graph} graph
* @returns
*/
handle(event, data, metadata, graph) {
if (!graph || !metadata) {
console.warn(`Graph or metadata not found in ${event} event`);
return;
}
const usage = data?.output?.usage_metadata;
if (metadata?.model) {
usage.model = metadata.model;
}
if (usage) {
this.collectedUsage.push(usage);
}
}
}
/**
* Get default handlers for stream events.
* @param {Object} options - The options object.
* @param {ServerResponse} options.res - The options object.
* @param {ContentAggregator} options.aggregateContent - The options object.
* @param {ToolEndCallback} options.toolEndCallback - Callback to use when tool ends.
* @param {Array<UsageMetadata>} options.collectedUsage - The list of collected usage metadata.
* @returns {Record<string, t.EventHandler>} The default handlers.
* @throws {Error} If the request is not found.
*/
function getDefaultHandlers({ res, aggregateContent, toolEndCallback, collectedUsage }) {
if (!res || !aggregateContent) {
throw new Error(
`[getDefaultHandlers] Missing required options: res: ${!res}, aggregateContent: ${!aggregateContent}`,
);
}
const handlers = {
[GraphEvents.CHAT_MODEL_END]: new ModelEndHandler(collectedUsage),
[GraphEvents.TOOL_END]: new ToolEndHandler(toolEndCallback),
[GraphEvents.CHAT_MODEL_STREAM]: new ChatModelStreamHandler(),
[GraphEvents.ON_RUN_STEP]: {
/**
* Handle ON_RUN_STEP event.
* @param {string} event - The event name.
* @param {StreamEventData} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: (event, data, metadata) => {
if (data?.stepDetails.type === StepTypes.TOOL_CALLS) {
sendEvent(res, { event, data });
} else if (metadata?.last_agent_index === metadata?.agent_index) {
sendEvent(res, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
sendEvent(res, { event, data });
} else {
const agentName = metadata?.name ?? 'Agent';
const isToolCall = data?.stepDetails.type === StepTypes.TOOL_CALLS;
const action = isToolCall ? 'performing a task...' : 'thinking...';
sendEvent(res, {
event: 'on_agent_update',
data: {
runId: metadata?.run_id,
message: `${agentName} is ${action}`,
},
});
}
aggregateContent({ event, data });
},
},
[GraphEvents.ON_RUN_STEP_DELTA]: {
/**
* Handle ON_RUN_STEP_DELTA event.
* @param {string} event - The event name.
* @param {StreamEventData} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: (event, data, metadata) => {
if (data?.delta.type === StepTypes.TOOL_CALLS) {
sendEvent(res, { event, data });
} else if (metadata?.last_agent_index === metadata?.agent_index) {
sendEvent(res, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
sendEvent(res, { event, data });
}
aggregateContent({ event, data });
},
},
[GraphEvents.ON_RUN_STEP_COMPLETED]: {
/**
* Handle ON_RUN_STEP_COMPLETED event.
* @param {string} event - The event name.
* @param {StreamEventData & { result: ToolEndData }} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: (event, data, metadata) => {
if (data?.result != null) {
sendEvent(res, { event, data });
} else if (metadata?.last_agent_index === metadata?.agent_index) {
sendEvent(res, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
sendEvent(res, { event, data });
}
aggregateContent({ event, data });
},
},
[GraphEvents.ON_MESSAGE_DELTA]: {
/**
* Handle ON_MESSAGE_DELTA event.
* @param {string} event - The event name.
* @param {StreamEventData} data - The event data.
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
*/
handle: (event, data, metadata) => {
if (metadata?.last_agent_index === metadata?.agent_index) {
sendEvent(res, { event, data });
} else if (!metadata?.hide_sequential_outputs) {
sendEvent(res, { event, data });
}
aggregateContent({ event, data });
},
},
};
return handlers;
}
/**
*
* @param {Object} params
* @param {ServerRequest} params.req
* @param {ServerResponse} params.res
* @param {Promise<MongoFile | { filename: string; filepath: string; expires: number;} | null>[]} params.artifactPromises
* @returns {ToolEndCallback} The tool end callback.
*/
function createToolEndCallback({ req, res, artifactPromises }) {
/**
* @type {ToolEndCallback}
*/
return async (data, metadata) => {
const output = data?.output;
if (!output) {
return;
}
if (!output.artifact) {
return;
}
if (imageGenTools.has(output.name)) {
artifactPromises.push(
(async () => {
const fileMetadata = Object.assign(output.artifact, {
messageId: metadata.run_id,
toolCallId: output.tool_call_id,
conversationId: metadata.thread_id,
});
if (!res.headersSent) {
return fileMetadata;
}
if (!fileMetadata) {
return null;
}
res.write(`event: attachment\ndata: ${JSON.stringify(fileMetadata)}\n\n`);
return fileMetadata;
})().catch((error) => {
logger.error('Error processing code output:', error);
return null;
}),
);
return;
}
if (output.artifact.content) {
/** @type {FormattedContent[]} */
const content = output.artifact.content;
for (const part of content) {
if (part.type !== 'image_url') {
continue;
}
const { url } = part.image_url;
artifactPromises.push(
(async () => {
const filename = `${output.tool_call_id}-image-${new Date().getTime()}`;
const file = await saveBase64Image(url, {
req,
filename,
endpoint: metadata.provider,
context: FileContext.image_generation,
});
const fileMetadata = Object.assign(file, {
messageId: metadata.run_id,
toolCallId: output.tool_call_id,
conversationId: metadata.thread_id,
});
if (!res.headersSent) {
return fileMetadata;
}
if (!fileMetadata) {
return null;
}
res.write(`event: attachment\ndata: ${JSON.stringify(fileMetadata)}\n\n`);
return fileMetadata;
})().catch((error) => {
logger.error('Error processing artifact content:', error);
return null;
}),
);
}
return;
}
{
if (output.name !== Tools.execute_code) {
return;
}
}
if (!output.artifact.files) {
return;
}
for (const file of output.artifact.files) {
const { id, name } = file;
artifactPromises.push(
(async () => {
const result = await loadAuthValues({
userId: req.user.id,
authFields: [EnvVar.CODE_API_KEY],
});
const fileMetadata = await processCodeOutput({
req,
id,
name,
apiKey: result[EnvVar.CODE_API_KEY],
messageId: metadata.run_id,
toolCallId: output.tool_call_id,
conversationId: metadata.thread_id,
session_id: output.artifact.session_id,
});
if (!res.headersSent) {
return fileMetadata;
}
if (!fileMetadata) {
return null;
}
res.write(`event: attachment\ndata: ${JSON.stringify(fileMetadata)}\n\n`);
return fileMetadata;
})().catch((error) => {
logger.error('Error processing code output:', error);
return null;
}),
);
}
};
}
module.exports = {
sendEvent,
getDefaultHandlers,
createToolEndCallback,
};