feat: Add active job management for user and show progress in conversation list

- Implemented a new endpoint to retrieve active generation job IDs for the current user, enhancing user experience by allowing visibility of ongoing tasks.
- Integrated active job tracking in the Conversations component, displaying generation indicators based on active jobs.
- Optimized job management in the GenerationJobManager and InMemoryJobStore to support user-specific job queries, ensuring efficient resource handling and cleanup.
- Updated relevant components and hooks to utilize the new active jobs feature, improving overall application responsiveness and user feedback.
This commit is contained in:
Danny Avila 2025-12-17 12:22:51 -05:00
parent c69bb0b14d
commit 5a2aafb7d0
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
11 changed files with 279 additions and 20 deletions

View file

@ -2,6 +2,7 @@ import { useEffect, useState, useRef, useCallback } from 'react';
import { v4 } from 'uuid';
import { SSE } from 'sse.js';
import { useSetRecoilState } from 'recoil';
import { useQueryClient } from '@tanstack/react-query';
import {
request,
Constants,
@ -12,10 +13,16 @@ import {
import type { TMessage, TPayload, TSubmission, EventSubmission } from 'librechat-data-provider';
import type { EventHandlerParams } from './useEventHandlers';
import { useGenTitleMutation, useGetStartupConfig, useGetUserBalance } from '~/data-provider';
import { activeJobsQueryKey } from '~/data-provider/SSE/queries';
import { useAuthContext } from '~/hooks/AuthContext';
import useEventHandlers from './useEventHandlers';
import store from '~/store';
/** Response type for active jobs query */
interface ActiveJobsResponse {
activeJobIds: string[];
}
const clearDraft = (conversationId?: string | null) => {
if (conversationId) {
localStorage.removeItem(`${LocalStorageKeys.TEXT_DRAFT}${conversationId}`);
@ -55,9 +62,36 @@ export default function useResumableSSE(
runIndex = 0,
) {
const genTitle = useGenTitleMutation();
const queryClient = useQueryClient();
const setActiveRunId = useSetRecoilState(store.activeRunFamily(runIndex));
const { token, isAuthenticated } = useAuthContext();
/**
* Optimistically add a job ID to the active jobs cache.
* Called when generation starts.
*/
const addActiveJob = useCallback(
(jobId: string) => {
queryClient.setQueryData<ActiveJobsResponse>(activeJobsQueryKey, (old) => ({
activeJobIds: [...new Set([...(old?.activeJobIds ?? []), jobId])],
}));
},
[queryClient],
);
/**
* Optimistically remove a job ID from the active jobs cache.
* Called when generation completes, aborts, or errors.
*/
const removeActiveJob = useCallback(
(jobId: string) => {
queryClient.setQueryData<ActiveJobsResponse>(activeJobsQueryKey, (old) => ({
activeJobIds: (old?.activeJobIds ?? []).filter((id) => id !== jobId),
}));
},
[queryClient],
);
const [_completed, setCompleted] = useState(new Set());
const [streamId, setStreamId] = useState<string | null>(null);
const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex));
@ -155,6 +189,8 @@ export default function useResumableSSE(
}
// Clear handler maps on stream completion to prevent memory leaks
clearStepMaps();
// Optimistically remove from active jobs
removeActiveJob(currentStreamId);
(startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch();
sse.close();
setStreamId(null);
@ -303,15 +339,31 @@ export default function useResumableSSE(
/**
* Error event - fired on actual network failures (non-200, connection lost, etc.)
* This should trigger reconnection with exponential backoff.
* This should trigger reconnection with exponential backoff, except for 404 errors.
*/
sse.addEventListener('error', async (e: MessageEvent) => {
console.log('[ResumableSSE] Stream error (network failure) - will attempt reconnect');
(startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch();
/* @ts-ignore - sse.js types don't expose responseCode */
const responseCode = e.responseCode;
// 404 means job doesn't exist (completed/deleted) - don't retry
if (responseCode === 404) {
console.log('[ResumableSSE] Stream not found (404) - job completed or expired');
sse.close();
// Optimistically remove from active jobs since job is gone
removeActiveJob(currentStreamId);
setIsSubmitting(false);
setShowStopButton(false);
setStreamId(null);
reconnectAttemptRef.current = 0;
return;
}
console.log('[ResumableSSE] Stream error (network failure) - will attempt reconnect');
// Check for 401 and try to refresh token (same pattern as useSSE)
/* @ts-ignore */
if (e.responseCode === 401) {
if (responseCode === 401) {
try {
const refreshResponse = await request.refreshToken();
const newToken = refreshResponse?.token ?? '';
@ -330,9 +382,8 @@ export default function useResumableSSE(
}
}
sse.close();
if (reconnectAttemptRef.current < MAX_RETRIES) {
// Increment counter BEFORE close() so abort handler knows we're reconnecting
reconnectAttemptRef.current++;
const delay = Math.min(1000 * Math.pow(2, reconnectAttemptRef.current - 1), 30000);
@ -340,6 +391,8 @@ export default function useResumableSSE(
`[ResumableSSE] Reconnecting in ${delay}ms (attempt ${reconnectAttemptRef.current}/${MAX_RETRIES})`,
);
sse.close();
reconnectTimeoutRef.current = setTimeout(() => {
if (submissionRef.current) {
// Reconnect with isResume=true to get sync event with any missed content
@ -353,7 +406,10 @@ export default function useResumableSSE(
setShowStopButton(true);
} else {
console.error('[ResumableSSE] Max reconnect attempts reached');
sse.close();
errorHandler({ data: undefined, submission: currentSubmission as EventSubmission });
// Optimistically remove from active jobs on max retries
removeActiveJob(currentStreamId);
setIsSubmitting(false);
setShowStopButton(false);
setStreamId(null);
@ -362,17 +418,23 @@ export default function useResumableSSE(
/**
* Abort event - fired when sse.close() is called (intentional close).
* This happens on cleanup/navigation. Do NOT reconnect, just reset UI.
* The backend stream continues running - useResumeOnLoad will restore if user returns.
* This happens on cleanup/navigation OR when error handler closes to reconnect.
* Only reset state if we're NOT in a reconnection cycle.
*/
sse.addEventListener('abort', () => {
// If we're in a reconnection cycle, don't reset state
// (error handler will set up the reconnect timeout)
if (reconnectAttemptRef.current > 0) {
console.log('[ResumableSSE] Stream closed for reconnect - preserving state');
return;
}
console.log('[ResumableSSE] Stream aborted (intentional close) - no reconnect');
// Clear any pending reconnect attempts
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
reconnectTimeoutRef.current = null;
}
reconnectAttemptRef.current = 0;
// Reset UI state - useResumeOnLoad will restore if user returns to this conversation
setIsSubmitting(false);
setShowStopButton(false);
@ -425,6 +487,7 @@ export default function useResumableSSE(
setMessages,
startupConfig?.balance?.enabled,
balanceQuery,
removeActiveJob,
],
);
@ -522,6 +585,8 @@ export default function useResumableSSE(
// Resume: just subscribe to existing stream, don't start new generation
console.log('[ResumableSSE] Resuming existing stream:', resumeStreamId);
setStreamId(resumeStreamId);
// Optimistically add to active jobs (in case it's not already there)
addActiveJob(resumeStreamId);
subscribeToStream(resumeStreamId, submission, true); // isResume=true
} else {
// New generation: start and then subscribe
@ -529,6 +594,8 @@ export default function useResumableSSE(
const newStreamId = await startGeneration(submission);
if (newStreamId) {
setStreamId(newStreamId);
// Optimistically add to active jobs
addActiveJob(newStreamId);
subscribeToStream(newStreamId, submission);
} else {
console.error('[ResumableSSE] Failed to get streamId from startGeneration');
@ -547,6 +614,8 @@ export default function useResumableSSE(
clearTimeout(reconnectTimeoutRef.current);
reconnectTimeoutRef.current = null;
}
// Reset reconnect counter before closing (so abort handler doesn't think we're reconnecting)
reconnectAttemptRef.current = 0;
if (sseRef.current) {
sseRef.current.close();
sseRef.current = null;