fix(agents): handle 'new' conversationId and improve abort reliability

- Treat 'new' as placeholder that needs UUID in request controller
- Send JSON response immediately before tool loading for faster SSE connection
- Use job's abort controller instead of prelimAbortController
- Emit errors to stream if headers already sent
- Skip 'new' as valid ID in abort endpoint
- Add fallback to find active jobs by userId when conversationId is 'new'
This commit is contained in:
Danny Avila 2025-12-18 19:26:24 -05:00
parent 39adeac86e
commit d9a5893d4b
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
2 changed files with 39 additions and 16 deletions

View file

@ -48,22 +48,25 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
const userId = req.user.id; const userId = req.user.id;
// Generate conversationId upfront if not provided - streamId === conversationId always // Generate conversationId upfront if not provided - streamId === conversationId always
const conversationId = reqConversationId || crypto.randomUUID(); // Treat "new" as a placeholder that needs a real UUID (frontend may send "new" for new convos)
const conversationId =
!reqConversationId || reqConversationId === 'new' ? crypto.randomUUID() : reqConversationId;
const streamId = conversationId; const streamId = conversationId;
let client = null; let client = null;
try { try {
const prelimAbortController = new AbortController();
res.on('close', () => {
if (!prelimAbortController.signal.aborted) {
prelimAbortController.abort();
}
});
const job = await GenerationJobManager.createJob(streamId, userId, conversationId); const job = await GenerationJobManager.createJob(streamId, userId, conversationId);
req._resumableStreamId = streamId; req._resumableStreamId = streamId;
// Send JSON response IMMEDIATELY so client can connect to SSE stream
// This is critical: tool loading (MCP OAuth) may emit events that the client needs to receive
res.json({ streamId, conversationId, status: 'started' });
// Note: We no longer use res.on('close') to abort since we send JSON immediately.
// The response closes normally after res.json(), which is not an abort condition.
// Abort handling is done through GenerationJobManager via the SSE stream connection.
// Track if partial response was already saved to avoid duplicates // Track if partial response was already saved to avoid duplicates
let partialResponseSaved = false; let partialResponseSaved = false;
@ -128,12 +131,13 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
req, req,
res, res,
endpointOption, endpointOption,
signal: prelimAbortController.signal, // Use the job's abort controller signal - allows abort via GenerationJobManager.abortJob()
signal: job.abortController.signal,
}); });
if (prelimAbortController.signal.aborted) { if (job.abortController.signal.aborted) {
GenerationJobManager.completeJob(streamId, 'Request aborted during initialization'); GenerationJobManager.completeJob(streamId, 'Request aborted during initialization');
return res.status(400).json({ error: 'Request aborted during initialization' }); return;
} }
client = result.client; client = result.client;
@ -147,8 +151,6 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
GenerationJobManager.setContentParts(streamId, client.contentParts); GenerationJobManager.setContentParts(streamId, client.contentParts);
} }
res.json({ streamId, conversationId, status: 'started' });
let userMessage; let userMessage;
const getReqData = (data = {}) => { const getReqData = (data = {}) => {
@ -339,6 +341,9 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
logger.error('[ResumableAgentController] Initialization error:', error); logger.error('[ResumableAgentController] Initialization error:', error);
if (!res.headersSent) { if (!res.headersSent) {
res.status(500).json({ error: error.message || 'Failed to start generation' }); res.status(500).json({ error: error.message || 'Failed to start generation' });
} else {
// JSON already sent, emit error to stream so client can receive it
GenerationJobManager.emitError(streamId, error.message || 'Failed to start generation');
} }
GenerationJobManager.completeJob(streamId, error.message); GenerationJobManager.completeJob(streamId, error.message);
if (client) { if (client) {
@ -374,7 +379,9 @@ const _LegacyAgentController = async (req, res, next, initializeClient, addTitle
} = req.body; } = req.body;
// Generate conversationId upfront if not provided - streamId === conversationId always // Generate conversationId upfront if not provided - streamId === conversationId always
const conversationId = reqConversationId || crypto.randomUUID(); // Treat "new" as a placeholder that needs a real UUID (frontend may send "new" for new convos)
const conversationId =
!reqConversationId || reqConversationId === 'new' ? crypto.randomUUID() : reqConversationId;
const streamId = conversationId; const streamId = conversationId;
let userMessage; let userMessage;

View file

@ -171,10 +171,26 @@ router.post('/chat/abort', async (req, res) => {
logger.debug(`[AgentStream] Body:`, req.body); logger.debug(`[AgentStream] Body:`, req.body);
const { streamId, conversationId, abortKey } = req.body; const { streamId, conversationId, abortKey } = req.body;
const userId = req.user?.id;
// streamId === conversationId, so try any of the provided IDs // streamId === conversationId, so try any of the provided IDs
const jobStreamId = streamId || conversationId || abortKey?.split(':')[0]; // Skip "new" as it's a placeholder for new conversations, not an actual ID
const job = jobStreamId ? await GenerationJobManager.getJob(jobStreamId) : null; let jobStreamId =
streamId || (conversationId !== 'new' ? conversationId : null) || abortKey?.split(':')[0];
let job = jobStreamId ? await GenerationJobManager.getJob(jobStreamId) : null;
// Fallback: if job not found and we have a userId, look up active jobs for user
// This handles the case where frontend sends "new" but job was created with a UUID
if (!job && userId) {
logger.debug(`[AgentStream] Job not found by ID, checking active jobs for user: ${userId}`);
const activeJobIds = await GenerationJobManager.getActiveJobIdsForUser(userId);
if (activeJobIds.length > 0) {
// Abort the most recent active job for this user
jobStreamId = activeJobIds[0];
job = await GenerationJobManager.getJob(jobStreamId);
logger.debug(`[AgentStream] Found active job for user: ${jobStreamId}`);
}
}
logger.debug(`[AgentStream] Computed jobStreamId: ${jobStreamId}`); logger.debug(`[AgentStream] Computed jobStreamId: ${jobStreamId}`);