mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-04-03 06:17:21 +02:00
🏁 fix: Invalidate Message Cache on Stream 404 Instead of Showing Error (#12411)
* fix: Invalidate message cache on STREAM_EXPIRED instead of showing error When a 404 (stream expired) is received during SSE resume, the generation has already completed and messages are persisted in the database. Instead of injecting an error message into the cache, invalidate the messages query so react-query refetches from the DB. Also clear stale stream status cache and step maps to prevent retries and memory leaks. * fix: Mark conversation as processed when no active job found Prevents useResumeOnLoad from repeatedly re-checking the same conversation when the stream status returns inactive. The ref still resets on conversation change, so navigating away and back will correctly re-check. Also wait for background refetches to settle (isFetching) before acting on inactive status, preventing stale cached active:false from suppressing a valid resume. * test: Update useResumableSSE spec for cache invalidation on 404 Verify message cache invalidation, stream status removal, clearStepMaps, and setIsSubmitting(false) on the 404 path. * fix: Resolve lint warnings from CI Remove unused ErrorTypes import in test, add queryClient to useCallback dependency array in useResumableSSE. * Reorder import statements in useResumableSSE.ts
This commit is contained in:
parent
4b6d68b3b5
commit
df82f2e9b2
3 changed files with 53 additions and 30 deletions
|
|
@ -1,5 +1,5 @@
|
|||
import { renderHook, act } from '@testing-library/react';
|
||||
import { Constants, ErrorTypes, LocalStorageKeys } from 'librechat-data-provider';
|
||||
import { Constants, LocalStorageKeys } from 'librechat-data-provider';
|
||||
import type { TSubmission } from 'librechat-data-provider';
|
||||
|
||||
type SSEEventListener = (e: Partial<MessageEvent> & { responseCode?: number }) => void;
|
||||
|
|
@ -34,7 +34,13 @@ jest.mock('sse.js', () => ({
|
|||
}));
|
||||
|
||||
const mockSetQueryData = jest.fn();
|
||||
const mockQueryClient = { setQueryData: mockSetQueryData };
|
||||
const mockInvalidateQueries = jest.fn();
|
||||
const mockRemoveQueries = jest.fn();
|
||||
const mockQueryClient = {
|
||||
setQueryData: mockSetQueryData,
|
||||
invalidateQueries: mockInvalidateQueries,
|
||||
removeQueries: mockRemoveQueries,
|
||||
};
|
||||
|
||||
jest.mock('@tanstack/react-query', () => ({
|
||||
...jest.requireActual('@tanstack/react-query'),
|
||||
|
|
@ -63,6 +69,7 @@ jest.mock('~/data-provider', () => ({
|
|||
useGetStartupConfig: () => ({ data: { balance: { enabled: false } } }),
|
||||
useGetUserBalance: () => ({ refetch: jest.fn() }),
|
||||
queueTitleGeneration: jest.fn(),
|
||||
streamStatusQueryKey: (conversationId: string) => ['streamStatus', conversationId],
|
||||
}));
|
||||
|
||||
const mockErrorHandler = jest.fn();
|
||||
|
|
@ -162,6 +169,11 @@ describe('useResumableSSE - 404 error path', () => {
|
|||
beforeEach(() => {
|
||||
mockSSEInstances.length = 0;
|
||||
localStorage.clear();
|
||||
mockErrorHandler.mockClear();
|
||||
mockClearStepMaps.mockClear();
|
||||
mockSetIsSubmitting.mockClear();
|
||||
mockInvalidateQueries.mockClear();
|
||||
mockRemoveQueries.mockClear();
|
||||
});
|
||||
|
||||
const seedDraft = (conversationId: string) => {
|
||||
|
|
@ -200,19 +212,18 @@ describe('useResumableSSE - 404 error path', () => {
|
|||
unmount();
|
||||
});
|
||||
|
||||
it('calls errorHandler with STREAM_EXPIRED error type on 404', async () => {
|
||||
it('invalidates message cache and clears stream status on 404 instead of showing error', async () => {
|
||||
const { unmount } = await render404Scenario(CONV_ID);
|
||||
|
||||
expect(mockErrorHandler).toHaveBeenCalledTimes(1);
|
||||
const call = mockErrorHandler.mock.calls[0][0];
|
||||
expect(call.data).toBeDefined();
|
||||
const parsed = JSON.parse(call.data.text);
|
||||
expect(parsed.type).toBe(ErrorTypes.STREAM_EXPIRED);
|
||||
expect(call.submission).toEqual(
|
||||
expect.objectContaining({
|
||||
conversation: expect.objectContaining({ conversationId: CONV_ID }),
|
||||
}),
|
||||
);
|
||||
expect(mockErrorHandler).not.toHaveBeenCalled();
|
||||
expect(mockInvalidateQueries).toHaveBeenCalledWith({
|
||||
queryKey: ['messages', CONV_ID],
|
||||
});
|
||||
expect(mockRemoveQueries).toHaveBeenCalledWith({
|
||||
queryKey: ['streamStatus', CONV_ID],
|
||||
});
|
||||
expect(mockClearStepMaps).toHaveBeenCalled();
|
||||
expect(mockSetIsSubmitting).toHaveBeenCalledWith(false);
|
||||
unmount();
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,12 @@ import {
|
|||
} from 'librechat-data-provider';
|
||||
import type { TMessage, TPayload, TSubmission, EventSubmission } from 'librechat-data-provider';
|
||||
import type { EventHandlerParams } from './useEventHandlers';
|
||||
import { useGetStartupConfig, useGetUserBalance, queueTitleGeneration } from '~/data-provider';
|
||||
import {
|
||||
useGetUserBalance,
|
||||
useGetStartupConfig,
|
||||
queueTitleGeneration,
|
||||
streamStatusQueryKey,
|
||||
} from '~/data-provider';
|
||||
import type { ActiveJobsResponse } from '~/data-provider';
|
||||
import { useAuthContext } from '~/hooks/AuthContext';
|
||||
import useEventHandlers from './useEventHandlers';
|
||||
|
|
@ -343,18 +348,20 @@ export default function useResumableSSE(
|
|||
/* @ts-ignore - sse.js types don't expose responseCode */
|
||||
const responseCode = e.responseCode;
|
||||
|
||||
// 404 means job doesn't exist (completed/deleted) - don't retry
|
||||
// 404 → job completed & was cleaned up; messages are persisted in DB.
|
||||
// Invalidate cache once so react-query refetches instead of showing an error.
|
||||
if (responseCode === 404) {
|
||||
console.log('[ResumableSSE] Stream not found (404) - job completed or expired');
|
||||
const convoId = currentSubmission.conversation?.conversationId;
|
||||
console.log('[ResumableSSE] Stream 404, invalidating messages for:', convoId);
|
||||
sse.close();
|
||||
removeActiveJob(currentStreamId);
|
||||
clearAllDrafts(currentSubmission.conversation?.conversationId);
|
||||
errorHandler({
|
||||
data: {
|
||||
text: JSON.stringify({ type: ErrorTypes.STREAM_EXPIRED }),
|
||||
} as unknown as Parameters<typeof errorHandler>[0]['data'],
|
||||
submission: currentSubmission as EventSubmission,
|
||||
});
|
||||
clearAllDrafts(convoId);
|
||||
clearStepMaps();
|
||||
if (convoId) {
|
||||
queryClient.invalidateQueries({ queryKey: [QueryKeys.messages, convoId] });
|
||||
queryClient.removeQueries({ queryKey: streamStatusQueryKey(convoId) });
|
||||
}
|
||||
setIsSubmitting(false);
|
||||
setShowStopButton(false);
|
||||
setStreamId(null);
|
||||
reconnectAttemptRef.current = 0;
|
||||
|
|
@ -544,6 +551,7 @@ export default function useResumableSSE(
|
|||
startupConfig?.balance?.enabled,
|
||||
balanceQuery,
|
||||
removeActiveJob,
|
||||
queryClient,
|
||||
],
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -125,7 +125,11 @@ export default function useResumeOnLoad(
|
|||
conversationId !== Constants.NEW_CONVO &&
|
||||
processedConvoRef.current !== conversationId; // Don't re-check processed convos
|
||||
|
||||
const { data: streamStatus, isSuccess } = useStreamStatus(conversationId, shouldCheck);
|
||||
const {
|
||||
data: streamStatus,
|
||||
isSuccess,
|
||||
isFetching,
|
||||
} = useStreamStatus(conversationId, shouldCheck);
|
||||
|
||||
useEffect(() => {
|
||||
console.log('[ResumeOnLoad] Effect check', {
|
||||
|
|
@ -135,6 +139,7 @@ export default function useResumeOnLoad(
|
|||
hasCurrentSubmission: !!currentSubmission,
|
||||
currentSubmissionConvoId: currentSubmission?.conversation?.conversationId,
|
||||
isSuccess,
|
||||
isFetching,
|
||||
streamStatusActive: streamStatus?.active,
|
||||
streamStatusStreamId: streamStatus?.streamId,
|
||||
processedConvoRef: processedConvoRef.current,
|
||||
|
|
@ -171,8 +176,9 @@ export default function useResumeOnLoad(
|
|||
);
|
||||
}
|
||||
|
||||
// Wait for stream status query to complete
|
||||
if (!isSuccess || !streamStatus) {
|
||||
// Wait for stream status query to complete (including background refetches
|
||||
// that may replace a stale cached result with fresh data)
|
||||
if (!isSuccess || !streamStatus || isFetching) {
|
||||
console.log('[ResumeOnLoad] Waiting for stream status query');
|
||||
return;
|
||||
}
|
||||
|
|
@ -183,15 +189,12 @@ export default function useResumeOnLoad(
|
|||
return;
|
||||
}
|
||||
|
||||
// Check if there's an active job to resume
|
||||
// DON'T mark as processed here - only mark when we actually create a submission
|
||||
// This prevents stale cache data from blocking subsequent resume attempts
|
||||
if (!streamStatus.active || !streamStatus.streamId) {
|
||||
console.log('[ResumeOnLoad] No active job to resume for:', conversationId);
|
||||
processedConvoRef.current = conversationId;
|
||||
return;
|
||||
}
|
||||
|
||||
// Mark as processed NOW - we verified there's an active job and will create submission
|
||||
processedConvoRef.current = conversationId;
|
||||
|
||||
console.log('[ResumeOnLoad] Found active job, creating submission...', {
|
||||
|
|
@ -241,6 +244,7 @@ export default function useResumeOnLoad(
|
|||
submissionConvoId,
|
||||
currentSubmission,
|
||||
isSuccess,
|
||||
isFetching,
|
||||
streamStatus,
|
||||
getMessages,
|
||||
setSubmission,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue