feat: Enhance Stream Management with Abort Functionality

- Updated the abort endpoint to support aborting ongoing generation streams using either streamId or conversationId.
- Introduced a new mutation hook `useAbortStreamMutation` for client-side integration.
- Added `useStreamStatus` query to monitor stream status and facilitate resuming conversations.
- Enhanced `useChatHelpers` to incorporate abort functionality when stopping generation.
- Improved `useResumableSSE` to handle stream errors and token refresh seamlessly.
- Updated `useResumeOnLoad` to check for active streams and resume conversations appropriately.
This commit is contained in:
Danny Avila 2025-12-11 21:19:43 -05:00
parent ff14cd3b44
commit 1853b4a189
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
11 changed files with 295 additions and 136 deletions

View file

@ -2,6 +2,7 @@ import { useEffect, useRef } from 'react';
import { useSetRecoilState, useRecoilValue } from 'recoil';
import { Constants, tMessageSchema } from 'librechat-data-provider';
import type { TMessage, TConversation, TSubmission, Agents } from 'librechat-data-provider';
import { useStreamStatus } from '~/data-provider';
import store from '~/store';
/**
@ -77,7 +78,9 @@ function buildSubmissionFromResumeState(
isRegenerate: false,
isTemporary: false,
endpointOption: {},
} as TSubmission;
// Signal to useResumableSSE to subscribe to existing stream instead of starting new
resumeStreamId: streamId,
} as TSubmission & { resumeStreamId: string };
}
/**
@ -97,67 +100,121 @@ export default function useResumeOnLoad(
const resumableEnabled = useRecoilValue(store.resumableStreams);
const setSubmission = useSetRecoilState(store.submissionByIndex(runIndex));
const currentSubmission = useRecoilValue(store.submissionByIndex(runIndex));
const hasResumedRef = useRef<string | null>(null);
// Track conversations we've already processed (either resumed or skipped)
const processedConvoRef = useRef<string | null>(null);
// Check for active stream when conversation changes
// const { data: streamStatus, isSuccess } = useStreamStatus(
// conversationId,
// resumableEnabled && !currentSubmission, // Only check if no active submission
// );
// Only check if resumable is enabled and no active submission
const shouldCheck =
resumableEnabled &&
!currentSubmission &&
!!conversationId &&
conversationId !== Constants.NEW_CONVO &&
processedConvoRef.current !== conversationId; // Don't re-check processed convos
const { data: streamStatus, isSuccess } = useStreamStatus(conversationId, shouldCheck);
useEffect(() => {
// if (!resumableEnabled || !conversationId || !isSuccess || !streamStatus) {
if (!resumableEnabled || !conversationId) {
console.log('[ResumeOnLoad] Effect check', {
resumableEnabled,
conversationId,
hasCurrentSubmission: !!currentSubmission,
currentSubmissionConvoId: currentSubmission?.conversation?.conversationId,
isSuccess,
streamStatusActive: streamStatus?.active,
streamStatusStreamId: streamStatus?.streamId,
processedConvoRef: processedConvoRef.current,
});
if (!resumableEnabled || !conversationId || conversationId === Constants.NEW_CONVO) {
console.log('[ResumeOnLoad] Skipping - not enabled or new convo');
return;
}
// Don't resume if we already have an active submission
// Don't resume if we already have an active submission (we started it ourselves)
if (currentSubmission) {
console.log('[ResumeOnLoad] Skipping - already have active submission, marking as processed');
// Mark as processed so we don't try again
processedConvoRef.current = conversationId;
return;
}
// Don't resume the same conversation twice
if (hasResumedRef.current === conversationId) {
// Wait for stream status query to complete
if (!isSuccess || !streamStatus) {
console.log('[ResumeOnLoad] Waiting for stream status query');
return;
}
// Don't process the same conversation twice
if (processedConvoRef.current === conversationId) {
console.log('[ResumeOnLoad] Skipping - already processed this conversation');
return;
}
// Mark as processed immediately to prevent race conditions
processedConvoRef.current = conversationId;
// Check if there's an active job to resume
// if (!streamStatus.active || !streamStatus.streamId) {
// return;
// }
if (!streamStatus.active || !streamStatus.streamId) {
console.log('[ResumeOnLoad] No active job to resume for:', conversationId);
return;
}
// console.log('[ResumeOnLoad] Found active job, creating submission...', {
// streamId: streamStatus.streamId,
// status: streamStatus.status,
// });
hasResumedRef.current = conversationId;
console.log('[ResumeOnLoad] Found active job, creating submission...', {
streamId: streamStatus.streamId,
status: streamStatus.status,
resumeState: streamStatus.resumeState,
});
const messages = getMessages() || [];
// Minimal submission without resume state
const lastMessage = messages[messages.length - 1];
const submission: TSubmission = {
messages,
userMessage: lastMessage ?? ({ messageId: 'resume', conversationId, text: '' } as TMessage),
initialResponse: {
messageId: 'resume_',
// Build submission from resume state if available
if (streamStatus.resumeState) {
const submission = buildSubmissionFromResumeState(
streamStatus.resumeState,
streamStatus.streamId,
messages,
conversationId,
text: '',
content: [{ type: 'text', text: '' }],
} as TMessage,
conversation: { conversationId, title: 'Resumed Chat' } as TConversation,
isRegenerate: false,
isTemporary: false,
endpointOption: {},
} as TSubmission;
setSubmission(submission);
}, [conversationId, resumableEnabled, currentSubmission, getMessages, setSubmission]);
);
setSubmission(submission);
} else {
// Minimal submission without resume state
const lastUserMessage = [...messages].reverse().find((m) => m.isCreatedByUser);
const submission = {
messages,
userMessage:
lastUserMessage ?? ({ messageId: 'resume', conversationId, text: '' } as TMessage),
initialResponse: {
messageId: 'resume_',
conversationId,
text: '',
content: streamStatus.aggregatedContent ?? [{ type: 'text', text: '' }],
} as TMessage,
conversation: { conversationId, title: 'Resumed Chat' } as TConversation,
isRegenerate: false,
isTemporary: false,
endpointOption: {},
// Signal to useResumableSSE to subscribe to existing stream instead of starting new
resumeStreamId: streamStatus.streamId,
} as TSubmission & { resumeStreamId: string };
setSubmission(submission);
}
}, [
conversationId,
resumableEnabled,
currentSubmission,
isSuccess,
streamStatus,
getMessages,
setSubmission,
]);
// Reset hasResumedRef when conversation changes
// Reset processedConvoRef when conversation changes to a different one
useEffect(() => {
if (conversationId !== hasResumedRef.current) {
hasResumedRef.current = null;
if (conversationId && conversationId !== processedConvoRef.current) {
// Only reset if we're navigating to a DIFFERENT conversation
// This allows re-checking when navigating back
processedConvoRef.current = null;
}
}, [conversationId]);
}