diff --git a/api/server/routes/agents/chat.js b/api/server/routes/agents/chat.js index bf88713527..d05dd5baf0 100644 --- a/api/server/routes/agents/chat.js +++ b/api/server/routes/agents/chat.js @@ -18,8 +18,6 @@ const { getRoleByName } = require('~/models/Role'); const router = express.Router(); -router.use(moderateText); - const checkAgentAccess = generateCheckAccess({ permissionType: PermissionTypes.AGENTS, permissions: [Permissions.USE], @@ -30,77 +28,12 @@ const checkAgentResourceAccess = canAccessAgentFromBody({ requiredPermission: PermissionBits.VIEW, }); -/** - * @route GET /stream/:streamId - * @desc Subscribe to an ongoing generation job's SSE stream - * @access Private - */ -router.get('/stream/:streamId', requireJwtAuth, (req, res) => { - const { streamId } = req.params; - - const job = GenerationJobManager.getJob(streamId); - if (!job) { - return res.status(404).json({ - error: 'Stream not found', - message: 'The generation job does not exist or has expired.', - }); - } - - // Disable compression for SSE - res.setHeader('Content-Encoding', 'identity'); - res.setHeader('Content-Type', 'text/event-stream'); - res.setHeader('Cache-Control', 'no-cache, no-transform'); - res.setHeader('Connection', 'keep-alive'); - res.setHeader('X-Accel-Buffering', 'no'); - res.flushHeaders(); - - logger.debug(`[AgentStream] Client subscribed to ${streamId}`); - - const unsubscribe = GenerationJobManager.subscribe( - streamId, - (event) => { - if (!res.writableEnded) { - res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`); - if (typeof res.flush === 'function') { - res.flush(); - } - } - }, - (event) => { - if (!res.writableEnded) { - res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`); - if (typeof res.flush === 'function') { - res.flush(); - } - res.end(); - } - }, - (error) => { - if (!res.writableEnded) { - res.write(`event: error\ndata: ${JSON.stringify({ error })}\n\n`); - if (typeof res.flush === 'function') { - res.flush(); - } - res.end(); - } - }, - ); - - if (!unsubscribe) { - return res.status(404).json({ error: 'Failed to subscribe to stream' }); - } - - if (job.status === 'complete' || job.status === 'error' || job.status === 'aborted') { - res.write(`event: message\ndata: ${JSON.stringify({ final: true, status: job.status })}\n\n`); - res.end(); - return; - } - - req.on('close', () => { - logger.debug(`[AgentStream] Client disconnected from ${streamId}`); - unsubscribe(); - }); -}); +router.use(moderateText); +router.use(checkAgentAccess); +router.use(checkAgentResourceAccess); +router.use(validateConvoAccess); +router.use(buildEndpointOption); +router.use(setHeaders); /** * @route POST /abort @@ -121,12 +54,6 @@ router.post('/abort', (req, res) => { res.status(404).json({ error: 'Job not found' }); }); -router.use(checkAgentAccess); -router.use(checkAgentResourceAccess); -router.use(validateConvoAccess); -router.use(buildEndpointOption); -router.use(setHeaders); - const controller = async (req, res, next) => { await AgentController(req, res, next, initializeClient, addTitle); }; diff --git a/api/server/routes/agents/index.js b/api/server/routes/agents/index.js index b5e249b059..1f501d75bb 100644 --- a/api/server/routes/agents/index.js +++ b/api/server/routes/agents/index.js @@ -1,5 +1,6 @@ const express = require('express'); -const { isEnabled } = require('@librechat/api'); +const { isEnabled, GenerationJobManager } = require('@librechat/api'); +const { logger } = require('@librechat/data-schemas'); const { uaParser, checkBan, @@ -22,6 +23,108 @@ router.use(uaParser); router.use('/', v1); +/** + * Stream endpoints - mounted before chatRouter to bypass rate limiters + * These are GET requests and don't need message body validation or rate limiting + */ + +/** + * @route GET /chat/stream/:streamId + * @desc Subscribe to an ongoing generation job's SSE stream with replay support + * @access Private + * @description Replays any chunks missed during disconnect, then streams live + */ +router.get('/chat/stream/:streamId', (req, res) => { + const { streamId } = req.params; + + const job = GenerationJobManager.getJob(streamId); + if (!job) { + return res.status(404).json({ + error: 'Stream not found', + message: 'The generation job does not exist or has expired.', + }); + } + + res.setHeader('Content-Encoding', 'identity'); + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache, no-transform'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('X-Accel-Buffering', 'no'); + res.flushHeaders(); + + logger.debug(`[AgentStream] Client subscribed to ${streamId}`); + + const result = GenerationJobManager.subscribe( + streamId, + (event) => { + if (!res.writableEnded) { + res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`); + if (typeof res.flush === 'function') { + res.flush(); + } + } + }, + (event) => { + if (!res.writableEnded) { + res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`); + if (typeof res.flush === 'function') { + res.flush(); + } + res.end(); + } + }, + (error) => { + if (!res.writableEnded) { + res.write(`event: error\ndata: ${JSON.stringify({ error })}\n\n`); + if (typeof res.flush === 'function') { + res.flush(); + } + res.end(); + } + }, + ); + + if (!result) { + return res.status(404).json({ error: 'Failed to subscribe to stream' }); + } + + req.on('close', () => { + logger.debug(`[AgentStream] Client disconnected from ${streamId}`); + result.unsubscribe(); + }); +}); + +/** + * @route GET /chat/status/:conversationId + * @desc Check if there's an active generation job for a conversation + * @access Private + * @returns { active, streamId, status, chunkCount, aggregatedContent, createdAt } + */ +router.get('/chat/status/:conversationId', (req, res) => { + const { conversationId } = req.params; + + const job = GenerationJobManager.getJobByConversation(conversationId); + + if (!job) { + return res.json({ active: false }); + } + + if (job.metadata.userId !== req.user.id) { + return res.status(403).json({ error: 'Unauthorized' }); + } + + const info = GenerationJobManager.getStreamInfo(job.streamId); + + res.json({ + active: info?.active ?? false, + streamId: job.streamId, + status: info?.status ?? job.status, + chunkCount: info?.chunkCount ?? 0, + aggregatedContent: info?.aggregatedContent, + createdAt: info?.createdAt ?? job.createdAt, + }); +}); + const chatRouter = express.Router(); chatRouter.use(configMiddleware); diff --git a/client/src/components/Chat/ChatView.tsx b/client/src/components/Chat/ChatView.tsx index 6f0f556c9b..b40c7003c8 100644 --- a/client/src/components/Chat/ChatView.tsx +++ b/client/src/components/Chat/ChatView.tsx @@ -7,7 +7,7 @@ import { Constants, buildTree } from 'librechat-data-provider'; import type { TMessage } from 'librechat-data-provider'; import type { ChatFormValues } from '~/common'; import { ChatContext, AddedChatContext, useFileMapContext, ChatFormProvider } from '~/Providers'; -import { useChatHelpers, useAddedResponse, useAdaptiveSSE } from '~/hooks'; +import { useChatHelpers, useAddedResponse, useAdaptiveSSE, useResumeOnLoad } from '~/hooks'; import ConversationStarters from './Input/ConversationStarters'; import { useGetMessagesByConvoId } from '~/data-provider'; import MessagesView from './Messages/MessagesView'; @@ -54,6 +54,9 @@ function ChatView({ index = 0 }: { index?: number }) { useAdaptiveSSE(rootSubmission, chatHelpers, false, index); useAdaptiveSSE(addedSubmission, addedChatHelpers, true, index + 1); + // Auto-resume if navigating back to conversation with active job + useResumeOnLoad(conversationId, chatHelpers, index); + const methods = useForm({ defaultValues: { text: '' }, }); diff --git a/client/src/data-provider/queries/streamStatus.ts b/client/src/data-provider/queries/streamStatus.ts new file mode 100644 index 0000000000..4b34290ba6 --- /dev/null +++ b/client/src/data-provider/queries/streamStatus.ts @@ -0,0 +1,40 @@ +import { useQuery } from '@tanstack/react-query'; +import { request } from 'librechat-data-provider'; + +export interface StreamStatusResponse { + active: boolean; + streamId?: string; + status?: 'running' | 'complete' | 'error' | 'aborted'; + chunkCount?: number; + aggregatedContent?: Array<{ type: string; text?: string }>; + createdAt?: number; +} + +/** + * Query key for stream status + */ +export const streamStatusQueryKey = (conversationId: string) => ['streamStatus', conversationId]; + +/** + * Fetch stream status for a conversation + */ +export const fetchStreamStatus = async (conversationId: string): Promise => { + const response = await request.get(`/api/agents/chat/status/${conversationId}`); + return response.data; +}; + +/** + * React Query hook for checking if a conversation has an active generation stream. + * Only fetches when conversationId is provided and resumable streams are enabled. + */ +export function useStreamStatus(conversationId: string | undefined, enabled = true) { + return useQuery({ + queryKey: streamStatusQueryKey(conversationId || ''), + queryFn: () => fetchStreamStatus(conversationId!), + enabled: !!conversationId && enabled, + staleTime: 1000, // Consider stale after 1 second + refetchOnMount: true, + refetchOnWindowFocus: true, + retry: false, + }); +} diff --git a/client/src/hooks/SSE/index.ts b/client/src/hooks/SSE/index.ts index bf31f2b038..2829db76f6 100644 --- a/client/src/hooks/SSE/index.ts +++ b/client/src/hooks/SSE/index.ts @@ -1,6 +1,7 @@ export { default as useSSE } from './useSSE'; export { default as useResumableSSE } from './useResumableSSE'; export { default as useAdaptiveSSE } from './useAdaptiveSSE'; +export { default as useResumeOnLoad } from './useResumeOnLoad'; export { default as useStepHandler } from './useStepHandler'; export { default as useContentHandler } from './useContentHandler'; export { default as useAttachmentHandler } from './useAttachmentHandler'; diff --git a/client/src/hooks/SSE/useResumeOnLoad.ts b/client/src/hooks/SSE/useResumeOnLoad.ts new file mode 100644 index 0000000000..ba980c5dc2 --- /dev/null +++ b/client/src/hooks/SSE/useResumeOnLoad.ts @@ -0,0 +1,182 @@ +import { useEffect, useState, useRef } from 'react'; +import { SSE } from 'sse.js'; +import { useSetRecoilState, useRecoilValue } from 'recoil'; +import { request } from 'librechat-data-provider'; +import type { TMessage, EventSubmission } from 'librechat-data-provider'; +import type { EventHandlerParams } from './useEventHandlers'; +import { useAuthContext } from '~/hooks/AuthContext'; +import { useGetStartupConfig, useGetUserBalance } from '~/data-provider'; +import useEventHandlers from './useEventHandlers'; +import store from '~/store'; + +type ChatHelpers = Pick< + EventHandlerParams, + | 'setMessages' + | 'getMessages' + | 'setConversation' + | 'setIsSubmitting' + | 'newConversation' + | 'resetLatestMessage' +>; + +/** + * Hook to resume streaming if navigating back to a conversation with active generation. + * Checks for active jobs on mount and auto-subscribes if found. + */ +export default function useResumeOnLoad( + conversationId: string | undefined, + chatHelpers: ChatHelpers, + runIndex = 0, +) { + const resumableEnabled = useRecoilValue(store.resumableStreams); + const { token, isAuthenticated } = useAuthContext(); + const sseRef = useRef(null); + const checkedConvoRef = useRef(null); + const [completed, setCompleted] = useState(new Set()); + const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex)); + const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex)); + + const { getMessages, setIsSubmitting } = chatHelpers; + + const { stepHandler, finalHandler, contentHandler } = useEventHandlers({ + ...chatHelpers, + setCompleted, + setShowStopButton, + }); + + const { data: startupConfig } = useGetStartupConfig(); + const balanceQuery = useGetUserBalance({ + enabled: !!isAuthenticated && startupConfig?.balance?.enabled, + }); + + /** + * Check for active job when conversation loads + */ + useEffect(() => { + if (!resumableEnabled || !conversationId || !token) { + checkedConvoRef.current = null; + return; + } + + // Only check once per conversationId to prevent loops + if (checkedConvoRef.current === conversationId) { + return; + } + + checkedConvoRef.current = conversationId; + + const checkAndResume = async () => { + try { + const response = await fetch(`/api/agents/chat/status/${conversationId}`, { + headers: { Authorization: `Bearer ${token}` }, + }); + + if (!response.ok) { + return; + } + + const { active, streamId } = await response.json(); + + if (!active || !streamId) { + return; + } + + console.log('[ResumeOnLoad] Found active job, resuming...', { streamId }); + + const messages = getMessages() || []; + const lastMessage = messages[messages.length - 1]; + let textIndex: number | null = null; + + const url = `/api/agents/chat/stream/${encodeURIComponent(streamId)}`; + + const sse = new SSE(url, { + headers: { Authorization: `Bearer ${token}` }, + method: 'GET', + }); + sseRef.current = sse; + + sse.addEventListener('open', () => { + console.log('[ResumeOnLoad] Reconnected to stream'); + setAbortScroll(false); + setShowStopButton(true); + setIsSubmitting(true); + }); + + sse.addEventListener('message', (e: MessageEvent) => { + try { + const data = JSON.parse(e.data); + + if (data.final != null) { + try { + finalHandler(data, { messages } as unknown as EventSubmission); + } catch (error) { + console.error('[ResumeOnLoad] Error in finalHandler:', error); + setIsSubmitting(false); + setShowStopButton(false); + } + (startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch(); + sse.close(); + sseRef.current = null; + return; + } + + if (data.event != null) { + stepHandler(data, { + messages, + userMessage: lastMessage, + } as unknown as EventSubmission); + return; + } + + if (data.type != null) { + const { text, index } = data; + if (text != null && index !== textIndex) { + textIndex = index; + } + contentHandler({ data, submission: { messages } as unknown as EventSubmission }); + return; + } + } catch (error) { + console.error('[ResumeOnLoad] Error processing message:', error); + } + }); + + sse.addEventListener('error', async (e: MessageEvent) => { + console.log('[ResumeOnLoad] Stream error'); + sse.close(); + sseRef.current = null; + setIsSubmitting(false); + setShowStopButton(false); + + /* @ts-ignore */ + if (e.responseCode === 401) { + try { + const refreshResponse = await request.refreshToken(); + const newToken = refreshResponse?.token ?? ''; + if (newToken) { + request.dispatchTokenUpdatedEvent(newToken); + } + } catch (error) { + console.log('[ResumeOnLoad] Token refresh failed:', error); + } + } + }); + + sse.stream(); + } catch (error) { + console.error('[ResumeOnLoad] Error checking job status:', error); + } + }; + + checkAndResume(); + + return () => { + if (sseRef.current) { + sseRef.current.close(); + sseRef.current = null; + } + }; + // Only re-run when conversationId changes + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [conversationId]); +} diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 7646a8f6e0..0597c66f5b 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -8,6 +8,7 @@ import type { DoneHandler, ErrorHandler, UnsubscribeFn, + ContentPart, } from './types'; /** @@ -68,6 +69,8 @@ class GenerationJobManagerClass { metadata: { userId, conversationId }, readyPromise, resolveReady: resolveReady!, + chunks: [], + aggregatedContent: [], }; job.emitter.setMaxListeners(100); @@ -87,6 +90,28 @@ class GenerationJobManagerClass { return this.jobs.get(streamId); } + /** + * Find an active job by conversationId. + * Since streamId === conversationId for existing conversations, + * we first check by streamId, then search metadata. + * @param conversationId - The conversation identifier + * @returns The job if found, undefined otherwise + */ + getJobByConversation(conversationId: string): GenerationJob | undefined { + const directMatch = this.jobs.get(conversationId); + if (directMatch && directMatch.status === 'running') { + return directMatch; + } + + for (const job of this.jobs.values()) { + if (job.metadata.conversationId === conversationId && job.status === 'running') { + return job; + } + } + + return undefined; + } + /** * Check if a job exists. * @param streamId - The stream identifier @@ -144,24 +169,51 @@ class GenerationJobManagerClass { } /** - * Subscribe to a job's event stream. + * Subscribe to a job's event stream with replay support. + * Replays any chunks buffered during disconnect, then continues with live events. + * Buffer is cleared after replay (only holds chunks missed during disconnect). * @param streamId - The stream identifier * @param onChunk - Handler for chunk events * @param onDone - Optional handler for completion * @param onError - Optional handler for errors - * @returns Unsubscribe function, or null if job not found + * @returns Object with unsubscribe function, or null if job not found */ subscribe( streamId: string, onChunk: ChunkHandler, onDone?: DoneHandler, onError?: ErrorHandler, - ): UnsubscribeFn | null { + ): { unsubscribe: UnsubscribeFn } | null { const job = this.jobs.get(streamId); if (!job) { return null; } + // Replay buffered chunks (only chunks missed during disconnect) + const chunksToReplay = [...job.chunks]; + const replayCount = chunksToReplay.length; + + if (replayCount > 0) { + logger.debug( + `[GenerationJobManager] Replaying ${replayCount} buffered chunks for ${streamId}`, + ); + } + + // Clear buffer after capturing for replay - subscriber is now connected + job.chunks = []; + + // Use setImmediate to allow the caller to set up their connection first + setImmediate(() => { + for (const chunk of chunksToReplay) { + onChunk(chunk); + } + + // If job is already complete, send the final event + if (job.finalEvent && ['complete', 'error', 'aborted'].includes(job.status)) { + onDone?.(job.finalEvent); + } + }); + const chunkHandler = (event: ServerSentEvent) => onChunk(event); const doneHandler = (event: ServerSentEvent) => onDone?.(event); const errorHandler = (error: string) => onError?.(error); @@ -176,18 +228,27 @@ class GenerationJobManagerClass { logger.debug(`[GenerationJobManager] First subscriber ready for ${streamId}`); } - return () => { + const unsubscribe = () => { const currentJob = this.jobs.get(streamId); if (currentJob) { currentJob.emitter.off('chunk', chunkHandler); currentJob.emitter.off('done', doneHandler); currentJob.emitter.off('error', errorHandler); + + // Emit event when last subscriber leaves (for saving partial response) + if (currentJob.emitter.listenerCount('chunk') === 0 && currentJob.status === 'running') { + currentJob.emitter.emit('allSubscribersLeft', currentJob.aggregatedContent); + logger.debug(`[GenerationJobManager] All subscribers left ${streamId}`); + } } }; + + return { unsubscribe }; } /** * Emit a chunk event to all subscribers. + * Only buffers chunks when no subscribers are listening (for reconnect replay). * @param streamId - The stream identifier * @param event - The event data to emit */ @@ -196,11 +257,49 @@ class GenerationJobManagerClass { if (!job || job.status !== 'running') { return; } + + // Only buffer if no one is listening (for reconnect replay) + const hasSubscribers = job.emitter.listenerCount('chunk') > 0; + if (!hasSubscribers) { + job.chunks.push(event); + } + + // Always aggregate content (for partial response saving) + this.aggregateContent(job, event); + job.emitter.emit('chunk', event); } + /** + * Aggregate content parts from message delta events. + * Used to save partial response when subscribers disconnect. + */ + private aggregateContent(job: GenerationJob, event: ServerSentEvent): void { + // Check for on_message_delta events which contain content + const data = event as Record; + if (data.event === 'on_message_delta' && data.data) { + const eventData = data.data as Record; + const delta = eventData.delta as Record | undefined; + if (delta?.content && Array.isArray(delta.content)) { + for (const part of delta.content) { + if (part.type === 'text' && part.text) { + // Find or create text content part + let textPart = job.aggregatedContent?.find((p) => p.type === 'text'); + if (!textPart) { + textPart = { type: 'text', text: '' }; + job.aggregatedContent = job.aggregatedContent || []; + job.aggregatedContent.push(textPart); + } + textPart.text = (textPart.text || '') + part.text; + } + } + } + } + } + /** * Emit a done event to all subscribers. + * Stores the final event for replay on reconnect. * @param streamId - The stream identifier * @param event - The final event data */ @@ -209,6 +308,7 @@ class GenerationJobManagerClass { if (!job) { return; } + job.finalEvent = event; job.emitter.emit('done', event); } @@ -278,6 +378,31 @@ class GenerationJobManagerClass { } } + /** + * Get stream info for status endpoint. + * Returns chunk count, status, and aggregated content. + */ + getStreamInfo(streamId: string): { + active: boolean; + status: GenerationJobStatus; + chunkCount: number; + aggregatedContent?: ContentPart[]; + createdAt: number; + } | null { + const job = this.jobs.get(streamId); + if (!job) { + return null; + } + + return { + active: job.status === 'running', + status: job.status, + chunkCount: job.chunks.length, + aggregatedContent: job.aggregatedContent, + createdAt: job.createdAt, + }; + } + /** * Get total number of active jobs. */ diff --git a/packages/api/src/stream/types.ts b/packages/api/src/stream/types.ts index 5b3d43ad16..e65c29157f 100644 --- a/packages/api/src/stream/types.ts +++ b/packages/api/src/stream/types.ts @@ -19,6 +19,18 @@ export interface GenerationJob { metadata: GenerationJobMetadata; readyPromise: Promise; resolveReady: () => void; + /** Buffered chunks for replay on reconnect */ + chunks: ServerSentEvent[]; + /** Final event when job completes */ + finalEvent?: ServerSentEvent; + /** Aggregated content parts for saving partial response */ + aggregatedContent?: ContentPart[]; +} + +export interface ContentPart { + type: string; + text?: string; + [key: string]: unknown; } export type ChunkHandler = (event: ServerSentEvent) => void;