From df82f2e9b221e21ad8a02de52019927c445c780b Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Thu, 26 Mar 2026 12:27:31 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=8F=81=20fix:=20Invalidate=20Message=20Ca?= =?UTF-8?q?che=20on=20Stream=20404=20Instead=20of=20Showing=20Error=20(#12?= =?UTF-8?q?411)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: Invalidate message cache on STREAM_EXPIRED instead of showing error When a 404 (stream expired) is received during SSE resume, the generation has already completed and messages are persisted in the database. Instead of injecting an error message into the cache, invalidate the messages query so react-query refetches from the DB. Also clear stale stream status cache and step maps to prevent retries and memory leaks. * fix: Mark conversation as processed when no active job found Prevents useResumeOnLoad from repeatedly re-checking the same conversation when the stream status returns inactive. The ref still resets on conversation change, so navigating away and back will correctly re-check. Also wait for background refetches to settle (isFetching) before acting on inactive status, preventing stale cached active:false from suppressing a valid resume. * test: Update useResumableSSE spec for cache invalidation on 404 Verify message cache invalidation, stream status removal, clearStepMaps, and setIsSubmitting(false) on the 404 path. * fix: Resolve lint warnings from CI Remove unused ErrorTypes import in test, add queryClient to useCallback dependency array in useResumableSSE. * Reorder import statements in useResumableSSE.ts --- .../SSE/__tests__/useResumableSSE.spec.ts | 37 ++++++++++++------- client/src/hooks/SSE/useResumableSSE.ts | 28 +++++++++----- client/src/hooks/SSE/useResumeOnLoad.ts | 18 +++++---- 3 files changed, 53 insertions(+), 30 deletions(-) diff --git a/client/src/hooks/SSE/__tests__/useResumableSSE.spec.ts b/client/src/hooks/SSE/__tests__/useResumableSSE.spec.ts index 9100f39858..1717d27c22 100644 --- a/client/src/hooks/SSE/__tests__/useResumableSSE.spec.ts +++ b/client/src/hooks/SSE/__tests__/useResumableSSE.spec.ts @@ -1,5 +1,5 @@ import { renderHook, act } from '@testing-library/react'; -import { Constants, ErrorTypes, LocalStorageKeys } from 'librechat-data-provider'; +import { Constants, LocalStorageKeys } from 'librechat-data-provider'; import type { TSubmission } from 'librechat-data-provider'; type SSEEventListener = (e: Partial & { responseCode?: number }) => void; @@ -34,7 +34,13 @@ jest.mock('sse.js', () => ({ })); const mockSetQueryData = jest.fn(); -const mockQueryClient = { setQueryData: mockSetQueryData }; +const mockInvalidateQueries = jest.fn(); +const mockRemoveQueries = jest.fn(); +const mockQueryClient = { + setQueryData: mockSetQueryData, + invalidateQueries: mockInvalidateQueries, + removeQueries: mockRemoveQueries, +}; jest.mock('@tanstack/react-query', () => ({ ...jest.requireActual('@tanstack/react-query'), @@ -63,6 +69,7 @@ jest.mock('~/data-provider', () => ({ useGetStartupConfig: () => ({ data: { balance: { enabled: false } } }), useGetUserBalance: () => ({ refetch: jest.fn() }), queueTitleGeneration: jest.fn(), + streamStatusQueryKey: (conversationId: string) => ['streamStatus', conversationId], })); const mockErrorHandler = jest.fn(); @@ -162,6 +169,11 @@ describe('useResumableSSE - 404 error path', () => { beforeEach(() => { mockSSEInstances.length = 0; localStorage.clear(); + mockErrorHandler.mockClear(); + mockClearStepMaps.mockClear(); + mockSetIsSubmitting.mockClear(); + mockInvalidateQueries.mockClear(); + mockRemoveQueries.mockClear(); }); const seedDraft = (conversationId: string) => { @@ -200,19 +212,18 @@ describe('useResumableSSE - 404 error path', () => { unmount(); }); - it('calls errorHandler with STREAM_EXPIRED error type on 404', async () => { + it('invalidates message cache and clears stream status on 404 instead of showing error', async () => { const { unmount } = await render404Scenario(CONV_ID); - expect(mockErrorHandler).toHaveBeenCalledTimes(1); - const call = mockErrorHandler.mock.calls[0][0]; - expect(call.data).toBeDefined(); - const parsed = JSON.parse(call.data.text); - expect(parsed.type).toBe(ErrorTypes.STREAM_EXPIRED); - expect(call.submission).toEqual( - expect.objectContaining({ - conversation: expect.objectContaining({ conversationId: CONV_ID }), - }), - ); + expect(mockErrorHandler).not.toHaveBeenCalled(); + expect(mockInvalidateQueries).toHaveBeenCalledWith({ + queryKey: ['messages', CONV_ID], + }); + expect(mockRemoveQueries).toHaveBeenCalledWith({ + queryKey: ['streamStatus', CONV_ID], + }); + expect(mockClearStepMaps).toHaveBeenCalled(); + expect(mockSetIsSubmitting).toHaveBeenCalledWith(false); unmount(); }); diff --git a/client/src/hooks/SSE/useResumableSSE.ts b/client/src/hooks/SSE/useResumableSSE.ts index 32820f8392..39dc610dae 100644 --- a/client/src/hooks/SSE/useResumableSSE.ts +++ b/client/src/hooks/SSE/useResumableSSE.ts @@ -16,7 +16,12 @@ import { } from 'librechat-data-provider'; import type { TMessage, TPayload, TSubmission, EventSubmission } from 'librechat-data-provider'; import type { EventHandlerParams } from './useEventHandlers'; -import { useGetStartupConfig, useGetUserBalance, queueTitleGeneration } from '~/data-provider'; +import { + useGetUserBalance, + useGetStartupConfig, + queueTitleGeneration, + streamStatusQueryKey, +} from '~/data-provider'; import type { ActiveJobsResponse } from '~/data-provider'; import { useAuthContext } from '~/hooks/AuthContext'; import useEventHandlers from './useEventHandlers'; @@ -343,18 +348,20 @@ export default function useResumableSSE( /* @ts-ignore - sse.js types don't expose responseCode */ const responseCode = e.responseCode; - // 404 means job doesn't exist (completed/deleted) - don't retry + // 404 → job completed & was cleaned up; messages are persisted in DB. + // Invalidate cache once so react-query refetches instead of showing an error. if (responseCode === 404) { - console.log('[ResumableSSE] Stream not found (404) - job completed or expired'); + const convoId = currentSubmission.conversation?.conversationId; + console.log('[ResumableSSE] Stream 404, invalidating messages for:', convoId); sse.close(); removeActiveJob(currentStreamId); - clearAllDrafts(currentSubmission.conversation?.conversationId); - errorHandler({ - data: { - text: JSON.stringify({ type: ErrorTypes.STREAM_EXPIRED }), - } as unknown as Parameters[0]['data'], - submission: currentSubmission as EventSubmission, - }); + clearAllDrafts(convoId); + clearStepMaps(); + if (convoId) { + queryClient.invalidateQueries({ queryKey: [QueryKeys.messages, convoId] }); + queryClient.removeQueries({ queryKey: streamStatusQueryKey(convoId) }); + } + setIsSubmitting(false); setShowStopButton(false); setStreamId(null); reconnectAttemptRef.current = 0; @@ -544,6 +551,7 @@ export default function useResumableSSE( startupConfig?.balance?.enabled, balanceQuery, removeActiveJob, + queryClient, ], ); diff --git a/client/src/hooks/SSE/useResumeOnLoad.ts b/client/src/hooks/SSE/useResumeOnLoad.ts index f09751db0e..5f0f691787 100644 --- a/client/src/hooks/SSE/useResumeOnLoad.ts +++ b/client/src/hooks/SSE/useResumeOnLoad.ts @@ -125,7 +125,11 @@ export default function useResumeOnLoad( conversationId !== Constants.NEW_CONVO && processedConvoRef.current !== conversationId; // Don't re-check processed convos - const { data: streamStatus, isSuccess } = useStreamStatus(conversationId, shouldCheck); + const { + data: streamStatus, + isSuccess, + isFetching, + } = useStreamStatus(conversationId, shouldCheck); useEffect(() => { console.log('[ResumeOnLoad] Effect check', { @@ -135,6 +139,7 @@ export default function useResumeOnLoad( hasCurrentSubmission: !!currentSubmission, currentSubmissionConvoId: currentSubmission?.conversation?.conversationId, isSuccess, + isFetching, streamStatusActive: streamStatus?.active, streamStatusStreamId: streamStatus?.streamId, processedConvoRef: processedConvoRef.current, @@ -171,8 +176,9 @@ export default function useResumeOnLoad( ); } - // Wait for stream status query to complete - if (!isSuccess || !streamStatus) { + // Wait for stream status query to complete (including background refetches + // that may replace a stale cached result with fresh data) + if (!isSuccess || !streamStatus || isFetching) { console.log('[ResumeOnLoad] Waiting for stream status query'); return; } @@ -183,15 +189,12 @@ export default function useResumeOnLoad( return; } - // Check if there's an active job to resume - // DON'T mark as processed here - only mark when we actually create a submission - // This prevents stale cache data from blocking subsequent resume attempts if (!streamStatus.active || !streamStatus.streamId) { console.log('[ResumeOnLoad] No active job to resume for:', conversationId); + processedConvoRef.current = conversationId; return; } - // Mark as processed NOW - we verified there's an active job and will create submission processedConvoRef.current = conversationId; console.log('[ResumeOnLoad] Found active job, creating submission...', { @@ -241,6 +244,7 @@ export default function useResumeOnLoad( submissionConvoId, currentSubmission, isSuccess, + isFetching, streamStatus, getMessages, setSubmission,