From 1b2d3f30ef10f42f94faa3619aec7c89cd8316a6 Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Fri, 12 Dec 2025 01:10:08 -0500 Subject: [PATCH] fix: improve syncing when switching conversations --- api/server/controllers/agents/client.js | 7 + api/server/controllers/agents/request.js | 5 + api/server/routes/agents/index.js | 11 +- client/src/components/Chat/ChatView.tsx | 3 +- client/src/hooks/SSE/useContentHandler.ts | 15 +- client/src/hooks/SSE/useEventHandlers.ts | 8 +- client/src/hooks/SSE/useResumableSSE.ts | 115 +++++++++- client/src/hooks/SSE/useResumeOnLoad.ts | 84 +++++-- client/src/hooks/SSE/useStepHandler.ts | 19 +- .../api/src/stream/GenerationJobManager.ts | 209 ++++++++---------- packages/api/src/stream/index.ts | 1 - packages/api/src/types/index.ts | 1 + .../src/{stream/types.ts => types/stream.ts} | 9 +- packages/data-provider/src/types/agents.ts | 3 +- 14 files changed, 314 insertions(+), 176 deletions(-) rename packages/api/src/{stream/types.ts => types/stream.ts} (83%) diff --git a/api/server/controllers/agents/client.js b/api/server/controllers/agents/client.js index faf3c58399..449bf1b08b 100644 --- a/api/server/controllers/agents/client.js +++ b/api/server/controllers/agents/client.js @@ -14,6 +14,7 @@ const { getBalanceConfig, getProviderConfig, memoryInstructions, + GenerationJobManager, getTransactionsConfig, createMemoryProcessor, filterMalformedContentParts, @@ -953,6 +954,12 @@ class AgentClient extends BaseClient { } this.run = run; + + const streamId = this.options.req?._resumableStreamId; + if (streamId && run.Graph) { + GenerationJobManager.setGraph(streamId, run.Graph); + } + if (userMCPAuthMap != null) { config.configurable.userMCPAuthMap = userMCPAuthMap; } diff --git a/api/server/controllers/agents/request.js b/api/server/controllers/agents/request.js index 7f588f3472..2e8f9bd18d 100644 --- a/api/server/controllers/agents/request.js +++ b/api/server/controllers/agents/request.js @@ -144,6 +144,11 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit GenerationJobManager.updateMetadata(streamId, { sender: client.sender }); } + // Store reference to client's contentParts - graph will be set when run is created + if (client?.contentParts) { + GenerationJobManager.setContentParts(streamId, client.contentParts); + } + res.json({ streamId, status: 'started' }); let conversationId = reqConversationId; diff --git a/api/server/routes/agents/index.js b/api/server/routes/agents/index.js index 44a04e8db9..36d293afad 100644 --- a/api/server/routes/agents/index.js +++ b/api/server/routes/agents/index.js @@ -56,17 +56,20 @@ router.get('/chat/stream/:streamId', (req, res) => { logger.debug(`[AgentStream] Client subscribed to ${streamId}, resume: ${isResume}`); - // Send sync event with resume state for reconnecting clients - if (isResume && !GenerationJobManager.wasSyncSent(streamId)) { + // Send sync event with resume state for ALL reconnecting clients + // This supports multi-tab scenarios where each tab needs run step data + if (isResume) { const resumeState = GenerationJobManager.getResumeState(streamId); if (resumeState && !res.writableEnded) { + // Send sync event with run steps AND aggregatedContent + // Client will use aggregatedContent to initialize message state res.write(`event: message\ndata: ${JSON.stringify({ sync: true, resumeState })}\n\n`); if (typeof res.flush === 'function') { res.flush(); } - GenerationJobManager.markSyncSent(streamId); + const textPart = resumeState.aggregatedContent?.find((p) => p.type === 'text'); logger.debug( - `[AgentStream] Sent sync event for ${streamId} with ${resumeState.runSteps.length} run steps`, + `[AgentStream] Sent sync event for ${streamId} with ${resumeState.runSteps.length} run steps, content length: ${textPart?.text?.length ?? 0}`, ); } } diff --git a/client/src/components/Chat/ChatView.tsx b/client/src/components/Chat/ChatView.tsx index 03d1533c23..0c418af2c4 100644 --- a/client/src/components/Chat/ChatView.tsx +++ b/client/src/components/Chat/ChatView.tsx @@ -55,7 +55,8 @@ function ChatView({ index = 0 }: { index?: number }) { useAdaptiveSSE(addedSubmission, addedChatHelpers, true, index + 1); // Auto-resume if navigating back to conversation with active job - useResumeOnLoad(conversationId, chatHelpers.getMessages, index); + // Wait for messages to load before resuming to avoid race condition + useResumeOnLoad(conversationId, chatHelpers.getMessages, index, !isLoading); const methods = useForm({ defaultValues: { text: '' }, diff --git a/client/src/hooks/SSE/useContentHandler.ts b/client/src/hooks/SSE/useContentHandler.ts index d51cb1e016..458c304be4 100644 --- a/client/src/hooks/SSE/useContentHandler.ts +++ b/client/src/hooks/SSE/useContentHandler.ts @@ -27,7 +27,13 @@ type TContentHandler = { export default function useContentHandler({ setMessages, getMessages }: TUseContentHandler) { const queryClient = useQueryClient(); const messageMap = useMemo(() => new Map(), []); - return useCallback( + + /** Reset the message map - call this after sync to prevent stale state from overwriting synced content */ + const resetMessageMap = useCallback(() => { + messageMap.clear(); + }, [messageMap]); + + const handler = useCallback( ({ data, submission }: TContentHandler) => { const { type, messageId, thread_id, conversationId, index } = data; @@ -41,8 +47,11 @@ export default function useContentHandler({ setMessages, getMessages }: TUseCont let response = messageMap.get(messageId); if (!response) { + // Check if message already exists in current messages (e.g., after sync) + // Use that as base instead of stale initialResponse + const existingMessage = _messages?.find((m) => m.messageId === messageId); response = { - ...(initialResponse as TMessage), + ...(existingMessage ?? (initialResponse as TMessage)), parentMessageId: userMessage?.messageId ?? '', conversationId, messageId, @@ -82,4 +91,6 @@ export default function useContentHandler({ setMessages, getMessages }: TUseCont }, [queryClient, getMessages, messageMap, setMessages], ); + + return { contentHandler: handler, resetContentHandler: resetMessageMap }; } diff --git a/client/src/hooks/SSE/useEventHandlers.ts b/client/src/hooks/SSE/useEventHandlers.ts index 199482998f..9ca8da4dec 100644 --- a/client/src/hooks/SSE/useEventHandlers.ts +++ b/client/src/hooks/SSE/useEventHandlers.ts @@ -189,8 +189,8 @@ export default function useEventHandlers({ const { conversationId: paramId } = useParams(); const { token } = useAuthContext(); - const contentHandler = useContentHandler({ setMessages, getMessages }); - const { stepHandler, clearStepMaps } = useStepHandler({ + const { contentHandler, resetContentHandler } = useContentHandler({ setMessages, getMessages }); + const { stepHandler, clearStepMaps, syncStepMessage } = useStepHandler({ setMessages, getMessages, announcePolite, @@ -827,15 +827,17 @@ export default function useEventHandlers({ ); return { - clearStepMaps, stepHandler, syncHandler, finalHandler, errorHandler, + clearStepMaps, messageHandler, contentHandler, createdHandler, + syncStepMessage, attachmentHandler, abortConversation, + resetContentHandler, }; } diff --git a/client/src/hooks/SSE/useResumableSSE.ts b/client/src/hooks/SSE/useResumableSSE.ts index 1495a81b98..ccfe406b1c 100644 --- a/client/src/hooks/SSE/useResumableSSE.ts +++ b/client/src/hooks/SSE/useResumableSSE.ts @@ -58,7 +58,7 @@ export default function useResumableSSE( const setActiveRunId = useSetRecoilState(store.activeRunFamily(runIndex)); const { token, isAuthenticated } = useAuthContext(); - const [completed, setCompleted] = useState(new Set()); + const [_completed, setCompleted] = useState(new Set()); const [streamId, setStreamId] = useState(null); const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex)); const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex)); @@ -78,15 +78,16 @@ export default function useResumableSSE( } = chatHelpers; const { - clearStepMaps, stepHandler, - syncHandler, finalHandler, errorHandler, + clearStepMaps, messageHandler, contentHandler, createdHandler, + syncStepMessage, attachmentHandler, + resetContentHandler, } = useEventHandlers({ genTitle, setMessages, @@ -108,14 +109,16 @@ export default function useResumableSSE( /** * Subscribe to stream via SSE library (supports custom headers) * Follows same auth pattern as useSSE + * @param isResume - If true, adds ?resume=true to trigger sync event from server */ const subscribeToStream = useCallback( - (currentStreamId: string, currentSubmission: TSubmission) => { + (currentStreamId: string, currentSubmission: TSubmission, isResume = false) => { let { userMessage } = currentSubmission; let textIndex: number | null = null; - const url = `/api/agents/chat/stream/${encodeURIComponent(currentStreamId)}`; - console.log('[ResumableSSE] Subscribing to stream:', url); + const baseUrl = `/api/agents/chat/stream/${encodeURIComponent(currentStreamId)}`; + const url = isResume ? `${baseUrl}?resume=true` : baseUrl; + console.log('[ResumableSSE] Subscribing to stream:', url, { isResume }); const sse = new SSE(url, { headers: { Authorization: `Bearer ${token}` }, @@ -184,13 +187,98 @@ export default function useResumableSSE( } if (data.sync != null) { - console.log('[ResumableSSE] Received SYNC event', { - conversationId: data.conversationId, - hasResumeState: !!data.resumeState, + const textPart = data.resumeState?.aggregatedContent?.find( + (p: { type: string }) => p.type === 'text', + ); + console.log('[ResumableSSE] SYNC received', { + runSteps: data.resumeState?.runSteps?.length ?? 0, + contentLength: textPart?.text?.length ?? 0, }); + const runId = v4(); setActiveRunId(runId); - syncHandler(data, { ...currentSubmission, userMessage } as EventSubmission); + + // Replay run steps + if (data.resumeState?.runSteps) { + for (const runStep of data.resumeState.runSteps) { + stepHandler({ event: 'on_run_step', data: runStep }, { + ...currentSubmission, + userMessage, + } as EventSubmission); + } + } + + // Set message content from aggregatedContent + if (data.resumeState?.aggregatedContent && userMessage?.messageId) { + const messages = getMessages() ?? []; + const userMsgId = userMessage.messageId; + const serverResponseId = data.resumeState.responseMessageId; + + // Find the EXACT response message - prioritize responseMessageId from server + // This is critical when there are multiple responses to the same user message + let responseIdx = -1; + if (serverResponseId) { + responseIdx = messages.findIndex((m) => m.messageId === serverResponseId); + } + // Fallback: find by parentMessageId pattern (for new messages) + if (responseIdx < 0) { + responseIdx = messages.findIndex( + (m) => + !m.isCreatedByUser && + (m.messageId === `${userMsgId}_` || m.parentMessageId === userMsgId), + ); + } + + const textPart = data.resumeState.aggregatedContent?.find( + (p: { type: string }) => p.type === 'text', + ); + console.log('[ResumableSSE] SYNC update', { + userMsgId, + serverResponseId, + responseIdx, + foundMessageId: responseIdx >= 0 ? messages[responseIdx]?.messageId : null, + messagesCount: messages.length, + aggregatedContentLength: data.resumeState.aggregatedContent?.length, + textContentLength: textPart?.text?.length ?? 0, + }); + + if (responseIdx >= 0) { + // Update existing response message with aggregatedContent + const updated = [...messages]; + const oldContent = updated[responseIdx]?.content; + updated[responseIdx] = { + ...updated[responseIdx], + content: data.resumeState.aggregatedContent, + }; + console.log('[ResumableSSE] SYNC updating message', { + messageId: updated[responseIdx]?.messageId, + oldContentLength: Array.isArray(oldContent) ? oldContent.length : 0, + newContentLength: data.resumeState.aggregatedContent?.length, + }); + setMessages(updated); + // Sync both content handler and step handler with the updated message + // so subsequent deltas build on synced content, not stale content + resetContentHandler(); + syncStepMessage(updated[responseIdx]); + console.log('[ResumableSSE] SYNC complete, handlers synced'); + } else { + // Add new response message + const responseId = serverResponseId ?? `${userMsgId}_`; + setMessages([ + ...messages, + { + messageId: responseId, + parentMessageId: userMsgId, + conversationId: currentSubmission.conversation?.conversationId ?? '', + text: '', + content: data.resumeState.aggregatedContent, + isCreatedByUser: false, + } as TMessage, + ]); + } + } + + setShowStopButton(true); return; } @@ -278,11 +366,14 @@ export default function useResumableSSE( createdHandler, attachmentHandler, stepHandler, - syncHandler, contentHandler, + resetContentHandler, + syncStepMessage, messageHandler, errorHandler, setIsSubmitting, + getMessages, + setMessages, startupConfig?.balance?.enabled, balanceQuery, ], @@ -356,7 +447,7 @@ export default function useResumableSSE( // Resume: just subscribe to existing stream, don't start new generation console.log('[ResumableSSE] Resuming existing stream:', resumeStreamId); setStreamId(resumeStreamId); - subscribeToStream(resumeStreamId, submission); + subscribeToStream(resumeStreamId, submission, true); // isResume=true } else { // New generation: start and then subscribe console.log('[ResumableSSE] Starting NEW generation'); diff --git a/client/src/hooks/SSE/useResumeOnLoad.ts b/client/src/hooks/SSE/useResumeOnLoad.ts index 4349c219d7..abf0c7eda8 100644 --- a/client/src/hooks/SSE/useResumeOnLoad.ts +++ b/client/src/hooks/SSE/useResumeOnLoad.ts @@ -51,18 +51,20 @@ function buildSubmissionFromResumeState( isCreatedByUser: true, } as TMessage))); - // Use existing response from DB if available (preserves already-saved content) - const initialResponse: TMessage = - existingResponseMessage ?? - ({ - messageId: responseMessageId, - parentMessageId: userMessage.messageId, - conversationId, - text: '', - content: (resumeState.aggregatedContent as TMessage['content']) ?? [], - isCreatedByUser: false, - role: 'assistant', - } as TMessage); + // ALWAYS use aggregatedContent from resumeState - it has the latest content from the running job. + // DB content may be stale (saved at disconnect, but generation continued). + const initialResponse: TMessage = { + messageId: existingResponseMessage?.messageId ?? responseMessageId, + parentMessageId: existingResponseMessage?.parentMessageId ?? userMessage.messageId, + conversationId, + text: '', + // aggregatedContent is authoritative - it reflects actual job state + content: (resumeState.aggregatedContent as TMessage['content']) ?? [], + isCreatedByUser: false, + role: 'assistant', + sender: existingResponseMessage?.sender, + model: existingResponseMessage?.model, + } as TMessage; const conversation: TConversation = { conversationId, @@ -91,11 +93,14 @@ function buildSubmissionFromResumeState( * 1. Uses useStreamStatus to check for active jobs on navigation * 2. If active job found, builds a submission with streamId and sets it * 3. useResumableSSE picks up the submission and subscribes to the stream + * + * @param messagesLoaded - Whether the messages query has finished loading (prevents race condition) */ export default function useResumeOnLoad( conversationId: string | undefined, getMessages: () => TMessage[] | undefined, runIndex = 0, + messagesLoaded = true, ) { const resumableEnabled = useRecoilValue(store.resumableStreams); const setSubmission = useSetRecoilState(store.submissionByIndex(runIndex)); @@ -104,10 +109,14 @@ export default function useResumeOnLoad( const processedConvoRef = useRef(null); // Check for active stream when conversation changes - // Only check if resumable is enabled and no active submission + // Allow check if no submission OR submission is for a different conversation (stale) + const submissionConvoId = currentSubmission?.conversation?.conversationId; + const hasActiveSubmissionForThisConvo = currentSubmission && submissionConvoId === conversationId; + const shouldCheck = resumableEnabled && - !currentSubmission && + messagesLoaded && // Wait for messages to load before checking + !hasActiveSubmissionForThisConvo && // Allow if no submission or stale submission !!conversationId && conversationId !== Constants.NEW_CONVO && processedConvoRef.current !== conversationId; // Don't re-check processed convos @@ -118,6 +127,7 @@ export default function useResumeOnLoad( console.log('[ResumeOnLoad] Effect check', { resumableEnabled, conversationId, + messagesLoaded, hasCurrentSubmission: !!currentSubmission, currentSubmissionConvoId: currentSubmission?.conversation?.conversationId, isSuccess, @@ -131,14 +141,32 @@ export default function useResumeOnLoad( return; } - // Don't resume if we already have an active submission (we started it ourselves) - if (currentSubmission) { - console.log('[ResumeOnLoad] Skipping - already have active submission, marking as processed'); + // Wait for messages to load to avoid race condition where sync overwrites then DB overwrites + if (!messagesLoaded) { + console.log('[ResumeOnLoad] Waiting for messages to load'); + return; + } + + // Don't resume if we already have an active submission FOR THIS CONVERSATION + // A stale submission with undefined/different conversationId should not block us + if (hasActiveSubmissionForThisConvo) { + console.log('[ResumeOnLoad] Skipping - already have active submission for this conversation'); // Mark as processed so we don't try again processedConvoRef.current = conversationId; return; } + // If there's a stale submission for a different conversation, log it but continue + if (currentSubmission && submissionConvoId !== conversationId) { + console.log( + '[ResumeOnLoad] Found stale submission for different conversation, will check for resume', + { + staleConvoId: submissionConvoId, + currentConvoId: conversationId, + }, + ); + } + // Wait for stream status query to complete if (!isSuccess || !streamStatus) { console.log('[ResumeOnLoad] Waiting for stream status query'); @@ -151,15 +179,17 @@ export default function useResumeOnLoad( return; } - // Mark as processed immediately to prevent race conditions - processedConvoRef.current = conversationId; - // Check if there's an active job to resume + // DON'T mark as processed here - only mark when we actually create a submission + // This prevents stale cache data from blocking subsequent resume attempts if (!streamStatus.active || !streamStatus.streamId) { console.log('[ResumeOnLoad] No active job to resume for:', conversationId); return; } + // Mark as processed NOW - we verified there's an active job and will create submission + processedConvoRef.current = conversationId; + console.log('[ResumeOnLoad] Found active job, creating submission...', { streamId: streamStatus.streamId, status: streamStatus.status, @@ -202,6 +232,9 @@ export default function useResumeOnLoad( }, [ conversationId, resumableEnabled, + messagesLoaded, + hasActiveSubmissionForThisConvo, + submissionConvoId, currentSubmission, isSuccess, streamStatus, @@ -209,11 +242,14 @@ export default function useResumeOnLoad( setSubmission, ]); - // Reset processedConvoRef when conversation changes to a different one + // Reset processedConvoRef when conversation changes to allow re-checking useEffect(() => { - if (conversationId && conversationId !== processedConvoRef.current) { - // Only reset if we're navigating to a DIFFERENT conversation - // This allows re-checking when navigating back + // Always reset when conversation changes - this allows resuming when navigating back + if (conversationId !== processedConvoRef.current) { + console.log('[ResumeOnLoad] Resetting processedConvoRef for new conversation:', { + old: processedConvoRef.current, + new: conversationId, + }); processedConvoRef.current = null; } }, [conversationId]); diff --git a/client/src/hooks/SSE/useStepHandler.ts b/client/src/hooks/SSE/useStepHandler.ts index 87786ab444..bf9a0d024b 100644 --- a/client/src/hooks/SSE/useStepHandler.ts +++ b/client/src/hooks/SSE/useStepHandler.ts @@ -51,12 +51,9 @@ type AllContentTypes = | ContentTypes.IMAGE_URL | ContentTypes.ERROR; -const noop = () => {}; - export default function useStepHandler({ setMessages, getMessages, - setIsSubmitting = noop, announcePolite, lastAnnouncementTimeRef, }: TUseStepHandler) { @@ -468,7 +465,7 @@ export default function useStepHandler({ stepMap.current.clear(); }; }, - [getMessages, setIsSubmitting, lastAnnouncementTimeRef, announcePolite, setMessages], + [getMessages, lastAnnouncementTimeRef, announcePolite, setMessages], ); const clearStepMaps = useCallback(() => { @@ -476,5 +473,17 @@ export default function useStepHandler({ messageMap.current.clear(); stepMap.current.clear(); }, []); - return { stepHandler, clearStepMaps }; + + /** + * Sync a message into the step handler's messageMap. + * Call this after receiving sync event to ensure subsequent deltas + * build on the synced content, not stale content. + */ + const syncStepMessage = useCallback((message: TMessage) => { + if (message?.messageId) { + messageMap.current.set(message.messageId, { ...message }); + } + }, []); + + return { stepHandler, clearStepMaps, syncStepMessage }; } diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index e6af5f8161..38a76e3625 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -1,18 +1,8 @@ import { EventEmitter } from 'events'; import { logger } from '@librechat/data-schemas'; import type { Agents } from 'librechat-data-provider'; -import type { ServerSentEvent } from '~/types'; -import type { - GenerationJob, - GenerationJobStatus, - ChunkHandler, - DoneHandler, - ErrorHandler, - UnsubscribeFn, - ContentPart, - ResumeState, - GenerationJobMetadata, -} from './types'; +import type { StandardGraph } from '@librechat/agents'; +import type * as t from '~/types'; /** * Manages generation jobs for resumable LLM streams. @@ -20,7 +10,7 @@ import type { * Clients can subscribe/unsubscribe to job events without affecting generation. */ class GenerationJobManagerClass { - private jobs = new Map(); + private jobs = new Map(); private cleanupInterval: NodeJS.Timeout | null = null; /** Time to keep completed jobs before cleanup (1 hour) */ private ttlAfterComplete = 3600000; @@ -53,7 +43,7 @@ class GenerationJobManagerClass { * @param conversationId - Optional conversation ID * @returns The created job */ - createJob(streamId: string, userId: string, conversationId?: string): GenerationJob { + createJob(streamId: string, userId: string, conversationId?: string): t.GenerationJob { if (this.jobs.size >= this.maxJobs) { this.evictOldest(); } @@ -63,7 +53,7 @@ class GenerationJobManagerClass { resolveReady = resolve; }); - const job: GenerationJob = { + const job: t.GenerationJob = { streamId, emitter: new EventEmitter(), status: 'running', @@ -73,8 +63,6 @@ class GenerationJobManagerClass { readyPromise, resolveReady: resolveReady!, chunks: [], - aggregatedContent: [], - runSteps: new Map(), }; job.emitter.setMaxListeners(100); @@ -90,7 +78,7 @@ class GenerationJobManagerClass { * @param streamId - The stream identifier * @returns The job if found, undefined otherwise */ - getJob(streamId: string): GenerationJob | undefined { + getJob(streamId: string): t.GenerationJob | undefined { return this.jobs.get(streamId); } @@ -101,7 +89,7 @@ class GenerationJobManagerClass { * @param conversationId - The conversation identifier * @returns The job if found, undefined otherwise */ - getJobByConversation(conversationId: string): GenerationJob | undefined { + getJobByConversation(conversationId: string): t.GenerationJob | undefined { const directMatch = this.jobs.get(conversationId); if (directMatch && directMatch.status === 'running') { return directMatch; @@ -130,7 +118,7 @@ class GenerationJobManagerClass { * @param streamId - The stream identifier * @returns The job status or undefined if not found */ - getJobStatus(streamId: string): GenerationJobStatus | undefined { + getJobStatus(streamId: string): t.GenerationJobStatus | undefined { return this.jobs.get(streamId)?.status; } @@ -197,7 +185,7 @@ class GenerationJobManagerClass { messageId: job.metadata.responseMessageId ?? `${userMessageId ?? 'aborted'}_`, parentMessageId: userMessageId, // Link response to user message conversationId: job.metadata.conversationId, - content: job.aggregatedContent ?? [], + content: job.contentPartsRef ?? [], sender: job.metadata.sender ?? 'AI', unfinished: true, /** Not an error - the job was intentionally aborted */ @@ -205,7 +193,7 @@ class GenerationJobManagerClass { isCreatedByUser: false, }, aborted: true, - } as unknown as ServerSentEvent; + } as unknown as t.ServerSentEvent; job.finalEvent = abortFinalEvent; job.emitter.emit('done', abortFinalEvent); @@ -227,42 +215,25 @@ class GenerationJobManagerClass { */ subscribe( streamId: string, - onChunk: ChunkHandler, - onDone?: DoneHandler, - onError?: ErrorHandler, - ): { unsubscribe: UnsubscribeFn } | null { + onChunk: t.ChunkHandler, + onDone?: t.DoneHandler, + onError?: t.ErrorHandler, + ): { unsubscribe: t.UnsubscribeFn } | null { const job = this.jobs.get(streamId); if (!job) { return null; } - // Replay buffered chunks (only chunks missed during disconnect) - const chunksToReplay = [...job.chunks]; - const replayCount = chunksToReplay.length; - - if (replayCount > 0) { - logger.debug( - `[GenerationJobManager] Replaying ${replayCount} buffered chunks for ${streamId}`, - ); - } - - // Clear buffer after capturing for replay - subscriber is now connected - job.chunks = []; - // Use setImmediate to allow the caller to set up their connection first setImmediate(() => { - for (const chunk of chunksToReplay) { - onChunk(chunk); - } - // If job is already complete, send the final event if (job.finalEvent && ['complete', 'error', 'aborted'].includes(job.status)) { onDone?.(job.finalEvent); } }); - const chunkHandler = (event: ServerSentEvent) => onChunk(event); - const doneHandler = (event: ServerSentEvent) => onDone?.(event); + const chunkHandler = (event: t.ServerSentEvent) => onChunk(event); + const doneHandler = (event: t.ServerSentEvent) => onDone?.(event); const errorHandler = (error: string) => onError?.(error); job.emitter.on('chunk', chunkHandler); @@ -282,10 +253,13 @@ class GenerationJobManagerClass { currentJob.emitter.off('done', doneHandler); currentJob.emitter.off('error', errorHandler); - // Emit event when last subscriber leaves (for saving partial response) + // When last subscriber leaves if (currentJob.emitter.listenerCount('chunk') === 0 && currentJob.status === 'running') { - currentJob.emitter.emit('allSubscribersLeft', currentJob.aggregatedContent); - logger.debug(`[GenerationJobManager] All subscribers left ${streamId}`); + // Reset syncSent so reconnecting clients get sync event again + currentJob.syncSent = false; + // Emit event for saving partial response - use graph's contentParts directly + currentJob.emitter.emit('allSubscribersLeft', currentJob.contentPartsRef ?? []); + logger.debug(`[GenerationJobManager] All subscribers left ${streamId}, reset syncSent`); } } }; @@ -300,53 +274,31 @@ class GenerationJobManagerClass { * @param streamId - The stream identifier * @param event - The event data to emit */ - emitChunk(streamId: string, event: ServerSentEvent): void { + emitChunk(streamId: string, event: t.ServerSentEvent): void { const job = this.jobs.get(streamId); if (!job || job.status !== 'running') { return; } - // Only buffer if no one is listening (for reconnect replay) - const hasSubscribers = job.emitter.listenerCount('chunk') > 0; - if (!hasSubscribers) { - job.chunks.push(event); - } - - // Track run steps for reconnection - this.trackRunStep(job, event); + // // Only buffer if no one is listening (for reconnect replay) + // const hasSubscribers = job.emitter.listenerCount('chunk') > 0; + // if (!hasSubscribers) { + // job.chunks.push(event); + // } // Track user message from created event this.trackUserMessage(job, event); - // Always aggregate content (for partial response saving) - this.aggregateContent(job, event); + // Run steps and content are tracked via graphRef and contentPartsRef + // No need to aggregate separately - these reference the graph's data directly job.emitter.emit('chunk', event); } - /** - * Track run step events for reconnection state. - * This allows reconnecting clients to rebuild their stepMap. - */ - private trackRunStep(job: GenerationJob, event: ServerSentEvent): void { - const data = event as Record; - if (data.event !== 'on_run_step') { - return; - } - - const runStep = data.data as Agents.RunStep; - if (!runStep?.id) { - return; - } - - job.runSteps.set(runStep.id, runStep); - logger.debug(`[GenerationJobManager] Tracked run step: ${runStep.id} for ${job.streamId}`); - } - /** * Track user message from created event for reconnection. */ - private trackUserMessage(job: GenerationJob, event: ServerSentEvent): void { + private trackUserMessage(job: t.GenerationJob, event: t.ServerSentEvent): void { const data = event as Record; if (!data.created || !data.message) { return; @@ -374,7 +326,7 @@ class GenerationJobManagerClass { * @param streamId - The stream identifier * @param metadata - Partial metadata to merge */ - updateMetadata(streamId: string, metadata: Partial): void { + updateMetadata(streamId: string, metadata: Partial): void { const job = this.jobs.get(streamId); if (!job) { return; @@ -383,21 +335,69 @@ class GenerationJobManagerClass { logger.debug(`[GenerationJobManager] Updated metadata for ${streamId}`); } + /** + * Set reference to the graph's contentParts array. + * This is the authoritative content source - no need to aggregate separately. + * @param streamId - The stream identifier + * @param contentParts - Reference to graph's contentParts array + */ + setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void { + const job = this.jobs.get(streamId); + if (!job) { + return; + } + job.contentPartsRef = contentParts; + logger.debug(`[GenerationJobManager] Set contentParts reference for ${streamId}`, { + initialLength: contentParts?.length ?? 0, + isArray: Array.isArray(contentParts), + }); + } + + /** + * Set reference to the graph instance. + * This provides access to run steps (contentData) - no need to track separately. + * @param streamId - The stream identifier + * @param graph - Reference to the graph instance (must have contentData property) + */ + setGraph(streamId: string, graph: StandardGraph): void { + const job = this.jobs.get(streamId); + if (!job) { + return; + } + job.graphRef = graph; + logger.debug(`[GenerationJobManager] Set graph reference for ${streamId}`); + } + /** * Get resume state for reconnecting clients. * Includes run steps, aggregated content, and user message data. * @param streamId - The stream identifier * @returns Resume state or null if job not found */ - getResumeState(streamId: string): ResumeState | null { + getResumeState(streamId: string): t.ResumeState | null { const job = this.jobs.get(streamId); if (!job) { return null; } + // Use graph's contentParts directly - it's always current and complete + // No conversion needed - send as-is + const aggregatedContent = job.contentPartsRef ?? []; + + // Use graph's contentData for run steps - it's the authoritative source + const runSteps = job.graphRef?.contentData ?? []; + + logger.debug(`[GenerationJobManager] getResumeState:`, { + streamId, + aggregatedContentLength: aggregatedContent.length, + runStepsLength: runSteps.length, + hasGraphRef: !!job.graphRef, + hasContentPartsRef: !!job.contentPartsRef, + }); + return { - runSteps: Array.from(job.runSteps.values()), - aggregatedContent: job.aggregatedContent, + runSteps, + aggregatedContent, userMessage: job.metadata.userMessage, responseMessageId: job.metadata.responseMessageId, conversationId: job.metadata.conversationId, @@ -423,41 +423,13 @@ class GenerationJobManagerClass { return this.jobs.get(streamId)?.syncSent ?? false; } - /** - * Aggregate content parts from message delta events. - * Used to save partial response when subscribers disconnect. - * Uses flat format: { type: 'text', text: 'content' } - */ - private aggregateContent(job: GenerationJob, event: ServerSentEvent): void { - // Check for on_message_delta events which contain content - const data = event as Record; - if (data.event === 'on_message_delta' && data.data) { - const eventData = data.data as Record; - const delta = eventData.delta as Record | undefined; - if (delta?.content && Array.isArray(delta.content)) { - for (const part of delta.content) { - if (part.type === 'text' && part.text) { - // Find or create text content part in flat format - let textPart = job.aggregatedContent?.find((p) => p.type === 'text'); - if (!textPart) { - textPart = { type: 'text', text: '' }; - job.aggregatedContent = job.aggregatedContent || []; - job.aggregatedContent.push(textPart); - } - textPart.text = (textPart.text || '') + part.text; - } - } - } - } - } - /** * Emit a done event to all subscribers. * Stores the final event for replay on reconnect. * @param streamId - The stream identifier * @param event - The final event data */ - emitDone(streamId: string, event: ServerSentEvent): void { + emitDone(streamId: string, event: t.ServerSentEvent): void { const job = this.jobs.get(streamId); if (!job) { return; @@ -508,7 +480,6 @@ class GenerationJobManagerClass { const job = this.jobs.get(streamId); if (job) { job.emitter.removeAllListeners(); - job.runSteps.clear(); this.jobs.delete(streamId); } } @@ -539,10 +510,10 @@ class GenerationJobManagerClass { */ getStreamInfo(streamId: string): { active: boolean; - status: GenerationJobStatus; + status: t.GenerationJobStatus; chunkCount: number; runStepCount: number; - aggregatedContent?: ContentPart[]; + aggregatedContent?: Agents.MessageContentComplex[]; createdAt: number; } | null { const job = this.jobs.get(streamId); @@ -554,8 +525,8 @@ class GenerationJobManagerClass { active: job.status === 'running', status: job.status, chunkCount: job.chunks.length, - runStepCount: job.runSteps.size, - aggregatedContent: job.aggregatedContent, + runStepCount: job.graphRef?.contentData?.length ?? 0, + aggregatedContent: job.contentPartsRef ?? [], createdAt: job.createdAt, }; } @@ -570,8 +541,8 @@ class GenerationJobManagerClass { /** * Get count of jobs by status. */ - getJobCountByStatus(): Record { - const counts: Record = { + getJobCountByStatus(): Record { + const counts: Record = { running: 0, complete: 0, error: 0, diff --git a/packages/api/src/stream/index.ts b/packages/api/src/stream/index.ts index ac7131e8ce..42db007151 100644 --- a/packages/api/src/stream/index.ts +++ b/packages/api/src/stream/index.ts @@ -1,2 +1 @@ export { GenerationJobManager, GenerationJobManagerClass } from './GenerationJobManager'; -export type * from './types'; diff --git a/packages/api/src/types/index.ts b/packages/api/src/types/index.ts index a874a09ff6..31adc3b9bb 100644 --- a/packages/api/src/types/index.ts +++ b/packages/api/src/types/index.ts @@ -13,3 +13,4 @@ export type * from './openai'; export * from './prompts'; export * from './run'; export * from './tokens'; +export * from './stream'; diff --git a/packages/api/src/stream/types.ts b/packages/api/src/types/stream.ts similarity index 83% rename from packages/api/src/stream/types.ts rename to packages/api/src/types/stream.ts index ac5e49087f..592ec40081 100644 --- a/packages/api/src/stream/types.ts +++ b/packages/api/src/types/stream.ts @@ -1,5 +1,6 @@ import type { EventEmitter } from 'events'; import type { Agents } from 'librechat-data-provider'; +import type { StandardGraph } from '@librechat/agents'; import type { ServerSentEvent } from '~/types'; export interface GenerationJobMetadata { @@ -30,10 +31,10 @@ export interface GenerationJob { chunks: ServerSentEvent[]; /** Final event when job completes */ finalEvent?: ServerSentEvent; - /** Aggregated content parts for saving partial response */ - aggregatedContent?: ContentPart[]; - /** Tracked run steps for reconnection - maps step ID to step data */ - runSteps: Map; + /** Reference to graph's contentParts - the authoritative content source */ + contentPartsRef?: Agents.MessageContentComplex[]; + /** Reference to the graph instance for accessing run steps (contentData) */ + graphRef?: StandardGraph; /** Flag to indicate if a sync event was already sent (prevent duplicate replays) */ syncSent?: boolean; } diff --git a/packages/data-provider/src/types/agents.ts b/packages/data-provider/src/types/agents.ts index 3c822cee8b..43ba6cfeb1 100644 --- a/packages/data-provider/src/types/agents.ts +++ b/packages/data-provider/src/types/agents.ts @@ -190,7 +190,8 @@ export namespace Agents { /** State data sent to reconnecting clients */ export interface ResumeState { runSteps: RunStep[]; - aggregatedContent?: ContentPart[]; + /** Aggregated content parts - can be MessageContentComplex[] or ContentPart[] */ + aggregatedContent?: MessageContentComplex[]; userMessage?: UserMessageMeta; responseMessageId?: string; conversationId?: string;