WIP: resumable stream

This commit is contained in:
Danny Avila 2025-12-11 09:52:15 -05:00
parent 2522cf760f
commit ff14cd3b44
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
12 changed files with 498 additions and 209 deletions

View file

@ -66,6 +66,65 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
const job = GenerationJobManager.createJob(streamId, userId, reqConversationId); const job = GenerationJobManager.createJob(streamId, userId, reqConversationId);
req._resumableStreamId = streamId; req._resumableStreamId = streamId;
// Track if partial response was already saved to avoid duplicates
let partialResponseSaved = false;
/**
* Listen for all subscribers leaving to save partial response.
* This ensures the response is saved to DB even if all clients disconnect
* while generation continues.
*
* Note: The messageId used here falls back to `${userMessage.messageId}_` if the
* actual response messageId isn't available yet. The final response save will
* overwrite this with the complete response using the same messageId pattern.
*/
job.emitter.on('allSubscribersLeft', async (aggregatedContent) => {
if (partialResponseSaved || !aggregatedContent || aggregatedContent.length === 0) {
return;
}
const resumeState = GenerationJobManager.getResumeState(streamId);
if (!resumeState?.userMessage) {
logger.debug('[ResumableAgentController] No user message to save partial response for');
return;
}
partialResponseSaved = true;
const responseConversationId = resumeState.conversationId || reqConversationId;
try {
const partialMessage = {
messageId: resumeState.responseMessageId || `${resumeState.userMessage.messageId}_`,
conversationId: responseConversationId,
parentMessageId: resumeState.userMessage.messageId,
sender: client?.sender ?? 'AI',
content: aggregatedContent,
unfinished: true,
error: false,
isCreatedByUser: false,
user: userId,
endpoint: endpointOption.endpoint,
model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model,
};
if (req.body?.agent_id) {
partialMessage.agent_id = req.body.agent_id;
}
await saveMessage(req, partialMessage, {
context: 'api/server/controllers/agents/request.js - partial response on disconnect',
});
logger.debug(
`[ResumableAgentController] Saved partial response for ${streamId}, content parts: ${aggregatedContent.length}`,
);
} catch (error) {
logger.error('[ResumableAgentController] Error saving partial response:', error);
// Reset flag so we can try again if subscribers reconnect and leave again
partialResponseSaved = false;
}
});
/** @type {{ client: TAgentClient; userMCPAuthMap?: Record<string, Record<string, string>> }} */ /** @type {{ client: TAgentClient; userMCPAuthMap?: Record<string, Record<string, string>> }} */
const result = await initializeClient({ const result = await initializeClient({
req, req,
@ -106,9 +165,14 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
} }
try { try {
const onStart = (userMsg, _respMsgId, _isNewConvo) => { const onStart = (userMsg, respMsgId, _isNewConvo) => {
userMessage = userMsg; userMessage = userMsg;
// Store the response messageId upfront so partial saves use the same ID
if (respMsgId) {
GenerationJobManager.updateMetadata(streamId, { responseMessageId: respMsgId });
}
GenerationJobManager.emitChunk(streamId, { GenerationJobManager.emitChunk(streamId, {
created: true, created: true,
message: userMessage, message: userMessage,
@ -203,8 +267,15 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
}); });
} }
// Skip title generation if job was aborted
const newConvo = !reqConversationId; const newConvo = !reqConversationId;
if (addTitle && parentMessageId === Constants.NO_PARENT && newConvo) { const shouldGenerateTitle =
addTitle &&
parentMessageId === Constants.NO_PARENT &&
newConvo &&
!job.abortController.signal.aborted;
if (shouldGenerateTitle) {
addTitle(req, { addTitle(req, {
text, text,
response: { ...response }, response: { ...response },
@ -224,12 +295,24 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
} }
} }
} catch (error) { } catch (error) {
// Check if this was an abort (not a real error)
const wasAborted = job.abortController.signal.aborted || error.message?.includes('abort');
if (wasAborted) {
logger.debug(`[ResumableAgentController] Generation aborted for ${streamId}`);
// abortJob already handled emitDone and completeJob
} else {
logger.error(`[ResumableAgentController] Generation error for ${streamId}:`, error); logger.error(`[ResumableAgentController] Generation error for ${streamId}:`, error);
GenerationJobManager.emitError(streamId, error.message || 'Generation failed'); GenerationJobManager.emitError(streamId, error.message || 'Generation failed');
GenerationJobManager.completeJob(streamId, error.message); GenerationJobManager.completeJob(streamId, error.message);
}
if (client) { if (client) {
disposeClient(client); disposeClient(client);
} }
// Don't continue to title generation after error/abort
return;
} }
}; };

View file

@ -23,9 +23,10 @@ async function buildEndpointOption(req, res, next) {
try { try {
parsedBody = parseCompactConvo({ endpoint, endpointType, conversation: req.body }); parsedBody = parseCompactConvo({ endpoint, endpointType, conversation: req.body });
} catch (error) { } catch (error) {
logger.warn( logger.error(`Error parsing compact conversation for endpoint ${endpoint}`, error);
`Error parsing conversation for endpoint ${endpoint}${error?.message ? `: ${error.message}` : ''}`, logger.debug({
); 'Error parsing compact conversation': { endpoint, endpointType, conversation: req.body },
});
return handleError(res, { text: 'Error parsing conversation' }); return handleError(res, { text: 'Error parsing conversation' });
} }

View file

@ -1,6 +1,5 @@
const express = require('express'); const express = require('express');
const { generateCheckAccess, skipAgentCheck, GenerationJobManager } = require('@librechat/api'); const { generateCheckAccess, skipAgentCheck } = require('@librechat/api');
const { logger } = require('@librechat/data-schemas');
const { PermissionTypes, Permissions, PermissionBits } = require('librechat-data-provider'); const { PermissionTypes, Permissions, PermissionBits } = require('librechat-data-provider');
const { const {
setHeaders, setHeaders,
@ -35,25 +34,6 @@ router.use(validateConvoAccess);
router.use(buildEndpointOption); router.use(buildEndpointOption);
router.use(setHeaders); router.use(setHeaders);
/**
* @route POST /abort
* @desc Abort an ongoing generation job
* @access Private
*/
router.post('/abort', (req, res) => {
const { streamId, abortKey } = req.body;
const jobStreamId = streamId || abortKey?.split(':')?.[0];
if (jobStreamId && GenerationJobManager.hasJob(jobStreamId)) {
GenerationJobManager.abortJob(jobStreamId);
logger.debug(`[AgentStream] Job aborted: ${jobStreamId}`);
return res.json({ success: true, aborted: jobStreamId });
}
res.status(404).json({ error: 'Job not found' });
});
const controller = async (req, res, next) => { const controller = async (req, res, next) => {
await AgentController(req, res, next, initializeClient, addTitle); await AgentController(req, res, next, initializeClient, addTitle);
}; };

View file

@ -32,10 +32,12 @@ router.use('/', v1);
* @route GET /chat/stream/:streamId * @route GET /chat/stream/:streamId
* @desc Subscribe to an ongoing generation job's SSE stream with replay support * @desc Subscribe to an ongoing generation job's SSE stream with replay support
* @access Private * @access Private
* @description Replays any chunks missed during disconnect, then streams live * @description Sends sync event with resume state, replays missed chunks, then streams live
* @query resume=true - Indicates this is a reconnection (sends sync event)
*/ */
router.get('/chat/stream/:streamId', (req, res) => { router.get('/chat/stream/:streamId', (req, res) => {
const { streamId } = req.params; const { streamId } = req.params;
const isResume = req.query.resume === 'true';
const job = GenerationJobManager.getJob(streamId); const job = GenerationJobManager.getJob(streamId);
if (!job) { if (!job) {
@ -52,7 +54,22 @@ router.get('/chat/stream/:streamId', (req, res) => {
res.setHeader('X-Accel-Buffering', 'no'); res.setHeader('X-Accel-Buffering', 'no');
res.flushHeaders(); res.flushHeaders();
logger.debug(`[AgentStream] Client subscribed to ${streamId}`); logger.debug(`[AgentStream] Client subscribed to ${streamId}, resume: ${isResume}`);
// Send sync event with resume state for reconnecting clients
if (isResume && !GenerationJobManager.wasSyncSent(streamId)) {
const resumeState = GenerationJobManager.getResumeState(streamId);
if (resumeState && !res.writableEnded) {
res.write(`event: message\ndata: ${JSON.stringify({ sync: true, resumeState })}\n\n`);
if (typeof res.flush === 'function') {
res.flush();
}
GenerationJobManager.markSyncSent(streamId);
logger.debug(
`[AgentStream] Sent sync event for ${streamId} with ${resumeState.runSteps.length} run steps`,
);
}
}
const result = GenerationJobManager.subscribe( const result = GenerationJobManager.subscribe(
streamId, streamId,
@ -98,7 +115,7 @@ router.get('/chat/stream/:streamId', (req, res) => {
* @route GET /chat/status/:conversationId * @route GET /chat/status/:conversationId
* @desc Check if there's an active generation job for a conversation * @desc Check if there's an active generation job for a conversation
* @access Private * @access Private
* @returns { active, streamId, status, chunkCount, aggregatedContent, createdAt } * @returns { active, streamId, status, chunkCount, aggregatedContent, createdAt, resumeState }
*/ */
router.get('/chat/status/:conversationId', (req, res) => { router.get('/chat/status/:conversationId', (req, res) => {
const { conversationId } = req.params; const { conversationId } = req.params;
@ -114,17 +131,47 @@ router.get('/chat/status/:conversationId', (req, res) => {
} }
const info = GenerationJobManager.getStreamInfo(job.streamId); const info = GenerationJobManager.getStreamInfo(job.streamId);
const resumeState = GenerationJobManager.getResumeState(job.streamId);
res.json({ res.json({
active: info?.active ?? false, active: info?.active ?? false,
streamId: job.streamId, streamId: job.streamId,
status: info?.status ?? job.status, status: info?.status ?? job.status,
chunkCount: info?.chunkCount ?? 0, chunkCount: info?.chunkCount ?? 0,
runStepCount: info?.runStepCount ?? 0,
aggregatedContent: info?.aggregatedContent, aggregatedContent: info?.aggregatedContent,
createdAt: info?.createdAt ?? job.createdAt, createdAt: info?.createdAt ?? job.createdAt,
resumeState,
}); });
}); });
/**
* @route POST /chat/abort
* @desc Abort an ongoing generation job
* @access Private
* @description Mounted before chatRouter to bypass buildEndpointOption middleware
*/
router.post('/chat/abort', (req, res) => {
logger.debug(`[AgentStream] ========== ABORT ENDPOINT HIT ==========`);
logger.debug(`[AgentStream] Method: ${req.method}, Path: ${req.path}`);
logger.debug(`[AgentStream] Body:`, req.body);
const { streamId, abortKey } = req.body;
const jobStreamId = streamId || abortKey?.split(':')?.[0];
logger.debug(`[AgentStream] Computed jobStreamId: ${jobStreamId}`);
if (jobStreamId && GenerationJobManager.hasJob(jobStreamId)) {
logger.debug(`[AgentStream] Job found, aborting: ${jobStreamId}`);
GenerationJobManager.abortJob(jobStreamId);
logger.debug(`[AgentStream] Job aborted successfully: ${jobStreamId}`);
return res.json({ success: true, aborted: jobStreamId });
}
logger.warn(`[AgentStream] Job not found for streamId: ${jobStreamId}`);
return res.status(404).json({ error: 'Job not found', streamId: jobStreamId });
});
const chatRouter = express.Router(); const chatRouter = express.Router();
chatRouter.use(configMiddleware); chatRouter.use(configMiddleware);

View file

@ -55,7 +55,7 @@ function ChatView({ index = 0 }: { index?: number }) {
useAdaptiveSSE(addedSubmission, addedChatHelpers, true, index + 1); useAdaptiveSSE(addedSubmission, addedChatHelpers, true, index + 1);
// Auto-resume if navigating back to conversation with active job // Auto-resume if navigating back to conversation with active job
useResumeOnLoad(conversationId, chatHelpers, index); useResumeOnLoad(conversationId, chatHelpers.getMessages, index);
const methods = useForm<ChatFormValues>({ const methods = useForm<ChatFormValues>({
defaultValues: { text: '' }, defaultValues: { text: '' },

View file

@ -283,6 +283,9 @@ export default function useChatFunctions({
} }
} }
} else { } else {
// Assistants endpoint uses nested format: { type: 'text', text: { value: 'content' } }
// Agents and other endpoints use flat format: { type: 'text', text: 'content' }
if (isAssistantsEndpoint(endpoint)) {
initialResponse.content = [ initialResponse.content = [
{ {
type: ContentTypes.TEXT, type: ContentTypes.TEXT,
@ -291,6 +294,14 @@ export default function useChatFunctions({
}, },
}, },
]; ];
} else {
initialResponse.content = [
{
type: ContentTypes.TEXT,
text: '',
},
];
}
} }
setShowStopButton(true); setShowStopButton(true);
} }

View file

@ -56,9 +56,7 @@ export default function useTextarea({
}); });
const entityName = entity?.name ?? ''; const entityName = entity?.name ?? '';
const isNotAppendable = const isNotAppendable = latestMessage?.error === true && !isAssistant;
(((latestMessage?.unfinished ?? false) && !isSubmitting) || (latestMessage?.error ?? false)) &&
!isAssistant;
// && (conversationId?.length ?? 0) > 6; // also ensures that we don't show the wrong placeholder // && (conversationId?.length ?? 0) > 6; // also ensures that we don't show the wrong placeholder
useEffect(() => { useEffect(() => {

View file

@ -1,182 +1,163 @@
import { useEffect, useState, useRef } from 'react'; import { useEffect, useRef } from 'react';
import { SSE } from 'sse.js';
import { useSetRecoilState, useRecoilValue } from 'recoil'; import { useSetRecoilState, useRecoilValue } from 'recoil';
import { request } from 'librechat-data-provider'; import { Constants, tMessageSchema } from 'librechat-data-provider';
import type { TMessage, EventSubmission } from 'librechat-data-provider'; import type { TMessage, TConversation, TSubmission, Agents } from 'librechat-data-provider';
import type { EventHandlerParams } from './useEventHandlers';
import { useAuthContext } from '~/hooks/AuthContext';
import { useGetStartupConfig, useGetUserBalance } from '~/data-provider';
import useEventHandlers from './useEventHandlers';
import store from '~/store'; import store from '~/store';
type ChatHelpers = Pick< /**
EventHandlerParams, * Build a submission object from resume state for reconnected streams.
| 'setMessages' * This provides the minimum data needed for useResumableSSE to subscribe.
| 'getMessages' */
| 'setConversation' function buildSubmissionFromResumeState(
| 'setIsSubmitting' resumeState: Agents.ResumeState,
| 'newConversation' streamId: string,
| 'resetLatestMessage' messages: TMessage[],
>; conversationId: string,
): TSubmission {
const userMessageData = resumeState.userMessage;
const responseMessageId =
resumeState.responseMessageId ?? `${userMessageData?.messageId ?? 'resume'}_`;
// Try to find existing user message in the messages array (from database)
const existingUserMessage = messages.find(
(m) => m.isCreatedByUser && m.messageId === userMessageData?.messageId,
);
// Try to find existing response message in the messages array (from database)
const existingResponseMessage = messages.find(
(m) =>
!m.isCreatedByUser &&
(m.messageId === responseMessageId || m.parentMessageId === userMessageData?.messageId),
);
// Create or use existing user message
const userMessage: TMessage =
existingUserMessage ??
(userMessageData
? (tMessageSchema.parse({
messageId: userMessageData.messageId,
parentMessageId: userMessageData.parentMessageId ?? Constants.NO_PARENT,
conversationId: userMessageData.conversationId ?? conversationId,
text: userMessageData.text ?? '',
isCreatedByUser: true,
role: 'user',
}) as TMessage)
: (messages[messages.length - 2] ??
({
messageId: 'resume_user_msg',
conversationId,
text: '',
isCreatedByUser: true,
} as TMessage)));
// Use existing response from DB if available (preserves already-saved content)
const initialResponse: TMessage =
existingResponseMessage ??
({
messageId: responseMessageId,
parentMessageId: userMessage.messageId,
conversationId,
text: '',
content: (resumeState.aggregatedContent as TMessage['content']) ?? [],
isCreatedByUser: false,
role: 'assistant',
} as TMessage);
const conversation: TConversation = {
conversationId,
title: 'Resumed Chat',
endpoint: null,
} as TConversation;
return {
messages,
userMessage,
initialResponse,
conversation,
isRegenerate: false,
isTemporary: false,
endpointOption: {},
} as TSubmission;
}
/** /**
* Hook to resume streaming if navigating back to a conversation with active generation. * Hook to resume streaming if navigating to a conversation with active generation.
* Checks for active jobs on mount and auto-subscribes if found. * Checks stream status via React Query and sets submission if active job found.
*
* This hook:
* 1. Uses useStreamStatus to check for active jobs on navigation
* 2. If active job found, builds a submission with streamId and sets it
* 3. useResumableSSE picks up the submission and subscribes to the stream
*/ */
export default function useResumeOnLoad( export default function useResumeOnLoad(
conversationId: string | undefined, conversationId: string | undefined,
chatHelpers: ChatHelpers, getMessages: () => TMessage[] | undefined,
runIndex = 0, runIndex = 0,
) { ) {
const resumableEnabled = useRecoilValue(store.resumableStreams); const resumableEnabled = useRecoilValue(store.resumableStreams);
const { token, isAuthenticated } = useAuthContext(); const setSubmission = useSetRecoilState(store.submissionByIndex(runIndex));
const sseRef = useRef<SSE | null>(null); const currentSubmission = useRecoilValue(store.submissionByIndex(runIndex));
const checkedConvoRef = useRef<string | null>(null); const hasResumedRef = useRef<string | null>(null);
const [completed, setCompleted] = useState(new Set());
const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex));
const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex));
const { getMessages, setIsSubmitting } = chatHelpers; // Check for active stream when conversation changes
// const { data: streamStatus, isSuccess } = useStreamStatus(
// conversationId,
// resumableEnabled && !currentSubmission, // Only check if no active submission
// );
const { stepHandler, finalHandler, contentHandler } = useEventHandlers({
...chatHelpers,
setCompleted,
setShowStopButton,
});
const { data: startupConfig } = useGetStartupConfig();
const balanceQuery = useGetUserBalance({
enabled: !!isAuthenticated && startupConfig?.balance?.enabled,
});
/**
* Check for active job when conversation loads
*/
useEffect(() => { useEffect(() => {
if (!resumableEnabled || !conversationId || !token) { // if (!resumableEnabled || !conversationId || !isSuccess || !streamStatus) {
checkedConvoRef.current = null; if (!resumableEnabled || !conversationId) {
return; return;
} }
// Only check once per conversationId to prevent loops // Don't resume if we already have an active submission
if (checkedConvoRef.current === conversationId) { if (currentSubmission) {
return; return;
} }
checkedConvoRef.current = conversationId; // Don't resume the same conversation twice
if (hasResumedRef.current === conversationId) {
const checkAndResume = async () => {
try {
const response = await fetch(`/api/agents/chat/status/${conversationId}`, {
headers: { Authorization: `Bearer ${token}` },
});
if (!response.ok) {
return; return;
} }
const { active, streamId } = await response.json(); // Check if there's an active job to resume
// if (!streamStatus.active || !streamStatus.streamId) {
// return;
// }
if (!active || !streamId) { // console.log('[ResumeOnLoad] Found active job, creating submission...', {
return; // streamId: streamStatus.streamId,
} // status: streamStatus.status,
// });
console.log('[ResumeOnLoad] Found active job, resuming...', { streamId }); hasResumedRef.current = conversationId;
const messages = getMessages() || []; const messages = getMessages() || [];
// Minimal submission without resume state
const lastMessage = messages[messages.length - 1]; const lastMessage = messages[messages.length - 1];
let textIndex: number | null = null; const submission: TSubmission = {
const url = `/api/agents/chat/stream/${encodeURIComponent(streamId)}`;
const sse = new SSE(url, {
headers: { Authorization: `Bearer ${token}` },
method: 'GET',
});
sseRef.current = sse;
sse.addEventListener('open', () => {
console.log('[ResumeOnLoad] Reconnected to stream');
setAbortScroll(false);
setShowStopButton(true);
setIsSubmitting(true);
});
sse.addEventListener('message', (e: MessageEvent) => {
try {
const data = JSON.parse(e.data);
if (data.final != null) {
try {
finalHandler(data, { messages } as unknown as EventSubmission);
} catch (error) {
console.error('[ResumeOnLoad] Error in finalHandler:', error);
setIsSubmitting(false);
setShowStopButton(false);
}
(startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch();
sse.close();
sseRef.current = null;
return;
}
if (data.event != null) {
stepHandler(data, {
messages, messages,
userMessage: lastMessage, userMessage: lastMessage ?? ({ messageId: 'resume', conversationId, text: '' } as TMessage),
} as unknown as EventSubmission); initialResponse: {
return; messageId: 'resume_',
} conversationId,
text: '',
content: [{ type: 'text', text: '' }],
} as TMessage,
conversation: { conversationId, title: 'Resumed Chat' } as TConversation,
isRegenerate: false,
isTemporary: false,
endpointOption: {},
} as TSubmission;
setSubmission(submission);
}, [conversationId, resumableEnabled, currentSubmission, getMessages, setSubmission]);
if (data.type != null) { // Reset hasResumedRef when conversation changes
const { text, index } = data; useEffect(() => {
if (text != null && index !== textIndex) { if (conversationId !== hasResumedRef.current) {
textIndex = index; hasResumedRef.current = null;
} }
contentHandler({ data, submission: { messages } as unknown as EventSubmission });
return;
}
} catch (error) {
console.error('[ResumeOnLoad] Error processing message:', error);
}
});
sse.addEventListener('error', async (e: MessageEvent) => {
console.log('[ResumeOnLoad] Stream error');
sse.close();
sseRef.current = null;
setIsSubmitting(false);
setShowStopButton(false);
/* @ts-ignore */
if (e.responseCode === 401) {
try {
const refreshResponse = await request.refreshToken();
const newToken = refreshResponse?.token ?? '';
if (newToken) {
request.dispatchTokenUpdatedEvent(newToken);
}
} catch (error) {
console.log('[ResumeOnLoad] Token refresh failed:', error);
}
}
});
sse.stream();
} catch (error) {
console.error('[ResumeOnLoad] Error checking job status:', error);
}
};
checkAndResume();
return () => {
if (sseRef.current) {
sseRef.current.close();
sseRef.current = null;
}
};
// Only re-run when conversationId changes
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [conversationId]); }, [conversationId]);
} }

View file

@ -21,7 +21,8 @@ type TUseStepHandler = {
announcePolite: (options: AnnounceOptions) => void; announcePolite: (options: AnnounceOptions) => void;
setMessages: (messages: TMessage[]) => void; setMessages: (messages: TMessage[]) => void;
getMessages: () => TMessage[] | undefined; getMessages: () => TMessage[] | undefined;
setIsSubmitting: SetterOrUpdater<boolean>; /** @deprecated - isSubmitting should be derived from submission state */
setIsSubmitting?: SetterOrUpdater<boolean>;
lastAnnouncementTimeRef: React.MutableRefObject<number>; lastAnnouncementTimeRef: React.MutableRefObject<number>;
}; };
@ -50,10 +51,12 @@ type AllContentTypes =
| ContentTypes.IMAGE_URL | ContentTypes.IMAGE_URL
| ContentTypes.ERROR; | ContentTypes.ERROR;
const noop = () => {};
export default function useStepHandler({ export default function useStepHandler({
setMessages, setMessages,
getMessages, getMessages,
setIsSubmitting, setIsSubmitting = noop,
announcePolite, announcePolite,
lastAnnouncementTimeRef, lastAnnouncementTimeRef,
}: TUseStepHandler) { }: TUseStepHandler) {
@ -198,7 +201,6 @@ export default function useStepHandler({
({ event, data }: TStepEvent, submission: EventSubmission) => { ({ event, data }: TStepEvent, submission: EventSubmission) => {
const messages = getMessages() || []; const messages = getMessages() || [];
const { userMessage } = submission; const { userMessage } = submission;
setIsSubmitting(true);
let parentMessageId = userMessage.messageId; let parentMessageId = userMessage.messageId;
const currentTime = Date.now(); const currentTime = Date.now();
@ -230,12 +232,17 @@ export default function useStepHandler({
if (!response) { if (!response) {
const responseMessage = messages[messages.length - 1] as TMessage; const responseMessage = messages[messages.length - 1] as TMessage;
// Preserve existing content from DB (partial response) and prepend initialContent if provided
const existingContent = responseMessage?.content ?? [];
const mergedContent =
initialContent.length > 0 ? [...initialContent, ...existingContent] : existingContent;
response = { response = {
...responseMessage, ...responseMessage,
parentMessageId, parentMessageId,
conversationId: userMessage.conversationId, conversationId: userMessage.conversationId,
messageId: responseMessageId, messageId: responseMessageId,
content: initialContent, content: mergedContent,
}; };
messageMap.current.set(responseMessageId, response); messageMap.current.set(responseMessageId, response);

View file

@ -1,5 +1,6 @@
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import { logger } from '@librechat/data-schemas'; import { logger } from '@librechat/data-schemas';
import type { Agents } from 'librechat-data-provider';
import type { ServerSentEvent } from '~/types'; import type { ServerSentEvent } from '~/types';
import type { import type {
GenerationJob, GenerationJob,
@ -9,6 +10,8 @@ import type {
ErrorHandler, ErrorHandler,
UnsubscribeFn, UnsubscribeFn,
ContentPart, ContentPart,
ResumeState,
GenerationJobMetadata,
} from './types'; } from './types';
/** /**
@ -71,6 +74,7 @@ class GenerationJobManagerClass {
resolveReady: resolveReady!, resolveReady: resolveReady!,
chunks: [], chunks: [],
aggregatedContent: [], aggregatedContent: [],
runSteps: new Map(),
}; };
job.emitter.setMaxListeners(100); job.emitter.setMaxListeners(100);
@ -152,18 +156,55 @@ class GenerationJobManagerClass {
/** /**
* Abort a job (user-initiated). * Abort a job (user-initiated).
* Emits both error event and a final done event with aborted flag.
* @param streamId - The stream identifier * @param streamId - The stream identifier
*/ */
abortJob(streamId: string): void { abortJob(streamId: string): void {
const job = this.jobs.get(streamId); const job = this.jobs.get(streamId);
if (!job) { if (!job) {
logger.warn(`[GenerationJobManager] Cannot abort - job not found: ${streamId}`);
return; return;
} }
logger.debug(
`[GenerationJobManager] Aborting job ${streamId}, signal already aborted: ${job.abortController.signal.aborted}`,
);
job.abortController.abort(); job.abortController.abort();
job.status = 'aborted'; job.status = 'aborted';
job.completedAt = Date.now(); job.completedAt = Date.now();
job.emitter.emit('error', 'Request aborted by user'); logger.debug(
`[GenerationJobManager] AbortController.abort() called for ${streamId}, signal.aborted: ${job.abortController.signal.aborted}`,
);
// Create a final event for abort so clients can properly handle UI cleanup
const abortFinalEvent = {
final: true,
conversation: {
conversationId: job.metadata.conversationId,
},
title: 'New Chat',
requestMessage: job.metadata.userMessage
? {
messageId: job.metadata.userMessage.messageId,
conversationId: job.metadata.conversationId,
text: job.metadata.userMessage.text ?? '',
}
: null,
responseMessage: {
messageId:
job.metadata.responseMessageId ?? `${job.metadata.userMessage?.messageId ?? 'aborted'}_`,
conversationId: job.metadata.conversationId,
content: job.aggregatedContent ?? [],
unfinished: true,
error: true,
},
aborted: true,
} as unknown as ServerSentEvent;
job.finalEvent = abortFinalEvent;
job.emitter.emit('done', abortFinalEvent);
// Don't emit error event - it causes unhandled error warnings
// The done event with error:true and aborted:true is sufficient
logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`); logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`);
} }
@ -249,6 +290,7 @@ class GenerationJobManagerClass {
/** /**
* Emit a chunk event to all subscribers. * Emit a chunk event to all subscribers.
* Only buffers chunks when no subscribers are listening (for reconnect replay). * Only buffers chunks when no subscribers are listening (for reconnect replay).
* Also tracks run steps and user message for reconnection state.
* @param streamId - The stream identifier * @param streamId - The stream identifier
* @param event - The event data to emit * @param event - The event data to emit
*/ */
@ -264,15 +306,121 @@ class GenerationJobManagerClass {
job.chunks.push(event); job.chunks.push(event);
} }
// Track run steps for reconnection
this.trackRunStep(job, event);
// Track user message from created event
this.trackUserMessage(job, event);
// Always aggregate content (for partial response saving) // Always aggregate content (for partial response saving)
this.aggregateContent(job, event); this.aggregateContent(job, event);
job.emitter.emit('chunk', event); job.emitter.emit('chunk', event);
} }
/**
* Track run step events for reconnection state.
* This allows reconnecting clients to rebuild their stepMap.
*/
private trackRunStep(job: GenerationJob, event: ServerSentEvent): void {
const data = event as Record<string, unknown>;
if (data.event !== 'on_run_step') {
return;
}
const runStep = data.data as Agents.RunStep;
if (!runStep?.id) {
return;
}
job.runSteps.set(runStep.id, runStep);
logger.debug(`[GenerationJobManager] Tracked run step: ${runStep.id} for ${job.streamId}`);
}
/**
* Track user message from created event for reconnection.
*/
private trackUserMessage(job: GenerationJob, event: ServerSentEvent): void {
const data = event as Record<string, unknown>;
if (!data.created || !data.message) {
return;
}
const message = data.message as Record<string, unknown>;
job.metadata.userMessage = {
messageId: message.messageId as string,
parentMessageId: message.parentMessageId as string | undefined,
conversationId: message.conversationId as string | undefined,
text: message.text as string | undefined,
};
// Update conversationId in metadata if not set
if (!job.metadata.conversationId && message.conversationId) {
job.metadata.conversationId = message.conversationId as string;
}
logger.debug(`[GenerationJobManager] Tracked user message for ${job.streamId}`);
}
/**
* Update job metadata with additional information.
* Called when more information becomes available during generation.
* @param streamId - The stream identifier
* @param metadata - Partial metadata to merge
*/
updateMetadata(streamId: string, metadata: Partial<GenerationJobMetadata>): void {
const job = this.jobs.get(streamId);
if (!job) {
return;
}
job.metadata = { ...job.metadata, ...metadata };
logger.debug(`[GenerationJobManager] Updated metadata for ${streamId}`);
}
/**
* Get resume state for reconnecting clients.
* Includes run steps, aggregated content, and user message data.
* @param streamId - The stream identifier
* @returns Resume state or null if job not found
*/
getResumeState(streamId: string): ResumeState | null {
const job = this.jobs.get(streamId);
if (!job) {
return null;
}
return {
runSteps: Array.from(job.runSteps.values()),
aggregatedContent: job.aggregatedContent,
userMessage: job.metadata.userMessage,
responseMessageId: job.metadata.responseMessageId,
conversationId: job.metadata.conversationId,
};
}
/**
* Mark that sync has been sent for this job to prevent duplicate replays.
* @param streamId - The stream identifier
*/
markSyncSent(streamId: string): void {
const job = this.jobs.get(streamId);
if (job) {
job.syncSent = true;
}
}
/**
* Check if sync has been sent for this job.
* @param streamId - The stream identifier
*/
wasSyncSent(streamId: string): boolean {
return this.jobs.get(streamId)?.syncSent ?? false;
}
/** /**
* Aggregate content parts from message delta events. * Aggregate content parts from message delta events.
* Used to save partial response when subscribers disconnect. * Used to save partial response when subscribers disconnect.
* Uses flat format: { type: 'text', text: 'content' }
*/ */
private aggregateContent(job: GenerationJob, event: ServerSentEvent): void { private aggregateContent(job: GenerationJob, event: ServerSentEvent): void {
// Check for on_message_delta events which contain content // Check for on_message_delta events which contain content
@ -283,7 +431,7 @@ class GenerationJobManagerClass {
if (delta?.content && Array.isArray(delta.content)) { if (delta?.content && Array.isArray(delta.content)) {
for (const part of delta.content) { for (const part of delta.content) {
if (part.type === 'text' && part.text) { if (part.type === 'text' && part.text) {
// Find or create text content part // Find or create text content part in flat format
let textPart = job.aggregatedContent?.find((p) => p.type === 'text'); let textPart = job.aggregatedContent?.find((p) => p.type === 'text');
if (!textPart) { if (!textPart) {
textPart = { type: 'text', text: '' }; textPart = { type: 'text', text: '' };
@ -354,6 +502,7 @@ class GenerationJobManagerClass {
const job = this.jobs.get(streamId); const job = this.jobs.get(streamId);
if (job) { if (job) {
job.emitter.removeAllListeners(); job.emitter.removeAllListeners();
job.runSteps.clear();
this.jobs.delete(streamId); this.jobs.delete(streamId);
} }
} }
@ -380,12 +529,13 @@ class GenerationJobManagerClass {
/** /**
* Get stream info for status endpoint. * Get stream info for status endpoint.
* Returns chunk count, status, and aggregated content. * Returns chunk count, status, aggregated content, and run step count.
*/ */
getStreamInfo(streamId: string): { getStreamInfo(streamId: string): {
active: boolean; active: boolean;
status: GenerationJobStatus; status: GenerationJobStatus;
chunkCount: number; chunkCount: number;
runStepCount: number;
aggregatedContent?: ContentPart[]; aggregatedContent?: ContentPart[];
createdAt: number; createdAt: number;
} | null { } | null {
@ -398,6 +548,7 @@ class GenerationJobManagerClass {
active: job.status === 'running', active: job.status === 'running',
status: job.status, status: job.status,
chunkCount: job.chunks.length, chunkCount: job.chunks.length,
runStepCount: job.runSteps.size,
aggregatedContent: job.aggregatedContent, aggregatedContent: job.aggregatedContent,
createdAt: job.createdAt, createdAt: job.createdAt,
}; };

View file

@ -1,9 +1,14 @@
import type { EventEmitter } from 'events'; import type { EventEmitter } from 'events';
import type { Agents } from 'librechat-data-provider';
import type { ServerSentEvent } from '~/types'; import type { ServerSentEvent } from '~/types';
export interface GenerationJobMetadata { export interface GenerationJobMetadata {
userId: string; userId: string;
conversationId?: string; conversationId?: string;
/** User message data for rebuilding submission on reconnect */
userMessage?: Agents.UserMessageMeta;
/** Response message ID for tracking */
responseMessageId?: string;
} }
export type GenerationJobStatus = 'running' | 'complete' | 'error' | 'aborted'; export type GenerationJobStatus = 'running' | 'complete' | 'error' | 'aborted';
@ -25,13 +30,14 @@ export interface GenerationJob {
finalEvent?: ServerSentEvent; finalEvent?: ServerSentEvent;
/** Aggregated content parts for saving partial response */ /** Aggregated content parts for saving partial response */
aggregatedContent?: ContentPart[]; aggregatedContent?: ContentPart[];
/** Tracked run steps for reconnection - maps step ID to step data */
runSteps: Map<string, Agents.RunStep>;
/** Flag to indicate if a sync event was already sent (prevent duplicate replays) */
syncSent?: boolean;
} }
export interface ContentPart { export type ContentPart = Agents.ContentPart;
type: string; export type ResumeState = Agents.ResumeState;
text?: string;
[key: string]: unknown;
}
export type ChunkHandler = (event: ServerSentEvent) => void; export type ChunkHandler = (event: ServerSentEvent) => void;
export type DoneHandler = (event: ServerSentEvent) => void; export type DoneHandler = (event: ServerSentEvent) => void;

View file

@ -171,6 +171,30 @@ export namespace Agents {
stepDetails: StepDetails; stepDetails: StepDetails;
usage: null | object; usage: null | object;
}; };
/** Content part for aggregated message content */
export interface ContentPart {
type: string;
text?: string;
[key: string]: unknown;
}
/** User message metadata for rebuilding submission on reconnect */
export interface UserMessageMeta {
messageId: string;
parentMessageId?: string;
conversationId?: string;
text?: string;
}
/** State data sent to reconnecting clients */
export interface ResumeState {
runSteps: RunStep[];
aggregatedContent?: ContentPart[];
userMessage?: UserMessageMeta;
responseMessageId?: string;
conversationId?: string;
}
/** /**
* Represents a run step delta i.e. any changed fields on a run step during * Represents a run step delta i.e. any changed fields on a run step during
* streaming. * streaming.