mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-26 13:18:51 +01:00
fix: improve syncing when switching conversations
This commit is contained in:
parent
e2c41c68d2
commit
d0c7566599
14 changed files with 314 additions and 176 deletions
|
|
@ -55,7 +55,8 @@ function ChatView({ index = 0 }: { index?: number }) {
|
|||
useAdaptiveSSE(addedSubmission, addedChatHelpers, true, index + 1);
|
||||
|
||||
// Auto-resume if navigating back to conversation with active job
|
||||
useResumeOnLoad(conversationId, chatHelpers.getMessages, index);
|
||||
// Wait for messages to load before resuming to avoid race condition
|
||||
useResumeOnLoad(conversationId, chatHelpers.getMessages, index, !isLoading);
|
||||
|
||||
const methods = useForm<ChatFormValues>({
|
||||
defaultValues: { text: '' },
|
||||
|
|
|
|||
|
|
@ -27,7 +27,13 @@ type TContentHandler = {
|
|||
export default function useContentHandler({ setMessages, getMessages }: TUseContentHandler) {
|
||||
const queryClient = useQueryClient();
|
||||
const messageMap = useMemo(() => new Map<string, TMessage>(), []);
|
||||
return useCallback(
|
||||
|
||||
/** Reset the message map - call this after sync to prevent stale state from overwriting synced content */
|
||||
const resetMessageMap = useCallback(() => {
|
||||
messageMap.clear();
|
||||
}, [messageMap]);
|
||||
|
||||
const handler = useCallback(
|
||||
({ data, submission }: TContentHandler) => {
|
||||
const { type, messageId, thread_id, conversationId, index } = data;
|
||||
|
||||
|
|
@ -41,8 +47,11 @@ export default function useContentHandler({ setMessages, getMessages }: TUseCont
|
|||
|
||||
let response = messageMap.get(messageId);
|
||||
if (!response) {
|
||||
// Check if message already exists in current messages (e.g., after sync)
|
||||
// Use that as base instead of stale initialResponse
|
||||
const existingMessage = _messages?.find((m) => m.messageId === messageId);
|
||||
response = {
|
||||
...(initialResponse as TMessage),
|
||||
...(existingMessage ?? (initialResponse as TMessage)),
|
||||
parentMessageId: userMessage?.messageId ?? '',
|
||||
conversationId,
|
||||
messageId,
|
||||
|
|
@ -82,4 +91,6 @@ export default function useContentHandler({ setMessages, getMessages }: TUseCont
|
|||
},
|
||||
[queryClient, getMessages, messageMap, setMessages],
|
||||
);
|
||||
|
||||
return { contentHandler: handler, resetContentHandler: resetMessageMap };
|
||||
}
|
||||
|
|
|
|||
|
|
@ -189,8 +189,8 @@ export default function useEventHandlers({
|
|||
const { conversationId: paramId } = useParams();
|
||||
const { token } = useAuthContext();
|
||||
|
||||
const contentHandler = useContentHandler({ setMessages, getMessages });
|
||||
const { stepHandler, clearStepMaps } = useStepHandler({
|
||||
const { contentHandler, resetContentHandler } = useContentHandler({ setMessages, getMessages });
|
||||
const { stepHandler, clearStepMaps, syncStepMessage } = useStepHandler({
|
||||
setMessages,
|
||||
getMessages,
|
||||
announcePolite,
|
||||
|
|
@ -827,15 +827,17 @@ export default function useEventHandlers({
|
|||
);
|
||||
|
||||
return {
|
||||
clearStepMaps,
|
||||
stepHandler,
|
||||
syncHandler,
|
||||
finalHandler,
|
||||
errorHandler,
|
||||
clearStepMaps,
|
||||
messageHandler,
|
||||
contentHandler,
|
||||
createdHandler,
|
||||
syncStepMessage,
|
||||
attachmentHandler,
|
||||
abortConversation,
|
||||
resetContentHandler,
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ export default function useResumableSSE(
|
|||
const setActiveRunId = useSetRecoilState(store.activeRunFamily(runIndex));
|
||||
|
||||
const { token, isAuthenticated } = useAuthContext();
|
||||
const [completed, setCompleted] = useState(new Set());
|
||||
const [_completed, setCompleted] = useState(new Set());
|
||||
const [streamId, setStreamId] = useState<string | null>(null);
|
||||
const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex));
|
||||
const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex));
|
||||
|
|
@ -78,15 +78,16 @@ export default function useResumableSSE(
|
|||
} = chatHelpers;
|
||||
|
||||
const {
|
||||
clearStepMaps,
|
||||
stepHandler,
|
||||
syncHandler,
|
||||
finalHandler,
|
||||
errorHandler,
|
||||
clearStepMaps,
|
||||
messageHandler,
|
||||
contentHandler,
|
||||
createdHandler,
|
||||
syncStepMessage,
|
||||
attachmentHandler,
|
||||
resetContentHandler,
|
||||
} = useEventHandlers({
|
||||
genTitle,
|
||||
setMessages,
|
||||
|
|
@ -108,14 +109,16 @@ export default function useResumableSSE(
|
|||
/**
|
||||
* Subscribe to stream via SSE library (supports custom headers)
|
||||
* Follows same auth pattern as useSSE
|
||||
* @param isResume - If true, adds ?resume=true to trigger sync event from server
|
||||
*/
|
||||
const subscribeToStream = useCallback(
|
||||
(currentStreamId: string, currentSubmission: TSubmission) => {
|
||||
(currentStreamId: string, currentSubmission: TSubmission, isResume = false) => {
|
||||
let { userMessage } = currentSubmission;
|
||||
let textIndex: number | null = null;
|
||||
|
||||
const url = `/api/agents/chat/stream/${encodeURIComponent(currentStreamId)}`;
|
||||
console.log('[ResumableSSE] Subscribing to stream:', url);
|
||||
const baseUrl = `/api/agents/chat/stream/${encodeURIComponent(currentStreamId)}`;
|
||||
const url = isResume ? `${baseUrl}?resume=true` : baseUrl;
|
||||
console.log('[ResumableSSE] Subscribing to stream:', url, { isResume });
|
||||
|
||||
const sse = new SSE(url, {
|
||||
headers: { Authorization: `Bearer ${token}` },
|
||||
|
|
@ -184,13 +187,98 @@ export default function useResumableSSE(
|
|||
}
|
||||
|
||||
if (data.sync != null) {
|
||||
console.log('[ResumableSSE] Received SYNC event', {
|
||||
conversationId: data.conversationId,
|
||||
hasResumeState: !!data.resumeState,
|
||||
const textPart = data.resumeState?.aggregatedContent?.find(
|
||||
(p: { type: string }) => p.type === 'text',
|
||||
);
|
||||
console.log('[ResumableSSE] SYNC received', {
|
||||
runSteps: data.resumeState?.runSteps?.length ?? 0,
|
||||
contentLength: textPart?.text?.length ?? 0,
|
||||
});
|
||||
|
||||
const runId = v4();
|
||||
setActiveRunId(runId);
|
||||
syncHandler(data, { ...currentSubmission, userMessage } as EventSubmission);
|
||||
|
||||
// Replay run steps
|
||||
if (data.resumeState?.runSteps) {
|
||||
for (const runStep of data.resumeState.runSteps) {
|
||||
stepHandler({ event: 'on_run_step', data: runStep }, {
|
||||
...currentSubmission,
|
||||
userMessage,
|
||||
} as EventSubmission);
|
||||
}
|
||||
}
|
||||
|
||||
// Set message content from aggregatedContent
|
||||
if (data.resumeState?.aggregatedContent && userMessage?.messageId) {
|
||||
const messages = getMessages() ?? [];
|
||||
const userMsgId = userMessage.messageId;
|
||||
const serverResponseId = data.resumeState.responseMessageId;
|
||||
|
||||
// Find the EXACT response message - prioritize responseMessageId from server
|
||||
// This is critical when there are multiple responses to the same user message
|
||||
let responseIdx = -1;
|
||||
if (serverResponseId) {
|
||||
responseIdx = messages.findIndex((m) => m.messageId === serverResponseId);
|
||||
}
|
||||
// Fallback: find by parentMessageId pattern (for new messages)
|
||||
if (responseIdx < 0) {
|
||||
responseIdx = messages.findIndex(
|
||||
(m) =>
|
||||
!m.isCreatedByUser &&
|
||||
(m.messageId === `${userMsgId}_` || m.parentMessageId === userMsgId),
|
||||
);
|
||||
}
|
||||
|
||||
const textPart = data.resumeState.aggregatedContent?.find(
|
||||
(p: { type: string }) => p.type === 'text',
|
||||
);
|
||||
console.log('[ResumableSSE] SYNC update', {
|
||||
userMsgId,
|
||||
serverResponseId,
|
||||
responseIdx,
|
||||
foundMessageId: responseIdx >= 0 ? messages[responseIdx]?.messageId : null,
|
||||
messagesCount: messages.length,
|
||||
aggregatedContentLength: data.resumeState.aggregatedContent?.length,
|
||||
textContentLength: textPart?.text?.length ?? 0,
|
||||
});
|
||||
|
||||
if (responseIdx >= 0) {
|
||||
// Update existing response message with aggregatedContent
|
||||
const updated = [...messages];
|
||||
const oldContent = updated[responseIdx]?.content;
|
||||
updated[responseIdx] = {
|
||||
...updated[responseIdx],
|
||||
content: data.resumeState.aggregatedContent,
|
||||
};
|
||||
console.log('[ResumableSSE] SYNC updating message', {
|
||||
messageId: updated[responseIdx]?.messageId,
|
||||
oldContentLength: Array.isArray(oldContent) ? oldContent.length : 0,
|
||||
newContentLength: data.resumeState.aggregatedContent?.length,
|
||||
});
|
||||
setMessages(updated);
|
||||
// Sync both content handler and step handler with the updated message
|
||||
// so subsequent deltas build on synced content, not stale content
|
||||
resetContentHandler();
|
||||
syncStepMessage(updated[responseIdx]);
|
||||
console.log('[ResumableSSE] SYNC complete, handlers synced');
|
||||
} else {
|
||||
// Add new response message
|
||||
const responseId = serverResponseId ?? `${userMsgId}_`;
|
||||
setMessages([
|
||||
...messages,
|
||||
{
|
||||
messageId: responseId,
|
||||
parentMessageId: userMsgId,
|
||||
conversationId: currentSubmission.conversation?.conversationId ?? '',
|
||||
text: '',
|
||||
content: data.resumeState.aggregatedContent,
|
||||
isCreatedByUser: false,
|
||||
} as TMessage,
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
setShowStopButton(true);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -278,11 +366,14 @@ export default function useResumableSSE(
|
|||
createdHandler,
|
||||
attachmentHandler,
|
||||
stepHandler,
|
||||
syncHandler,
|
||||
contentHandler,
|
||||
resetContentHandler,
|
||||
syncStepMessage,
|
||||
messageHandler,
|
||||
errorHandler,
|
||||
setIsSubmitting,
|
||||
getMessages,
|
||||
setMessages,
|
||||
startupConfig?.balance?.enabled,
|
||||
balanceQuery,
|
||||
],
|
||||
|
|
@ -356,7 +447,7 @@ export default function useResumableSSE(
|
|||
// Resume: just subscribe to existing stream, don't start new generation
|
||||
console.log('[ResumableSSE] Resuming existing stream:', resumeStreamId);
|
||||
setStreamId(resumeStreamId);
|
||||
subscribeToStream(resumeStreamId, submission);
|
||||
subscribeToStream(resumeStreamId, submission, true); // isResume=true
|
||||
} else {
|
||||
// New generation: start and then subscribe
|
||||
console.log('[ResumableSSE] Starting NEW generation');
|
||||
|
|
|
|||
|
|
@ -51,18 +51,20 @@ function buildSubmissionFromResumeState(
|
|||
isCreatedByUser: true,
|
||||
} as TMessage)));
|
||||
|
||||
// Use existing response from DB if available (preserves already-saved content)
|
||||
const initialResponse: TMessage =
|
||||
existingResponseMessage ??
|
||||
({
|
||||
messageId: responseMessageId,
|
||||
parentMessageId: userMessage.messageId,
|
||||
conversationId,
|
||||
text: '',
|
||||
content: (resumeState.aggregatedContent as TMessage['content']) ?? [],
|
||||
isCreatedByUser: false,
|
||||
role: 'assistant',
|
||||
} as TMessage);
|
||||
// ALWAYS use aggregatedContent from resumeState - it has the latest content from the running job.
|
||||
// DB content may be stale (saved at disconnect, but generation continued).
|
||||
const initialResponse: TMessage = {
|
||||
messageId: existingResponseMessage?.messageId ?? responseMessageId,
|
||||
parentMessageId: existingResponseMessage?.parentMessageId ?? userMessage.messageId,
|
||||
conversationId,
|
||||
text: '',
|
||||
// aggregatedContent is authoritative - it reflects actual job state
|
||||
content: (resumeState.aggregatedContent as TMessage['content']) ?? [],
|
||||
isCreatedByUser: false,
|
||||
role: 'assistant',
|
||||
sender: existingResponseMessage?.sender,
|
||||
model: existingResponseMessage?.model,
|
||||
} as TMessage;
|
||||
|
||||
const conversation: TConversation = {
|
||||
conversationId,
|
||||
|
|
@ -91,11 +93,14 @@ function buildSubmissionFromResumeState(
|
|||
* 1. Uses useStreamStatus to check for active jobs on navigation
|
||||
* 2. If active job found, builds a submission with streamId and sets it
|
||||
* 3. useResumableSSE picks up the submission and subscribes to the stream
|
||||
*
|
||||
* @param messagesLoaded - Whether the messages query has finished loading (prevents race condition)
|
||||
*/
|
||||
export default function useResumeOnLoad(
|
||||
conversationId: string | undefined,
|
||||
getMessages: () => TMessage[] | undefined,
|
||||
runIndex = 0,
|
||||
messagesLoaded = true,
|
||||
) {
|
||||
const resumableEnabled = useRecoilValue(store.resumableStreams);
|
||||
const setSubmission = useSetRecoilState(store.submissionByIndex(runIndex));
|
||||
|
|
@ -104,10 +109,14 @@ export default function useResumeOnLoad(
|
|||
const processedConvoRef = useRef<string | null>(null);
|
||||
|
||||
// Check for active stream when conversation changes
|
||||
// Only check if resumable is enabled and no active submission
|
||||
// Allow check if no submission OR submission is for a different conversation (stale)
|
||||
const submissionConvoId = currentSubmission?.conversation?.conversationId;
|
||||
const hasActiveSubmissionForThisConvo = currentSubmission && submissionConvoId === conversationId;
|
||||
|
||||
const shouldCheck =
|
||||
resumableEnabled &&
|
||||
!currentSubmission &&
|
||||
messagesLoaded && // Wait for messages to load before checking
|
||||
!hasActiveSubmissionForThisConvo && // Allow if no submission or stale submission
|
||||
!!conversationId &&
|
||||
conversationId !== Constants.NEW_CONVO &&
|
||||
processedConvoRef.current !== conversationId; // Don't re-check processed convos
|
||||
|
|
@ -118,6 +127,7 @@ export default function useResumeOnLoad(
|
|||
console.log('[ResumeOnLoad] Effect check', {
|
||||
resumableEnabled,
|
||||
conversationId,
|
||||
messagesLoaded,
|
||||
hasCurrentSubmission: !!currentSubmission,
|
||||
currentSubmissionConvoId: currentSubmission?.conversation?.conversationId,
|
||||
isSuccess,
|
||||
|
|
@ -131,14 +141,32 @@ export default function useResumeOnLoad(
|
|||
return;
|
||||
}
|
||||
|
||||
// Don't resume if we already have an active submission (we started it ourselves)
|
||||
if (currentSubmission) {
|
||||
console.log('[ResumeOnLoad] Skipping - already have active submission, marking as processed');
|
||||
// Wait for messages to load to avoid race condition where sync overwrites then DB overwrites
|
||||
if (!messagesLoaded) {
|
||||
console.log('[ResumeOnLoad] Waiting for messages to load');
|
||||
return;
|
||||
}
|
||||
|
||||
// Don't resume if we already have an active submission FOR THIS CONVERSATION
|
||||
// A stale submission with undefined/different conversationId should not block us
|
||||
if (hasActiveSubmissionForThisConvo) {
|
||||
console.log('[ResumeOnLoad] Skipping - already have active submission for this conversation');
|
||||
// Mark as processed so we don't try again
|
||||
processedConvoRef.current = conversationId;
|
||||
return;
|
||||
}
|
||||
|
||||
// If there's a stale submission for a different conversation, log it but continue
|
||||
if (currentSubmission && submissionConvoId !== conversationId) {
|
||||
console.log(
|
||||
'[ResumeOnLoad] Found stale submission for different conversation, will check for resume',
|
||||
{
|
||||
staleConvoId: submissionConvoId,
|
||||
currentConvoId: conversationId,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Wait for stream status query to complete
|
||||
if (!isSuccess || !streamStatus) {
|
||||
console.log('[ResumeOnLoad] Waiting for stream status query');
|
||||
|
|
@ -151,15 +179,17 @@ export default function useResumeOnLoad(
|
|||
return;
|
||||
}
|
||||
|
||||
// Mark as processed immediately to prevent race conditions
|
||||
processedConvoRef.current = conversationId;
|
||||
|
||||
// Check if there's an active job to resume
|
||||
// DON'T mark as processed here - only mark when we actually create a submission
|
||||
// This prevents stale cache data from blocking subsequent resume attempts
|
||||
if (!streamStatus.active || !streamStatus.streamId) {
|
||||
console.log('[ResumeOnLoad] No active job to resume for:', conversationId);
|
||||
return;
|
||||
}
|
||||
|
||||
// Mark as processed NOW - we verified there's an active job and will create submission
|
||||
processedConvoRef.current = conversationId;
|
||||
|
||||
console.log('[ResumeOnLoad] Found active job, creating submission...', {
|
||||
streamId: streamStatus.streamId,
|
||||
status: streamStatus.status,
|
||||
|
|
@ -202,6 +232,9 @@ export default function useResumeOnLoad(
|
|||
}, [
|
||||
conversationId,
|
||||
resumableEnabled,
|
||||
messagesLoaded,
|
||||
hasActiveSubmissionForThisConvo,
|
||||
submissionConvoId,
|
||||
currentSubmission,
|
||||
isSuccess,
|
||||
streamStatus,
|
||||
|
|
@ -209,11 +242,14 @@ export default function useResumeOnLoad(
|
|||
setSubmission,
|
||||
]);
|
||||
|
||||
// Reset processedConvoRef when conversation changes to a different one
|
||||
// Reset processedConvoRef when conversation changes to allow re-checking
|
||||
useEffect(() => {
|
||||
if (conversationId && conversationId !== processedConvoRef.current) {
|
||||
// Only reset if we're navigating to a DIFFERENT conversation
|
||||
// This allows re-checking when navigating back
|
||||
// Always reset when conversation changes - this allows resuming when navigating back
|
||||
if (conversationId !== processedConvoRef.current) {
|
||||
console.log('[ResumeOnLoad] Resetting processedConvoRef for new conversation:', {
|
||||
old: processedConvoRef.current,
|
||||
new: conversationId,
|
||||
});
|
||||
processedConvoRef.current = null;
|
||||
}
|
||||
}, [conversationId]);
|
||||
|
|
|
|||
|
|
@ -51,12 +51,9 @@ type AllContentTypes =
|
|||
| ContentTypes.IMAGE_URL
|
||||
| ContentTypes.ERROR;
|
||||
|
||||
const noop = () => {};
|
||||
|
||||
export default function useStepHandler({
|
||||
setMessages,
|
||||
getMessages,
|
||||
setIsSubmitting = noop,
|
||||
announcePolite,
|
||||
lastAnnouncementTimeRef,
|
||||
}: TUseStepHandler) {
|
||||
|
|
@ -468,7 +465,7 @@ export default function useStepHandler({
|
|||
stepMap.current.clear();
|
||||
};
|
||||
},
|
||||
[getMessages, setIsSubmitting, lastAnnouncementTimeRef, announcePolite, setMessages],
|
||||
[getMessages, lastAnnouncementTimeRef, announcePolite, setMessages],
|
||||
);
|
||||
|
||||
const clearStepMaps = useCallback(() => {
|
||||
|
|
@ -476,5 +473,17 @@ export default function useStepHandler({
|
|||
messageMap.current.clear();
|
||||
stepMap.current.clear();
|
||||
}, []);
|
||||
return { stepHandler, clearStepMaps };
|
||||
|
||||
/**
|
||||
* Sync a message into the step handler's messageMap.
|
||||
* Call this after receiving sync event to ensure subsequent deltas
|
||||
* build on the synced content, not stale content.
|
||||
*/
|
||||
const syncStepMessage = useCallback((message: TMessage) => {
|
||||
if (message?.messageId) {
|
||||
messageMap.current.set(message.messageId, { ...message });
|
||||
}
|
||||
}, []);
|
||||
|
||||
return { stepHandler, clearStepMaps, syncStepMessage };
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue