fix: improve syncing when switching conversations

This commit is contained in:
Danny Avila 2025-12-12 01:10:08 -05:00
parent 8018762f11
commit 1b2d3f30ef
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
14 changed files with 314 additions and 176 deletions

View file

@ -14,6 +14,7 @@ const {
getBalanceConfig, getBalanceConfig,
getProviderConfig, getProviderConfig,
memoryInstructions, memoryInstructions,
GenerationJobManager,
getTransactionsConfig, getTransactionsConfig,
createMemoryProcessor, createMemoryProcessor,
filterMalformedContentParts, filterMalformedContentParts,
@ -953,6 +954,12 @@ class AgentClient extends BaseClient {
} }
this.run = run; this.run = run;
const streamId = this.options.req?._resumableStreamId;
if (streamId && run.Graph) {
GenerationJobManager.setGraph(streamId, run.Graph);
}
if (userMCPAuthMap != null) { if (userMCPAuthMap != null) {
config.configurable.userMCPAuthMap = userMCPAuthMap; config.configurable.userMCPAuthMap = userMCPAuthMap;
} }

View file

@ -144,6 +144,11 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
GenerationJobManager.updateMetadata(streamId, { sender: client.sender }); GenerationJobManager.updateMetadata(streamId, { sender: client.sender });
} }
// Store reference to client's contentParts - graph will be set when run is created
if (client?.contentParts) {
GenerationJobManager.setContentParts(streamId, client.contentParts);
}
res.json({ streamId, status: 'started' }); res.json({ streamId, status: 'started' });
let conversationId = reqConversationId; let conversationId = reqConversationId;

View file

@ -56,17 +56,20 @@ router.get('/chat/stream/:streamId', (req, res) => {
logger.debug(`[AgentStream] Client subscribed to ${streamId}, resume: ${isResume}`); logger.debug(`[AgentStream] Client subscribed to ${streamId}, resume: ${isResume}`);
// Send sync event with resume state for reconnecting clients // Send sync event with resume state for ALL reconnecting clients
if (isResume && !GenerationJobManager.wasSyncSent(streamId)) { // This supports multi-tab scenarios where each tab needs run step data
if (isResume) {
const resumeState = GenerationJobManager.getResumeState(streamId); const resumeState = GenerationJobManager.getResumeState(streamId);
if (resumeState && !res.writableEnded) { if (resumeState && !res.writableEnded) {
// Send sync event with run steps AND aggregatedContent
// Client will use aggregatedContent to initialize message state
res.write(`event: message\ndata: ${JSON.stringify({ sync: true, resumeState })}\n\n`); res.write(`event: message\ndata: ${JSON.stringify({ sync: true, resumeState })}\n\n`);
if (typeof res.flush === 'function') { if (typeof res.flush === 'function') {
res.flush(); res.flush();
} }
GenerationJobManager.markSyncSent(streamId); const textPart = resumeState.aggregatedContent?.find((p) => p.type === 'text');
logger.debug( logger.debug(
`[AgentStream] Sent sync event for ${streamId} with ${resumeState.runSteps.length} run steps`, `[AgentStream] Sent sync event for ${streamId} with ${resumeState.runSteps.length} run steps, content length: ${textPart?.text?.length ?? 0}`,
); );
} }
} }

View file

@ -55,7 +55,8 @@ function ChatView({ index = 0 }: { index?: number }) {
useAdaptiveSSE(addedSubmission, addedChatHelpers, true, index + 1); useAdaptiveSSE(addedSubmission, addedChatHelpers, true, index + 1);
// Auto-resume if navigating back to conversation with active job // Auto-resume if navigating back to conversation with active job
useResumeOnLoad(conversationId, chatHelpers.getMessages, index); // Wait for messages to load before resuming to avoid race condition
useResumeOnLoad(conversationId, chatHelpers.getMessages, index, !isLoading);
const methods = useForm<ChatFormValues>({ const methods = useForm<ChatFormValues>({
defaultValues: { text: '' }, defaultValues: { text: '' },

View file

@ -27,7 +27,13 @@ type TContentHandler = {
export default function useContentHandler({ setMessages, getMessages }: TUseContentHandler) { export default function useContentHandler({ setMessages, getMessages }: TUseContentHandler) {
const queryClient = useQueryClient(); const queryClient = useQueryClient();
const messageMap = useMemo(() => new Map<string, TMessage>(), []); const messageMap = useMemo(() => new Map<string, TMessage>(), []);
return useCallback(
/** Reset the message map - call this after sync to prevent stale state from overwriting synced content */
const resetMessageMap = useCallback(() => {
messageMap.clear();
}, [messageMap]);
const handler = useCallback(
({ data, submission }: TContentHandler) => { ({ data, submission }: TContentHandler) => {
const { type, messageId, thread_id, conversationId, index } = data; const { type, messageId, thread_id, conversationId, index } = data;
@ -41,8 +47,11 @@ export default function useContentHandler({ setMessages, getMessages }: TUseCont
let response = messageMap.get(messageId); let response = messageMap.get(messageId);
if (!response) { if (!response) {
// Check if message already exists in current messages (e.g., after sync)
// Use that as base instead of stale initialResponse
const existingMessage = _messages?.find((m) => m.messageId === messageId);
response = { response = {
...(initialResponse as TMessage), ...(existingMessage ?? (initialResponse as TMessage)),
parentMessageId: userMessage?.messageId ?? '', parentMessageId: userMessage?.messageId ?? '',
conversationId, conversationId,
messageId, messageId,
@ -82,4 +91,6 @@ export default function useContentHandler({ setMessages, getMessages }: TUseCont
}, },
[queryClient, getMessages, messageMap, setMessages], [queryClient, getMessages, messageMap, setMessages],
); );
return { contentHandler: handler, resetContentHandler: resetMessageMap };
} }

View file

@ -189,8 +189,8 @@ export default function useEventHandlers({
const { conversationId: paramId } = useParams(); const { conversationId: paramId } = useParams();
const { token } = useAuthContext(); const { token } = useAuthContext();
const contentHandler = useContentHandler({ setMessages, getMessages }); const { contentHandler, resetContentHandler } = useContentHandler({ setMessages, getMessages });
const { stepHandler, clearStepMaps } = useStepHandler({ const { stepHandler, clearStepMaps, syncStepMessage } = useStepHandler({
setMessages, setMessages,
getMessages, getMessages,
announcePolite, announcePolite,
@ -827,15 +827,17 @@ export default function useEventHandlers({
); );
return { return {
clearStepMaps,
stepHandler, stepHandler,
syncHandler, syncHandler,
finalHandler, finalHandler,
errorHandler, errorHandler,
clearStepMaps,
messageHandler, messageHandler,
contentHandler, contentHandler,
createdHandler, createdHandler,
syncStepMessage,
attachmentHandler, attachmentHandler,
abortConversation, abortConversation,
resetContentHandler,
}; };
} }

View file

@ -58,7 +58,7 @@ export default function useResumableSSE(
const setActiveRunId = useSetRecoilState(store.activeRunFamily(runIndex)); const setActiveRunId = useSetRecoilState(store.activeRunFamily(runIndex));
const { token, isAuthenticated } = useAuthContext(); const { token, isAuthenticated } = useAuthContext();
const [completed, setCompleted] = useState(new Set()); const [_completed, setCompleted] = useState(new Set());
const [streamId, setStreamId] = useState<string | null>(null); const [streamId, setStreamId] = useState<string | null>(null);
const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex)); const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex));
const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex)); const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex));
@ -78,15 +78,16 @@ export default function useResumableSSE(
} = chatHelpers; } = chatHelpers;
const { const {
clearStepMaps,
stepHandler, stepHandler,
syncHandler,
finalHandler, finalHandler,
errorHandler, errorHandler,
clearStepMaps,
messageHandler, messageHandler,
contentHandler, contentHandler,
createdHandler, createdHandler,
syncStepMessage,
attachmentHandler, attachmentHandler,
resetContentHandler,
} = useEventHandlers({ } = useEventHandlers({
genTitle, genTitle,
setMessages, setMessages,
@ -108,14 +109,16 @@ export default function useResumableSSE(
/** /**
* Subscribe to stream via SSE library (supports custom headers) * Subscribe to stream via SSE library (supports custom headers)
* Follows same auth pattern as useSSE * Follows same auth pattern as useSSE
* @param isResume - If true, adds ?resume=true to trigger sync event from server
*/ */
const subscribeToStream = useCallback( const subscribeToStream = useCallback(
(currentStreamId: string, currentSubmission: TSubmission) => { (currentStreamId: string, currentSubmission: TSubmission, isResume = false) => {
let { userMessage } = currentSubmission; let { userMessage } = currentSubmission;
let textIndex: number | null = null; let textIndex: number | null = null;
const url = `/api/agents/chat/stream/${encodeURIComponent(currentStreamId)}`; const baseUrl = `/api/agents/chat/stream/${encodeURIComponent(currentStreamId)}`;
console.log('[ResumableSSE] Subscribing to stream:', url); const url = isResume ? `${baseUrl}?resume=true` : baseUrl;
console.log('[ResumableSSE] Subscribing to stream:', url, { isResume });
const sse = new SSE(url, { const sse = new SSE(url, {
headers: { Authorization: `Bearer ${token}` }, headers: { Authorization: `Bearer ${token}` },
@ -184,13 +187,98 @@ export default function useResumableSSE(
} }
if (data.sync != null) { if (data.sync != null) {
console.log('[ResumableSSE] Received SYNC event', { const textPart = data.resumeState?.aggregatedContent?.find(
conversationId: data.conversationId, (p: { type: string }) => p.type === 'text',
hasResumeState: !!data.resumeState, );
console.log('[ResumableSSE] SYNC received', {
runSteps: data.resumeState?.runSteps?.length ?? 0,
contentLength: textPart?.text?.length ?? 0,
}); });
const runId = v4(); const runId = v4();
setActiveRunId(runId); setActiveRunId(runId);
syncHandler(data, { ...currentSubmission, userMessage } as EventSubmission);
// Replay run steps
if (data.resumeState?.runSteps) {
for (const runStep of data.resumeState.runSteps) {
stepHandler({ event: 'on_run_step', data: runStep }, {
...currentSubmission,
userMessage,
} as EventSubmission);
}
}
// Set message content from aggregatedContent
if (data.resumeState?.aggregatedContent && userMessage?.messageId) {
const messages = getMessages() ?? [];
const userMsgId = userMessage.messageId;
const serverResponseId = data.resumeState.responseMessageId;
// Find the EXACT response message - prioritize responseMessageId from server
// This is critical when there are multiple responses to the same user message
let responseIdx = -1;
if (serverResponseId) {
responseIdx = messages.findIndex((m) => m.messageId === serverResponseId);
}
// Fallback: find by parentMessageId pattern (for new messages)
if (responseIdx < 0) {
responseIdx = messages.findIndex(
(m) =>
!m.isCreatedByUser &&
(m.messageId === `${userMsgId}_` || m.parentMessageId === userMsgId),
);
}
const textPart = data.resumeState.aggregatedContent?.find(
(p: { type: string }) => p.type === 'text',
);
console.log('[ResumableSSE] SYNC update', {
userMsgId,
serverResponseId,
responseIdx,
foundMessageId: responseIdx >= 0 ? messages[responseIdx]?.messageId : null,
messagesCount: messages.length,
aggregatedContentLength: data.resumeState.aggregatedContent?.length,
textContentLength: textPart?.text?.length ?? 0,
});
if (responseIdx >= 0) {
// Update existing response message with aggregatedContent
const updated = [...messages];
const oldContent = updated[responseIdx]?.content;
updated[responseIdx] = {
...updated[responseIdx],
content: data.resumeState.aggregatedContent,
};
console.log('[ResumableSSE] SYNC updating message', {
messageId: updated[responseIdx]?.messageId,
oldContentLength: Array.isArray(oldContent) ? oldContent.length : 0,
newContentLength: data.resumeState.aggregatedContent?.length,
});
setMessages(updated);
// Sync both content handler and step handler with the updated message
// so subsequent deltas build on synced content, not stale content
resetContentHandler();
syncStepMessage(updated[responseIdx]);
console.log('[ResumableSSE] SYNC complete, handlers synced');
} else {
// Add new response message
const responseId = serverResponseId ?? `${userMsgId}_`;
setMessages([
...messages,
{
messageId: responseId,
parentMessageId: userMsgId,
conversationId: currentSubmission.conversation?.conversationId ?? '',
text: '',
content: data.resumeState.aggregatedContent,
isCreatedByUser: false,
} as TMessage,
]);
}
}
setShowStopButton(true);
return; return;
} }
@ -278,11 +366,14 @@ export default function useResumableSSE(
createdHandler, createdHandler,
attachmentHandler, attachmentHandler,
stepHandler, stepHandler,
syncHandler,
contentHandler, contentHandler,
resetContentHandler,
syncStepMessage,
messageHandler, messageHandler,
errorHandler, errorHandler,
setIsSubmitting, setIsSubmitting,
getMessages,
setMessages,
startupConfig?.balance?.enabled, startupConfig?.balance?.enabled,
balanceQuery, balanceQuery,
], ],
@ -356,7 +447,7 @@ export default function useResumableSSE(
// Resume: just subscribe to existing stream, don't start new generation // Resume: just subscribe to existing stream, don't start new generation
console.log('[ResumableSSE] Resuming existing stream:', resumeStreamId); console.log('[ResumableSSE] Resuming existing stream:', resumeStreamId);
setStreamId(resumeStreamId); setStreamId(resumeStreamId);
subscribeToStream(resumeStreamId, submission); subscribeToStream(resumeStreamId, submission, true); // isResume=true
} else { } else {
// New generation: start and then subscribe // New generation: start and then subscribe
console.log('[ResumableSSE] Starting NEW generation'); console.log('[ResumableSSE] Starting NEW generation');

View file

@ -51,18 +51,20 @@ function buildSubmissionFromResumeState(
isCreatedByUser: true, isCreatedByUser: true,
} as TMessage))); } as TMessage)));
// Use existing response from DB if available (preserves already-saved content) // ALWAYS use aggregatedContent from resumeState - it has the latest content from the running job.
const initialResponse: TMessage = // DB content may be stale (saved at disconnect, but generation continued).
existingResponseMessage ?? const initialResponse: TMessage = {
({ messageId: existingResponseMessage?.messageId ?? responseMessageId,
messageId: responseMessageId, parentMessageId: existingResponseMessage?.parentMessageId ?? userMessage.messageId,
parentMessageId: userMessage.messageId, conversationId,
conversationId, text: '',
text: '', // aggregatedContent is authoritative - it reflects actual job state
content: (resumeState.aggregatedContent as TMessage['content']) ?? [], content: (resumeState.aggregatedContent as TMessage['content']) ?? [],
isCreatedByUser: false, isCreatedByUser: false,
role: 'assistant', role: 'assistant',
} as TMessage); sender: existingResponseMessage?.sender,
model: existingResponseMessage?.model,
} as TMessage;
const conversation: TConversation = { const conversation: TConversation = {
conversationId, conversationId,
@ -91,11 +93,14 @@ function buildSubmissionFromResumeState(
* 1. Uses useStreamStatus to check for active jobs on navigation * 1. Uses useStreamStatus to check for active jobs on navigation
* 2. If active job found, builds a submission with streamId and sets it * 2. If active job found, builds a submission with streamId and sets it
* 3. useResumableSSE picks up the submission and subscribes to the stream * 3. useResumableSSE picks up the submission and subscribes to the stream
*
* @param messagesLoaded - Whether the messages query has finished loading (prevents race condition)
*/ */
export default function useResumeOnLoad( export default function useResumeOnLoad(
conversationId: string | undefined, conversationId: string | undefined,
getMessages: () => TMessage[] | undefined, getMessages: () => TMessage[] | undefined,
runIndex = 0, runIndex = 0,
messagesLoaded = true,
) { ) {
const resumableEnabled = useRecoilValue(store.resumableStreams); const resumableEnabled = useRecoilValue(store.resumableStreams);
const setSubmission = useSetRecoilState(store.submissionByIndex(runIndex)); const setSubmission = useSetRecoilState(store.submissionByIndex(runIndex));
@ -104,10 +109,14 @@ export default function useResumeOnLoad(
const processedConvoRef = useRef<string | null>(null); const processedConvoRef = useRef<string | null>(null);
// Check for active stream when conversation changes // Check for active stream when conversation changes
// Only check if resumable is enabled and no active submission // Allow check if no submission OR submission is for a different conversation (stale)
const submissionConvoId = currentSubmission?.conversation?.conversationId;
const hasActiveSubmissionForThisConvo = currentSubmission && submissionConvoId === conversationId;
const shouldCheck = const shouldCheck =
resumableEnabled && resumableEnabled &&
!currentSubmission && messagesLoaded && // Wait for messages to load before checking
!hasActiveSubmissionForThisConvo && // Allow if no submission or stale submission
!!conversationId && !!conversationId &&
conversationId !== Constants.NEW_CONVO && conversationId !== Constants.NEW_CONVO &&
processedConvoRef.current !== conversationId; // Don't re-check processed convos processedConvoRef.current !== conversationId; // Don't re-check processed convos
@ -118,6 +127,7 @@ export default function useResumeOnLoad(
console.log('[ResumeOnLoad] Effect check', { console.log('[ResumeOnLoad] Effect check', {
resumableEnabled, resumableEnabled,
conversationId, conversationId,
messagesLoaded,
hasCurrentSubmission: !!currentSubmission, hasCurrentSubmission: !!currentSubmission,
currentSubmissionConvoId: currentSubmission?.conversation?.conversationId, currentSubmissionConvoId: currentSubmission?.conversation?.conversationId,
isSuccess, isSuccess,
@ -131,14 +141,32 @@ export default function useResumeOnLoad(
return; return;
} }
// Don't resume if we already have an active submission (we started it ourselves) // Wait for messages to load to avoid race condition where sync overwrites then DB overwrites
if (currentSubmission) { if (!messagesLoaded) {
console.log('[ResumeOnLoad] Skipping - already have active submission, marking as processed'); console.log('[ResumeOnLoad] Waiting for messages to load');
return;
}
// Don't resume if we already have an active submission FOR THIS CONVERSATION
// A stale submission with undefined/different conversationId should not block us
if (hasActiveSubmissionForThisConvo) {
console.log('[ResumeOnLoad] Skipping - already have active submission for this conversation');
// Mark as processed so we don't try again // Mark as processed so we don't try again
processedConvoRef.current = conversationId; processedConvoRef.current = conversationId;
return; return;
} }
// If there's a stale submission for a different conversation, log it but continue
if (currentSubmission && submissionConvoId !== conversationId) {
console.log(
'[ResumeOnLoad] Found stale submission for different conversation, will check for resume',
{
staleConvoId: submissionConvoId,
currentConvoId: conversationId,
},
);
}
// Wait for stream status query to complete // Wait for stream status query to complete
if (!isSuccess || !streamStatus) { if (!isSuccess || !streamStatus) {
console.log('[ResumeOnLoad] Waiting for stream status query'); console.log('[ResumeOnLoad] Waiting for stream status query');
@ -151,15 +179,17 @@ export default function useResumeOnLoad(
return; return;
} }
// Mark as processed immediately to prevent race conditions
processedConvoRef.current = conversationId;
// Check if there's an active job to resume // Check if there's an active job to resume
// DON'T mark as processed here - only mark when we actually create a submission
// This prevents stale cache data from blocking subsequent resume attempts
if (!streamStatus.active || !streamStatus.streamId) { if (!streamStatus.active || !streamStatus.streamId) {
console.log('[ResumeOnLoad] No active job to resume for:', conversationId); console.log('[ResumeOnLoad] No active job to resume for:', conversationId);
return; return;
} }
// Mark as processed NOW - we verified there's an active job and will create submission
processedConvoRef.current = conversationId;
console.log('[ResumeOnLoad] Found active job, creating submission...', { console.log('[ResumeOnLoad] Found active job, creating submission...', {
streamId: streamStatus.streamId, streamId: streamStatus.streamId,
status: streamStatus.status, status: streamStatus.status,
@ -202,6 +232,9 @@ export default function useResumeOnLoad(
}, [ }, [
conversationId, conversationId,
resumableEnabled, resumableEnabled,
messagesLoaded,
hasActiveSubmissionForThisConvo,
submissionConvoId,
currentSubmission, currentSubmission,
isSuccess, isSuccess,
streamStatus, streamStatus,
@ -209,11 +242,14 @@ export default function useResumeOnLoad(
setSubmission, setSubmission,
]); ]);
// Reset processedConvoRef when conversation changes to a different one // Reset processedConvoRef when conversation changes to allow re-checking
useEffect(() => { useEffect(() => {
if (conversationId && conversationId !== processedConvoRef.current) { // Always reset when conversation changes - this allows resuming when navigating back
// Only reset if we're navigating to a DIFFERENT conversation if (conversationId !== processedConvoRef.current) {
// This allows re-checking when navigating back console.log('[ResumeOnLoad] Resetting processedConvoRef for new conversation:', {
old: processedConvoRef.current,
new: conversationId,
});
processedConvoRef.current = null; processedConvoRef.current = null;
} }
}, [conversationId]); }, [conversationId]);

View file

@ -51,12 +51,9 @@ type AllContentTypes =
| ContentTypes.IMAGE_URL | ContentTypes.IMAGE_URL
| ContentTypes.ERROR; | ContentTypes.ERROR;
const noop = () => {};
export default function useStepHandler({ export default function useStepHandler({
setMessages, setMessages,
getMessages, getMessages,
setIsSubmitting = noop,
announcePolite, announcePolite,
lastAnnouncementTimeRef, lastAnnouncementTimeRef,
}: TUseStepHandler) { }: TUseStepHandler) {
@ -468,7 +465,7 @@ export default function useStepHandler({
stepMap.current.clear(); stepMap.current.clear();
}; };
}, },
[getMessages, setIsSubmitting, lastAnnouncementTimeRef, announcePolite, setMessages], [getMessages, lastAnnouncementTimeRef, announcePolite, setMessages],
); );
const clearStepMaps = useCallback(() => { const clearStepMaps = useCallback(() => {
@ -476,5 +473,17 @@ export default function useStepHandler({
messageMap.current.clear(); messageMap.current.clear();
stepMap.current.clear(); stepMap.current.clear();
}, []); }, []);
return { stepHandler, clearStepMaps };
/**
* Sync a message into the step handler's messageMap.
* Call this after receiving sync event to ensure subsequent deltas
* build on the synced content, not stale content.
*/
const syncStepMessage = useCallback((message: TMessage) => {
if (message?.messageId) {
messageMap.current.set(message.messageId, { ...message });
}
}, []);
return { stepHandler, clearStepMaps, syncStepMessage };
} }

View file

@ -1,18 +1,8 @@
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import { logger } from '@librechat/data-schemas'; import { logger } from '@librechat/data-schemas';
import type { Agents } from 'librechat-data-provider'; import type { Agents } from 'librechat-data-provider';
import type { ServerSentEvent } from '~/types'; import type { StandardGraph } from '@librechat/agents';
import type { import type * as t from '~/types';
GenerationJob,
GenerationJobStatus,
ChunkHandler,
DoneHandler,
ErrorHandler,
UnsubscribeFn,
ContentPart,
ResumeState,
GenerationJobMetadata,
} from './types';
/** /**
* Manages generation jobs for resumable LLM streams. * Manages generation jobs for resumable LLM streams.
@ -20,7 +10,7 @@ import type {
* Clients can subscribe/unsubscribe to job events without affecting generation. * Clients can subscribe/unsubscribe to job events without affecting generation.
*/ */
class GenerationJobManagerClass { class GenerationJobManagerClass {
private jobs = new Map<string, GenerationJob>(); private jobs = new Map<string, t.GenerationJob>();
private cleanupInterval: NodeJS.Timeout | null = null; private cleanupInterval: NodeJS.Timeout | null = null;
/** Time to keep completed jobs before cleanup (1 hour) */ /** Time to keep completed jobs before cleanup (1 hour) */
private ttlAfterComplete = 3600000; private ttlAfterComplete = 3600000;
@ -53,7 +43,7 @@ class GenerationJobManagerClass {
* @param conversationId - Optional conversation ID * @param conversationId - Optional conversation ID
* @returns The created job * @returns The created job
*/ */
createJob(streamId: string, userId: string, conversationId?: string): GenerationJob { createJob(streamId: string, userId: string, conversationId?: string): t.GenerationJob {
if (this.jobs.size >= this.maxJobs) { if (this.jobs.size >= this.maxJobs) {
this.evictOldest(); this.evictOldest();
} }
@ -63,7 +53,7 @@ class GenerationJobManagerClass {
resolveReady = resolve; resolveReady = resolve;
}); });
const job: GenerationJob = { const job: t.GenerationJob = {
streamId, streamId,
emitter: new EventEmitter(), emitter: new EventEmitter(),
status: 'running', status: 'running',
@ -73,8 +63,6 @@ class GenerationJobManagerClass {
readyPromise, readyPromise,
resolveReady: resolveReady!, resolveReady: resolveReady!,
chunks: [], chunks: [],
aggregatedContent: [],
runSteps: new Map(),
}; };
job.emitter.setMaxListeners(100); job.emitter.setMaxListeners(100);
@ -90,7 +78,7 @@ class GenerationJobManagerClass {
* @param streamId - The stream identifier * @param streamId - The stream identifier
* @returns The job if found, undefined otherwise * @returns The job if found, undefined otherwise
*/ */
getJob(streamId: string): GenerationJob | undefined { getJob(streamId: string): t.GenerationJob | undefined {
return this.jobs.get(streamId); return this.jobs.get(streamId);
} }
@ -101,7 +89,7 @@ class GenerationJobManagerClass {
* @param conversationId - The conversation identifier * @param conversationId - The conversation identifier
* @returns The job if found, undefined otherwise * @returns The job if found, undefined otherwise
*/ */
getJobByConversation(conversationId: string): GenerationJob | undefined { getJobByConversation(conversationId: string): t.GenerationJob | undefined {
const directMatch = this.jobs.get(conversationId); const directMatch = this.jobs.get(conversationId);
if (directMatch && directMatch.status === 'running') { if (directMatch && directMatch.status === 'running') {
return directMatch; return directMatch;
@ -130,7 +118,7 @@ class GenerationJobManagerClass {
* @param streamId - The stream identifier * @param streamId - The stream identifier
* @returns The job status or undefined if not found * @returns The job status or undefined if not found
*/ */
getJobStatus(streamId: string): GenerationJobStatus | undefined { getJobStatus(streamId: string): t.GenerationJobStatus | undefined {
return this.jobs.get(streamId)?.status; return this.jobs.get(streamId)?.status;
} }
@ -197,7 +185,7 @@ class GenerationJobManagerClass {
messageId: job.metadata.responseMessageId ?? `${userMessageId ?? 'aborted'}_`, messageId: job.metadata.responseMessageId ?? `${userMessageId ?? 'aborted'}_`,
parentMessageId: userMessageId, // Link response to user message parentMessageId: userMessageId, // Link response to user message
conversationId: job.metadata.conversationId, conversationId: job.metadata.conversationId,
content: job.aggregatedContent ?? [], content: job.contentPartsRef ?? [],
sender: job.metadata.sender ?? 'AI', sender: job.metadata.sender ?? 'AI',
unfinished: true, unfinished: true,
/** Not an error - the job was intentionally aborted */ /** Not an error - the job was intentionally aborted */
@ -205,7 +193,7 @@ class GenerationJobManagerClass {
isCreatedByUser: false, isCreatedByUser: false,
}, },
aborted: true, aborted: true,
} as unknown as ServerSentEvent; } as unknown as t.ServerSentEvent;
job.finalEvent = abortFinalEvent; job.finalEvent = abortFinalEvent;
job.emitter.emit('done', abortFinalEvent); job.emitter.emit('done', abortFinalEvent);
@ -227,42 +215,25 @@ class GenerationJobManagerClass {
*/ */
subscribe( subscribe(
streamId: string, streamId: string,
onChunk: ChunkHandler, onChunk: t.ChunkHandler,
onDone?: DoneHandler, onDone?: t.DoneHandler,
onError?: ErrorHandler, onError?: t.ErrorHandler,
): { unsubscribe: UnsubscribeFn } | null { ): { unsubscribe: t.UnsubscribeFn } | null {
const job = this.jobs.get(streamId); const job = this.jobs.get(streamId);
if (!job) { if (!job) {
return null; return null;
} }
// Replay buffered chunks (only chunks missed during disconnect)
const chunksToReplay = [...job.chunks];
const replayCount = chunksToReplay.length;
if (replayCount > 0) {
logger.debug(
`[GenerationJobManager] Replaying ${replayCount} buffered chunks for ${streamId}`,
);
}
// Clear buffer after capturing for replay - subscriber is now connected
job.chunks = [];
// Use setImmediate to allow the caller to set up their connection first // Use setImmediate to allow the caller to set up their connection first
setImmediate(() => { setImmediate(() => {
for (const chunk of chunksToReplay) {
onChunk(chunk);
}
// If job is already complete, send the final event // If job is already complete, send the final event
if (job.finalEvent && ['complete', 'error', 'aborted'].includes(job.status)) { if (job.finalEvent && ['complete', 'error', 'aborted'].includes(job.status)) {
onDone?.(job.finalEvent); onDone?.(job.finalEvent);
} }
}); });
const chunkHandler = (event: ServerSentEvent) => onChunk(event); const chunkHandler = (event: t.ServerSentEvent) => onChunk(event);
const doneHandler = (event: ServerSentEvent) => onDone?.(event); const doneHandler = (event: t.ServerSentEvent) => onDone?.(event);
const errorHandler = (error: string) => onError?.(error); const errorHandler = (error: string) => onError?.(error);
job.emitter.on('chunk', chunkHandler); job.emitter.on('chunk', chunkHandler);
@ -282,10 +253,13 @@ class GenerationJobManagerClass {
currentJob.emitter.off('done', doneHandler); currentJob.emitter.off('done', doneHandler);
currentJob.emitter.off('error', errorHandler); currentJob.emitter.off('error', errorHandler);
// Emit event when last subscriber leaves (for saving partial response) // When last subscriber leaves
if (currentJob.emitter.listenerCount('chunk') === 0 && currentJob.status === 'running') { if (currentJob.emitter.listenerCount('chunk') === 0 && currentJob.status === 'running') {
currentJob.emitter.emit('allSubscribersLeft', currentJob.aggregatedContent); // Reset syncSent so reconnecting clients get sync event again
logger.debug(`[GenerationJobManager] All subscribers left ${streamId}`); currentJob.syncSent = false;
// Emit event for saving partial response - use graph's contentParts directly
currentJob.emitter.emit('allSubscribersLeft', currentJob.contentPartsRef ?? []);
logger.debug(`[GenerationJobManager] All subscribers left ${streamId}, reset syncSent`);
} }
} }
}; };
@ -300,53 +274,31 @@ class GenerationJobManagerClass {
* @param streamId - The stream identifier * @param streamId - The stream identifier
* @param event - The event data to emit * @param event - The event data to emit
*/ */
emitChunk(streamId: string, event: ServerSentEvent): void { emitChunk(streamId: string, event: t.ServerSentEvent): void {
const job = this.jobs.get(streamId); const job = this.jobs.get(streamId);
if (!job || job.status !== 'running') { if (!job || job.status !== 'running') {
return; return;
} }
// Only buffer if no one is listening (for reconnect replay) // // Only buffer if no one is listening (for reconnect replay)
const hasSubscribers = job.emitter.listenerCount('chunk') > 0; // const hasSubscribers = job.emitter.listenerCount('chunk') > 0;
if (!hasSubscribers) { // if (!hasSubscribers) {
job.chunks.push(event); // job.chunks.push(event);
} // }
// Track run steps for reconnection
this.trackRunStep(job, event);
// Track user message from created event // Track user message from created event
this.trackUserMessage(job, event); this.trackUserMessage(job, event);
// Always aggregate content (for partial response saving) // Run steps and content are tracked via graphRef and contentPartsRef
this.aggregateContent(job, event); // No need to aggregate separately - these reference the graph's data directly
job.emitter.emit('chunk', event); job.emitter.emit('chunk', event);
} }
/**
* Track run step events for reconnection state.
* This allows reconnecting clients to rebuild their stepMap.
*/
private trackRunStep(job: GenerationJob, event: ServerSentEvent): void {
const data = event as Record<string, unknown>;
if (data.event !== 'on_run_step') {
return;
}
const runStep = data.data as Agents.RunStep;
if (!runStep?.id) {
return;
}
job.runSteps.set(runStep.id, runStep);
logger.debug(`[GenerationJobManager] Tracked run step: ${runStep.id} for ${job.streamId}`);
}
/** /**
* Track user message from created event for reconnection. * Track user message from created event for reconnection.
*/ */
private trackUserMessage(job: GenerationJob, event: ServerSentEvent): void { private trackUserMessage(job: t.GenerationJob, event: t.ServerSentEvent): void {
const data = event as Record<string, unknown>; const data = event as Record<string, unknown>;
if (!data.created || !data.message) { if (!data.created || !data.message) {
return; return;
@ -374,7 +326,7 @@ class GenerationJobManagerClass {
* @param streamId - The stream identifier * @param streamId - The stream identifier
* @param metadata - Partial metadata to merge * @param metadata - Partial metadata to merge
*/ */
updateMetadata(streamId: string, metadata: Partial<GenerationJobMetadata>): void { updateMetadata(streamId: string, metadata: Partial<t.GenerationJobMetadata>): void {
const job = this.jobs.get(streamId); const job = this.jobs.get(streamId);
if (!job) { if (!job) {
return; return;
@ -383,21 +335,69 @@ class GenerationJobManagerClass {
logger.debug(`[GenerationJobManager] Updated metadata for ${streamId}`); logger.debug(`[GenerationJobManager] Updated metadata for ${streamId}`);
} }
/**
* Set reference to the graph's contentParts array.
* This is the authoritative content source - no need to aggregate separately.
* @param streamId - The stream identifier
* @param contentParts - Reference to graph's contentParts array
*/
setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void {
const job = this.jobs.get(streamId);
if (!job) {
return;
}
job.contentPartsRef = contentParts;
logger.debug(`[GenerationJobManager] Set contentParts reference for ${streamId}`, {
initialLength: contentParts?.length ?? 0,
isArray: Array.isArray(contentParts),
});
}
/**
* Set reference to the graph instance.
* This provides access to run steps (contentData) - no need to track separately.
* @param streamId - The stream identifier
* @param graph - Reference to the graph instance (must have contentData property)
*/
setGraph(streamId: string, graph: StandardGraph): void {
const job = this.jobs.get(streamId);
if (!job) {
return;
}
job.graphRef = graph;
logger.debug(`[GenerationJobManager] Set graph reference for ${streamId}`);
}
/** /**
* Get resume state for reconnecting clients. * Get resume state for reconnecting clients.
* Includes run steps, aggregated content, and user message data. * Includes run steps, aggregated content, and user message data.
* @param streamId - The stream identifier * @param streamId - The stream identifier
* @returns Resume state or null if job not found * @returns Resume state or null if job not found
*/ */
getResumeState(streamId: string): ResumeState | null { getResumeState(streamId: string): t.ResumeState | null {
const job = this.jobs.get(streamId); const job = this.jobs.get(streamId);
if (!job) { if (!job) {
return null; return null;
} }
// Use graph's contentParts directly - it's always current and complete
// No conversion needed - send as-is
const aggregatedContent = job.contentPartsRef ?? [];
// Use graph's contentData for run steps - it's the authoritative source
const runSteps = job.graphRef?.contentData ?? [];
logger.debug(`[GenerationJobManager] getResumeState:`, {
streamId,
aggregatedContentLength: aggregatedContent.length,
runStepsLength: runSteps.length,
hasGraphRef: !!job.graphRef,
hasContentPartsRef: !!job.contentPartsRef,
});
return { return {
runSteps: Array.from(job.runSteps.values()), runSteps,
aggregatedContent: job.aggregatedContent, aggregatedContent,
userMessage: job.metadata.userMessage, userMessage: job.metadata.userMessage,
responseMessageId: job.metadata.responseMessageId, responseMessageId: job.metadata.responseMessageId,
conversationId: job.metadata.conversationId, conversationId: job.metadata.conversationId,
@ -423,41 +423,13 @@ class GenerationJobManagerClass {
return this.jobs.get(streamId)?.syncSent ?? false; return this.jobs.get(streamId)?.syncSent ?? false;
} }
/**
* Aggregate content parts from message delta events.
* Used to save partial response when subscribers disconnect.
* Uses flat format: { type: 'text', text: 'content' }
*/
private aggregateContent(job: GenerationJob, event: ServerSentEvent): void {
// Check for on_message_delta events which contain content
const data = event as Record<string, unknown>;
if (data.event === 'on_message_delta' && data.data) {
const eventData = data.data as Record<string, unknown>;
const delta = eventData.delta as Record<string, unknown> | undefined;
if (delta?.content && Array.isArray(delta.content)) {
for (const part of delta.content) {
if (part.type === 'text' && part.text) {
// Find or create text content part in flat format
let textPart = job.aggregatedContent?.find((p) => p.type === 'text');
if (!textPart) {
textPart = { type: 'text', text: '' };
job.aggregatedContent = job.aggregatedContent || [];
job.aggregatedContent.push(textPart);
}
textPart.text = (textPart.text || '') + part.text;
}
}
}
}
}
/** /**
* Emit a done event to all subscribers. * Emit a done event to all subscribers.
* Stores the final event for replay on reconnect. * Stores the final event for replay on reconnect.
* @param streamId - The stream identifier * @param streamId - The stream identifier
* @param event - The final event data * @param event - The final event data
*/ */
emitDone(streamId: string, event: ServerSentEvent): void { emitDone(streamId: string, event: t.ServerSentEvent): void {
const job = this.jobs.get(streamId); const job = this.jobs.get(streamId);
if (!job) { if (!job) {
return; return;
@ -508,7 +480,6 @@ class GenerationJobManagerClass {
const job = this.jobs.get(streamId); const job = this.jobs.get(streamId);
if (job) { if (job) {
job.emitter.removeAllListeners(); job.emitter.removeAllListeners();
job.runSteps.clear();
this.jobs.delete(streamId); this.jobs.delete(streamId);
} }
} }
@ -539,10 +510,10 @@ class GenerationJobManagerClass {
*/ */
getStreamInfo(streamId: string): { getStreamInfo(streamId: string): {
active: boolean; active: boolean;
status: GenerationJobStatus; status: t.GenerationJobStatus;
chunkCount: number; chunkCount: number;
runStepCount: number; runStepCount: number;
aggregatedContent?: ContentPart[]; aggregatedContent?: Agents.MessageContentComplex[];
createdAt: number; createdAt: number;
} | null { } | null {
const job = this.jobs.get(streamId); const job = this.jobs.get(streamId);
@ -554,8 +525,8 @@ class GenerationJobManagerClass {
active: job.status === 'running', active: job.status === 'running',
status: job.status, status: job.status,
chunkCount: job.chunks.length, chunkCount: job.chunks.length,
runStepCount: job.runSteps.size, runStepCount: job.graphRef?.contentData?.length ?? 0,
aggregatedContent: job.aggregatedContent, aggregatedContent: job.contentPartsRef ?? [],
createdAt: job.createdAt, createdAt: job.createdAt,
}; };
} }
@ -570,8 +541,8 @@ class GenerationJobManagerClass {
/** /**
* Get count of jobs by status. * Get count of jobs by status.
*/ */
getJobCountByStatus(): Record<GenerationJobStatus, number> { getJobCountByStatus(): Record<t.GenerationJobStatus, number> {
const counts: Record<GenerationJobStatus, number> = { const counts: Record<t.GenerationJobStatus, number> = {
running: 0, running: 0,
complete: 0, complete: 0,
error: 0, error: 0,

View file

@ -1,2 +1 @@
export { GenerationJobManager, GenerationJobManagerClass } from './GenerationJobManager'; export { GenerationJobManager, GenerationJobManagerClass } from './GenerationJobManager';
export type * from './types';

View file

@ -13,3 +13,4 @@ export type * from './openai';
export * from './prompts'; export * from './prompts';
export * from './run'; export * from './run';
export * from './tokens'; export * from './tokens';
export * from './stream';

View file

@ -1,5 +1,6 @@
import type { EventEmitter } from 'events'; import type { EventEmitter } from 'events';
import type { Agents } from 'librechat-data-provider'; import type { Agents } from 'librechat-data-provider';
import type { StandardGraph } from '@librechat/agents';
import type { ServerSentEvent } from '~/types'; import type { ServerSentEvent } from '~/types';
export interface GenerationJobMetadata { export interface GenerationJobMetadata {
@ -30,10 +31,10 @@ export interface GenerationJob {
chunks: ServerSentEvent[]; chunks: ServerSentEvent[];
/** Final event when job completes */ /** Final event when job completes */
finalEvent?: ServerSentEvent; finalEvent?: ServerSentEvent;
/** Aggregated content parts for saving partial response */ /** Reference to graph's contentParts - the authoritative content source */
aggregatedContent?: ContentPart[]; contentPartsRef?: Agents.MessageContentComplex[];
/** Tracked run steps for reconnection - maps step ID to step data */ /** Reference to the graph instance for accessing run steps (contentData) */
runSteps: Map<string, Agents.RunStep>; graphRef?: StandardGraph;
/** Flag to indicate if a sync event was already sent (prevent duplicate replays) */ /** Flag to indicate if a sync event was already sent (prevent duplicate replays) */
syncSent?: boolean; syncSent?: boolean;
} }

View file

@ -190,7 +190,8 @@ export namespace Agents {
/** State data sent to reconnecting clients */ /** State data sent to reconnecting clients */
export interface ResumeState { export interface ResumeState {
runSteps: RunStep[]; runSteps: RunStep[];
aggregatedContent?: ContentPart[]; /** Aggregated content parts - can be MessageContentComplex[] or ContentPart[] */
aggregatedContent?: MessageContentComplex[];
userMessage?: UserMessageMeta; userMessage?: UserMessageMeta;
responseMessageId?: string; responseMessageId?: string;
conversationId?: string; conversationId?: string;