mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-01-23 10:46:12 +01:00
* 🔧 fix: Prevent race conditions in message saving during abort scenarios * Added logic to save partial responses before returning from the abort endpoint to ensure parentMessageId exists in the database. * Updated the ResumableAgentController to save response messages before emitting final events, preventing orphaned parentMessageIds. * Enhanced handling of unfinished responses to improve stability and data integrity in agent interactions. * 🔧 fix: logging and job replacement handling in ResumableAgentController * Added detailed logging for job creation and final event emissions to improve traceability. * Implemented logic to check for job replacement before emitting events, preventing stale requests from affecting newer jobs. * Updated abort handling to log additional context about the abort result, enhancing debugging capabilities. * refactor: abort handling and token spending logic in AgentStream * Added authorization check for abort attempts to prevent unauthorized access. * Improved response message saving logic to ensure valid message IDs are stored. * Implemented token spending for aborted requests to prevent double-spending across parallel agents. * Enhanced logging for better traceability of token spending operations during abort scenarios. * refactor: remove TODO comments for token spending in abort handling * Removed outdated TODO comments regarding token spending for aborted requests in the abort endpoint. * This change streamlines the code and clarifies the current implementation status. * ✅ test: Add comprehensive tests for job replacement and abort handling * Introduced unit tests for job replacement detection in ResumableAgentController, covering job creation timestamp tracking, stale job detection, and response message saving order. * Added tests for the agent abort endpoint, ensuring proper authorization checks, early abort handling, and partial response saving. * Enhanced logging and error handling in tests to improve traceability and robustness of the abort functionality.
266 lines
8.9 KiB
JavaScript
266 lines
8.9 KiB
JavaScript
const express = require('express');
|
|
const { isEnabled, GenerationJobManager } = require('@librechat/api');
|
|
const { logger } = require('@librechat/data-schemas');
|
|
const {
|
|
uaParser,
|
|
checkBan,
|
|
requireJwtAuth,
|
|
messageIpLimiter,
|
|
configMiddleware,
|
|
messageUserLimiter,
|
|
} = require('~/server/middleware');
|
|
const { saveMessage } = require('~/models');
|
|
const { v1 } = require('./v1');
|
|
const chat = require('./chat');
|
|
|
|
const { LIMIT_MESSAGE_IP, LIMIT_MESSAGE_USER } = process.env ?? {};
|
|
|
|
const router = express.Router();
|
|
|
|
router.use(requireJwtAuth);
|
|
router.use(checkBan);
|
|
router.use(uaParser);
|
|
|
|
router.use('/', v1);
|
|
|
|
/**
|
|
* Stream endpoints - mounted before chatRouter to bypass rate limiters
|
|
* These are GET requests and don't need message body validation or rate limiting
|
|
*/
|
|
|
|
/**
|
|
* @route GET /chat/stream/:streamId
|
|
* @desc Subscribe to an ongoing generation job's SSE stream with replay support
|
|
* @access Private
|
|
* @description Sends sync event with resume state, replays missed chunks, then streams live
|
|
* @query resume=true - Indicates this is a reconnection (sends sync event)
|
|
*/
|
|
router.get('/chat/stream/:streamId', async (req, res) => {
|
|
const { streamId } = req.params;
|
|
const isResume = req.query.resume === 'true';
|
|
|
|
const job = await GenerationJobManager.getJob(streamId);
|
|
if (!job) {
|
|
return res.status(404).json({
|
|
error: 'Stream not found',
|
|
message: 'The generation job does not exist or has expired.',
|
|
});
|
|
}
|
|
|
|
res.setHeader('Content-Encoding', 'identity');
|
|
res.setHeader('Content-Type', 'text/event-stream');
|
|
res.setHeader('Cache-Control', 'no-cache, no-transform');
|
|
res.setHeader('Connection', 'keep-alive');
|
|
res.setHeader('X-Accel-Buffering', 'no');
|
|
res.flushHeaders();
|
|
|
|
logger.debug(`[AgentStream] Client subscribed to ${streamId}, resume: ${isResume}`);
|
|
|
|
// Send sync event with resume state for ALL reconnecting clients
|
|
// This supports multi-tab scenarios where each tab needs run step data
|
|
if (isResume) {
|
|
const resumeState = await GenerationJobManager.getResumeState(streamId);
|
|
if (resumeState && !res.writableEnded) {
|
|
// Send sync event with run steps AND aggregatedContent
|
|
// Client will use aggregatedContent to initialize message state
|
|
res.write(`event: message\ndata: ${JSON.stringify({ sync: true, resumeState })}\n\n`);
|
|
if (typeof res.flush === 'function') {
|
|
res.flush();
|
|
}
|
|
logger.debug(
|
|
`[AgentStream] Sent sync event for ${streamId} with ${resumeState.runSteps.length} run steps`,
|
|
);
|
|
}
|
|
}
|
|
|
|
const result = await GenerationJobManager.subscribe(
|
|
streamId,
|
|
(event) => {
|
|
if (!res.writableEnded) {
|
|
res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`);
|
|
if (typeof res.flush === 'function') {
|
|
res.flush();
|
|
}
|
|
}
|
|
},
|
|
(event) => {
|
|
if (!res.writableEnded) {
|
|
res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`);
|
|
if (typeof res.flush === 'function') {
|
|
res.flush();
|
|
}
|
|
res.end();
|
|
}
|
|
},
|
|
(error) => {
|
|
if (!res.writableEnded) {
|
|
res.write(`event: error\ndata: ${JSON.stringify({ error })}\n\n`);
|
|
if (typeof res.flush === 'function') {
|
|
res.flush();
|
|
}
|
|
res.end();
|
|
}
|
|
},
|
|
);
|
|
|
|
if (!result) {
|
|
return res.status(404).json({ error: 'Failed to subscribe to stream' });
|
|
}
|
|
|
|
req.on('close', () => {
|
|
logger.debug(`[AgentStream] Client disconnected from ${streamId}`);
|
|
result.unsubscribe();
|
|
});
|
|
});
|
|
|
|
/**
|
|
* @route GET /chat/active
|
|
* @desc Get all active generation job IDs for the current user
|
|
* @access Private
|
|
* @returns { activeJobIds: string[] }
|
|
*/
|
|
router.get('/chat/active', async (req, res) => {
|
|
const activeJobIds = await GenerationJobManager.getActiveJobIdsForUser(req.user.id);
|
|
res.json({ activeJobIds });
|
|
});
|
|
|
|
/**
|
|
* @route GET /chat/status/:conversationId
|
|
* @desc Check if there's an active generation job for a conversation
|
|
* @access Private
|
|
* @returns { active, streamId, status, aggregatedContent, createdAt, resumeState }
|
|
*/
|
|
router.get('/chat/status/:conversationId', async (req, res) => {
|
|
const { conversationId } = req.params;
|
|
|
|
// streamId === conversationId, so we can use getJob directly
|
|
const job = await GenerationJobManager.getJob(conversationId);
|
|
|
|
if (!job) {
|
|
return res.json({ active: false });
|
|
}
|
|
|
|
if (job.metadata.userId !== req.user.id) {
|
|
return res.status(403).json({ error: 'Unauthorized' });
|
|
}
|
|
|
|
// Get resume state which contains aggregatedContent
|
|
// Avoid calling both getStreamInfo and getResumeState (both fetch content)
|
|
const resumeState = await GenerationJobManager.getResumeState(conversationId);
|
|
const isActive = job.status === 'running';
|
|
|
|
res.json({
|
|
active: isActive,
|
|
streamId: conversationId,
|
|
status: job.status,
|
|
aggregatedContent: resumeState?.aggregatedContent ?? [],
|
|
createdAt: job.createdAt,
|
|
resumeState,
|
|
});
|
|
});
|
|
|
|
/**
|
|
* @route POST /chat/abort
|
|
* @desc Abort an ongoing generation job
|
|
* @access Private
|
|
* @description Mounted before chatRouter to bypass buildEndpointOption middleware
|
|
*/
|
|
router.post('/chat/abort', async (req, res) => {
|
|
logger.debug(`[AgentStream] ========== ABORT ENDPOINT HIT ==========`);
|
|
logger.debug(`[AgentStream] Method: ${req.method}, Path: ${req.path}`);
|
|
logger.debug(`[AgentStream] Body:`, req.body);
|
|
|
|
const { streamId, conversationId, abortKey } = req.body;
|
|
const userId = req.user?.id;
|
|
|
|
// streamId === conversationId, so try any of the provided IDs
|
|
// Skip "new" as it's a placeholder for new conversations, not an actual ID
|
|
let jobStreamId =
|
|
streamId || (conversationId !== 'new' ? conversationId : null) || abortKey?.split(':')[0];
|
|
let job = jobStreamId ? await GenerationJobManager.getJob(jobStreamId) : null;
|
|
|
|
// Fallback: if job not found and we have a userId, look up active jobs for user
|
|
// This handles the case where frontend sends "new" but job was created with a UUID
|
|
if (!job && userId) {
|
|
logger.debug(`[AgentStream] Job not found by ID, checking active jobs for user: ${userId}`);
|
|
const activeJobIds = await GenerationJobManager.getActiveJobIdsForUser(userId);
|
|
if (activeJobIds.length > 0) {
|
|
// Abort the most recent active job for this user
|
|
jobStreamId = activeJobIds[0];
|
|
job = await GenerationJobManager.getJob(jobStreamId);
|
|
logger.debug(`[AgentStream] Found active job for user: ${jobStreamId}`);
|
|
}
|
|
}
|
|
|
|
logger.debug(`[AgentStream] Computed jobStreamId: ${jobStreamId}`);
|
|
|
|
if (job && jobStreamId) {
|
|
if (job.metadata?.userId && job.metadata.userId !== userId) {
|
|
logger.warn(`[AgentStream] Unauthorized abort attempt for ${jobStreamId} by user ${userId}`);
|
|
return res.status(403).json({ error: 'Unauthorized' });
|
|
}
|
|
|
|
logger.debug(`[AgentStream] Job found, aborting: ${jobStreamId}`);
|
|
const abortResult = await GenerationJobManager.abortJob(jobStreamId);
|
|
logger.debug(`[AgentStream] Job aborted successfully: ${jobStreamId}`, {
|
|
abortResultSuccess: abortResult.success,
|
|
abortResultUserMessageId: abortResult.jobData?.userMessage?.messageId,
|
|
abortResultResponseMessageId: abortResult.jobData?.responseMessageId,
|
|
});
|
|
|
|
// CRITICAL: Save partial response BEFORE returning to prevent race condition.
|
|
// If user sends a follow-up immediately after abort, the parentMessageId must exist in DB.
|
|
// Only save if we have a valid responseMessageId (skip early aborts before generation started)
|
|
if (
|
|
abortResult.success &&
|
|
abortResult.jobData?.userMessage?.messageId &&
|
|
abortResult.jobData?.responseMessageId
|
|
) {
|
|
const { jobData, content, text } = abortResult;
|
|
const responseMessage = {
|
|
messageId: jobData.responseMessageId,
|
|
parentMessageId: jobData.userMessage.messageId,
|
|
conversationId: jobData.conversationId,
|
|
content: content || [],
|
|
text: text || '',
|
|
sender: jobData.sender || 'AI',
|
|
endpoint: jobData.endpoint,
|
|
model: jobData.model,
|
|
unfinished: true,
|
|
error: false,
|
|
isCreatedByUser: false,
|
|
user: userId,
|
|
};
|
|
|
|
try {
|
|
await saveMessage(req, responseMessage, {
|
|
context: 'api/server/routes/agents/index.js - abort endpoint',
|
|
});
|
|
logger.debug(`[AgentStream] Saved partial response for: ${jobStreamId}`);
|
|
} catch (saveError) {
|
|
logger.error(`[AgentStream] Failed to save partial response: ${saveError.message}`);
|
|
}
|
|
}
|
|
|
|
return res.json({ success: true, aborted: jobStreamId });
|
|
}
|
|
|
|
logger.warn(`[AgentStream] Job not found for streamId: ${jobStreamId}`);
|
|
return res.status(404).json({ error: 'Job not found', streamId: jobStreamId });
|
|
});
|
|
|
|
const chatRouter = express.Router();
|
|
chatRouter.use(configMiddleware);
|
|
|
|
if (isEnabled(LIMIT_MESSAGE_IP)) {
|
|
chatRouter.use(messageIpLimiter);
|
|
}
|
|
|
|
if (isEnabled(LIMIT_MESSAGE_USER)) {
|
|
chatRouter.use(messageUserLimiter);
|
|
}
|
|
|
|
chatRouter.use('/', chat);
|
|
router.use('/chat', chatRouter);
|
|
|
|
module.exports = router;
|