mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-02-24 03:14:08 +01:00
✨ feat: Implement Resumable Generation Jobs with SSE Support
- Introduced GenerationJobManager to handle resumable LLM generation jobs independently of HTTP connections. - Added support for subscribing to ongoing generation jobs via SSE, allowing clients to reconnect and receive updates without losing progress. - Enhanced existing agent controllers and routes to integrate resumable functionality, including job creation, completion, and error handling. - Updated client-side hooks to manage adaptive SSE streams, switching between standard and resumable modes based on user settings. - Added UI components and settings for enabling/disabling resumable streams, improving user experience during unstable connections.
This commit is contained in:
parent
9dda857a59
commit
9b9d6be622
17 changed files with 1212 additions and 37 deletions
|
|
@ -7,7 +7,7 @@ import { Constants, buildTree } from 'librechat-data-provider';
|
|||
import type { TMessage } from 'librechat-data-provider';
|
||||
import type { ChatFormValues } from '~/common';
|
||||
import { ChatContext, AddedChatContext, useFileMapContext, ChatFormProvider } from '~/Providers';
|
||||
import { useChatHelpers, useAddedResponse, useSSE } from '~/hooks';
|
||||
import { useChatHelpers, useAddedResponse, useAdaptiveSSE } from '~/hooks';
|
||||
import ConversationStarters from './Input/ConversationStarters';
|
||||
import { useGetMessagesByConvoId } from '~/data-provider';
|
||||
import MessagesView from './Messages/MessagesView';
|
||||
|
|
@ -51,8 +51,8 @@ function ChatView({ index = 0 }: { index?: number }) {
|
|||
const chatHelpers = useChatHelpers(index, conversationId);
|
||||
const addedChatHelpers = useAddedResponse({ rootIndex: index });
|
||||
|
||||
useSSE(rootSubmission, chatHelpers, false);
|
||||
useSSE(addedSubmission, addedChatHelpers, true);
|
||||
useAdaptiveSSE(rootSubmission, chatHelpers, false, index);
|
||||
useAdaptiveSSE(addedSubmission, addedChatHelpers, true, index + 1);
|
||||
|
||||
const methods = useForm<ChatFormValues>({
|
||||
defaultValues: { text: '' },
|
||||
|
|
|
|||
|
|
@ -84,6 +84,13 @@ const toggleSwitchConfigs = [
|
|||
hoverCardText: 'com_nav_info_default_temporary_chat',
|
||||
key: 'defaultTemporaryChat',
|
||||
},
|
||||
{
|
||||
stateAtom: store.resumableStreams,
|
||||
localizationKey: 'com_nav_resumable_streams',
|
||||
switchId: 'resumableStreams',
|
||||
hoverCardText: 'com_nav_info_resumable_streams',
|
||||
key: 'resumableStreams',
|
||||
},
|
||||
];
|
||||
|
||||
function Chat() {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,6 @@
|
|||
export { default as useSSE } from './useSSE';
|
||||
export { default as useResumableSSE } from './useResumableSSE';
|
||||
export { default as useAdaptiveSSE } from './useAdaptiveSSE';
|
||||
export { default as useStepHandler } from './useStepHandler';
|
||||
export { default as useContentHandler } from './useContentHandler';
|
||||
export { default as useAttachmentHandler } from './useAttachmentHandler';
|
||||
|
|
|
|||
43
client/src/hooks/SSE/useAdaptiveSSE.ts
Normal file
43
client/src/hooks/SSE/useAdaptiveSSE.ts
Normal file
|
|
@ -0,0 +1,43 @@
|
|||
import { useRecoilValue } from 'recoil';
|
||||
import type { TSubmission } from 'librechat-data-provider';
|
||||
import type { EventHandlerParams } from './useEventHandlers';
|
||||
import useSSE from './useSSE';
|
||||
import useResumableSSE from './useResumableSSE';
|
||||
import store from '~/store';
|
||||
|
||||
type ChatHelpers = Pick<
|
||||
EventHandlerParams,
|
||||
| 'setMessages'
|
||||
| 'getMessages'
|
||||
| 'setConversation'
|
||||
| 'setIsSubmitting'
|
||||
| 'newConversation'
|
||||
| 'resetLatestMessage'
|
||||
>;
|
||||
|
||||
/**
|
||||
* Adaptive SSE hook that switches between standard and resumable modes.
|
||||
* Uses Recoil state to determine which mode to use.
|
||||
*
|
||||
* Note: Both hooks are always called to comply with React's Rules of Hooks.
|
||||
* We pass null submission to the inactive one.
|
||||
*/
|
||||
export default function useAdaptiveSSE(
|
||||
submission: TSubmission | null,
|
||||
chatHelpers: ChatHelpers,
|
||||
isAddedRequest = false,
|
||||
runIndex = 0,
|
||||
) {
|
||||
const resumableEnabled = useRecoilValue(store.resumableStreams);
|
||||
|
||||
useSSE(resumableEnabled ? null : submission, chatHelpers, isAddedRequest, runIndex);
|
||||
|
||||
const { streamId } = useResumableSSE(
|
||||
resumableEnabled ? submission : null,
|
||||
chatHelpers,
|
||||
isAddedRequest,
|
||||
runIndex,
|
||||
);
|
||||
|
||||
return { streamId, resumableEnabled };
|
||||
}
|
||||
406
client/src/hooks/SSE/useResumableSSE.ts
Normal file
406
client/src/hooks/SSE/useResumableSSE.ts
Normal file
|
|
@ -0,0 +1,406 @@
|
|||
import { useEffect, useState, useRef, useCallback } from 'react';
|
||||
import { v4 } from 'uuid';
|
||||
import { SSE } from 'sse.js';
|
||||
import { useSetRecoilState } from 'recoil';
|
||||
import {
|
||||
request,
|
||||
Constants,
|
||||
createPayload,
|
||||
LocalStorageKeys,
|
||||
removeNullishValues,
|
||||
} from 'librechat-data-provider';
|
||||
import type { TMessage, TPayload, TSubmission, EventSubmission } from 'librechat-data-provider';
|
||||
import type { EventHandlerParams } from './useEventHandlers';
|
||||
import type { TResData } from '~/common';
|
||||
import { useGenTitleMutation, useGetStartupConfig, useGetUserBalance } from '~/data-provider';
|
||||
import { useAuthContext } from '~/hooks/AuthContext';
|
||||
import useEventHandlers from './useEventHandlers';
|
||||
import store from '~/store';
|
||||
|
||||
const clearDraft = (conversationId?: string | null) => {
|
||||
if (conversationId) {
|
||||
localStorage.removeItem(`${LocalStorageKeys.TEXT_DRAFT}${conversationId}`);
|
||||
localStorage.removeItem(`${LocalStorageKeys.FILES_DRAFT}${conversationId}`);
|
||||
} else {
|
||||
localStorage.removeItem(`${LocalStorageKeys.TEXT_DRAFT}${Constants.NEW_CONVO}`);
|
||||
localStorage.removeItem(`${LocalStorageKeys.FILES_DRAFT}${Constants.NEW_CONVO}`);
|
||||
}
|
||||
};
|
||||
|
||||
type ChatHelpers = Pick<
|
||||
EventHandlerParams,
|
||||
| 'setMessages'
|
||||
| 'getMessages'
|
||||
| 'setConversation'
|
||||
| 'setIsSubmitting'
|
||||
| 'newConversation'
|
||||
| 'resetLatestMessage'
|
||||
>;
|
||||
|
||||
const MAX_RETRIES = 5;
|
||||
|
||||
/**
|
||||
* Hook for resumable SSE streams.
|
||||
* Separates generation start (POST) from stream subscription (GET EventSource).
|
||||
* Supports auto-reconnection with exponential backoff.
|
||||
*/
|
||||
export default function useResumableSSE(
|
||||
submission: TSubmission | null,
|
||||
chatHelpers: ChatHelpers,
|
||||
isAddedRequest = false,
|
||||
runIndex = 0,
|
||||
) {
|
||||
const genTitle = useGenTitleMutation();
|
||||
const setActiveRunId = useSetRecoilState(store.activeRunFamily(runIndex));
|
||||
|
||||
const { token, isAuthenticated } = useAuthContext();
|
||||
const [completed, setCompleted] = useState(new Set());
|
||||
const [streamId, setStreamId] = useState<string | null>(null);
|
||||
const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex));
|
||||
const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex));
|
||||
|
||||
const sseRef = useRef<SSE | null>(null);
|
||||
const reconnectAttemptRef = useRef(0);
|
||||
const reconnectTimeoutRef = useRef<NodeJS.Timeout | null>(null);
|
||||
const submissionRef = useRef<TSubmission | null>(null);
|
||||
|
||||
const {
|
||||
setMessages,
|
||||
getMessages,
|
||||
setConversation,
|
||||
setIsSubmitting,
|
||||
newConversation,
|
||||
resetLatestMessage,
|
||||
} = chatHelpers;
|
||||
|
||||
const {
|
||||
clearStepMaps,
|
||||
stepHandler,
|
||||
syncHandler,
|
||||
finalHandler,
|
||||
errorHandler,
|
||||
messageHandler,
|
||||
contentHandler,
|
||||
createdHandler,
|
||||
attachmentHandler,
|
||||
abortConversation,
|
||||
} = useEventHandlers({
|
||||
genTitle,
|
||||
setMessages,
|
||||
getMessages,
|
||||
setCompleted,
|
||||
isAddedRequest,
|
||||
setConversation,
|
||||
setIsSubmitting,
|
||||
newConversation,
|
||||
setShowStopButton,
|
||||
resetLatestMessage,
|
||||
});
|
||||
|
||||
const { data: startupConfig } = useGetStartupConfig();
|
||||
const balanceQuery = useGetUserBalance({
|
||||
enabled: !!isAuthenticated && startupConfig?.balance?.enabled,
|
||||
});
|
||||
|
||||
/**
|
||||
* Subscribe to stream via SSE library (supports custom headers)
|
||||
*/
|
||||
const subscribeToStream = useCallback(
|
||||
(currentStreamId: string, currentSubmission: TSubmission) => {
|
||||
let { userMessage } = currentSubmission;
|
||||
let textIndex: number | null = null;
|
||||
|
||||
const url = `/api/agents/chat/stream/${encodeURIComponent(currentStreamId)}`;
|
||||
console.log('[ResumableSSE] Subscribing to stream:', url);
|
||||
|
||||
const sse = new SSE(url, {
|
||||
headers: { Authorization: `Bearer ${token}` },
|
||||
method: 'GET',
|
||||
});
|
||||
sseRef.current = sse;
|
||||
|
||||
sse.addEventListener('open', () => {
|
||||
console.log('[ResumableSSE] Stream connected');
|
||||
setAbortScroll(false);
|
||||
setShowStopButton(true);
|
||||
reconnectAttemptRef.current = 0;
|
||||
});
|
||||
|
||||
sse.addEventListener('message', (e: MessageEvent) => {
|
||||
try {
|
||||
const data = JSON.parse(e.data);
|
||||
|
||||
if (data.final != null) {
|
||||
clearDraft(currentSubmission.conversation?.conversationId);
|
||||
try {
|
||||
finalHandler(data, currentSubmission as EventSubmission);
|
||||
} catch (error) {
|
||||
console.error('[ResumableSSE] Error in finalHandler:', error);
|
||||
setIsSubmitting(false);
|
||||
setShowStopButton(false);
|
||||
}
|
||||
(startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch();
|
||||
sse.close();
|
||||
setStreamId(null);
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.created != null) {
|
||||
const runId = v4();
|
||||
setActiveRunId(runId);
|
||||
userMessage = {
|
||||
...userMessage,
|
||||
...data.message,
|
||||
overrideParentMessageId: userMessage.overrideParentMessageId,
|
||||
};
|
||||
createdHandler(data, { ...currentSubmission, userMessage } as EventSubmission);
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.event === 'attachment' && data.data) {
|
||||
attachmentHandler({
|
||||
data: data.data,
|
||||
submission: currentSubmission as EventSubmission,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.event != null) {
|
||||
stepHandler(data, { ...currentSubmission, userMessage } as EventSubmission);
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.sync != null) {
|
||||
const runId = v4();
|
||||
setActiveRunId(runId);
|
||||
syncHandler(data, { ...currentSubmission, userMessage } as EventSubmission);
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.type != null) {
|
||||
const { text, index } = data;
|
||||
if (text != null && index !== textIndex) {
|
||||
textIndex = index;
|
||||
}
|
||||
contentHandler({ data, submission: currentSubmission as EventSubmission });
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.message != null) {
|
||||
const text = data.text ?? data.response;
|
||||
const initialResponse = {
|
||||
...(currentSubmission.initialResponse as TMessage),
|
||||
parentMessageId: data.parentMessageId,
|
||||
messageId: data.messageId,
|
||||
};
|
||||
messageHandler(text, { ...currentSubmission, userMessage, initialResponse });
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[ResumableSSE] Error processing message:', error);
|
||||
}
|
||||
});
|
||||
|
||||
// Handle cancel event (triggered when stop button is clicked)
|
||||
sse.addEventListener('cancel', async () => {
|
||||
console.log('[ResumableSSE] Cancel requested, aborting job');
|
||||
sse.close();
|
||||
|
||||
// Call abort endpoint to stop backend generation
|
||||
try {
|
||||
await fetch('/api/agents/chat/abort', {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
Authorization: `Bearer ${token}`,
|
||||
},
|
||||
body: JSON.stringify({ streamId: currentStreamId }),
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('[ResumableSSE] Error aborting job:', error);
|
||||
}
|
||||
|
||||
// Handle UI cleanup via abortConversation
|
||||
const latestMessages = getMessages();
|
||||
const conversationId = latestMessages?.[latestMessages.length - 1]?.conversationId;
|
||||
try {
|
||||
await abortConversation(
|
||||
conversationId ??
|
||||
userMessage.conversationId ??
|
||||
currentSubmission.conversation?.conversationId ??
|
||||
'',
|
||||
currentSubmission as EventSubmission,
|
||||
latestMessages,
|
||||
);
|
||||
} catch (error) {
|
||||
console.error('[ResumableSSE] Error during abort:', error);
|
||||
setIsSubmitting(false);
|
||||
setShowStopButton(false);
|
||||
}
|
||||
setStreamId(null);
|
||||
});
|
||||
|
||||
sse.addEventListener('error', async (e: MessageEvent) => {
|
||||
console.log('[ResumableSSE] Stream error, connection closed');
|
||||
sse.close();
|
||||
|
||||
// Check for 401 and try to refresh token
|
||||
/* @ts-ignore */
|
||||
if (e.responseCode === 401) {
|
||||
try {
|
||||
const refreshResponse = await request.refreshToken();
|
||||
const newToken = refreshResponse?.token ?? '';
|
||||
if (newToken) {
|
||||
request.dispatchTokenUpdatedEvent(newToken);
|
||||
// Retry with new token
|
||||
if (submissionRef.current) {
|
||||
subscribeToStream(currentStreamId, submissionRef.current);
|
||||
}
|
||||
return;
|
||||
}
|
||||
} catch (error) {
|
||||
console.log('[ResumableSSE] Token refresh failed:', error);
|
||||
}
|
||||
}
|
||||
|
||||
if (reconnectAttemptRef.current < MAX_RETRIES) {
|
||||
reconnectAttemptRef.current++;
|
||||
const delay = Math.min(1000 * Math.pow(2, reconnectAttemptRef.current - 1), 30000);
|
||||
|
||||
console.log(
|
||||
`[ResumableSSE] Reconnecting in ${delay}ms (attempt ${reconnectAttemptRef.current}/${MAX_RETRIES})`,
|
||||
);
|
||||
|
||||
reconnectTimeoutRef.current = setTimeout(() => {
|
||||
if (submissionRef.current) {
|
||||
subscribeToStream(currentStreamId, submissionRef.current);
|
||||
}
|
||||
}, delay);
|
||||
} else {
|
||||
console.error('[ResumableSSE] Max reconnect attempts reached');
|
||||
errorHandler({ data: undefined, submission: currentSubmission as EventSubmission });
|
||||
setIsSubmitting(false);
|
||||
setShowStopButton(false);
|
||||
setStreamId(null);
|
||||
}
|
||||
});
|
||||
|
||||
// Start the SSE connection
|
||||
sse.stream();
|
||||
},
|
||||
[
|
||||
token,
|
||||
setAbortScroll,
|
||||
setActiveRunId,
|
||||
setShowStopButton,
|
||||
finalHandler,
|
||||
createdHandler,
|
||||
attachmentHandler,
|
||||
stepHandler,
|
||||
syncHandler,
|
||||
contentHandler,
|
||||
messageHandler,
|
||||
errorHandler,
|
||||
setIsSubmitting,
|
||||
startupConfig?.balance?.enabled,
|
||||
balanceQuery,
|
||||
abortConversation,
|
||||
getMessages,
|
||||
],
|
||||
);
|
||||
|
||||
/**
|
||||
* Start generation (POST request that returns streamId)
|
||||
*/
|
||||
const startGeneration = useCallback(
|
||||
async (currentSubmission: TSubmission): Promise<string | null> => {
|
||||
const payloadData = createPayload(currentSubmission);
|
||||
let { payload } = payloadData;
|
||||
payload = removeNullishValues(payload) as TPayload;
|
||||
|
||||
clearStepMaps();
|
||||
|
||||
const url = payloadData.server.includes('?')
|
||||
? `${payloadData.server}&resumable=true`
|
||||
: `${payloadData.server}?resumable=true`;
|
||||
|
||||
try {
|
||||
const response = await fetch(url, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
Authorization: `Bearer ${token}`,
|
||||
},
|
||||
body: JSON.stringify(payload),
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
const errorData = await response.json().catch(() => ({}));
|
||||
throw new Error(errorData.error || `Failed to start generation: ${response.statusText}`);
|
||||
}
|
||||
|
||||
const { streamId: newStreamId } = await response.json();
|
||||
console.log('[ResumableSSE] Generation started:', { streamId: newStreamId });
|
||||
|
||||
return newStreamId;
|
||||
} catch (error) {
|
||||
console.error('[ResumableSSE] Error starting generation:', error);
|
||||
errorHandler({ data: undefined, submission: currentSubmission as EventSubmission });
|
||||
setIsSubmitting(false);
|
||||
return null;
|
||||
}
|
||||
},
|
||||
[token, clearStepMaps, errorHandler, setIsSubmitting],
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
if (!submission || Object.keys(submission).length === 0) {
|
||||
if (reconnectTimeoutRef.current) {
|
||||
clearTimeout(reconnectTimeoutRef.current);
|
||||
reconnectTimeoutRef.current = null;
|
||||
}
|
||||
if (sseRef.current) {
|
||||
sseRef.current.close();
|
||||
sseRef.current = null;
|
||||
}
|
||||
setStreamId(null);
|
||||
reconnectAttemptRef.current = 0;
|
||||
submissionRef.current = null;
|
||||
return;
|
||||
}
|
||||
|
||||
submissionRef.current = submission;
|
||||
|
||||
const initStream = async () => {
|
||||
setIsSubmitting(true);
|
||||
|
||||
const newStreamId = await startGeneration(submission);
|
||||
if (newStreamId) {
|
||||
setStreamId(newStreamId);
|
||||
subscribeToStream(newStreamId, submission);
|
||||
}
|
||||
};
|
||||
|
||||
initStream();
|
||||
|
||||
return () => {
|
||||
if (reconnectTimeoutRef.current) {
|
||||
clearTimeout(reconnectTimeoutRef.current);
|
||||
reconnectTimeoutRef.current = null;
|
||||
}
|
||||
if (sseRef.current) {
|
||||
const isCancelled = sseRef.current.readyState <= 1;
|
||||
sseRef.current.close();
|
||||
if (isCancelled) {
|
||||
// Dispatch cancel event to trigger abort
|
||||
const e = new Event('cancel');
|
||||
/* @ts-ignore */
|
||||
sseRef.current.dispatchEvent(e);
|
||||
}
|
||||
sseRef.current = null;
|
||||
}
|
||||
};
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [submission]);
|
||||
|
||||
return { streamId };
|
||||
}
|
||||
|
|
@ -490,6 +490,7 @@
|
|||
"com_nav_info_save_draft": "When enabled, the text and attachments you enter in the chat form will be automatically saved locally as drafts. These drafts will be available even if you reload the page or switch to a different conversation. Drafts are stored locally on your device and are deleted once the message is sent.",
|
||||
"com_nav_info_show_thinking": "When enabled, the chat will display the thinking dropdowns open by default, allowing you to view the AI's reasoning in real-time. When disabled, the thinking dropdowns will remain closed by default for a cleaner and more streamlined interface",
|
||||
"com_nav_info_user_name_display": "When enabled, the username of the sender will be shown above each message you send. When disabled, you will only see \"You\" above your messages.",
|
||||
"com_nav_info_resumable_streams": "When enabled, LLM generation continues in the background even if your connection drops. You can reconnect and resume receiving the response without losing progress. This is useful for unstable connections or long responses.",
|
||||
"com_nav_keep_screen_awake": "Keep screen awake during response generation",
|
||||
"com_nav_lang_arabic": "العربية",
|
||||
"com_nav_lang_armenian": "Հայերեն",
|
||||
|
|
@ -548,6 +549,7 @@
|
|||
"com_nav_plus_command": "+-Command",
|
||||
"com_nav_plus_command_description": "Toggle command \"+\" for adding a multi-response setting",
|
||||
"com_nav_profile_picture": "Profile Picture",
|
||||
"com_nav_resumable_streams": "Resumable Streams (Beta)",
|
||||
"com_nav_save_badges_state": "Save badges state",
|
||||
"com_nav_save_drafts": "Save drafts locally",
|
||||
"com_nav_scroll_button": "Scroll to the end button",
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ const localStorageAtoms = {
|
|||
LaTeXParsing: atomWithLocalStorage('LaTeXParsing', true),
|
||||
centerFormOnLanding: atomWithLocalStorage('centerFormOnLanding', true),
|
||||
showFooter: atomWithLocalStorage('showFooter', true),
|
||||
resumableStreams: atomWithLocalStorage('resumableStreams', true),
|
||||
|
||||
// Commands settings
|
||||
atCommand: atomWithLocalStorage('atCommand', true),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue