fix: Add streamId parameter for resumable stream handling across services (actions, mcp oauth)

This commit is contained in:
Danny Avila 2025-12-18 17:42:34 -05:00
parent ca1e7c316c
commit 228562c9f0
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
4 changed files with 85 additions and 14 deletions

View file

@ -434,6 +434,7 @@ Anchor pattern: \\ue202turn{N}{type}{index} where N=turn number, type=search|new
user: safeUser,
userMCPAuthMap,
res: options.res,
streamId: options.req?._resumableStreamId || null,
model: agent?.model ?? model,
serverName: config.serverName,
provider: agent?.provider ?? endpoint,

View file

@ -3,7 +3,12 @@ const { nanoid } = require('nanoid');
const { tool } = require('@langchain/core/tools');
const { GraphEvents, sleep } = require('@librechat/agents');
const { logger, encryptV2, decryptV2 } = require('@librechat/data-schemas');
const { sendEvent, logAxiosError, refreshAccessToken } = require('@librechat/api');
const {
sendEvent,
logAxiosError,
refreshAccessToken,
GenerationJobManager,
} = require('@librechat/api');
const {
Time,
CacheKeys,
@ -127,6 +132,7 @@ async function loadActionSets(searchParams) {
* @param {string | undefined} [params.description] - The description for the tool.
* @param {import('zod').ZodTypeAny | undefined} [params.zodSchema] - The Zod schema for tool input validation/definition
* @param {{ oauth_client_id?: string; oauth_client_secret?: string; }} params.encrypted - The encrypted values for the action.
* @param {string | null} [params.streamId] - The stream ID for resumable streams.
* @returns { Promise<typeof tool | { _call: (toolInput: Object | string) => unknown}> } An object with `_call` method to execute the tool input.
*/
async function createActionTool({
@ -138,6 +144,7 @@ async function createActionTool({
name,
description,
encrypted,
streamId = null,
}) {
/** @type {(toolInput: Object | string, config: GraphRunnableConfig) => Promise<unknown>} */
const _call = async (toolInput, config) => {
@ -192,7 +199,12 @@ async function createActionTool({
`${identifier}:oauth_login:${config.metadata.thread_id}:${config.metadata.run_id}`,
'oauth_login',
async () => {
sendEvent(res, { event: GraphEvents.ON_RUN_STEP_DELTA, data });
const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data };
if (streamId) {
GenerationJobManager.emitChunk(streamId, eventData);
} else {
sendEvent(res, eventData);
}
logger.debug('Sent OAuth login request to client', { action_id, identifier });
return true;
},
@ -217,7 +229,12 @@ async function createActionTool({
logger.debug('Received OAuth Authorization response', { action_id, identifier });
data.delta.auth = undefined;
data.delta.expires_at = undefined;
sendEvent(res, { event: GraphEvents.ON_RUN_STEP_DELTA, data });
const successEventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data };
if (streamId) {
GenerationJobManager.emitChunk(streamId, successEventData);
} else {
sendEvent(res, successEventData);
}
await sleep(3000);
metadata.oauth_access_token = result.access_token;
metadata.oauth_refresh_token = result.refresh_token;

View file

@ -13,6 +13,7 @@ const {
isMCPDomainAllowed,
normalizeServerName,
convertWithResolvedRefs,
GenerationJobManager,
} = require('@librechat/api');
const {
Time,
@ -37,8 +38,9 @@ const { getLogStores } = require('~/cache');
* @param {ServerResponse} params.res - The Express response object for sending events.
* @param {string} params.stepId - The ID of the step in the flow.
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
*/
function createRunStepDeltaEmitter({ res, stepId, toolCall }) {
function createRunStepDeltaEmitter({ res, stepId, toolCall, streamId = null }) {
/**
* @param {string} authURL - The URL to redirect the user for OAuth authentication.
* @returns {void}
@ -54,7 +56,12 @@ function createRunStepDeltaEmitter({ res, stepId, toolCall }) {
expires_at: Date.now() + Time.TWO_MINUTES,
},
};
sendEvent(res, { event: GraphEvents.ON_RUN_STEP_DELTA, data });
const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data };
if (streamId) {
GenerationJobManager.emitChunk(streamId, eventData);
} else {
sendEvent(res, eventData);
}
};
}
@ -65,8 +72,9 @@ function createRunStepDeltaEmitter({ res, stepId, toolCall }) {
* @param {string} params.stepId - The ID of the step in the flow.
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
* @param {number} [params.index]
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
*/
function createRunStepEmitter({ res, runId, stepId, toolCall, index }) {
function createRunStepEmitter({ res, runId, stepId, toolCall, index, streamId = null }) {
return function () {
/** @type {import('@librechat/agents').RunStep} */
const data = {
@ -79,7 +87,12 @@ function createRunStepEmitter({ res, runId, stepId, toolCall, index }) {
tool_calls: [toolCall],
},
};
sendEvent(res, { event: GraphEvents.ON_RUN_STEP, data });
const eventData = { event: GraphEvents.ON_RUN_STEP, data };
if (streamId) {
GenerationJobManager.emitChunk(streamId, eventData);
} else {
sendEvent(res, eventData);
}
};
}
@ -110,10 +123,9 @@ function createOAuthStart({ flowId, flowManager, callback }) {
* @param {ServerResponse} params.res - The Express response object for sending events.
* @param {string} params.stepId - The ID of the step in the flow.
* @param {ToolCallChunk} params.toolCall - The tool call object containing tool information.
* @param {string} params.loginFlowId - The ID of the login flow.
* @param {FlowStateManager<any>} params.flowManager - The flow manager instance.
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
*/
function createOAuthEnd({ res, stepId, toolCall }) {
function createOAuthEnd({ res, stepId, toolCall, streamId = null }) {
return async function () {
/** @type {{ id: string; delta: AgentToolCallDelta }} */
const data = {
@ -123,7 +135,12 @@ function createOAuthEnd({ res, stepId, toolCall }) {
tool_calls: [{ ...toolCall }],
},
};
sendEvent(res, { event: GraphEvents.ON_RUN_STEP_DELTA, data });
const eventData = { event: GraphEvents.ON_RUN_STEP_DELTA, data };
if (streamId) {
GenerationJobManager.emitChunk(streamId, eventData);
} else {
sendEvent(res, eventData);
}
logger.debug('Sent OAuth login success to client');
};
}
@ -164,10 +181,19 @@ function createOAuthCallback({ runStepEmitter, runStepDeltaEmitter }) {
* @param {AbortSignal} params.signal
* @param {string} params.model
* @param {number} [params.index]
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
* @returns { Promise<Array<typeof tool | { _call: (toolInput: Object | string) => unknown}>> } An object with `_call` method to execute the tool input.
*/
async function reconnectServer({ res, user, index, signal, serverName, userMCPAuthMap }) {
async function reconnectServer({
res,
user,
index,
signal,
serverName,
userMCPAuthMap,
streamId = null,
}) {
const runId = Constants.USE_PRELIM_RESPONSE_MESSAGE_ID;
const flowId = `${user.id}:${serverName}:${Date.now()}`;
const flowManager = getFlowStateManager(getLogStores(CacheKeys.FLOWS));
@ -184,11 +210,13 @@ async function reconnectServer({ res, user, index, signal, serverName, userMCPAu
runId,
stepId,
toolCall,
streamId,
});
const runStepDeltaEmitter = createRunStepDeltaEmitter({
res,
stepId,
toolCall,
streamId,
});
const callback = createOAuthCallback({ runStepEmitter, runStepDeltaEmitter });
const oauthStart = createOAuthStart({
@ -224,6 +252,7 @@ async function reconnectServer({ res, user, index, signal, serverName, userMCPAu
* @param {Providers | EModelEndpoint} params.provider - The provider for the tool.
* @param {number} [params.index]
* @param {AbortSignal} [params.signal]
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
* @param {import('@librechat/api').ParsedServerConfig} [params.config]
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
* @returns { Promise<Array<typeof tool | { _call: (toolInput: Object | string) => unknown}>> } An object with `_call` method to execute the tool input.
@ -237,6 +266,7 @@ async function createMCPTools({
provider,
serverName,
userMCPAuthMap,
streamId = null,
}) {
// Early domain validation before reconnecting server (avoid wasted work on disallowed domains)
// Use getAppConfig() to support per-user/role domain restrictions
@ -252,7 +282,15 @@ async function createMCPTools({
}
}
const result = await reconnectServer({ res, user, index, signal, serverName, userMCPAuthMap });
const result = await reconnectServer({
res,
user,
index,
signal,
serverName,
userMCPAuthMap,
streamId,
});
if (!result || !result.tools) {
logger.warn(`[MCP][${serverName}] Failed to reinitialize MCP server.`);
return;
@ -265,6 +303,7 @@ async function createMCPTools({
user,
provider,
userMCPAuthMap,
streamId,
availableTools: result.availableTools,
toolKey: `${tool.name}${Constants.mcp_delimiter}${serverName}`,
config: serverConfig,
@ -286,6 +325,7 @@ async function createMCPTools({
* @param {string} params.model - The model for the tool.
* @param {number} [params.index]
* @param {AbortSignal} [params.signal]
* @param {string | null} [params.streamId] - The stream ID for resumable mode.
* @param {Providers | EModelEndpoint} params.provider - The provider for the tool.
* @param {LCAvailableTools} [params.availableTools]
* @param {Record<string, Record<string, string>>} [params.userMCPAuthMap]
@ -302,6 +342,7 @@ async function createMCPTool({
userMCPAuthMap,
availableTools,
config,
streamId = null,
}) {
const [toolName, serverName] = toolKey.split(Constants.mcp_delimiter);
@ -332,6 +373,7 @@ async function createMCPTool({
signal,
serverName,
userMCPAuthMap,
streamId,
});
toolDefinition = result?.availableTools?.[toolKey]?.function;
}
@ -347,10 +389,18 @@ async function createMCPTool({
toolName,
serverName,
toolDefinition,
streamId,
});
}
function createToolInstance({ res, toolName, serverName, toolDefinition, provider: _provider }) {
function createToolInstance({
res,
toolName,
serverName,
toolDefinition,
provider: _provider,
streamId = null,
}) {
/** @type {LCTool} */
const { description, parameters } = toolDefinition;
const isGoogle = _provider === Providers.VERTEXAI || _provider === Providers.GOOGLE;
@ -386,6 +436,7 @@ function createToolInstance({ res, toolName, serverName, toolDefinition, provide
res,
stepId,
toolCall,
streamId,
});
const oauthStart = createOAuthStart({
flowId,
@ -396,6 +447,7 @@ function createToolInstance({ res, toolName, serverName, toolDefinition, provide
res,
stepId,
toolCall,
streamId,
});
if (derivedSignal) {

View file

@ -630,6 +630,7 @@ async function loadAgentTools({
encrypted,
name: toolName,
description: functionSig.description,
streamId,
});
if (!tool) {