From ff14cd3b44a57d60011a49c6acd98f388b1de100 Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Thu, 11 Dec 2025 09:52:15 -0500 Subject: [PATCH] WIP: resumable stream --- api/server/controllers/agents/request.js | 93 +++++- api/server/middleware/buildEndpointOption.js | 7 +- api/server/routes/agents/chat.js | 22 +- api/server/routes/agents/index.js | 53 +++- client/src/components/Chat/ChatView.tsx | 2 +- client/src/hooks/Chat/useChatFunctions.ts | 25 +- client/src/hooks/Input/useTextarea.ts | 4 +- client/src/hooks/SSE/useResumeOnLoad.ts | 289 ++++++++---------- client/src/hooks/SSE/useStepHandler.ts | 15 +- .../api/src/stream/GenerationJobManager.ts | 157 +++++++++- packages/api/src/stream/types.ts | 16 +- packages/data-provider/src/types/agents.ts | 24 ++ 12 files changed, 498 insertions(+), 209 deletions(-) diff --git a/api/server/controllers/agents/request.js b/api/server/controllers/agents/request.js index aacab578a7..80ff52fb3a 100644 --- a/api/server/controllers/agents/request.js +++ b/api/server/controllers/agents/request.js @@ -66,6 +66,65 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit const job = GenerationJobManager.createJob(streamId, userId, reqConversationId); req._resumableStreamId = streamId; + // Track if partial response was already saved to avoid duplicates + let partialResponseSaved = false; + + /** + * Listen for all subscribers leaving to save partial response. + * This ensures the response is saved to DB even if all clients disconnect + * while generation continues. + * + * Note: The messageId used here falls back to `${userMessage.messageId}_` if the + * actual response messageId isn't available yet. The final response save will + * overwrite this with the complete response using the same messageId pattern. + */ + job.emitter.on('allSubscribersLeft', async (aggregatedContent) => { + if (partialResponseSaved || !aggregatedContent || aggregatedContent.length === 0) { + return; + } + + const resumeState = GenerationJobManager.getResumeState(streamId); + if (!resumeState?.userMessage) { + logger.debug('[ResumableAgentController] No user message to save partial response for'); + return; + } + + partialResponseSaved = true; + const responseConversationId = resumeState.conversationId || reqConversationId; + + try { + const partialMessage = { + messageId: resumeState.responseMessageId || `${resumeState.userMessage.messageId}_`, + conversationId: responseConversationId, + parentMessageId: resumeState.userMessage.messageId, + sender: client?.sender ?? 'AI', + content: aggregatedContent, + unfinished: true, + error: false, + isCreatedByUser: false, + user: userId, + endpoint: endpointOption.endpoint, + model: endpointOption.modelOptions?.model || endpointOption.model_parameters?.model, + }; + + if (req.body?.agent_id) { + partialMessage.agent_id = req.body.agent_id; + } + + await saveMessage(req, partialMessage, { + context: 'api/server/controllers/agents/request.js - partial response on disconnect', + }); + + logger.debug( + `[ResumableAgentController] Saved partial response for ${streamId}, content parts: ${aggregatedContent.length}`, + ); + } catch (error) { + logger.error('[ResumableAgentController] Error saving partial response:', error); + // Reset flag so we can try again if subscribers reconnect and leave again + partialResponseSaved = false; + } + }); + /** @type {{ client: TAgentClient; userMCPAuthMap?: Record> }} */ const result = await initializeClient({ req, @@ -106,9 +165,14 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit } try { - const onStart = (userMsg, _respMsgId, _isNewConvo) => { + const onStart = (userMsg, respMsgId, _isNewConvo) => { userMessage = userMsg; + // Store the response messageId upfront so partial saves use the same ID + if (respMsgId) { + GenerationJobManager.updateMetadata(streamId, { responseMessageId: respMsgId }); + } + GenerationJobManager.emitChunk(streamId, { created: true, message: userMessage, @@ -203,8 +267,15 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit }); } + // Skip title generation if job was aborted const newConvo = !reqConversationId; - if (addTitle && parentMessageId === Constants.NO_PARENT && newConvo) { + const shouldGenerateTitle = + addTitle && + parentMessageId === Constants.NO_PARENT && + newConvo && + !job.abortController.signal.aborted; + + if (shouldGenerateTitle) { addTitle(req, { text, response: { ...response }, @@ -224,12 +295,24 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit } } } catch (error) { - logger.error(`[ResumableAgentController] Generation error for ${streamId}:`, error); - GenerationJobManager.emitError(streamId, error.message || 'Generation failed'); - GenerationJobManager.completeJob(streamId, error.message); + // Check if this was an abort (not a real error) + const wasAborted = job.abortController.signal.aborted || error.message?.includes('abort'); + + if (wasAborted) { + logger.debug(`[ResumableAgentController] Generation aborted for ${streamId}`); + // abortJob already handled emitDone and completeJob + } else { + logger.error(`[ResumableAgentController] Generation error for ${streamId}:`, error); + GenerationJobManager.emitError(streamId, error.message || 'Generation failed'); + GenerationJobManager.completeJob(streamId, error.message); + } + if (client) { disposeClient(client); } + + // Don't continue to title generation after error/abort + return; } }; diff --git a/api/server/middleware/buildEndpointOption.js b/api/server/middleware/buildEndpointOption.js index 202bf7d921..f56d850120 100644 --- a/api/server/middleware/buildEndpointOption.js +++ b/api/server/middleware/buildEndpointOption.js @@ -23,9 +23,10 @@ async function buildEndpointOption(req, res, next) { try { parsedBody = parseCompactConvo({ endpoint, endpointType, conversation: req.body }); } catch (error) { - logger.warn( - `Error parsing conversation for endpoint ${endpoint}${error?.message ? `: ${error.message}` : ''}`, - ); + logger.error(`Error parsing compact conversation for endpoint ${endpoint}`, error); + logger.debug({ + 'Error parsing compact conversation': { endpoint, endpointType, conversation: req.body }, + }); return handleError(res, { text: 'Error parsing conversation' }); } diff --git a/api/server/routes/agents/chat.js b/api/server/routes/agents/chat.js index d05dd5baf0..7b51882385 100644 --- a/api/server/routes/agents/chat.js +++ b/api/server/routes/agents/chat.js @@ -1,6 +1,5 @@ const express = require('express'); -const { generateCheckAccess, skipAgentCheck, GenerationJobManager } = require('@librechat/api'); -const { logger } = require('@librechat/data-schemas'); +const { generateCheckAccess, skipAgentCheck } = require('@librechat/api'); const { PermissionTypes, Permissions, PermissionBits } = require('librechat-data-provider'); const { setHeaders, @@ -35,25 +34,6 @@ router.use(validateConvoAccess); router.use(buildEndpointOption); router.use(setHeaders); -/** - * @route POST /abort - * @desc Abort an ongoing generation job - * @access Private - */ -router.post('/abort', (req, res) => { - const { streamId, abortKey } = req.body; - - const jobStreamId = streamId || abortKey?.split(':')?.[0]; - - if (jobStreamId && GenerationJobManager.hasJob(jobStreamId)) { - GenerationJobManager.abortJob(jobStreamId); - logger.debug(`[AgentStream] Job aborted: ${jobStreamId}`); - return res.json({ success: true, aborted: jobStreamId }); - } - - res.status(404).json({ error: 'Job not found' }); -}); - const controller = async (req, res, next) => { await AgentController(req, res, next, initializeClient, addTitle); }; diff --git a/api/server/routes/agents/index.js b/api/server/routes/agents/index.js index 1f501d75bb..5e727eb90c 100644 --- a/api/server/routes/agents/index.js +++ b/api/server/routes/agents/index.js @@ -32,10 +32,12 @@ router.use('/', v1); * @route GET /chat/stream/:streamId * @desc Subscribe to an ongoing generation job's SSE stream with replay support * @access Private - * @description Replays any chunks missed during disconnect, then streams live + * @description Sends sync event with resume state, replays missed chunks, then streams live + * @query resume=true - Indicates this is a reconnection (sends sync event) */ router.get('/chat/stream/:streamId', (req, res) => { const { streamId } = req.params; + const isResume = req.query.resume === 'true'; const job = GenerationJobManager.getJob(streamId); if (!job) { @@ -52,7 +54,22 @@ router.get('/chat/stream/:streamId', (req, res) => { res.setHeader('X-Accel-Buffering', 'no'); res.flushHeaders(); - logger.debug(`[AgentStream] Client subscribed to ${streamId}`); + logger.debug(`[AgentStream] Client subscribed to ${streamId}, resume: ${isResume}`); + + // Send sync event with resume state for reconnecting clients + if (isResume && !GenerationJobManager.wasSyncSent(streamId)) { + const resumeState = GenerationJobManager.getResumeState(streamId); + if (resumeState && !res.writableEnded) { + res.write(`event: message\ndata: ${JSON.stringify({ sync: true, resumeState })}\n\n`); + if (typeof res.flush === 'function') { + res.flush(); + } + GenerationJobManager.markSyncSent(streamId); + logger.debug( + `[AgentStream] Sent sync event for ${streamId} with ${resumeState.runSteps.length} run steps`, + ); + } + } const result = GenerationJobManager.subscribe( streamId, @@ -98,7 +115,7 @@ router.get('/chat/stream/:streamId', (req, res) => { * @route GET /chat/status/:conversationId * @desc Check if there's an active generation job for a conversation * @access Private - * @returns { active, streamId, status, chunkCount, aggregatedContent, createdAt } + * @returns { active, streamId, status, chunkCount, aggregatedContent, createdAt, resumeState } */ router.get('/chat/status/:conversationId', (req, res) => { const { conversationId } = req.params; @@ -114,17 +131,47 @@ router.get('/chat/status/:conversationId', (req, res) => { } const info = GenerationJobManager.getStreamInfo(job.streamId); + const resumeState = GenerationJobManager.getResumeState(job.streamId); res.json({ active: info?.active ?? false, streamId: job.streamId, status: info?.status ?? job.status, chunkCount: info?.chunkCount ?? 0, + runStepCount: info?.runStepCount ?? 0, aggregatedContent: info?.aggregatedContent, createdAt: info?.createdAt ?? job.createdAt, + resumeState, }); }); +/** + * @route POST /chat/abort + * @desc Abort an ongoing generation job + * @access Private + * @description Mounted before chatRouter to bypass buildEndpointOption middleware + */ +router.post('/chat/abort', (req, res) => { + logger.debug(`[AgentStream] ========== ABORT ENDPOINT HIT ==========`); + logger.debug(`[AgentStream] Method: ${req.method}, Path: ${req.path}`); + logger.debug(`[AgentStream] Body:`, req.body); + + const { streamId, abortKey } = req.body; + + const jobStreamId = streamId || abortKey?.split(':')?.[0]; + logger.debug(`[AgentStream] Computed jobStreamId: ${jobStreamId}`); + + if (jobStreamId && GenerationJobManager.hasJob(jobStreamId)) { + logger.debug(`[AgentStream] Job found, aborting: ${jobStreamId}`); + GenerationJobManager.abortJob(jobStreamId); + logger.debug(`[AgentStream] Job aborted successfully: ${jobStreamId}`); + return res.json({ success: true, aborted: jobStreamId }); + } + + logger.warn(`[AgentStream] Job not found for streamId: ${jobStreamId}`); + return res.status(404).json({ error: 'Job not found', streamId: jobStreamId }); +}); + const chatRouter = express.Router(); chatRouter.use(configMiddleware); diff --git a/client/src/components/Chat/ChatView.tsx b/client/src/components/Chat/ChatView.tsx index b40c7003c8..03d1533c23 100644 --- a/client/src/components/Chat/ChatView.tsx +++ b/client/src/components/Chat/ChatView.tsx @@ -55,7 +55,7 @@ function ChatView({ index = 0 }: { index?: number }) { useAdaptiveSSE(addedSubmission, addedChatHelpers, true, index + 1); // Auto-resume if navigating back to conversation with active job - useResumeOnLoad(conversationId, chatHelpers, index); + useResumeOnLoad(conversationId, chatHelpers.getMessages, index); const methods = useForm({ defaultValues: { text: '' }, diff --git a/client/src/hooks/Chat/useChatFunctions.ts b/client/src/hooks/Chat/useChatFunctions.ts index 8a61cd91c1..c717209ec5 100644 --- a/client/src/hooks/Chat/useChatFunctions.ts +++ b/client/src/hooks/Chat/useChatFunctions.ts @@ -283,14 +283,25 @@ export default function useChatFunctions({ } } } else { - initialResponse.content = [ - { - type: ContentTypes.TEXT, - [ContentTypes.TEXT]: { - value: '', + // Assistants endpoint uses nested format: { type: 'text', text: { value: 'content' } } + // Agents and other endpoints use flat format: { type: 'text', text: 'content' } + if (isAssistantsEndpoint(endpoint)) { + initialResponse.content = [ + { + type: ContentTypes.TEXT, + [ContentTypes.TEXT]: { + value: '', + }, }, - }, - ]; + ]; + } else { + initialResponse.content = [ + { + type: ContentTypes.TEXT, + text: '', + }, + ]; + } } setShowStopButton(true); } diff --git a/client/src/hooks/Input/useTextarea.ts b/client/src/hooks/Input/useTextarea.ts index 7d32cbbe02..4eae002430 100644 --- a/client/src/hooks/Input/useTextarea.ts +++ b/client/src/hooks/Input/useTextarea.ts @@ -56,9 +56,7 @@ export default function useTextarea({ }); const entityName = entity?.name ?? ''; - const isNotAppendable = - (((latestMessage?.unfinished ?? false) && !isSubmitting) || (latestMessage?.error ?? false)) && - !isAssistant; + const isNotAppendable = latestMessage?.error === true && !isAssistant; // && (conversationId?.length ?? 0) > 6; // also ensures that we don't show the wrong placeholder useEffect(() => { diff --git a/client/src/hooks/SSE/useResumeOnLoad.ts b/client/src/hooks/SSE/useResumeOnLoad.ts index ba980c5dc2..370d8c4ca2 100644 --- a/client/src/hooks/SSE/useResumeOnLoad.ts +++ b/client/src/hooks/SSE/useResumeOnLoad.ts @@ -1,182 +1,163 @@ -import { useEffect, useState, useRef } from 'react'; -import { SSE } from 'sse.js'; +import { useEffect, useRef } from 'react'; import { useSetRecoilState, useRecoilValue } from 'recoil'; -import { request } from 'librechat-data-provider'; -import type { TMessage, EventSubmission } from 'librechat-data-provider'; -import type { EventHandlerParams } from './useEventHandlers'; -import { useAuthContext } from '~/hooks/AuthContext'; -import { useGetStartupConfig, useGetUserBalance } from '~/data-provider'; -import useEventHandlers from './useEventHandlers'; +import { Constants, tMessageSchema } from 'librechat-data-provider'; +import type { TMessage, TConversation, TSubmission, Agents } from 'librechat-data-provider'; import store from '~/store'; -type ChatHelpers = Pick< - EventHandlerParams, - | 'setMessages' - | 'getMessages' - | 'setConversation' - | 'setIsSubmitting' - | 'newConversation' - | 'resetLatestMessage' ->; +/** + * Build a submission object from resume state for reconnected streams. + * This provides the minimum data needed for useResumableSSE to subscribe. + */ +function buildSubmissionFromResumeState( + resumeState: Agents.ResumeState, + streamId: string, + messages: TMessage[], + conversationId: string, +): TSubmission { + const userMessageData = resumeState.userMessage; + const responseMessageId = + resumeState.responseMessageId ?? `${userMessageData?.messageId ?? 'resume'}_`; + + // Try to find existing user message in the messages array (from database) + const existingUserMessage = messages.find( + (m) => m.isCreatedByUser && m.messageId === userMessageData?.messageId, + ); + + // Try to find existing response message in the messages array (from database) + const existingResponseMessage = messages.find( + (m) => + !m.isCreatedByUser && + (m.messageId === responseMessageId || m.parentMessageId === userMessageData?.messageId), + ); + + // Create or use existing user message + const userMessage: TMessage = + existingUserMessage ?? + (userMessageData + ? (tMessageSchema.parse({ + messageId: userMessageData.messageId, + parentMessageId: userMessageData.parentMessageId ?? Constants.NO_PARENT, + conversationId: userMessageData.conversationId ?? conversationId, + text: userMessageData.text ?? '', + isCreatedByUser: true, + role: 'user', + }) as TMessage) + : (messages[messages.length - 2] ?? + ({ + messageId: 'resume_user_msg', + conversationId, + text: '', + isCreatedByUser: true, + } as TMessage))); + + // Use existing response from DB if available (preserves already-saved content) + const initialResponse: TMessage = + existingResponseMessage ?? + ({ + messageId: responseMessageId, + parentMessageId: userMessage.messageId, + conversationId, + text: '', + content: (resumeState.aggregatedContent as TMessage['content']) ?? [], + isCreatedByUser: false, + role: 'assistant', + } as TMessage); + + const conversation: TConversation = { + conversationId, + title: 'Resumed Chat', + endpoint: null, + } as TConversation; + + return { + messages, + userMessage, + initialResponse, + conversation, + isRegenerate: false, + isTemporary: false, + endpointOption: {}, + } as TSubmission; +} /** - * Hook to resume streaming if navigating back to a conversation with active generation. - * Checks for active jobs on mount and auto-subscribes if found. + * Hook to resume streaming if navigating to a conversation with active generation. + * Checks stream status via React Query and sets submission if active job found. + * + * This hook: + * 1. Uses useStreamStatus to check for active jobs on navigation + * 2. If active job found, builds a submission with streamId and sets it + * 3. useResumableSSE picks up the submission and subscribes to the stream */ export default function useResumeOnLoad( conversationId: string | undefined, - chatHelpers: ChatHelpers, + getMessages: () => TMessage[] | undefined, runIndex = 0, ) { const resumableEnabled = useRecoilValue(store.resumableStreams); - const { token, isAuthenticated } = useAuthContext(); - const sseRef = useRef(null); - const checkedConvoRef = useRef(null); - const [completed, setCompleted] = useState(new Set()); - const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex)); - const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex)); + const setSubmission = useSetRecoilState(store.submissionByIndex(runIndex)); + const currentSubmission = useRecoilValue(store.submissionByIndex(runIndex)); + const hasResumedRef = useRef(null); - const { getMessages, setIsSubmitting } = chatHelpers; + // Check for active stream when conversation changes + // const { data: streamStatus, isSuccess } = useStreamStatus( + // conversationId, + // resumableEnabled && !currentSubmission, // Only check if no active submission + // ); - const { stepHandler, finalHandler, contentHandler } = useEventHandlers({ - ...chatHelpers, - setCompleted, - setShowStopButton, - }); - - const { data: startupConfig } = useGetStartupConfig(); - const balanceQuery = useGetUserBalance({ - enabled: !!isAuthenticated && startupConfig?.balance?.enabled, - }); - - /** - * Check for active job when conversation loads - */ useEffect(() => { - if (!resumableEnabled || !conversationId || !token) { - checkedConvoRef.current = null; + // if (!resumableEnabled || !conversationId || !isSuccess || !streamStatus) { + if (!resumableEnabled || !conversationId) { return; } - // Only check once per conversationId to prevent loops - if (checkedConvoRef.current === conversationId) { + // Don't resume if we already have an active submission + if (currentSubmission) { return; } - checkedConvoRef.current = conversationId; + // Don't resume the same conversation twice + if (hasResumedRef.current === conversationId) { + return; + } - const checkAndResume = async () => { - try { - const response = await fetch(`/api/agents/chat/status/${conversationId}`, { - headers: { Authorization: `Bearer ${token}` }, - }); + // Check if there's an active job to resume + // if (!streamStatus.active || !streamStatus.streamId) { + // return; + // } - if (!response.ok) { - return; - } + // console.log('[ResumeOnLoad] Found active job, creating submission...', { + // streamId: streamStatus.streamId, + // status: streamStatus.status, + // }); - const { active, streamId } = await response.json(); + hasResumedRef.current = conversationId; - if (!active || !streamId) { - return; - } + const messages = getMessages() || []; - console.log('[ResumeOnLoad] Found active job, resuming...', { streamId }); + // Minimal submission without resume state + const lastMessage = messages[messages.length - 1]; + const submission: TSubmission = { + messages, + userMessage: lastMessage ?? ({ messageId: 'resume', conversationId, text: '' } as TMessage), + initialResponse: { + messageId: 'resume_', + conversationId, + text: '', + content: [{ type: 'text', text: '' }], + } as TMessage, + conversation: { conversationId, title: 'Resumed Chat' } as TConversation, + isRegenerate: false, + isTemporary: false, + endpointOption: {}, + } as TSubmission; + setSubmission(submission); + }, [conversationId, resumableEnabled, currentSubmission, getMessages, setSubmission]); - const messages = getMessages() || []; - const lastMessage = messages[messages.length - 1]; - let textIndex: number | null = null; - - const url = `/api/agents/chat/stream/${encodeURIComponent(streamId)}`; - - const sse = new SSE(url, { - headers: { Authorization: `Bearer ${token}` }, - method: 'GET', - }); - sseRef.current = sse; - - sse.addEventListener('open', () => { - console.log('[ResumeOnLoad] Reconnected to stream'); - setAbortScroll(false); - setShowStopButton(true); - setIsSubmitting(true); - }); - - sse.addEventListener('message', (e: MessageEvent) => { - try { - const data = JSON.parse(e.data); - - if (data.final != null) { - try { - finalHandler(data, { messages } as unknown as EventSubmission); - } catch (error) { - console.error('[ResumeOnLoad] Error in finalHandler:', error); - setIsSubmitting(false); - setShowStopButton(false); - } - (startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch(); - sse.close(); - sseRef.current = null; - return; - } - - if (data.event != null) { - stepHandler(data, { - messages, - userMessage: lastMessage, - } as unknown as EventSubmission); - return; - } - - if (data.type != null) { - const { text, index } = data; - if (text != null && index !== textIndex) { - textIndex = index; - } - contentHandler({ data, submission: { messages } as unknown as EventSubmission }); - return; - } - } catch (error) { - console.error('[ResumeOnLoad] Error processing message:', error); - } - }); - - sse.addEventListener('error', async (e: MessageEvent) => { - console.log('[ResumeOnLoad] Stream error'); - sse.close(); - sseRef.current = null; - setIsSubmitting(false); - setShowStopButton(false); - - /* @ts-ignore */ - if (e.responseCode === 401) { - try { - const refreshResponse = await request.refreshToken(); - const newToken = refreshResponse?.token ?? ''; - if (newToken) { - request.dispatchTokenUpdatedEvent(newToken); - } - } catch (error) { - console.log('[ResumeOnLoad] Token refresh failed:', error); - } - } - }); - - sse.stream(); - } catch (error) { - console.error('[ResumeOnLoad] Error checking job status:', error); - } - }; - - checkAndResume(); - - return () => { - if (sseRef.current) { - sseRef.current.close(); - sseRef.current = null; - } - }; - // Only re-run when conversationId changes - // eslint-disable-next-line react-hooks/exhaustive-deps + // Reset hasResumedRef when conversation changes + useEffect(() => { + if (conversationId !== hasResumedRef.current) { + hasResumedRef.current = null; + } }, [conversationId]); } diff --git a/client/src/hooks/SSE/useStepHandler.ts b/client/src/hooks/SSE/useStepHandler.ts index 52ae53a460..87786ab444 100644 --- a/client/src/hooks/SSE/useStepHandler.ts +++ b/client/src/hooks/SSE/useStepHandler.ts @@ -21,7 +21,8 @@ type TUseStepHandler = { announcePolite: (options: AnnounceOptions) => void; setMessages: (messages: TMessage[]) => void; getMessages: () => TMessage[] | undefined; - setIsSubmitting: SetterOrUpdater; + /** @deprecated - isSubmitting should be derived from submission state */ + setIsSubmitting?: SetterOrUpdater; lastAnnouncementTimeRef: React.MutableRefObject; }; @@ -50,10 +51,12 @@ type AllContentTypes = | ContentTypes.IMAGE_URL | ContentTypes.ERROR; +const noop = () => {}; + export default function useStepHandler({ setMessages, getMessages, - setIsSubmitting, + setIsSubmitting = noop, announcePolite, lastAnnouncementTimeRef, }: TUseStepHandler) { @@ -198,7 +201,6 @@ export default function useStepHandler({ ({ event, data }: TStepEvent, submission: EventSubmission) => { const messages = getMessages() || []; const { userMessage } = submission; - setIsSubmitting(true); let parentMessageId = userMessage.messageId; const currentTime = Date.now(); @@ -230,12 +232,17 @@ export default function useStepHandler({ if (!response) { const responseMessage = messages[messages.length - 1] as TMessage; + // Preserve existing content from DB (partial response) and prepend initialContent if provided + const existingContent = responseMessage?.content ?? []; + const mergedContent = + initialContent.length > 0 ? [...initialContent, ...existingContent] : existingContent; + response = { ...responseMessage, parentMessageId, conversationId: userMessage.conversationId, messageId: responseMessageId, - content: initialContent, + content: mergedContent, }; messageMap.current.set(responseMessageId, response); diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 0597c66f5b..9df9c4b1ad 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -1,5 +1,6 @@ import { EventEmitter } from 'events'; import { logger } from '@librechat/data-schemas'; +import type { Agents } from 'librechat-data-provider'; import type { ServerSentEvent } from '~/types'; import type { GenerationJob, @@ -9,6 +10,8 @@ import type { ErrorHandler, UnsubscribeFn, ContentPart, + ResumeState, + GenerationJobMetadata, } from './types'; /** @@ -71,6 +74,7 @@ class GenerationJobManagerClass { resolveReady: resolveReady!, chunks: [], aggregatedContent: [], + runSteps: new Map(), }; job.emitter.setMaxListeners(100); @@ -152,18 +156,55 @@ class GenerationJobManagerClass { /** * Abort a job (user-initiated). + * Emits both error event and a final done event with aborted flag. * @param streamId - The stream identifier */ abortJob(streamId: string): void { const job = this.jobs.get(streamId); if (!job) { + logger.warn(`[GenerationJobManager] Cannot abort - job not found: ${streamId}`); return; } + logger.debug( + `[GenerationJobManager] Aborting job ${streamId}, signal already aborted: ${job.abortController.signal.aborted}`, + ); job.abortController.abort(); job.status = 'aborted'; job.completedAt = Date.now(); - job.emitter.emit('error', 'Request aborted by user'); + logger.debug( + `[GenerationJobManager] AbortController.abort() called for ${streamId}, signal.aborted: ${job.abortController.signal.aborted}`, + ); + + // Create a final event for abort so clients can properly handle UI cleanup + const abortFinalEvent = { + final: true, + conversation: { + conversationId: job.metadata.conversationId, + }, + title: 'New Chat', + requestMessage: job.metadata.userMessage + ? { + messageId: job.metadata.userMessage.messageId, + conversationId: job.metadata.conversationId, + text: job.metadata.userMessage.text ?? '', + } + : null, + responseMessage: { + messageId: + job.metadata.responseMessageId ?? `${job.metadata.userMessage?.messageId ?? 'aborted'}_`, + conversationId: job.metadata.conversationId, + content: job.aggregatedContent ?? [], + unfinished: true, + error: true, + }, + aborted: true, + } as unknown as ServerSentEvent; + + job.finalEvent = abortFinalEvent; + job.emitter.emit('done', abortFinalEvent); + // Don't emit error event - it causes unhandled error warnings + // The done event with error:true and aborted:true is sufficient logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`); } @@ -249,6 +290,7 @@ class GenerationJobManagerClass { /** * Emit a chunk event to all subscribers. * Only buffers chunks when no subscribers are listening (for reconnect replay). + * Also tracks run steps and user message for reconnection state. * @param streamId - The stream identifier * @param event - The event data to emit */ @@ -264,15 +306,121 @@ class GenerationJobManagerClass { job.chunks.push(event); } + // Track run steps for reconnection + this.trackRunStep(job, event); + + // Track user message from created event + this.trackUserMessage(job, event); + // Always aggregate content (for partial response saving) this.aggregateContent(job, event); job.emitter.emit('chunk', event); } + /** + * Track run step events for reconnection state. + * This allows reconnecting clients to rebuild their stepMap. + */ + private trackRunStep(job: GenerationJob, event: ServerSentEvent): void { + const data = event as Record; + if (data.event !== 'on_run_step') { + return; + } + + const runStep = data.data as Agents.RunStep; + if (!runStep?.id) { + return; + } + + job.runSteps.set(runStep.id, runStep); + logger.debug(`[GenerationJobManager] Tracked run step: ${runStep.id} for ${job.streamId}`); + } + + /** + * Track user message from created event for reconnection. + */ + private trackUserMessage(job: GenerationJob, event: ServerSentEvent): void { + const data = event as Record; + if (!data.created || !data.message) { + return; + } + + const message = data.message as Record; + job.metadata.userMessage = { + messageId: message.messageId as string, + parentMessageId: message.parentMessageId as string | undefined, + conversationId: message.conversationId as string | undefined, + text: message.text as string | undefined, + }; + + // Update conversationId in metadata if not set + if (!job.metadata.conversationId && message.conversationId) { + job.metadata.conversationId = message.conversationId as string; + } + + logger.debug(`[GenerationJobManager] Tracked user message for ${job.streamId}`); + } + + /** + * Update job metadata with additional information. + * Called when more information becomes available during generation. + * @param streamId - The stream identifier + * @param metadata - Partial metadata to merge + */ + updateMetadata(streamId: string, metadata: Partial): void { + const job = this.jobs.get(streamId); + if (!job) { + return; + } + job.metadata = { ...job.metadata, ...metadata }; + logger.debug(`[GenerationJobManager] Updated metadata for ${streamId}`); + } + + /** + * Get resume state for reconnecting clients. + * Includes run steps, aggregated content, and user message data. + * @param streamId - The stream identifier + * @returns Resume state or null if job not found + */ + getResumeState(streamId: string): ResumeState | null { + const job = this.jobs.get(streamId); + if (!job) { + return null; + } + + return { + runSteps: Array.from(job.runSteps.values()), + aggregatedContent: job.aggregatedContent, + userMessage: job.metadata.userMessage, + responseMessageId: job.metadata.responseMessageId, + conversationId: job.metadata.conversationId, + }; + } + + /** + * Mark that sync has been sent for this job to prevent duplicate replays. + * @param streamId - The stream identifier + */ + markSyncSent(streamId: string): void { + const job = this.jobs.get(streamId); + if (job) { + job.syncSent = true; + } + } + + /** + * Check if sync has been sent for this job. + * @param streamId - The stream identifier + */ + wasSyncSent(streamId: string): boolean { + return this.jobs.get(streamId)?.syncSent ?? false; + } + /** * Aggregate content parts from message delta events. * Used to save partial response when subscribers disconnect. + * Uses flat format: { type: 'text', text: 'content' } */ private aggregateContent(job: GenerationJob, event: ServerSentEvent): void { // Check for on_message_delta events which contain content @@ -283,7 +431,7 @@ class GenerationJobManagerClass { if (delta?.content && Array.isArray(delta.content)) { for (const part of delta.content) { if (part.type === 'text' && part.text) { - // Find or create text content part + // Find or create text content part in flat format let textPart = job.aggregatedContent?.find((p) => p.type === 'text'); if (!textPart) { textPart = { type: 'text', text: '' }; @@ -354,6 +502,7 @@ class GenerationJobManagerClass { const job = this.jobs.get(streamId); if (job) { job.emitter.removeAllListeners(); + job.runSteps.clear(); this.jobs.delete(streamId); } } @@ -380,12 +529,13 @@ class GenerationJobManagerClass { /** * Get stream info for status endpoint. - * Returns chunk count, status, and aggregated content. + * Returns chunk count, status, aggregated content, and run step count. */ getStreamInfo(streamId: string): { active: boolean; status: GenerationJobStatus; chunkCount: number; + runStepCount: number; aggregatedContent?: ContentPart[]; createdAt: number; } | null { @@ -398,6 +548,7 @@ class GenerationJobManagerClass { active: job.status === 'running', status: job.status, chunkCount: job.chunks.length, + runStepCount: job.runSteps.size, aggregatedContent: job.aggregatedContent, createdAt: job.createdAt, }; diff --git a/packages/api/src/stream/types.ts b/packages/api/src/stream/types.ts index e65c29157f..337ebcc17c 100644 --- a/packages/api/src/stream/types.ts +++ b/packages/api/src/stream/types.ts @@ -1,9 +1,14 @@ import type { EventEmitter } from 'events'; +import type { Agents } from 'librechat-data-provider'; import type { ServerSentEvent } from '~/types'; export interface GenerationJobMetadata { userId: string; conversationId?: string; + /** User message data for rebuilding submission on reconnect */ + userMessage?: Agents.UserMessageMeta; + /** Response message ID for tracking */ + responseMessageId?: string; } export type GenerationJobStatus = 'running' | 'complete' | 'error' | 'aborted'; @@ -25,13 +30,14 @@ export interface GenerationJob { finalEvent?: ServerSentEvent; /** Aggregated content parts for saving partial response */ aggregatedContent?: ContentPart[]; + /** Tracked run steps for reconnection - maps step ID to step data */ + runSteps: Map; + /** Flag to indicate if a sync event was already sent (prevent duplicate replays) */ + syncSent?: boolean; } -export interface ContentPart { - type: string; - text?: string; - [key: string]: unknown; -} +export type ContentPart = Agents.ContentPart; +export type ResumeState = Agents.ResumeState; export type ChunkHandler = (event: ServerSentEvent) => void; export type DoneHandler = (event: ServerSentEvent) => void; diff --git a/packages/data-provider/src/types/agents.ts b/packages/data-provider/src/types/agents.ts index f9101e782e..3c822cee8b 100644 --- a/packages/data-provider/src/types/agents.ts +++ b/packages/data-provider/src/types/agents.ts @@ -171,6 +171,30 @@ export namespace Agents { stepDetails: StepDetails; usage: null | object; }; + + /** Content part for aggregated message content */ + export interface ContentPart { + type: string; + text?: string; + [key: string]: unknown; + } + + /** User message metadata for rebuilding submission on reconnect */ + export interface UserMessageMeta { + messageId: string; + parentMessageId?: string; + conversationId?: string; + text?: string; + } + + /** State data sent to reconnecting clients */ + export interface ResumeState { + runSteps: RunStep[]; + aggregatedContent?: ContentPart[]; + userMessage?: UserMessageMeta; + responseMessageId?: string; + conversationId?: string; + } /** * Represents a run step delta i.e. any changed fields on a run step during * streaming.