import { useEffect, useState, useRef, useCallback } from 'react'; import { v4 } from 'uuid'; import { SSE } from 'sse.js'; import { useSetRecoilState } from 'recoil'; import { request, Constants, createPayload, LocalStorageKeys, removeNullishValues, } from 'librechat-data-provider'; import type { TMessage, TPayload, TSubmission, EventSubmission } from 'librechat-data-provider'; import type { EventHandlerParams } from './useEventHandlers'; import { useGenTitleMutation, useGetStartupConfig, useGetUserBalance } from '~/data-provider'; import { useAuthContext } from '~/hooks/AuthContext'; import useEventHandlers from './useEventHandlers'; import store from '~/store'; const clearDraft = (conversationId?: string | null) => { if (conversationId) { localStorage.removeItem(`${LocalStorageKeys.TEXT_DRAFT}${conversationId}`); localStorage.removeItem(`${LocalStorageKeys.FILES_DRAFT}${conversationId}`); } else { localStorage.removeItem(`${LocalStorageKeys.TEXT_DRAFT}${Constants.NEW_CONVO}`); localStorage.removeItem(`${LocalStorageKeys.FILES_DRAFT}${Constants.NEW_CONVO}`); } }; type ChatHelpers = Pick< EventHandlerParams, | 'setMessages' | 'getMessages' | 'setConversation' | 'setIsSubmitting' | 'newConversation' | 'resetLatestMessage' >; const MAX_RETRIES = 5; /** * Hook for resumable SSE streams. * Separates generation start (POST) from stream subscription (GET EventSource). * Supports auto-reconnection with exponential backoff. * * Key behavior: * - Navigation away does NOT abort the generation (just closes SSE) * - Only explicit abort (via stop button → backend abort endpoint) stops generation * - Backend emits `done` event with `aborted: true` on abort, handled via finalHandler */ export default function useResumableSSE( submission: TSubmission | null, chatHelpers: ChatHelpers, isAddedRequest = false, runIndex = 0, ) { const genTitle = useGenTitleMutation(); const setActiveRunId = useSetRecoilState(store.activeRunFamily(runIndex)); const { token, isAuthenticated } = useAuthContext(); const [_completed, setCompleted] = useState(new Set()); const [streamId, setStreamId] = useState(null); const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex)); const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex)); const sseRef = useRef(null); const reconnectAttemptRef = useRef(0); const reconnectTimeoutRef = useRef(null); const submissionRef = useRef(null); const { setMessages, getMessages, setConversation, setIsSubmitting, newConversation, resetLatestMessage, } = chatHelpers; const { stepHandler, finalHandler, errorHandler, clearStepMaps, messageHandler, contentHandler, createdHandler, syncStepMessage, attachmentHandler, resetContentHandler, } = useEventHandlers({ genTitle, setMessages, getMessages, setCompleted, isAddedRequest, setConversation, setIsSubmitting, newConversation, setShowStopButton, resetLatestMessage, }); const { data: startupConfig } = useGetStartupConfig(); const balanceQuery = useGetUserBalance({ enabled: !!isAuthenticated && startupConfig?.balance?.enabled, }); /** * Subscribe to stream via SSE library (supports custom headers) * Follows same auth pattern as useSSE * @param isResume - If true, adds ?resume=true to trigger sync event from server */ const subscribeToStream = useCallback( (currentStreamId: string, currentSubmission: TSubmission, isResume = false) => { let { userMessage } = currentSubmission; let textIndex: number | null = null; const baseUrl = `/api/agents/chat/stream/${encodeURIComponent(currentStreamId)}`; const url = isResume ? `${baseUrl}?resume=true` : baseUrl; console.log('[ResumableSSE] Subscribing to stream:', url, { isResume }); const sse = new SSE(url, { headers: { Authorization: `Bearer ${token}` }, method: 'GET', }); sseRef.current = sse; sse.addEventListener('open', () => { console.log('[ResumableSSE] Stream connected'); setAbortScroll(false); // Restore UI state on successful connection (including reconnection) setIsSubmitting(true); setShowStopButton(true); reconnectAttemptRef.current = 0; }); sse.addEventListener('message', (e: MessageEvent) => { try { const data = JSON.parse(e.data); if (data.final != null) { console.log('[ResumableSSE] Received FINAL event', { aborted: data.aborted, conversationId: data.conversation?.conversationId, hasResponseMessage: !!data.responseMessage, }); clearDraft(currentSubmission.conversation?.conversationId); try { finalHandler(data, currentSubmission as EventSubmission); } catch (error) { console.error('[ResumableSSE] Error in finalHandler:', error); setIsSubmitting(false); setShowStopButton(false); } // Clear handler maps on stream completion to prevent memory leaks clearStepMaps(); (startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch(); sse.close(); setStreamId(null); return; } if (data.created != null) { console.log('[ResumableSSE] Received CREATED event', { messageId: data.message?.messageId, conversationId: data.message?.conversationId, }); const runId = v4(); setActiveRunId(runId); userMessage = { ...userMessage, ...data.message, overrideParentMessageId: userMessage.overrideParentMessageId, }; createdHandler(data, { ...currentSubmission, userMessage } as EventSubmission); return; } if (data.event === 'attachment' && data.data) { attachmentHandler({ data: data.data, submission: currentSubmission as EventSubmission, }); return; } if (data.event != null) { stepHandler(data, { ...currentSubmission, userMessage } as EventSubmission); return; } if (data.sync != null) { console.log('[ResumableSSE] SYNC received', { runSteps: data.resumeState?.runSteps?.length ?? 0, }); const runId = v4(); setActiveRunId(runId); // Replay run steps if (data.resumeState?.runSteps) { for (const runStep of data.resumeState.runSteps) { stepHandler({ event: 'on_run_step', data: runStep }, { ...currentSubmission, userMessage, } as EventSubmission); } } // Set message content from aggregatedContent if (data.resumeState?.aggregatedContent && userMessage?.messageId) { const messages = getMessages() ?? []; const userMsgId = userMessage.messageId; const serverResponseId = data.resumeState.responseMessageId; // Find the EXACT response message - prioritize responseMessageId from server // This is critical when there are multiple responses to the same user message let responseIdx = -1; if (serverResponseId) { responseIdx = messages.findIndex((m) => m.messageId === serverResponseId); } // Fallback: find by parentMessageId pattern (for new messages) if (responseIdx < 0) { responseIdx = messages.findIndex( (m) => !m.isCreatedByUser && (m.messageId === `${userMsgId}_` || m.parentMessageId === userMsgId), ); } console.log('[ResumableSSE] SYNC update', { userMsgId, serverResponseId, responseIdx, foundMessageId: responseIdx >= 0 ? messages[responseIdx]?.messageId : null, messagesCount: messages.length, aggregatedContentLength: data.resumeState.aggregatedContent?.length, }); if (responseIdx >= 0) { // Update existing response message with aggregatedContent const updated = [...messages]; const oldContent = updated[responseIdx]?.content; updated[responseIdx] = { ...updated[responseIdx], content: data.resumeState.aggregatedContent, }; console.log('[ResumableSSE] SYNC updating message', { messageId: updated[responseIdx]?.messageId, oldContentLength: Array.isArray(oldContent) ? oldContent.length : 0, newContentLength: data.resumeState.aggregatedContent?.length, }); setMessages(updated); // Sync both content handler and step handler with the updated message // so subsequent deltas build on synced content, not stale content resetContentHandler(); syncStepMessage(updated[responseIdx]); console.log('[ResumableSSE] SYNC complete, handlers synced'); } else { // Add new response message const responseId = serverResponseId ?? `${userMsgId}_`; setMessages([ ...messages, { messageId: responseId, parentMessageId: userMsgId, conversationId: currentSubmission.conversation?.conversationId ?? '', text: '', content: data.resumeState.aggregatedContent, isCreatedByUser: false, } as TMessage, ]); } } setShowStopButton(true); return; } if (data.type != null) { const { text, index } = data; if (text != null && index !== textIndex) { textIndex = index; } contentHandler({ data, submission: currentSubmission as EventSubmission }); return; } if (data.message != null) { const text = data.text ?? data.response; const initialResponse = { ...(currentSubmission.initialResponse as TMessage), parentMessageId: data.parentMessageId, messageId: data.messageId, }; messageHandler(text, { ...currentSubmission, userMessage, initialResponse }); } } catch (error) { console.error('[ResumableSSE] Error processing message:', error); } }); /** * Error event - fired on actual network failures (non-200, connection lost, etc.) * This should trigger reconnection with exponential backoff. */ sse.addEventListener('error', async (e: MessageEvent) => { console.log('[ResumableSSE] Stream error (network failure) - will attempt reconnect'); (startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch(); // Check for 401 and try to refresh token (same pattern as useSSE) /* @ts-ignore */ if (e.responseCode === 401) { try { const refreshResponse = await request.refreshToken(); const newToken = refreshResponse?.token ?? ''; if (!newToken) { throw new Error('Token refresh failed.'); } // Update headers on same SSE instance and retry (like useSSE) sse.headers = { Authorization: `Bearer ${newToken}`, }; request.dispatchTokenUpdatedEvent(newToken); sse.stream(); return; } catch (error) { console.log('[ResumableSSE] Token refresh failed:', error); } } sse.close(); if (reconnectAttemptRef.current < MAX_RETRIES) { reconnectAttemptRef.current++; const delay = Math.min(1000 * Math.pow(2, reconnectAttemptRef.current - 1), 30000); console.log( `[ResumableSSE] Reconnecting in ${delay}ms (attempt ${reconnectAttemptRef.current}/${MAX_RETRIES})`, ); reconnectTimeoutRef.current = setTimeout(() => { if (submissionRef.current) { // Reconnect with isResume=true to get sync event with any missed content subscribeToStream(currentStreamId, submissionRef.current, true); } }, delay); // Keep UI in "submitting" state during reconnection attempts // so user knows we're still trying (abort handler may have reset these) setIsSubmitting(true); setShowStopButton(true); } else { console.error('[ResumableSSE] Max reconnect attempts reached'); errorHandler({ data: undefined, submission: currentSubmission as EventSubmission }); setIsSubmitting(false); setShowStopButton(false); setStreamId(null); } }); /** * Abort event - fired when sse.close() is called (intentional close). * This happens on cleanup/navigation. Do NOT reconnect, just reset UI. * The backend stream continues running - useResumeOnLoad will restore if user returns. */ sse.addEventListener('abort', () => { console.log('[ResumableSSE] Stream aborted (intentional close) - no reconnect'); // Clear any pending reconnect attempts if (reconnectTimeoutRef.current) { clearTimeout(reconnectTimeoutRef.current); reconnectTimeoutRef.current = null; } reconnectAttemptRef.current = 0; // Reset UI state - useResumeOnLoad will restore if user returns to this conversation setIsSubmitting(false); setShowStopButton(false); setStreamId(null); }); // Start the SSE connection sse.stream(); // Debug hooks for testing reconnection vs clean close behavior (dev only) if (import.meta.env.DEV) { const debugWindow = window as Window & { __sse?: SSE; __killNetwork?: () => void; __closeClean?: () => void; }; debugWindow.__sse = sse; /** Simulate network drop - triggers error event → reconnection */ debugWindow.__killNetwork = () => { console.log('[Debug] Simulating network drop...'); // @ts-ignore - sse.js types are incorrect, dispatchEvent actually takes Event sse.dispatchEvent(new Event('error')); }; /** Simulate clean close (navigation away) - triggers abort event → no reconnection */ debugWindow.__closeClean = () => { console.log('[Debug] Simulating clean close (navigation away)...'); sse.close(); }; } }, [ token, setAbortScroll, setActiveRunId, setShowStopButton, finalHandler, createdHandler, attachmentHandler, stepHandler, contentHandler, resetContentHandler, syncStepMessage, clearStepMaps, messageHandler, errorHandler, setIsSubmitting, getMessages, setMessages, startupConfig?.balance?.enabled, balanceQuery, ], ); /** * Start generation (POST request that returns streamId) * Uses request.post which has axios interceptors for automatic token refresh. * Retries up to 3 times on network errors with exponential backoff. */ const startGeneration = useCallback( async (currentSubmission: TSubmission): Promise => { const payloadData = createPayload(currentSubmission); let { payload } = payloadData; payload = removeNullishValues(payload) as TPayload; clearStepMaps(); const url = payloadData.server.includes('?') ? `${payloadData.server}&resumable=true` : `${payloadData.server}?resumable=true`; const maxRetries = 3; let lastError: unknown = null; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { // Use request.post which handles auth token refresh via axios interceptors const data = (await request.post(url, payload)) as { streamId: string }; console.log('[ResumableSSE] Generation started:', { streamId: data.streamId }); return data.streamId; } catch (error) { lastError = error; // Check if it's a network error (retry) vs server error (don't retry) const isNetworkError = error instanceof Error && 'code' in error && (error.code === 'ERR_NETWORK' || error.code === 'ERR_INTERNET_DISCONNECTED'); if (isNetworkError && attempt < maxRetries) { const delay = Math.min(1000 * Math.pow(2, attempt - 1), 8000); console.log( `[ResumableSSE] Network error starting generation, retrying in ${delay}ms (attempt ${attempt}/${maxRetries})`, ); await new Promise((resolve) => setTimeout(resolve, delay)); continue; } // Don't retry: either not a network error or max retries reached break; } } // All retries failed or non-network error console.error('[ResumableSSE] Error starting generation:', lastError); errorHandler({ data: undefined, submission: currentSubmission as EventSubmission }); setIsSubmitting(false); return null; }, [clearStepMaps, errorHandler, setIsSubmitting], ); useEffect(() => { if (!submission || Object.keys(submission).length === 0) { console.log('[ResumableSSE] No submission, cleaning up'); // Clear reconnect timeout if submission is cleared if (reconnectTimeoutRef.current) { clearTimeout(reconnectTimeoutRef.current); reconnectTimeoutRef.current = null; } // Close SSE but do NOT dispatch cancel - navigation should not abort if (sseRef.current) { sseRef.current.close(); sseRef.current = null; } setStreamId(null); reconnectAttemptRef.current = 0; submissionRef.current = null; return; } const resumeStreamId = (submission as TSubmission & { resumeStreamId?: string }).resumeStreamId; console.log('[ResumableSSE] Effect triggered', { conversationId: submission.conversation?.conversationId, hasResumeStreamId: !!resumeStreamId, resumeStreamId, userMessageId: submission.userMessage?.messageId, }); submissionRef.current = submission; const initStream = async () => { setIsSubmitting(true); setShowStopButton(true); if (resumeStreamId) { // Resume: just subscribe to existing stream, don't start new generation console.log('[ResumableSSE] Resuming existing stream:', resumeStreamId); setStreamId(resumeStreamId); subscribeToStream(resumeStreamId, submission, true); // isResume=true } else { // New generation: start and then subscribe console.log('[ResumableSSE] Starting NEW generation'); const newStreamId = await startGeneration(submission); if (newStreamId) { setStreamId(newStreamId); subscribeToStream(newStreamId, submission); } else { console.error('[ResumableSSE] Failed to get streamId from startGeneration'); } } }; initStream(); return () => { console.log('[ResumableSSE] Cleanup - closing SSE, resetting UI state'); // Cleanup on unmount/navigation - close connection but DO NOT abort backend // Reset UI state so it doesn't leak to other conversations // If user returns to this conversation, useResumeOnLoad will restore the state if (reconnectTimeoutRef.current) { clearTimeout(reconnectTimeoutRef.current); reconnectTimeoutRef.current = null; } if (sseRef.current) { sseRef.current.close(); sseRef.current = null; } // Clear handler maps to prevent memory leaks and stale state clearStepMaps(); // Reset UI state on cleanup - useResumeOnLoad will restore if needed setIsSubmitting(false); setShowStopButton(false); }; // eslint-disable-next-line react-hooks/exhaustive-deps }, [submission]); return { streamId }; }