mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-16 16:30:15 +01:00
WIP: resuming
This commit is contained in:
parent
0e850a5d5f
commit
2522cf760f
8 changed files with 478 additions and 85 deletions
|
|
@ -18,8 +18,6 @@ const { getRoleByName } = require('~/models/Role');
|
|||
|
||||
const router = express.Router();
|
||||
|
||||
router.use(moderateText);
|
||||
|
||||
const checkAgentAccess = generateCheckAccess({
|
||||
permissionType: PermissionTypes.AGENTS,
|
||||
permissions: [Permissions.USE],
|
||||
|
|
@ -30,77 +28,12 @@ const checkAgentResourceAccess = canAccessAgentFromBody({
|
|||
requiredPermission: PermissionBits.VIEW,
|
||||
});
|
||||
|
||||
/**
|
||||
* @route GET /stream/:streamId
|
||||
* @desc Subscribe to an ongoing generation job's SSE stream
|
||||
* @access Private
|
||||
*/
|
||||
router.get('/stream/:streamId', requireJwtAuth, (req, res) => {
|
||||
const { streamId } = req.params;
|
||||
|
||||
const job = GenerationJobManager.getJob(streamId);
|
||||
if (!job) {
|
||||
return res.status(404).json({
|
||||
error: 'Stream not found',
|
||||
message: 'The generation job does not exist or has expired.',
|
||||
});
|
||||
}
|
||||
|
||||
// Disable compression for SSE
|
||||
res.setHeader('Content-Encoding', 'identity');
|
||||
res.setHeader('Content-Type', 'text/event-stream');
|
||||
res.setHeader('Cache-Control', 'no-cache, no-transform');
|
||||
res.setHeader('Connection', 'keep-alive');
|
||||
res.setHeader('X-Accel-Buffering', 'no');
|
||||
res.flushHeaders();
|
||||
|
||||
logger.debug(`[AgentStream] Client subscribed to ${streamId}`);
|
||||
|
||||
const unsubscribe = GenerationJobManager.subscribe(
|
||||
streamId,
|
||||
(event) => {
|
||||
if (!res.writableEnded) {
|
||||
res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`);
|
||||
if (typeof res.flush === 'function') {
|
||||
res.flush();
|
||||
}
|
||||
}
|
||||
},
|
||||
(event) => {
|
||||
if (!res.writableEnded) {
|
||||
res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`);
|
||||
if (typeof res.flush === 'function') {
|
||||
res.flush();
|
||||
}
|
||||
res.end();
|
||||
}
|
||||
},
|
||||
(error) => {
|
||||
if (!res.writableEnded) {
|
||||
res.write(`event: error\ndata: ${JSON.stringify({ error })}\n\n`);
|
||||
if (typeof res.flush === 'function') {
|
||||
res.flush();
|
||||
}
|
||||
res.end();
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
if (!unsubscribe) {
|
||||
return res.status(404).json({ error: 'Failed to subscribe to stream' });
|
||||
}
|
||||
|
||||
if (job.status === 'complete' || job.status === 'error' || job.status === 'aborted') {
|
||||
res.write(`event: message\ndata: ${JSON.stringify({ final: true, status: job.status })}\n\n`);
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
|
||||
req.on('close', () => {
|
||||
logger.debug(`[AgentStream] Client disconnected from ${streamId}`);
|
||||
unsubscribe();
|
||||
});
|
||||
});
|
||||
router.use(moderateText);
|
||||
router.use(checkAgentAccess);
|
||||
router.use(checkAgentResourceAccess);
|
||||
router.use(validateConvoAccess);
|
||||
router.use(buildEndpointOption);
|
||||
router.use(setHeaders);
|
||||
|
||||
/**
|
||||
* @route POST /abort
|
||||
|
|
@ -121,12 +54,6 @@ router.post('/abort', (req, res) => {
|
|||
res.status(404).json({ error: 'Job not found' });
|
||||
});
|
||||
|
||||
router.use(checkAgentAccess);
|
||||
router.use(checkAgentResourceAccess);
|
||||
router.use(validateConvoAccess);
|
||||
router.use(buildEndpointOption);
|
||||
router.use(setHeaders);
|
||||
|
||||
const controller = async (req, res, next) => {
|
||||
await AgentController(req, res, next, initializeClient, addTitle);
|
||||
};
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
const express = require('express');
|
||||
const { isEnabled } = require('@librechat/api');
|
||||
const { isEnabled, GenerationJobManager } = require('@librechat/api');
|
||||
const { logger } = require('@librechat/data-schemas');
|
||||
const {
|
||||
uaParser,
|
||||
checkBan,
|
||||
|
|
@ -22,6 +23,108 @@ router.use(uaParser);
|
|||
|
||||
router.use('/', v1);
|
||||
|
||||
/**
|
||||
* Stream endpoints - mounted before chatRouter to bypass rate limiters
|
||||
* These are GET requests and don't need message body validation or rate limiting
|
||||
*/
|
||||
|
||||
/**
|
||||
* @route GET /chat/stream/:streamId
|
||||
* @desc Subscribe to an ongoing generation job's SSE stream with replay support
|
||||
* @access Private
|
||||
* @description Replays any chunks missed during disconnect, then streams live
|
||||
*/
|
||||
router.get('/chat/stream/:streamId', (req, res) => {
|
||||
const { streamId } = req.params;
|
||||
|
||||
const job = GenerationJobManager.getJob(streamId);
|
||||
if (!job) {
|
||||
return res.status(404).json({
|
||||
error: 'Stream not found',
|
||||
message: 'The generation job does not exist or has expired.',
|
||||
});
|
||||
}
|
||||
|
||||
res.setHeader('Content-Encoding', 'identity');
|
||||
res.setHeader('Content-Type', 'text/event-stream');
|
||||
res.setHeader('Cache-Control', 'no-cache, no-transform');
|
||||
res.setHeader('Connection', 'keep-alive');
|
||||
res.setHeader('X-Accel-Buffering', 'no');
|
||||
res.flushHeaders();
|
||||
|
||||
logger.debug(`[AgentStream] Client subscribed to ${streamId}`);
|
||||
|
||||
const result = GenerationJobManager.subscribe(
|
||||
streamId,
|
||||
(event) => {
|
||||
if (!res.writableEnded) {
|
||||
res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`);
|
||||
if (typeof res.flush === 'function') {
|
||||
res.flush();
|
||||
}
|
||||
}
|
||||
},
|
||||
(event) => {
|
||||
if (!res.writableEnded) {
|
||||
res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`);
|
||||
if (typeof res.flush === 'function') {
|
||||
res.flush();
|
||||
}
|
||||
res.end();
|
||||
}
|
||||
},
|
||||
(error) => {
|
||||
if (!res.writableEnded) {
|
||||
res.write(`event: error\ndata: ${JSON.stringify({ error })}\n\n`);
|
||||
if (typeof res.flush === 'function') {
|
||||
res.flush();
|
||||
}
|
||||
res.end();
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
if (!result) {
|
||||
return res.status(404).json({ error: 'Failed to subscribe to stream' });
|
||||
}
|
||||
|
||||
req.on('close', () => {
|
||||
logger.debug(`[AgentStream] Client disconnected from ${streamId}`);
|
||||
result.unsubscribe();
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* @route GET /chat/status/:conversationId
|
||||
* @desc Check if there's an active generation job for a conversation
|
||||
* @access Private
|
||||
* @returns { active, streamId, status, chunkCount, aggregatedContent, createdAt }
|
||||
*/
|
||||
router.get('/chat/status/:conversationId', (req, res) => {
|
||||
const { conversationId } = req.params;
|
||||
|
||||
const job = GenerationJobManager.getJobByConversation(conversationId);
|
||||
|
||||
if (!job) {
|
||||
return res.json({ active: false });
|
||||
}
|
||||
|
||||
if (job.metadata.userId !== req.user.id) {
|
||||
return res.status(403).json({ error: 'Unauthorized' });
|
||||
}
|
||||
|
||||
const info = GenerationJobManager.getStreamInfo(job.streamId);
|
||||
|
||||
res.json({
|
||||
active: info?.active ?? false,
|
||||
streamId: job.streamId,
|
||||
status: info?.status ?? job.status,
|
||||
chunkCount: info?.chunkCount ?? 0,
|
||||
aggregatedContent: info?.aggregatedContent,
|
||||
createdAt: info?.createdAt ?? job.createdAt,
|
||||
});
|
||||
});
|
||||
|
||||
const chatRouter = express.Router();
|
||||
chatRouter.use(configMiddleware);
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import { Constants, buildTree } from 'librechat-data-provider';
|
|||
import type { TMessage } from 'librechat-data-provider';
|
||||
import type { ChatFormValues } from '~/common';
|
||||
import { ChatContext, AddedChatContext, useFileMapContext, ChatFormProvider } from '~/Providers';
|
||||
import { useChatHelpers, useAddedResponse, useAdaptiveSSE } from '~/hooks';
|
||||
import { useChatHelpers, useAddedResponse, useAdaptiveSSE, useResumeOnLoad } from '~/hooks';
|
||||
import ConversationStarters from './Input/ConversationStarters';
|
||||
import { useGetMessagesByConvoId } from '~/data-provider';
|
||||
import MessagesView from './Messages/MessagesView';
|
||||
|
|
@ -54,6 +54,9 @@ function ChatView({ index = 0 }: { index?: number }) {
|
|||
useAdaptiveSSE(rootSubmission, chatHelpers, false, index);
|
||||
useAdaptiveSSE(addedSubmission, addedChatHelpers, true, index + 1);
|
||||
|
||||
// Auto-resume if navigating back to conversation with active job
|
||||
useResumeOnLoad(conversationId, chatHelpers, index);
|
||||
|
||||
const methods = useForm<ChatFormValues>({
|
||||
defaultValues: { text: '' },
|
||||
});
|
||||
|
|
|
|||
40
client/src/data-provider/queries/streamStatus.ts
Normal file
40
client/src/data-provider/queries/streamStatus.ts
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
import { useQuery } from '@tanstack/react-query';
|
||||
import { request } from 'librechat-data-provider';
|
||||
|
||||
export interface StreamStatusResponse {
|
||||
active: boolean;
|
||||
streamId?: string;
|
||||
status?: 'running' | 'complete' | 'error' | 'aborted';
|
||||
chunkCount?: number;
|
||||
aggregatedContent?: Array<{ type: string; text?: string }>;
|
||||
createdAt?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Query key for stream status
|
||||
*/
|
||||
export const streamStatusQueryKey = (conversationId: string) => ['streamStatus', conversationId];
|
||||
|
||||
/**
|
||||
* Fetch stream status for a conversation
|
||||
*/
|
||||
export const fetchStreamStatus = async (conversationId: string): Promise<StreamStatusResponse> => {
|
||||
const response = await request.get(`/api/agents/chat/status/${conversationId}`);
|
||||
return response.data;
|
||||
};
|
||||
|
||||
/**
|
||||
* React Query hook for checking if a conversation has an active generation stream.
|
||||
* Only fetches when conversationId is provided and resumable streams are enabled.
|
||||
*/
|
||||
export function useStreamStatus(conversationId: string | undefined, enabled = true) {
|
||||
return useQuery({
|
||||
queryKey: streamStatusQueryKey(conversationId || ''),
|
||||
queryFn: () => fetchStreamStatus(conversationId!),
|
||||
enabled: !!conversationId && enabled,
|
||||
staleTime: 1000, // Consider stale after 1 second
|
||||
refetchOnMount: true,
|
||||
refetchOnWindowFocus: true,
|
||||
retry: false,
|
||||
});
|
||||
}
|
||||
|
|
@ -1,6 +1,7 @@
|
|||
export { default as useSSE } from './useSSE';
|
||||
export { default as useResumableSSE } from './useResumableSSE';
|
||||
export { default as useAdaptiveSSE } from './useAdaptiveSSE';
|
||||
export { default as useResumeOnLoad } from './useResumeOnLoad';
|
||||
export { default as useStepHandler } from './useStepHandler';
|
||||
export { default as useContentHandler } from './useContentHandler';
|
||||
export { default as useAttachmentHandler } from './useAttachmentHandler';
|
||||
|
|
|
|||
182
client/src/hooks/SSE/useResumeOnLoad.ts
Normal file
182
client/src/hooks/SSE/useResumeOnLoad.ts
Normal file
|
|
@ -0,0 +1,182 @@
|
|||
import { useEffect, useState, useRef } from 'react';
|
||||
import { SSE } from 'sse.js';
|
||||
import { useSetRecoilState, useRecoilValue } from 'recoil';
|
||||
import { request } from 'librechat-data-provider';
|
||||
import type { TMessage, EventSubmission } from 'librechat-data-provider';
|
||||
import type { EventHandlerParams } from './useEventHandlers';
|
||||
import { useAuthContext } from '~/hooks/AuthContext';
|
||||
import { useGetStartupConfig, useGetUserBalance } from '~/data-provider';
|
||||
import useEventHandlers from './useEventHandlers';
|
||||
import store from '~/store';
|
||||
|
||||
type ChatHelpers = Pick<
|
||||
EventHandlerParams,
|
||||
| 'setMessages'
|
||||
| 'getMessages'
|
||||
| 'setConversation'
|
||||
| 'setIsSubmitting'
|
||||
| 'newConversation'
|
||||
| 'resetLatestMessage'
|
||||
>;
|
||||
|
||||
/**
|
||||
* Hook to resume streaming if navigating back to a conversation with active generation.
|
||||
* Checks for active jobs on mount and auto-subscribes if found.
|
||||
*/
|
||||
export default function useResumeOnLoad(
|
||||
conversationId: string | undefined,
|
||||
chatHelpers: ChatHelpers,
|
||||
runIndex = 0,
|
||||
) {
|
||||
const resumableEnabled = useRecoilValue(store.resumableStreams);
|
||||
const { token, isAuthenticated } = useAuthContext();
|
||||
const sseRef = useRef<SSE | null>(null);
|
||||
const checkedConvoRef = useRef<string | null>(null);
|
||||
const [completed, setCompleted] = useState(new Set());
|
||||
const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex));
|
||||
const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex));
|
||||
|
||||
const { getMessages, setIsSubmitting } = chatHelpers;
|
||||
|
||||
const { stepHandler, finalHandler, contentHandler } = useEventHandlers({
|
||||
...chatHelpers,
|
||||
setCompleted,
|
||||
setShowStopButton,
|
||||
});
|
||||
|
||||
const { data: startupConfig } = useGetStartupConfig();
|
||||
const balanceQuery = useGetUserBalance({
|
||||
enabled: !!isAuthenticated && startupConfig?.balance?.enabled,
|
||||
});
|
||||
|
||||
/**
|
||||
* Check for active job when conversation loads
|
||||
*/
|
||||
useEffect(() => {
|
||||
if (!resumableEnabled || !conversationId || !token) {
|
||||
checkedConvoRef.current = null;
|
||||
return;
|
||||
}
|
||||
|
||||
// Only check once per conversationId to prevent loops
|
||||
if (checkedConvoRef.current === conversationId) {
|
||||
return;
|
||||
}
|
||||
|
||||
checkedConvoRef.current = conversationId;
|
||||
|
||||
const checkAndResume = async () => {
|
||||
try {
|
||||
const response = await fetch(`/api/agents/chat/status/${conversationId}`, {
|
||||
headers: { Authorization: `Bearer ${token}` },
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
return;
|
||||
}
|
||||
|
||||
const { active, streamId } = await response.json();
|
||||
|
||||
if (!active || !streamId) {
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('[ResumeOnLoad] Found active job, resuming...', { streamId });
|
||||
|
||||
const messages = getMessages() || [];
|
||||
const lastMessage = messages[messages.length - 1];
|
||||
let textIndex: number | null = null;
|
||||
|
||||
const url = `/api/agents/chat/stream/${encodeURIComponent(streamId)}`;
|
||||
|
||||
const sse = new SSE(url, {
|
||||
headers: { Authorization: `Bearer ${token}` },
|
||||
method: 'GET',
|
||||
});
|
||||
sseRef.current = sse;
|
||||
|
||||
sse.addEventListener('open', () => {
|
||||
console.log('[ResumeOnLoad] Reconnected to stream');
|
||||
setAbortScroll(false);
|
||||
setShowStopButton(true);
|
||||
setIsSubmitting(true);
|
||||
});
|
||||
|
||||
sse.addEventListener('message', (e: MessageEvent) => {
|
||||
try {
|
||||
const data = JSON.parse(e.data);
|
||||
|
||||
if (data.final != null) {
|
||||
try {
|
||||
finalHandler(data, { messages } as unknown as EventSubmission);
|
||||
} catch (error) {
|
||||
console.error('[ResumeOnLoad] Error in finalHandler:', error);
|
||||
setIsSubmitting(false);
|
||||
setShowStopButton(false);
|
||||
}
|
||||
(startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch();
|
||||
sse.close();
|
||||
sseRef.current = null;
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.event != null) {
|
||||
stepHandler(data, {
|
||||
messages,
|
||||
userMessage: lastMessage,
|
||||
} as unknown as EventSubmission);
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.type != null) {
|
||||
const { text, index } = data;
|
||||
if (text != null && index !== textIndex) {
|
||||
textIndex = index;
|
||||
}
|
||||
contentHandler({ data, submission: { messages } as unknown as EventSubmission });
|
||||
return;
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[ResumeOnLoad] Error processing message:', error);
|
||||
}
|
||||
});
|
||||
|
||||
sse.addEventListener('error', async (e: MessageEvent) => {
|
||||
console.log('[ResumeOnLoad] Stream error');
|
||||
sse.close();
|
||||
sseRef.current = null;
|
||||
setIsSubmitting(false);
|
||||
setShowStopButton(false);
|
||||
|
||||
/* @ts-ignore */
|
||||
if (e.responseCode === 401) {
|
||||
try {
|
||||
const refreshResponse = await request.refreshToken();
|
||||
const newToken = refreshResponse?.token ?? '';
|
||||
if (newToken) {
|
||||
request.dispatchTokenUpdatedEvent(newToken);
|
||||
}
|
||||
} catch (error) {
|
||||
console.log('[ResumeOnLoad] Token refresh failed:', error);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
sse.stream();
|
||||
} catch (error) {
|
||||
console.error('[ResumeOnLoad] Error checking job status:', error);
|
||||
}
|
||||
};
|
||||
|
||||
checkAndResume();
|
||||
|
||||
return () => {
|
||||
if (sseRef.current) {
|
||||
sseRef.current.close();
|
||||
sseRef.current = null;
|
||||
}
|
||||
};
|
||||
// Only re-run when conversationId changes
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [conversationId]);
|
||||
}
|
||||
|
|
@ -8,6 +8,7 @@ import type {
|
|||
DoneHandler,
|
||||
ErrorHandler,
|
||||
UnsubscribeFn,
|
||||
ContentPart,
|
||||
} from './types';
|
||||
|
||||
/**
|
||||
|
|
@ -68,6 +69,8 @@ class GenerationJobManagerClass {
|
|||
metadata: { userId, conversationId },
|
||||
readyPromise,
|
||||
resolveReady: resolveReady!,
|
||||
chunks: [],
|
||||
aggregatedContent: [],
|
||||
};
|
||||
|
||||
job.emitter.setMaxListeners(100);
|
||||
|
|
@ -87,6 +90,28 @@ class GenerationJobManagerClass {
|
|||
return this.jobs.get(streamId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find an active job by conversationId.
|
||||
* Since streamId === conversationId for existing conversations,
|
||||
* we first check by streamId, then search metadata.
|
||||
* @param conversationId - The conversation identifier
|
||||
* @returns The job if found, undefined otherwise
|
||||
*/
|
||||
getJobByConversation(conversationId: string): GenerationJob | undefined {
|
||||
const directMatch = this.jobs.get(conversationId);
|
||||
if (directMatch && directMatch.status === 'running') {
|
||||
return directMatch;
|
||||
}
|
||||
|
||||
for (const job of this.jobs.values()) {
|
||||
if (job.metadata.conversationId === conversationId && job.status === 'running') {
|
||||
return job;
|
||||
}
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a job exists.
|
||||
* @param streamId - The stream identifier
|
||||
|
|
@ -144,24 +169,51 @@ class GenerationJobManagerClass {
|
|||
}
|
||||
|
||||
/**
|
||||
* Subscribe to a job's event stream.
|
||||
* Subscribe to a job's event stream with replay support.
|
||||
* Replays any chunks buffered during disconnect, then continues with live events.
|
||||
* Buffer is cleared after replay (only holds chunks missed during disconnect).
|
||||
* @param streamId - The stream identifier
|
||||
* @param onChunk - Handler for chunk events
|
||||
* @param onDone - Optional handler for completion
|
||||
* @param onError - Optional handler for errors
|
||||
* @returns Unsubscribe function, or null if job not found
|
||||
* @returns Object with unsubscribe function, or null if job not found
|
||||
*/
|
||||
subscribe(
|
||||
streamId: string,
|
||||
onChunk: ChunkHandler,
|
||||
onDone?: DoneHandler,
|
||||
onError?: ErrorHandler,
|
||||
): UnsubscribeFn | null {
|
||||
): { unsubscribe: UnsubscribeFn } | null {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Replay buffered chunks (only chunks missed during disconnect)
|
||||
const chunksToReplay = [...job.chunks];
|
||||
const replayCount = chunksToReplay.length;
|
||||
|
||||
if (replayCount > 0) {
|
||||
logger.debug(
|
||||
`[GenerationJobManager] Replaying ${replayCount} buffered chunks for ${streamId}`,
|
||||
);
|
||||
}
|
||||
|
||||
// Clear buffer after capturing for replay - subscriber is now connected
|
||||
job.chunks = [];
|
||||
|
||||
// Use setImmediate to allow the caller to set up their connection first
|
||||
setImmediate(() => {
|
||||
for (const chunk of chunksToReplay) {
|
||||
onChunk(chunk);
|
||||
}
|
||||
|
||||
// If job is already complete, send the final event
|
||||
if (job.finalEvent && ['complete', 'error', 'aborted'].includes(job.status)) {
|
||||
onDone?.(job.finalEvent);
|
||||
}
|
||||
});
|
||||
|
||||
const chunkHandler = (event: ServerSentEvent) => onChunk(event);
|
||||
const doneHandler = (event: ServerSentEvent) => onDone?.(event);
|
||||
const errorHandler = (error: string) => onError?.(error);
|
||||
|
|
@ -176,18 +228,27 @@ class GenerationJobManagerClass {
|
|||
logger.debug(`[GenerationJobManager] First subscriber ready for ${streamId}`);
|
||||
}
|
||||
|
||||
return () => {
|
||||
const unsubscribe = () => {
|
||||
const currentJob = this.jobs.get(streamId);
|
||||
if (currentJob) {
|
||||
currentJob.emitter.off('chunk', chunkHandler);
|
||||
currentJob.emitter.off('done', doneHandler);
|
||||
currentJob.emitter.off('error', errorHandler);
|
||||
|
||||
// Emit event when last subscriber leaves (for saving partial response)
|
||||
if (currentJob.emitter.listenerCount('chunk') === 0 && currentJob.status === 'running') {
|
||||
currentJob.emitter.emit('allSubscribersLeft', currentJob.aggregatedContent);
|
||||
logger.debug(`[GenerationJobManager] All subscribers left ${streamId}`);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
return { unsubscribe };
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a chunk event to all subscribers.
|
||||
* Only buffers chunks when no subscribers are listening (for reconnect replay).
|
||||
* @param streamId - The stream identifier
|
||||
* @param event - The event data to emit
|
||||
*/
|
||||
|
|
@ -196,11 +257,49 @@ class GenerationJobManagerClass {
|
|||
if (!job || job.status !== 'running') {
|
||||
return;
|
||||
}
|
||||
|
||||
// Only buffer if no one is listening (for reconnect replay)
|
||||
const hasSubscribers = job.emitter.listenerCount('chunk') > 0;
|
||||
if (!hasSubscribers) {
|
||||
job.chunks.push(event);
|
||||
}
|
||||
|
||||
// Always aggregate content (for partial response saving)
|
||||
this.aggregateContent(job, event);
|
||||
|
||||
job.emitter.emit('chunk', event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregate content parts from message delta events.
|
||||
* Used to save partial response when subscribers disconnect.
|
||||
*/
|
||||
private aggregateContent(job: GenerationJob, event: ServerSentEvent): void {
|
||||
// Check for on_message_delta events which contain content
|
||||
const data = event as Record<string, unknown>;
|
||||
if (data.event === 'on_message_delta' && data.data) {
|
||||
const eventData = data.data as Record<string, unknown>;
|
||||
const delta = eventData.delta as Record<string, unknown> | undefined;
|
||||
if (delta?.content && Array.isArray(delta.content)) {
|
||||
for (const part of delta.content) {
|
||||
if (part.type === 'text' && part.text) {
|
||||
// Find or create text content part
|
||||
let textPart = job.aggregatedContent?.find((p) => p.type === 'text');
|
||||
if (!textPart) {
|
||||
textPart = { type: 'text', text: '' };
|
||||
job.aggregatedContent = job.aggregatedContent || [];
|
||||
job.aggregatedContent.push(textPart);
|
||||
}
|
||||
textPart.text = (textPart.text || '') + part.text;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a done event to all subscribers.
|
||||
* Stores the final event for replay on reconnect.
|
||||
* @param streamId - The stream identifier
|
||||
* @param event - The final event data
|
||||
*/
|
||||
|
|
@ -209,6 +308,7 @@ class GenerationJobManagerClass {
|
|||
if (!job) {
|
||||
return;
|
||||
}
|
||||
job.finalEvent = event;
|
||||
job.emitter.emit('done', event);
|
||||
}
|
||||
|
||||
|
|
@ -278,6 +378,31 @@ class GenerationJobManagerClass {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get stream info for status endpoint.
|
||||
* Returns chunk count, status, and aggregated content.
|
||||
*/
|
||||
getStreamInfo(streamId: string): {
|
||||
active: boolean;
|
||||
status: GenerationJobStatus;
|
||||
chunkCount: number;
|
||||
aggregatedContent?: ContentPart[];
|
||||
createdAt: number;
|
||||
} | null {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
active: job.status === 'running',
|
||||
status: job.status,
|
||||
chunkCount: job.chunks.length,
|
||||
aggregatedContent: job.aggregatedContent,
|
||||
createdAt: job.createdAt,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get total number of active jobs.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -19,6 +19,18 @@ export interface GenerationJob {
|
|||
metadata: GenerationJobMetadata;
|
||||
readyPromise: Promise<void>;
|
||||
resolveReady: () => void;
|
||||
/** Buffered chunks for replay on reconnect */
|
||||
chunks: ServerSentEvent[];
|
||||
/** Final event when job completes */
|
||||
finalEvent?: ServerSentEvent;
|
||||
/** Aggregated content parts for saving partial response */
|
||||
aggregatedContent?: ContentPart[];
|
||||
}
|
||||
|
||||
export interface ContentPart {
|
||||
type: string;
|
||||
text?: string;
|
||||
[key: string]: unknown;
|
||||
}
|
||||
|
||||
export type ChunkHandler = (event: ServerSentEvent) => void;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue