diff --git a/api/server/routes/agents/index.js b/api/server/routes/agents/index.js index ddce168962..70570d6e38 100644 --- a/api/server/routes/agents/index.js +++ b/api/server/routes/agents/index.js @@ -113,6 +113,17 @@ router.get('/chat/stream/:streamId', async (req, res) => { }); }); +/** + * @route GET /chat/active + * @desc Get all active generation job IDs for the current user + * @access Private + * @returns { activeJobIds: string[] } + */ +router.get('/chat/active', async (req, res) => { + const activeJobIds = await GenerationJobManager.getActiveJobIdsForUser(req.user.id); + res.json({ activeJobIds }); +}); + /** * @route GET /chat/status/:conversationId * @desc Check if there's an active generation job for a conversation diff --git a/client/src/components/Conversations/Conversations.tsx b/client/src/components/Conversations/Conversations.tsx index 64b804b2d6..f0b05a5a00 100644 --- a/client/src/components/Conversations/Conversations.tsx +++ b/client/src/components/Conversations/Conversations.tsx @@ -7,6 +7,7 @@ import { List, AutoSizer, CellMeasurer, CellMeasurerCache } from 'react-virtuali import type { TConversation } from 'librechat-data-provider'; import { useLocalize, TranslationKeys, useFavorites, useShowMarketplace } from '~/hooks'; import FavoritesList from '~/components/Nav/Favorites/FavoritesList'; +import { useActiveJobs } from '~/data-provider'; import { groupConversationsByDate, cn } from '~/utils'; import Convo from './Convo'; import store from '~/store'; @@ -120,18 +121,28 @@ const MemoizedConvo = memo( conversation, retainView, toggleNav, + isGenerating, }: { conversation: TConversation; retainView: () => void; toggleNav: () => void; + isGenerating: boolean; }) => { - return ; + return ( + + ); }, (prevProps, nextProps) => { return ( prevProps.conversation.conversationId === nextProps.conversation.conversationId && prevProps.conversation.title === nextProps.conversation.title && - prevProps.conversation.endpoint === nextProps.conversation.endpoint + prevProps.conversation.endpoint === nextProps.conversation.endpoint && + prevProps.isGenerating === nextProps.isGenerating ); }, ); @@ -149,11 +160,19 @@ const Conversations: FC = ({ }) => { const localize = useLocalize(); const search = useRecoilValue(store.search); + const resumableEnabled = useRecoilValue(store.resumableStreams); const { favorites, isLoading: isFavoritesLoading } = useFavorites(); const isSmallScreen = useMediaQuery('(max-width: 768px)'); const convoHeight = isSmallScreen ? 44 : 34; const showAgentMarketplace = useShowMarketplace(); + // Fetch active job IDs for showing generation indicators + const { data: activeJobsData } = useActiveJobs(resumableEnabled); + const activeJobIds = useMemo( + () => new Set(activeJobsData?.activeJobIds ?? []), + [activeJobsData?.activeJobIds], + ); + // Determine if FavoritesList will render content const shouldShowFavorites = !search.query && (isFavoritesLoading || favorites.length > 0 || showAgentMarketplace); @@ -292,9 +311,15 @@ const Conversations: FC = ({ } if (item.type === 'convo') { + const isGenerating = activeJobIds.has(item.convo.conversationId ?? ''); return ( - + ); } @@ -311,6 +336,7 @@ const Conversations: FC = ({ isChatsExpanded, setIsChatsExpanded, shouldShowFavorites, + activeJobIds, ], ); diff --git a/client/src/components/Conversations/Convo.tsx b/client/src/components/Conversations/Convo.tsx index e85b341d5e..518d0e4a86 100644 --- a/client/src/components/Conversations/Convo.tsx +++ b/client/src/components/Conversations/Convo.tsx @@ -19,9 +19,15 @@ interface ConversationProps { conversation: TConversation; retainView: () => void; toggleNav: () => void; + isGenerating?: boolean; } -export default function Conversation({ conversation, retainView, toggleNav }: ConversationProps) { +export default function Conversation({ + conversation, + retainView, + toggleNav, + isGenerating = false, +}: ConversationProps) { const params = useParams(); const localize = useLocalize(); const { showToast } = useToastContext(); @@ -182,12 +188,35 @@ export default function Conversation({ conversation, retainView, toggleNav }: Co isSmallScreen={isSmallScreen} localize={localize} > - + {isGenerating ? ( + + + + + ) : ( + + )} )}
dataService.getActiveJobs(), + enabled, + staleTime: 5_000, // 5s - short to catch completions quickly + refetchOnMount: true, + refetchOnWindowFocus: true, // Catch up on tab switch (multi-tab scenario) + // Poll every 5s while there are active jobs to catch completions when navigated away + refetchInterval: (data) => { + const hasActiveJobs = (data?.activeJobIds?.length ?? 0) > 0; + return hasActiveJobs ? 5_000 : false; + }, + retry: false, + }); +} diff --git a/client/src/hooks/SSE/useResumableSSE.ts b/client/src/hooks/SSE/useResumableSSE.ts index 1faa9aa947..17764372a9 100644 --- a/client/src/hooks/SSE/useResumableSSE.ts +++ b/client/src/hooks/SSE/useResumableSSE.ts @@ -2,6 +2,7 @@ import { useEffect, useState, useRef, useCallback } from 'react'; import { v4 } from 'uuid'; import { SSE } from 'sse.js'; import { useSetRecoilState } from 'recoil'; +import { useQueryClient } from '@tanstack/react-query'; import { request, Constants, @@ -12,10 +13,16 @@ import { import type { TMessage, TPayload, TSubmission, EventSubmission } from 'librechat-data-provider'; import type { EventHandlerParams } from './useEventHandlers'; import { useGenTitleMutation, useGetStartupConfig, useGetUserBalance } from '~/data-provider'; +import { activeJobsQueryKey } from '~/data-provider/SSE/queries'; import { useAuthContext } from '~/hooks/AuthContext'; import useEventHandlers from './useEventHandlers'; import store from '~/store'; +/** Response type for active jobs query */ +interface ActiveJobsResponse { + activeJobIds: string[]; +} + const clearDraft = (conversationId?: string | null) => { if (conversationId) { localStorage.removeItem(`${LocalStorageKeys.TEXT_DRAFT}${conversationId}`); @@ -55,9 +62,36 @@ export default function useResumableSSE( runIndex = 0, ) { const genTitle = useGenTitleMutation(); + const queryClient = useQueryClient(); const setActiveRunId = useSetRecoilState(store.activeRunFamily(runIndex)); const { token, isAuthenticated } = useAuthContext(); + + /** + * Optimistically add a job ID to the active jobs cache. + * Called when generation starts. + */ + const addActiveJob = useCallback( + (jobId: string) => { + queryClient.setQueryData(activeJobsQueryKey, (old) => ({ + activeJobIds: [...new Set([...(old?.activeJobIds ?? []), jobId])], + })); + }, + [queryClient], + ); + + /** + * Optimistically remove a job ID from the active jobs cache. + * Called when generation completes, aborts, or errors. + */ + const removeActiveJob = useCallback( + (jobId: string) => { + queryClient.setQueryData(activeJobsQueryKey, (old) => ({ + activeJobIds: (old?.activeJobIds ?? []).filter((id) => id !== jobId), + })); + }, + [queryClient], + ); const [_completed, setCompleted] = useState(new Set()); const [streamId, setStreamId] = useState(null); const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex)); @@ -155,6 +189,8 @@ export default function useResumableSSE( } // Clear handler maps on stream completion to prevent memory leaks clearStepMaps(); + // Optimistically remove from active jobs + removeActiveJob(currentStreamId); (startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch(); sse.close(); setStreamId(null); @@ -303,15 +339,31 @@ export default function useResumableSSE( /** * Error event - fired on actual network failures (non-200, connection lost, etc.) - * This should trigger reconnection with exponential backoff. + * This should trigger reconnection with exponential backoff, except for 404 errors. */ sse.addEventListener('error', async (e: MessageEvent) => { - console.log('[ResumableSSE] Stream error (network failure) - will attempt reconnect'); (startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch(); + /* @ts-ignore - sse.js types don't expose responseCode */ + const responseCode = e.responseCode; + + // 404 means job doesn't exist (completed/deleted) - don't retry + if (responseCode === 404) { + console.log('[ResumableSSE] Stream not found (404) - job completed or expired'); + sse.close(); + // Optimistically remove from active jobs since job is gone + removeActiveJob(currentStreamId); + setIsSubmitting(false); + setShowStopButton(false); + setStreamId(null); + reconnectAttemptRef.current = 0; + return; + } + + console.log('[ResumableSSE] Stream error (network failure) - will attempt reconnect'); + // Check for 401 and try to refresh token (same pattern as useSSE) - /* @ts-ignore */ - if (e.responseCode === 401) { + if (responseCode === 401) { try { const refreshResponse = await request.refreshToken(); const newToken = refreshResponse?.token ?? ''; @@ -330,9 +382,8 @@ export default function useResumableSSE( } } - sse.close(); - if (reconnectAttemptRef.current < MAX_RETRIES) { + // Increment counter BEFORE close() so abort handler knows we're reconnecting reconnectAttemptRef.current++; const delay = Math.min(1000 * Math.pow(2, reconnectAttemptRef.current - 1), 30000); @@ -340,6 +391,8 @@ export default function useResumableSSE( `[ResumableSSE] Reconnecting in ${delay}ms (attempt ${reconnectAttemptRef.current}/${MAX_RETRIES})`, ); + sse.close(); + reconnectTimeoutRef.current = setTimeout(() => { if (submissionRef.current) { // Reconnect with isResume=true to get sync event with any missed content @@ -353,7 +406,10 @@ export default function useResumableSSE( setShowStopButton(true); } else { console.error('[ResumableSSE] Max reconnect attempts reached'); + sse.close(); errorHandler({ data: undefined, submission: currentSubmission as EventSubmission }); + // Optimistically remove from active jobs on max retries + removeActiveJob(currentStreamId); setIsSubmitting(false); setShowStopButton(false); setStreamId(null); @@ -362,17 +418,23 @@ export default function useResumableSSE( /** * Abort event - fired when sse.close() is called (intentional close). - * This happens on cleanup/navigation. Do NOT reconnect, just reset UI. - * The backend stream continues running - useResumeOnLoad will restore if user returns. + * This happens on cleanup/navigation OR when error handler closes to reconnect. + * Only reset state if we're NOT in a reconnection cycle. */ sse.addEventListener('abort', () => { + // If we're in a reconnection cycle, don't reset state + // (error handler will set up the reconnect timeout) + if (reconnectAttemptRef.current > 0) { + console.log('[ResumableSSE] Stream closed for reconnect - preserving state'); + return; + } + console.log('[ResumableSSE] Stream aborted (intentional close) - no reconnect'); // Clear any pending reconnect attempts if (reconnectTimeoutRef.current) { clearTimeout(reconnectTimeoutRef.current); reconnectTimeoutRef.current = null; } - reconnectAttemptRef.current = 0; // Reset UI state - useResumeOnLoad will restore if user returns to this conversation setIsSubmitting(false); setShowStopButton(false); @@ -425,6 +487,7 @@ export default function useResumableSSE( setMessages, startupConfig?.balance?.enabled, balanceQuery, + removeActiveJob, ], ); @@ -522,6 +585,8 @@ export default function useResumableSSE( // Resume: just subscribe to existing stream, don't start new generation console.log('[ResumableSSE] Resuming existing stream:', resumeStreamId); setStreamId(resumeStreamId); + // Optimistically add to active jobs (in case it's not already there) + addActiveJob(resumeStreamId); subscribeToStream(resumeStreamId, submission, true); // isResume=true } else { // New generation: start and then subscribe @@ -529,6 +594,8 @@ export default function useResumableSSE( const newStreamId = await startGeneration(submission); if (newStreamId) { setStreamId(newStreamId); + // Optimistically add to active jobs + addActiveJob(newStreamId); subscribeToStream(newStreamId, submission); } else { console.error('[ResumableSSE] Failed to get streamId from startGeneration'); @@ -547,6 +614,8 @@ export default function useResumableSSE( clearTimeout(reconnectTimeoutRef.current); reconnectTimeoutRef.current = null; } + // Reset reconnect counter before closing (so abort handler doesn't think we're reconnecting) + reconnectAttemptRef.current = 0; if (sseRef.current) { sseRef.current.close(); sseRef.current = null; diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index b3cf9adc46..41b2acacf0 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -903,6 +903,18 @@ class GenerationJobManagerClass { return { running, complete, error, aborted }; } + /** + * Get active job IDs for a user. + * Returns conversation IDs of running jobs belonging to the user. + * Performs self-healing cleanup of stale entries. + * + * @param userId - The user ID to query + * @returns Array of conversation IDs with active jobs + */ + async getActiveJobIdsForUser(userId: string): Promise { + return this.jobStore.getActiveJobIdsByUser(userId); + } + /** * Destroy the manager. * Cleans up all resources including runtime state, buffers, and stores. diff --git a/packages/api/src/stream/implementations/InMemoryJobStore.ts b/packages/api/src/stream/implementations/InMemoryJobStore.ts index 6161c58c33..1f7b4ac901 100644 --- a/packages/api/src/stream/implementations/InMemoryJobStore.ts +++ b/packages/api/src/stream/implementations/InMemoryJobStore.ts @@ -26,6 +26,9 @@ export class InMemoryJobStore implements IJobStore { private contentState = new Map(); private cleanupInterval: NodeJS.Timeout | null = null; + /** Maps userId -> Set of streamIds (conversationIds) for active jobs */ + private userJobMap = new Map>(); + /** Time to keep completed jobs before cleanup (0 = immediate) */ private ttlAfterComplete = 0; @@ -76,6 +79,15 @@ export class InMemoryJobStore implements IJobStore { }; this.jobs.set(streamId, job); + + // Track job by userId for efficient user-scoped queries + let userJobs = this.userJobMap.get(userId); + if (!userJobs) { + userJobs = new Set(); + this.userJobMap.set(userId, userJobs); + } + userJobs.add(streamId); + logger.debug(`[InMemoryJobStore] Created job: ${streamId}`); return job; @@ -94,6 +106,18 @@ export class InMemoryJobStore implements IJobStore { } async deleteJob(streamId: string): Promise { + // Remove from user's job set before deleting + const job = this.jobs.get(streamId); + if (job) { + const userJobs = this.userJobMap.get(job.userId); + if (userJobs) { + userJobs.delete(streamId); + if (userJobs.size === 0) { + this.userJobMap.delete(job.userId); + } + } + } + this.jobs.delete(streamId); this.contentState.delete(streamId); logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`); @@ -178,9 +202,42 @@ export class InMemoryJobStore implements IJobStore { } this.jobs.clear(); this.contentState.clear(); + this.userJobMap.clear(); logger.debug('[InMemoryJobStore] Destroyed'); } + /** + * Get active job IDs for a user. + * Returns conversation IDs of running jobs belonging to the user. + * Also performs self-healing cleanup: removes stale entries for jobs that no longer exist. + */ + async getActiveJobIdsByUser(userId: string): Promise { + const trackedIds = this.userJobMap.get(userId); + if (!trackedIds || trackedIds.size === 0) { + return []; + } + + const activeIds: string[] = []; + + for (const streamId of trackedIds) { + const job = this.jobs.get(streamId); + // Only include if job exists AND is still running + if (job && job.status === 'running') { + activeIds.push(streamId); + } else { + // Self-healing: job completed/deleted but mapping wasn't cleaned - fix it now + trackedIds.delete(streamId); + } + } + + // Clean up empty set + if (trackedIds.size === 0) { + this.userJobMap.delete(userId); + } + + return activeIds; + } + // ===== Content State Methods ===== /** diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index b1670a57ed..b2c5c038f4 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -122,6 +122,16 @@ export interface IJobStore { /** Destroy the store and release resources */ destroy(): Promise; + /** + * Get active job IDs for a user. + * Returns conversation IDs of running jobs belonging to the user. + * Also performs self-healing cleanup of stale entries. + * + * @param userId - The user ID to query + * @returns Array of conversation IDs with active jobs + */ + getActiveJobIdsByUser(userId: string): Promise; + // ===== Content State Methods ===== // These methods manage volatile content state tied to each job. // In-memory: Uses WeakRef to graph for live access diff --git a/packages/data-provider/src/api-endpoints.ts b/packages/data-provider/src/api-endpoints.ts index aa97a75303..4d8a7198be 100644 --- a/packages/data-provider/src/api-endpoints.ts +++ b/packages/data-provider/src/api-endpoints.ts @@ -226,6 +226,8 @@ export const agents = ({ path = '', options }: { path?: string; options?: object return url; }; +export const activeJobs = () => `${BASE_URL}/api/agents/chat/active`; + export const mcp = { tools: `${BASE_URL}/api/mcp/tools`, servers: `${BASE_URL}/api/mcp/servers`, diff --git a/packages/data-provider/src/data-service.ts b/packages/data-provider/src/data-service.ts index 21d5251388..518f20c7dd 100644 --- a/packages/data-provider/src/data-service.ts +++ b/packages/data-provider/src/data-service.ts @@ -1037,3 +1037,12 @@ export function getGraphApiToken(params: q.GraphTokenParams): Promise => { + return request.get(endpoints.activeJobs()); +}; diff --git a/packages/data-provider/src/keys.ts b/packages/data-provider/src/keys.ts index 879435d411..235baf4ebe 100644 --- a/packages/data-provider/src/keys.ts +++ b/packages/data-provider/src/keys.ts @@ -60,6 +60,8 @@ export enum QueryKeys { /* MCP Servers */ mcpServers = 'mcpServers', mcpServer = 'mcpServer', + /* Active Jobs */ + activeJobs = 'activeJobs', } // Dynamic query keys that require parameters