From d9a5893d4b5605ec9fee99ae9246089f412a9d06 Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Thu, 18 Dec 2025 19:26:24 -0500 Subject: [PATCH] fix(agents): handle 'new' conversationId and improve abort reliability - Treat 'new' as placeholder that needs UUID in request controller - Send JSON response immediately before tool loading for faster SSE connection - Use job's abort controller instead of prelimAbortController - Emit errors to stream if headers already sent - Skip 'new' as valid ID in abort endpoint - Add fallback to find active jobs by userId when conversationId is 'new' --- api/server/controllers/agents/request.js | 35 ++++++++++++++---------- api/server/routes/agents/index.js | 20 ++++++++++++-- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/api/server/controllers/agents/request.js b/api/server/controllers/agents/request.js index f5d8ae6c77..2460a37240 100644 --- a/api/server/controllers/agents/request.js +++ b/api/server/controllers/agents/request.js @@ -48,22 +48,25 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit const userId = req.user.id; // Generate conversationId upfront if not provided - streamId === conversationId always - const conversationId = reqConversationId || crypto.randomUUID(); + // Treat "new" as a placeholder that needs a real UUID (frontend may send "new" for new convos) + const conversationId = + !reqConversationId || reqConversationId === 'new' ? crypto.randomUUID() : reqConversationId; const streamId = conversationId; let client = null; try { - const prelimAbortController = new AbortController(); - res.on('close', () => { - if (!prelimAbortController.signal.aborted) { - prelimAbortController.abort(); - } - }); - const job = await GenerationJobManager.createJob(streamId, userId, conversationId); req._resumableStreamId = streamId; + // Send JSON response IMMEDIATELY so client can connect to SSE stream + // This is critical: tool loading (MCP OAuth) may emit events that the client needs to receive + res.json({ streamId, conversationId, status: 'started' }); + + // Note: We no longer use res.on('close') to abort since we send JSON immediately. + // The response closes normally after res.json(), which is not an abort condition. + // Abort handling is done through GenerationJobManager via the SSE stream connection. + // Track if partial response was already saved to avoid duplicates let partialResponseSaved = false; @@ -128,12 +131,13 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit req, res, endpointOption, - signal: prelimAbortController.signal, + // Use the job's abort controller signal - allows abort via GenerationJobManager.abortJob() + signal: job.abortController.signal, }); - if (prelimAbortController.signal.aborted) { + if (job.abortController.signal.aborted) { GenerationJobManager.completeJob(streamId, 'Request aborted during initialization'); - return res.status(400).json({ error: 'Request aborted during initialization' }); + return; } client = result.client; @@ -147,8 +151,6 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit GenerationJobManager.setContentParts(streamId, client.contentParts); } - res.json({ streamId, conversationId, status: 'started' }); - let userMessage; const getReqData = (data = {}) => { @@ -339,6 +341,9 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit logger.error('[ResumableAgentController] Initialization error:', error); if (!res.headersSent) { res.status(500).json({ error: error.message || 'Failed to start generation' }); + } else { + // JSON already sent, emit error to stream so client can receive it + GenerationJobManager.emitError(streamId, error.message || 'Failed to start generation'); } GenerationJobManager.completeJob(streamId, error.message); if (client) { @@ -374,7 +379,9 @@ const _LegacyAgentController = async (req, res, next, initializeClient, addTitle } = req.body; // Generate conversationId upfront if not provided - streamId === conversationId always - const conversationId = reqConversationId || crypto.randomUUID(); + // Treat "new" as a placeholder that needs a real UUID (frontend may send "new" for new convos) + const conversationId = + !reqConversationId || reqConversationId === 'new' ? crypto.randomUUID() : reqConversationId; const streamId = conversationId; let userMessage; diff --git a/api/server/routes/agents/index.js b/api/server/routes/agents/index.js index 70570d6e38..21af27d0bc 100644 --- a/api/server/routes/agents/index.js +++ b/api/server/routes/agents/index.js @@ -171,10 +171,26 @@ router.post('/chat/abort', async (req, res) => { logger.debug(`[AgentStream] Body:`, req.body); const { streamId, conversationId, abortKey } = req.body; + const userId = req.user?.id; // streamId === conversationId, so try any of the provided IDs - const jobStreamId = streamId || conversationId || abortKey?.split(':')[0]; - const job = jobStreamId ? await GenerationJobManager.getJob(jobStreamId) : null; + // Skip "new" as it's a placeholder for new conversations, not an actual ID + let jobStreamId = + streamId || (conversationId !== 'new' ? conversationId : null) || abortKey?.split(':')[0]; + let job = jobStreamId ? await GenerationJobManager.getJob(jobStreamId) : null; + + // Fallback: if job not found and we have a userId, look up active jobs for user + // This handles the case where frontend sends "new" but job was created with a UUID + if (!job && userId) { + logger.debug(`[AgentStream] Job not found by ID, checking active jobs for user: ${userId}`); + const activeJobIds = await GenerationJobManager.getActiveJobIdsForUser(userId); + if (activeJobIds.length > 0) { + // Abort the most recent active job for this user + jobStreamId = activeJobIds[0]; + job = await GenerationJobManager.getJob(jobStreamId); + logger.debug(`[AgentStream] Found active job for user: ${jobStreamId}`); + } + } logger.debug(`[AgentStream] Computed jobStreamId: ${jobStreamId}`);