mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-01-02 16:48:50 +01:00
WIP: resumable stream
This commit is contained in:
parent
db9b050b86
commit
9428148e1d
12 changed files with 498 additions and 209 deletions
|
|
@ -55,7 +55,7 @@ function ChatView({ index = 0 }: { index?: number }) {
|
|||
useAdaptiveSSE(addedSubmission, addedChatHelpers, true, index + 1);
|
||||
|
||||
// Auto-resume if navigating back to conversation with active job
|
||||
useResumeOnLoad(conversationId, chatHelpers, index);
|
||||
useResumeOnLoad(conversationId, chatHelpers.getMessages, index);
|
||||
|
||||
const methods = useForm<ChatFormValues>({
|
||||
defaultValues: { text: '' },
|
||||
|
|
|
|||
|
|
@ -283,14 +283,25 @@ export default function useChatFunctions({
|
|||
}
|
||||
}
|
||||
} else {
|
||||
initialResponse.content = [
|
||||
{
|
||||
type: ContentTypes.TEXT,
|
||||
[ContentTypes.TEXT]: {
|
||||
value: '',
|
||||
// Assistants endpoint uses nested format: { type: 'text', text: { value: 'content' } }
|
||||
// Agents and other endpoints use flat format: { type: 'text', text: 'content' }
|
||||
if (isAssistantsEndpoint(endpoint)) {
|
||||
initialResponse.content = [
|
||||
{
|
||||
type: ContentTypes.TEXT,
|
||||
[ContentTypes.TEXT]: {
|
||||
value: '',
|
||||
},
|
||||
},
|
||||
},
|
||||
];
|
||||
];
|
||||
} else {
|
||||
initialResponse.content = [
|
||||
{
|
||||
type: ContentTypes.TEXT,
|
||||
text: '',
|
||||
},
|
||||
];
|
||||
}
|
||||
}
|
||||
setShowStopButton(true);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -56,9 +56,7 @@ export default function useTextarea({
|
|||
});
|
||||
const entityName = entity?.name ?? '';
|
||||
|
||||
const isNotAppendable =
|
||||
(((latestMessage?.unfinished ?? false) && !isSubmitting) || (latestMessage?.error ?? false)) &&
|
||||
!isAssistant;
|
||||
const isNotAppendable = latestMessage?.error === true && !isAssistant;
|
||||
// && (conversationId?.length ?? 0) > 6; // also ensures that we don't show the wrong placeholder
|
||||
|
||||
useEffect(() => {
|
||||
|
|
|
|||
|
|
@ -1,182 +1,163 @@
|
|||
import { useEffect, useState, useRef } from 'react';
|
||||
import { SSE } from 'sse.js';
|
||||
import { useEffect, useRef } from 'react';
|
||||
import { useSetRecoilState, useRecoilValue } from 'recoil';
|
||||
import { request } from 'librechat-data-provider';
|
||||
import type { TMessage, EventSubmission } from 'librechat-data-provider';
|
||||
import type { EventHandlerParams } from './useEventHandlers';
|
||||
import { useAuthContext } from '~/hooks/AuthContext';
|
||||
import { useGetStartupConfig, useGetUserBalance } from '~/data-provider';
|
||||
import useEventHandlers from './useEventHandlers';
|
||||
import { Constants, tMessageSchema } from 'librechat-data-provider';
|
||||
import type { TMessage, TConversation, TSubmission, Agents } from 'librechat-data-provider';
|
||||
import store from '~/store';
|
||||
|
||||
type ChatHelpers = Pick<
|
||||
EventHandlerParams,
|
||||
| 'setMessages'
|
||||
| 'getMessages'
|
||||
| 'setConversation'
|
||||
| 'setIsSubmitting'
|
||||
| 'newConversation'
|
||||
| 'resetLatestMessage'
|
||||
>;
|
||||
/**
|
||||
* Build a submission object from resume state for reconnected streams.
|
||||
* This provides the minimum data needed for useResumableSSE to subscribe.
|
||||
*/
|
||||
function buildSubmissionFromResumeState(
|
||||
resumeState: Agents.ResumeState,
|
||||
streamId: string,
|
||||
messages: TMessage[],
|
||||
conversationId: string,
|
||||
): TSubmission {
|
||||
const userMessageData = resumeState.userMessage;
|
||||
const responseMessageId =
|
||||
resumeState.responseMessageId ?? `${userMessageData?.messageId ?? 'resume'}_`;
|
||||
|
||||
// Try to find existing user message in the messages array (from database)
|
||||
const existingUserMessage = messages.find(
|
||||
(m) => m.isCreatedByUser && m.messageId === userMessageData?.messageId,
|
||||
);
|
||||
|
||||
// Try to find existing response message in the messages array (from database)
|
||||
const existingResponseMessage = messages.find(
|
||||
(m) =>
|
||||
!m.isCreatedByUser &&
|
||||
(m.messageId === responseMessageId || m.parentMessageId === userMessageData?.messageId),
|
||||
);
|
||||
|
||||
// Create or use existing user message
|
||||
const userMessage: TMessage =
|
||||
existingUserMessage ??
|
||||
(userMessageData
|
||||
? (tMessageSchema.parse({
|
||||
messageId: userMessageData.messageId,
|
||||
parentMessageId: userMessageData.parentMessageId ?? Constants.NO_PARENT,
|
||||
conversationId: userMessageData.conversationId ?? conversationId,
|
||||
text: userMessageData.text ?? '',
|
||||
isCreatedByUser: true,
|
||||
role: 'user',
|
||||
}) as TMessage)
|
||||
: (messages[messages.length - 2] ??
|
||||
({
|
||||
messageId: 'resume_user_msg',
|
||||
conversationId,
|
||||
text: '',
|
||||
isCreatedByUser: true,
|
||||
} as TMessage)));
|
||||
|
||||
// Use existing response from DB if available (preserves already-saved content)
|
||||
const initialResponse: TMessage =
|
||||
existingResponseMessage ??
|
||||
({
|
||||
messageId: responseMessageId,
|
||||
parentMessageId: userMessage.messageId,
|
||||
conversationId,
|
||||
text: '',
|
||||
content: (resumeState.aggregatedContent as TMessage['content']) ?? [],
|
||||
isCreatedByUser: false,
|
||||
role: 'assistant',
|
||||
} as TMessage);
|
||||
|
||||
const conversation: TConversation = {
|
||||
conversationId,
|
||||
title: 'Resumed Chat',
|
||||
endpoint: null,
|
||||
} as TConversation;
|
||||
|
||||
return {
|
||||
messages,
|
||||
userMessage,
|
||||
initialResponse,
|
||||
conversation,
|
||||
isRegenerate: false,
|
||||
isTemporary: false,
|
||||
endpointOption: {},
|
||||
} as TSubmission;
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook to resume streaming if navigating back to a conversation with active generation.
|
||||
* Checks for active jobs on mount and auto-subscribes if found.
|
||||
* Hook to resume streaming if navigating to a conversation with active generation.
|
||||
* Checks stream status via React Query and sets submission if active job found.
|
||||
*
|
||||
* This hook:
|
||||
* 1. Uses useStreamStatus to check for active jobs on navigation
|
||||
* 2. If active job found, builds a submission with streamId and sets it
|
||||
* 3. useResumableSSE picks up the submission and subscribes to the stream
|
||||
*/
|
||||
export default function useResumeOnLoad(
|
||||
conversationId: string | undefined,
|
||||
chatHelpers: ChatHelpers,
|
||||
getMessages: () => TMessage[] | undefined,
|
||||
runIndex = 0,
|
||||
) {
|
||||
const resumableEnabled = useRecoilValue(store.resumableStreams);
|
||||
const { token, isAuthenticated } = useAuthContext();
|
||||
const sseRef = useRef<SSE | null>(null);
|
||||
const checkedConvoRef = useRef<string | null>(null);
|
||||
const [completed, setCompleted] = useState(new Set());
|
||||
const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex));
|
||||
const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex));
|
||||
const setSubmission = useSetRecoilState(store.submissionByIndex(runIndex));
|
||||
const currentSubmission = useRecoilValue(store.submissionByIndex(runIndex));
|
||||
const hasResumedRef = useRef<string | null>(null);
|
||||
|
||||
const { getMessages, setIsSubmitting } = chatHelpers;
|
||||
// Check for active stream when conversation changes
|
||||
// const { data: streamStatus, isSuccess } = useStreamStatus(
|
||||
// conversationId,
|
||||
// resumableEnabled && !currentSubmission, // Only check if no active submission
|
||||
// );
|
||||
|
||||
const { stepHandler, finalHandler, contentHandler } = useEventHandlers({
|
||||
...chatHelpers,
|
||||
setCompleted,
|
||||
setShowStopButton,
|
||||
});
|
||||
|
||||
const { data: startupConfig } = useGetStartupConfig();
|
||||
const balanceQuery = useGetUserBalance({
|
||||
enabled: !!isAuthenticated && startupConfig?.balance?.enabled,
|
||||
});
|
||||
|
||||
/**
|
||||
* Check for active job when conversation loads
|
||||
*/
|
||||
useEffect(() => {
|
||||
if (!resumableEnabled || !conversationId || !token) {
|
||||
checkedConvoRef.current = null;
|
||||
// if (!resumableEnabled || !conversationId || !isSuccess || !streamStatus) {
|
||||
if (!resumableEnabled || !conversationId) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Only check once per conversationId to prevent loops
|
||||
if (checkedConvoRef.current === conversationId) {
|
||||
// Don't resume if we already have an active submission
|
||||
if (currentSubmission) {
|
||||
return;
|
||||
}
|
||||
|
||||
checkedConvoRef.current = conversationId;
|
||||
// Don't resume the same conversation twice
|
||||
if (hasResumedRef.current === conversationId) {
|
||||
return;
|
||||
}
|
||||
|
||||
const checkAndResume = async () => {
|
||||
try {
|
||||
const response = await fetch(`/api/agents/chat/status/${conversationId}`, {
|
||||
headers: { Authorization: `Bearer ${token}` },
|
||||
});
|
||||
// Check if there's an active job to resume
|
||||
// if (!streamStatus.active || !streamStatus.streamId) {
|
||||
// return;
|
||||
// }
|
||||
|
||||
if (!response.ok) {
|
||||
return;
|
||||
}
|
||||
// console.log('[ResumeOnLoad] Found active job, creating submission...', {
|
||||
// streamId: streamStatus.streamId,
|
||||
// status: streamStatus.status,
|
||||
// });
|
||||
|
||||
const { active, streamId } = await response.json();
|
||||
hasResumedRef.current = conversationId;
|
||||
|
||||
if (!active || !streamId) {
|
||||
return;
|
||||
}
|
||||
const messages = getMessages() || [];
|
||||
|
||||
console.log('[ResumeOnLoad] Found active job, resuming...', { streamId });
|
||||
// Minimal submission without resume state
|
||||
const lastMessage = messages[messages.length - 1];
|
||||
const submission: TSubmission = {
|
||||
messages,
|
||||
userMessage: lastMessage ?? ({ messageId: 'resume', conversationId, text: '' } as TMessage),
|
||||
initialResponse: {
|
||||
messageId: 'resume_',
|
||||
conversationId,
|
||||
text: '',
|
||||
content: [{ type: 'text', text: '' }],
|
||||
} as TMessage,
|
||||
conversation: { conversationId, title: 'Resumed Chat' } as TConversation,
|
||||
isRegenerate: false,
|
||||
isTemporary: false,
|
||||
endpointOption: {},
|
||||
} as TSubmission;
|
||||
setSubmission(submission);
|
||||
}, [conversationId, resumableEnabled, currentSubmission, getMessages, setSubmission]);
|
||||
|
||||
const messages = getMessages() || [];
|
||||
const lastMessage = messages[messages.length - 1];
|
||||
let textIndex: number | null = null;
|
||||
|
||||
const url = `/api/agents/chat/stream/${encodeURIComponent(streamId)}`;
|
||||
|
||||
const sse = new SSE(url, {
|
||||
headers: { Authorization: `Bearer ${token}` },
|
||||
method: 'GET',
|
||||
});
|
||||
sseRef.current = sse;
|
||||
|
||||
sse.addEventListener('open', () => {
|
||||
console.log('[ResumeOnLoad] Reconnected to stream');
|
||||
setAbortScroll(false);
|
||||
setShowStopButton(true);
|
||||
setIsSubmitting(true);
|
||||
});
|
||||
|
||||
sse.addEventListener('message', (e: MessageEvent) => {
|
||||
try {
|
||||
const data = JSON.parse(e.data);
|
||||
|
||||
if (data.final != null) {
|
||||
try {
|
||||
finalHandler(data, { messages } as unknown as EventSubmission);
|
||||
} catch (error) {
|
||||
console.error('[ResumeOnLoad] Error in finalHandler:', error);
|
||||
setIsSubmitting(false);
|
||||
setShowStopButton(false);
|
||||
}
|
||||
(startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch();
|
||||
sse.close();
|
||||
sseRef.current = null;
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.event != null) {
|
||||
stepHandler(data, {
|
||||
messages,
|
||||
userMessage: lastMessage,
|
||||
} as unknown as EventSubmission);
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.type != null) {
|
||||
const { text, index } = data;
|
||||
if (text != null && index !== textIndex) {
|
||||
textIndex = index;
|
||||
}
|
||||
contentHandler({ data, submission: { messages } as unknown as EventSubmission });
|
||||
return;
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[ResumeOnLoad] Error processing message:', error);
|
||||
}
|
||||
});
|
||||
|
||||
sse.addEventListener('error', async (e: MessageEvent) => {
|
||||
console.log('[ResumeOnLoad] Stream error');
|
||||
sse.close();
|
||||
sseRef.current = null;
|
||||
setIsSubmitting(false);
|
||||
setShowStopButton(false);
|
||||
|
||||
/* @ts-ignore */
|
||||
if (e.responseCode === 401) {
|
||||
try {
|
||||
const refreshResponse = await request.refreshToken();
|
||||
const newToken = refreshResponse?.token ?? '';
|
||||
if (newToken) {
|
||||
request.dispatchTokenUpdatedEvent(newToken);
|
||||
}
|
||||
} catch (error) {
|
||||
console.log('[ResumeOnLoad] Token refresh failed:', error);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
sse.stream();
|
||||
} catch (error) {
|
||||
console.error('[ResumeOnLoad] Error checking job status:', error);
|
||||
}
|
||||
};
|
||||
|
||||
checkAndResume();
|
||||
|
||||
return () => {
|
||||
if (sseRef.current) {
|
||||
sseRef.current.close();
|
||||
sseRef.current = null;
|
||||
}
|
||||
};
|
||||
// Only re-run when conversationId changes
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
// Reset hasResumedRef when conversation changes
|
||||
useEffect(() => {
|
||||
if (conversationId !== hasResumedRef.current) {
|
||||
hasResumedRef.current = null;
|
||||
}
|
||||
}, [conversationId]);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,8 @@ type TUseStepHandler = {
|
|||
announcePolite: (options: AnnounceOptions) => void;
|
||||
setMessages: (messages: TMessage[]) => void;
|
||||
getMessages: () => TMessage[] | undefined;
|
||||
setIsSubmitting: SetterOrUpdater<boolean>;
|
||||
/** @deprecated - isSubmitting should be derived from submission state */
|
||||
setIsSubmitting?: SetterOrUpdater<boolean>;
|
||||
lastAnnouncementTimeRef: React.MutableRefObject<number>;
|
||||
};
|
||||
|
||||
|
|
@ -50,10 +51,12 @@ type AllContentTypes =
|
|||
| ContentTypes.IMAGE_URL
|
||||
| ContentTypes.ERROR;
|
||||
|
||||
const noop = () => {};
|
||||
|
||||
export default function useStepHandler({
|
||||
setMessages,
|
||||
getMessages,
|
||||
setIsSubmitting,
|
||||
setIsSubmitting = noop,
|
||||
announcePolite,
|
||||
lastAnnouncementTimeRef,
|
||||
}: TUseStepHandler) {
|
||||
|
|
@ -198,7 +201,6 @@ export default function useStepHandler({
|
|||
({ event, data }: TStepEvent, submission: EventSubmission) => {
|
||||
const messages = getMessages() || [];
|
||||
const { userMessage } = submission;
|
||||
setIsSubmitting(true);
|
||||
let parentMessageId = userMessage.messageId;
|
||||
|
||||
const currentTime = Date.now();
|
||||
|
|
@ -230,12 +232,17 @@ export default function useStepHandler({
|
|||
if (!response) {
|
||||
const responseMessage = messages[messages.length - 1] as TMessage;
|
||||
|
||||
// Preserve existing content from DB (partial response) and prepend initialContent if provided
|
||||
const existingContent = responseMessage?.content ?? [];
|
||||
const mergedContent =
|
||||
initialContent.length > 0 ? [...initialContent, ...existingContent] : existingContent;
|
||||
|
||||
response = {
|
||||
...responseMessage,
|
||||
parentMessageId,
|
||||
conversationId: userMessage.conversationId,
|
||||
messageId: responseMessageId,
|
||||
content: initialContent,
|
||||
content: mergedContent,
|
||||
};
|
||||
|
||||
messageMap.current.set(responseMessageId, response);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue