diff --git a/api/app/clients/tools/util/handleTools.js b/api/app/clients/tools/util/handleTools.js index bae7255d97..e9361a70d9 100644 --- a/api/app/clients/tools/util/handleTools.js +++ b/api/app/clients/tools/util/handleTools.js @@ -434,6 +434,7 @@ Anchor pattern: \\ue202turn{N}{type}{index} where N=turn number, type=search|new user: safeUser, userMCPAuthMap, res: options.res, + streamId: options.req?._resumableStreamId || null, model: agent?.model ?? model, serverName: config.serverName, provider: agent?.provider ?? endpoint, diff --git a/api/server/services/ActionService.js b/api/server/services/ActionService.js index 79586f0cf2..a2a515d14a 100644 --- a/api/server/services/ActionService.js +++ b/api/server/services/ActionService.js @@ -3,7 +3,12 @@ const { nanoid } = require('nanoid'); const { tool } = require('@langchain/core/tools'); const { GraphEvents, sleep } = require('@librechat/agents'); const { logger, encryptV2, decryptV2 } = require('@librechat/data-schemas'); -const { sendEvent, logAxiosError, refreshAccessToken } = require('@librechat/api'); +const { + sendEvent, + logAxiosError, + refreshAccessToken, + GenerationJobManager, +} = require('@librechat/api'); const { Time, CacheKeys, @@ -127,6 +132,7 @@ async function loadActionSets(searchParams) { * @param {string | undefined} [params.description] - The description for the tool. * @param {import('zod').ZodTypeAny | undefined} [params.zodSchema] - The Zod schema for tool input validation/definition * @param {{ oauth_client_id?: string; oauth_client_secret?: string; }} params.encrypted - The encrypted values for the action. + * @param {string | null} [params.streamId] - The stream ID for resumable streams. * @returns { Promise unknown}> } An object with `_call` method to execute the tool input. */ async function createActionTool({ @@ -138,6 +144,7 @@ async function createActionTool({ name, description, encrypted, + streamId = null, }) { /** @type {(toolInput: Object | string, config: GraphRunnableConfig) => Promise} */ const _call = async (toolInput, config) => { @@ -192,7 +199,12 @@ async function createActionTool({ `${identifier}:oauth_login:${config.metadata.thread_id}:${config.metadata.run_id}`, 'oauth_login', async () => { - sendEvent(res, { event: GraphEvents.ON_RUN_STEP_DELTA, data }); + const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data }; + if (streamId) { + GenerationJobManager.emitChunk(streamId, eventData); + } else { + sendEvent(res, eventData); + } logger.debug('Sent OAuth login request to client', { action_id, identifier }); return true; }, @@ -217,7 +229,12 @@ async function createActionTool({ logger.debug('Received OAuth Authorization response', { action_id, identifier }); data.delta.auth = undefined; data.delta.expires_at = undefined; - sendEvent(res, { event: GraphEvents.ON_RUN_STEP_DELTA, data }); + const successEventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data }; + if (streamId) { + GenerationJobManager.emitChunk(streamId, successEventData); + } else { + sendEvent(res, successEventData); + } await sleep(3000); metadata.oauth_access_token = result.access_token; metadata.oauth_refresh_token = result.refresh_token; diff --git a/api/server/services/MCP.js b/api/server/services/MCP.js index 72db447d3d..3d5c91866d 100644 --- a/api/server/services/MCP.js +++ b/api/server/services/MCP.js @@ -13,6 +13,7 @@ const { isMCPDomainAllowed, normalizeServerName, convertWithResolvedRefs, + GenerationJobManager, } = require('@librechat/api'); const { Time, @@ -37,8 +38,9 @@ const { getLogStores } = require('~/cache'); * @param {ServerResponse} params.res - The Express response object for sending events. * @param {string} params.stepId - The ID of the step in the flow. * @param {ToolCallChunk} params.toolCall - The tool call object containing tool information. + * @param {string | null} [params.streamId] - The stream ID for resumable mode. */ -function createRunStepDeltaEmitter({ res, stepId, toolCall }) { +function createRunStepDeltaEmitter({ res, stepId, toolCall, streamId = null }) { /** * @param {string} authURL - The URL to redirect the user for OAuth authentication. * @returns {void} @@ -54,7 +56,12 @@ function createRunStepDeltaEmitter({ res, stepId, toolCall }) { expires_at: Date.now() + Time.TWO_MINUTES, }, }; - sendEvent(res, { event: GraphEvents.ON_RUN_STEP_DELTA, data }); + const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data }; + if (streamId) { + GenerationJobManager.emitChunk(streamId, eventData); + } else { + sendEvent(res, eventData); + } }; } @@ -65,8 +72,9 @@ function createRunStepDeltaEmitter({ res, stepId, toolCall }) { * @param {string} params.stepId - The ID of the step in the flow. * @param {ToolCallChunk} params.toolCall - The tool call object containing tool information. * @param {number} [params.index] + * @param {string | null} [params.streamId] - The stream ID for resumable mode. */ -function createRunStepEmitter({ res, runId, stepId, toolCall, index }) { +function createRunStepEmitter({ res, runId, stepId, toolCall, index, streamId = null }) { return function () { /** @type {import('@librechat/agents').RunStep} */ const data = { @@ -79,7 +87,12 @@ function createRunStepEmitter({ res, runId, stepId, toolCall, index }) { tool_calls: [toolCall], }, }; - sendEvent(res, { event: GraphEvents.ON_RUN_STEP, data }); + const eventData = { event: GraphEvents.ON_RUN_STEP, data }; + if (streamId) { + GenerationJobManager.emitChunk(streamId, eventData); + } else { + sendEvent(res, eventData); + } }; } @@ -110,10 +123,9 @@ function createOAuthStart({ flowId, flowManager, callback }) { * @param {ServerResponse} params.res - The Express response object for sending events. * @param {string} params.stepId - The ID of the step in the flow. * @param {ToolCallChunk} params.toolCall - The tool call object containing tool information. - * @param {string} params.loginFlowId - The ID of the login flow. - * @param {FlowStateManager} params.flowManager - The flow manager instance. + * @param {string | null} [params.streamId] - The stream ID for resumable mode. */ -function createOAuthEnd({ res, stepId, toolCall }) { +function createOAuthEnd({ res, stepId, toolCall, streamId = null }) { return async function () { /** @type {{ id: string; delta: AgentToolCallDelta }} */ const data = { @@ -123,7 +135,12 @@ function createOAuthEnd({ res, stepId, toolCall }) { tool_calls: [{ ...toolCall }], }, }; - sendEvent(res, { event: GraphEvents.ON_RUN_STEP_DELTA, data }); + const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data }; + if (streamId) { + GenerationJobManager.emitChunk(streamId, eventData); + } else { + sendEvent(res, eventData); + } logger.debug('Sent OAuth login success to client'); }; } @@ -164,10 +181,19 @@ function createOAuthCallback({ runStepEmitter, runStepDeltaEmitter }) { * @param {AbortSignal} params.signal * @param {string} params.model * @param {number} [params.index] + * @param {string | null} [params.streamId] - The stream ID for resumable mode. * @param {Record>} [params.userMCPAuthMap] * @returns { Promise unknown}>> } An object with `_call` method to execute the tool input. */ -async function reconnectServer({ res, user, index, signal, serverName, userMCPAuthMap }) { +async function reconnectServer({ + res, + user, + index, + signal, + serverName, + userMCPAuthMap, + streamId = null, +}) { const runId = Constants.USE_PRELIM_RESPONSE_MESSAGE_ID; const flowId = `${user.id}:${serverName}:${Date.now()}`; const flowManager = getFlowStateManager(getLogStores(CacheKeys.FLOWS)); @@ -184,11 +210,13 @@ async function reconnectServer({ res, user, index, signal, serverName, userMCPAu runId, stepId, toolCall, + streamId, }); const runStepDeltaEmitter = createRunStepDeltaEmitter({ res, stepId, toolCall, + streamId, }); const callback = createOAuthCallback({ runStepEmitter, runStepDeltaEmitter }); const oauthStart = createOAuthStart({ @@ -224,6 +252,7 @@ async function reconnectServer({ res, user, index, signal, serverName, userMCPAu * @param {Providers | EModelEndpoint} params.provider - The provider for the tool. * @param {number} [params.index] * @param {AbortSignal} [params.signal] + * @param {string | null} [params.streamId] - The stream ID for resumable mode. * @param {import('@librechat/api').ParsedServerConfig} [params.config] * @param {Record>} [params.userMCPAuthMap] * @returns { Promise unknown}>> } An object with `_call` method to execute the tool input. @@ -237,6 +266,7 @@ async function createMCPTools({ provider, serverName, userMCPAuthMap, + streamId = null, }) { // Early domain validation before reconnecting server (avoid wasted work on disallowed domains) // Use getAppConfig() to support per-user/role domain restrictions @@ -252,7 +282,15 @@ async function createMCPTools({ } } - const result = await reconnectServer({ res, user, index, signal, serverName, userMCPAuthMap }); + const result = await reconnectServer({ + res, + user, + index, + signal, + serverName, + userMCPAuthMap, + streamId, + }); if (!result || !result.tools) { logger.warn(`[MCP][${serverName}] Failed to reinitialize MCP server.`); return; @@ -265,6 +303,7 @@ async function createMCPTools({ user, provider, userMCPAuthMap, + streamId, availableTools: result.availableTools, toolKey: `${tool.name}${Constants.mcp_delimiter}${serverName}`, config: serverConfig, @@ -286,6 +325,7 @@ async function createMCPTools({ * @param {string} params.model - The model for the tool. * @param {number} [params.index] * @param {AbortSignal} [params.signal] + * @param {string | null} [params.streamId] - The stream ID for resumable mode. * @param {Providers | EModelEndpoint} params.provider - The provider for the tool. * @param {LCAvailableTools} [params.availableTools] * @param {Record>} [params.userMCPAuthMap] @@ -302,6 +342,7 @@ async function createMCPTool({ userMCPAuthMap, availableTools, config, + streamId = null, }) { const [toolName, serverName] = toolKey.split(Constants.mcp_delimiter); @@ -332,6 +373,7 @@ async function createMCPTool({ signal, serverName, userMCPAuthMap, + streamId, }); toolDefinition = result?.availableTools?.[toolKey]?.function; } @@ -347,10 +389,18 @@ async function createMCPTool({ toolName, serverName, toolDefinition, + streamId, }); } -function createToolInstance({ res, toolName, serverName, toolDefinition, provider: _provider }) { +function createToolInstance({ + res, + toolName, + serverName, + toolDefinition, + provider: _provider, + streamId = null, +}) { /** @type {LCTool} */ const { description, parameters } = toolDefinition; const isGoogle = _provider === Providers.VERTEXAI || _provider === Providers.GOOGLE; @@ -386,6 +436,7 @@ function createToolInstance({ res, toolName, serverName, toolDefinition, provide res, stepId, toolCall, + streamId, }); const oauthStart = createOAuthStart({ flowId, @@ -396,6 +447,7 @@ function createToolInstance({ res, toolName, serverName, toolDefinition, provide res, stepId, toolCall, + streamId, }); if (derivedSignal) { diff --git a/api/server/services/ToolService.js b/api/server/services/ToolService.js index cb6d3ae667..b8028742ca 100644 --- a/api/server/services/ToolService.js +++ b/api/server/services/ToolService.js @@ -630,6 +630,7 @@ async function loadAgentTools({ encrypted, name: toolName, description: functionSig.description, + streamId, }); if (!tool) {