LibreChat/api/server/routes/agents/index.js
Danny Avila 7bc793b18d
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
🌊 fix: Prevent Buffered Event Duplication on SSE Resume Connections (#12225)
* fix: skipBufferReplay for job resume connections

- Introduced a new option `skipBufferReplay` in the `subscribe` method of `GenerationJobManagerClass` to prevent duplication of events when resuming a connection.
- Updated the logic to conditionally skip replaying buffered events if a sync event has already been sent, enhancing the efficiency of event handling during reconnections.
- Added integration tests to verify the correct behavior of the new option, ensuring that no buffered events are replayed when `skipBufferReplay` is true, while still allowing for normal replay behavior when false.

* refactor: Update GenerationJobManager to handle sync events more efficiently

- Modified the `subscribe` method to utilize a new `skipBufferReplay` option, allowing for the prevention of duplicate events during resume connections.
- Enhanced the logic in the `chat/stream` route to conditionally skip replaying buffered events if a sync event has already been sent, improving event handling efficiency.
- Updated integration tests to verify the correct behavior of the new option, ensuring that no buffered events are replayed when `skipBufferReplay` is true, while maintaining normal replay behavior when false.

* test: Enhance GenerationJobManager integration tests for Redis mode

- Updated integration tests to conditionally run based on the USE_REDIS environment variable, allowing for better control over Redis-related tests.
- Refactored test descriptions to utilize a dynamic `describeRedis` function, improving clarity and organization of tests related to Redis functionality.
- Removed redundant checks for Redis availability within individual tests, streamlining the test logic and enhancing readability.

* fix: sync handler state for new messages on resume

The sync event's else branch (new response message) was missing
resetContentHandler() and syncStepMessage() calls, leaving stale
handler state that caused subsequent deltas to build on partial
content instead of the synced aggregatedContent.

* feat: atomic subscribeWithResume to close resume event gap

Replaces separate getResumeState() + subscribe() calls with a single
subscribeWithResume() that atomically drains earlyEventBuffer between
the resume snapshot and the subscribe. In in-memory mode, drained events
are returned as pendingEvents for the client to replay after sync.
In Redis mode, pendingEvents is empty since chunks are already persisted.

The route handler now uses the atomic method for resume connections and
extracted shared SSE write helpers to reduce duplication. The client
replays any pendingEvents through the existing step/content handlers
after applying aggregatedContent from the sync payload.

* fix: only capture gap events in subscribeWithResume, not pre-snapshot buffer

The previous implementation drained the entire earlyEventBuffer into
pendingEvents, but pre-snapshot events are already reflected in
aggregatedContent. Replaying them re-introduced the duplication bug
through a different vector.

Now records buffer length before getResumeState() and slices from that
index, so only events arriving during the async gap are returned as
pendingEvents.

Also:
- Handle pendingEvents when resumeState is null (replay directly)
- Hoist duplicate test helpers to shared scope
- Remove redundant writableEnded guard in onDone
2026-03-14 10:54:26 -04:00

296 lines
9.7 KiB
JavaScript

const express = require('express');
const { isEnabled, GenerationJobManager } = require('@librechat/api');
const { logger } = require('@librechat/data-schemas');
const {
uaParser,
checkBan,
requireJwtAuth,
messageIpLimiter,
configMiddleware,
messageUserLimiter,
} = require('~/server/middleware');
const { saveMessage } = require('~/models');
const openai = require('./openai');
const responses = require('./responses');
const { v1 } = require('./v1');
const chat = require('./chat');
const { LIMIT_MESSAGE_IP, LIMIT_MESSAGE_USER } = process.env ?? {};
const router = express.Router();
/**
* Open Responses API routes (API key authentication handled in route file)
* Mounted at /agents/v1/responses (full path: /api/agents/v1/responses)
* NOTE: Must be mounted BEFORE /v1 to avoid being caught by the less specific route
* @see https://openresponses.org/specification
*/
router.use('/v1/responses', responses);
/**
* OpenAI-compatible API routes (API key authentication handled in route file)
* Mounted at /agents/v1 (full path: /api/agents/v1/chat/completions)
*/
router.use('/v1', openai);
router.use(requireJwtAuth);
router.use(checkBan);
router.use(uaParser);
router.use('/', v1);
/**
* Stream endpoints - mounted before chatRouter to bypass rate limiters
* These are GET requests and don't need message body validation or rate limiting
*/
/**
* @route GET /chat/stream/:streamId
* @desc Subscribe to an ongoing generation job's SSE stream with replay support
* @access Private
* @description Sends sync event with resume state, replays missed chunks, then streams live
* @query resume=true - Indicates this is a reconnection (sends sync event)
*/
router.get('/chat/stream/:streamId', async (req, res) => {
const { streamId } = req.params;
const isResume = req.query.resume === 'true';
const job = await GenerationJobManager.getJob(streamId);
if (!job) {
return res.status(404).json({
error: 'Stream not found',
message: 'The generation job does not exist or has expired.',
});
}
if (job.metadata?.userId && job.metadata.userId !== req.user.id) {
return res.status(403).json({ error: 'Unauthorized' });
}
res.setHeader('Content-Encoding', 'identity');
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache, no-transform');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
res.flushHeaders();
logger.debug(`[AgentStream] Client subscribed to ${streamId}, resume: ${isResume}`);
const writeEvent = (event) => {
if (!res.writableEnded) {
res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`);
if (typeof res.flush === 'function') {
res.flush();
}
}
};
const onDone = (event) => {
writeEvent(event);
res.end();
};
const onError = (error) => {
if (!res.writableEnded) {
res.write(`event: error\ndata: ${JSON.stringify({ error })}\n\n`);
if (typeof res.flush === 'function') {
res.flush();
}
res.end();
}
};
let result;
if (isResume) {
const { subscription, resumeState, pendingEvents } =
await GenerationJobManager.subscribeWithResume(streamId, writeEvent, onDone, onError);
if (!res.writableEnded) {
if (resumeState) {
res.write(
`event: message\ndata: ${JSON.stringify({ sync: true, resumeState, pendingEvents })}\n\n`,
);
if (typeof res.flush === 'function') {
res.flush();
}
GenerationJobManager.markSyncSent(streamId);
logger.debug(
`[AgentStream] Sent sync event for ${streamId} with ${resumeState.runSteps.length} run steps, ${pendingEvents.length} pending events`,
);
} else if (pendingEvents.length > 0) {
for (const event of pendingEvents) {
writeEvent(event);
}
logger.warn(
`[AgentStream] Resume state null for ${streamId}, replayed ${pendingEvents.length} gap events directly`,
);
}
}
result = subscription;
} else {
result = await GenerationJobManager.subscribe(streamId, writeEvent, onDone, onError);
}
if (!result) {
return res.status(404).json({ error: 'Failed to subscribe to stream' });
}
req.on('close', () => {
logger.debug(`[AgentStream] Client disconnected from ${streamId}`);
result.unsubscribe();
});
});
/**
* @route GET /chat/active
* @desc Get all active generation job IDs for the current user
* @access Private
* @returns { activeJobIds: string[] }
*/
router.get('/chat/active', async (req, res) => {
const activeJobIds = await GenerationJobManager.getActiveJobIdsForUser(req.user.id);
res.json({ activeJobIds });
});
/**
* @route GET /chat/status/:conversationId
* @desc Check if there's an active generation job for a conversation
* @access Private
* @returns { active, streamId, status, aggregatedContent, createdAt, resumeState }
*/
router.get('/chat/status/:conversationId', async (req, res) => {
const { conversationId } = req.params;
// streamId === conversationId, so we can use getJob directly
const job = await GenerationJobManager.getJob(conversationId);
if (!job) {
return res.json({ active: false });
}
if (job.metadata.userId !== req.user.id) {
return res.status(403).json({ error: 'Unauthorized' });
}
// Get resume state which contains aggregatedContent
// Avoid calling both getStreamInfo and getResumeState (both fetch content)
const resumeState = await GenerationJobManager.getResumeState(conversationId);
const isActive = job.status === 'running';
res.json({
active: isActive,
streamId: conversationId,
status: job.status,
aggregatedContent: resumeState?.aggregatedContent ?? [],
createdAt: job.createdAt,
resumeState,
});
});
/**
* @route POST /chat/abort
* @desc Abort an ongoing generation job
* @access Private
* @description Mounted before chatRouter to bypass buildEndpointOption middleware
*/
router.post('/chat/abort', async (req, res) => {
logger.debug(`[AgentStream] ========== ABORT ENDPOINT HIT ==========`);
logger.debug(`[AgentStream] Method: ${req.method}, Path: ${req.path}`);
logger.debug(`[AgentStream] Body:`, req.body);
const { streamId, conversationId, abortKey } = req.body;
const userId = req.user?.id;
// streamId === conversationId, so try any of the provided IDs
// Skip "new" as it's a placeholder for new conversations, not an actual ID
let jobStreamId =
streamId || (conversationId !== 'new' ? conversationId : null) || abortKey?.split(':')[0];
let job = jobStreamId ? await GenerationJobManager.getJob(jobStreamId) : null;
// Fallback: if job not found and we have a userId, look up active jobs for user
// This handles the case where frontend sends "new" but job was created with a UUID
if (!job && userId) {
logger.debug(`[AgentStream] Job not found by ID, checking active jobs for user: ${userId}`);
const activeJobIds = await GenerationJobManager.getActiveJobIdsForUser(userId);
if (activeJobIds.length > 0) {
// Abort the most recent active job for this user
jobStreamId = activeJobIds[0];
job = await GenerationJobManager.getJob(jobStreamId);
logger.debug(`[AgentStream] Found active job for user: ${jobStreamId}`);
}
}
logger.debug(`[AgentStream] Computed jobStreamId: ${jobStreamId}`);
if (job && jobStreamId) {
if (job.metadata?.userId && job.metadata.userId !== userId) {
logger.warn(`[AgentStream] Unauthorized abort attempt for ${jobStreamId} by user ${userId}`);
return res.status(403).json({ error: 'Unauthorized' });
}
logger.debug(`[AgentStream] Job found, aborting: ${jobStreamId}`);
const abortResult = await GenerationJobManager.abortJob(jobStreamId);
logger.debug(`[AgentStream] Job aborted successfully: ${jobStreamId}`, {
abortResultSuccess: abortResult.success,
abortResultUserMessageId: abortResult.jobData?.userMessage?.messageId,
abortResultResponseMessageId: abortResult.jobData?.responseMessageId,
});
// CRITICAL: Save partial response BEFORE returning to prevent race condition.
// If user sends a follow-up immediately after abort, the parentMessageId must exist in DB.
// Only save if we have a valid responseMessageId (skip early aborts before generation started)
if (
abortResult.success &&
abortResult.jobData?.userMessage?.messageId &&
abortResult.jobData?.responseMessageId
) {
const { jobData, content, text } = abortResult;
const responseMessage = {
messageId: jobData.responseMessageId,
parentMessageId: jobData.userMessage.messageId,
conversationId: jobData.conversationId,
content: content || [],
text: text || '',
sender: jobData.sender || 'AI',
endpoint: jobData.endpoint,
model: jobData.model,
unfinished: true,
error: false,
isCreatedByUser: false,
user: userId,
};
try {
await saveMessage(req, responseMessage, {
context: 'api/server/routes/agents/index.js - abort endpoint',
});
logger.debug(`[AgentStream] Saved partial response for: ${jobStreamId}`);
} catch (saveError) {
logger.error(`[AgentStream] Failed to save partial response: ${saveError.message}`);
}
}
return res.json({ success: true, aborted: jobStreamId });
}
logger.warn(`[AgentStream] Job not found for streamId: ${jobStreamId}`);
return res.status(404).json({ error: 'Job not found', streamId: jobStreamId });
});
const chatRouter = express.Router();
chatRouter.use(configMiddleware);
if (isEnabled(LIMIT_MESSAGE_IP)) {
chatRouter.use(messageIpLimiter);
}
if (isEnabled(LIMIT_MESSAGE_USER)) {
chatRouter.use(messageUserLimiter);
}
chatRouter.use('/', chat);
router.use('/chat', chatRouter);
module.exports = router;