mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-01-21 17:56:13 +01:00
feat: Enhance Stream Management with Abort Functionality
- Updated the abort endpoint to support aborting ongoing generation streams using either streamId or conversationId. - Introduced a new mutation hook `useAbortStreamMutation` for client-side integration. - Added `useStreamStatus` query to monitor stream status and facilitate resuming conversations. - Enhanced `useChatHelpers` to incorporate abort functionality when stopping generation. - Improved `useResumableSSE` to handle stream errors and token refresh seamlessly. - Updated `useResumeOnLoad` to check for active streams and resume conversations appropriately.
This commit is contained in:
parent
0a517a2b8f
commit
1985e53b80
11 changed files with 295 additions and 136 deletions
2
client/src/data-provider/SSE/index.ts
Normal file
2
client/src/data-provider/SSE/index.ts
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
export * from './queries';
|
||||
export * from './mutations';
|
||||
39
client/src/data-provider/SSE/mutations.ts
Normal file
39
client/src/data-provider/SSE/mutations.ts
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
import { useMutation } from '@tanstack/react-query';
|
||||
import { request } from 'librechat-data-provider';
|
||||
|
||||
export interface AbortStreamParams {
|
||||
/** The stream ID to abort (if known) */
|
||||
streamId?: string;
|
||||
/** The conversation ID to abort (backend will look up the job) */
|
||||
conversationId?: string;
|
||||
}
|
||||
|
||||
export interface AbortStreamResponse {
|
||||
success: boolean;
|
||||
aborted?: string;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort an ongoing generation stream.
|
||||
* The backend will emit a `done` event with `aborted: true` to the SSE stream,
|
||||
* allowing the client to handle cleanup via the normal event flow.
|
||||
*
|
||||
* Can pass either streamId or conversationId - backend will find the job.
|
||||
*/
|
||||
export const abortStream = async (params: AbortStreamParams): Promise<AbortStreamResponse> => {
|
||||
console.log('[abortStream] Calling abort endpoint with params:', params);
|
||||
const result = (await request.post('/api/agents/chat/abort', params)) as AbortStreamResponse;
|
||||
console.log('[abortStream] Abort response:', result);
|
||||
return result;
|
||||
};
|
||||
|
||||
/**
|
||||
* React Query mutation hook for aborting a generation stream.
|
||||
* Use this when the user explicitly clicks the stop button.
|
||||
*/
|
||||
export function useAbortStreamMutation() {
|
||||
return useMutation({
|
||||
mutationFn: abortStream,
|
||||
});
|
||||
}
|
||||
46
client/src/data-provider/SSE/queries.ts
Normal file
46
client/src/data-provider/SSE/queries.ts
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
import { useQuery } from '@tanstack/react-query';
|
||||
import { request } from 'librechat-data-provider';
|
||||
import type { Agents } from 'librechat-data-provider';
|
||||
|
||||
export interface StreamStatusResponse {
|
||||
active: boolean;
|
||||
streamId?: string;
|
||||
status?: 'running' | 'complete' | 'error' | 'aborted';
|
||||
chunkCount?: number;
|
||||
aggregatedContent?: Array<{ type: string; text?: string }>;
|
||||
createdAt?: number;
|
||||
resumeState?: Agents.ResumeState;
|
||||
}
|
||||
|
||||
/**
|
||||
* Query key for stream status
|
||||
*/
|
||||
export const streamStatusQueryKey = (conversationId: string) => ['streamStatus', conversationId];
|
||||
|
||||
/**
|
||||
* Fetch stream status for a conversation
|
||||
*/
|
||||
export const fetchStreamStatus = async (conversationId: string): Promise<StreamStatusResponse> => {
|
||||
console.log('[fetchStreamStatus] Fetching status for:', conversationId);
|
||||
const result = await request.get<StreamStatusResponse>(
|
||||
`/api/agents/chat/status/${conversationId}`,
|
||||
);
|
||||
console.log('[fetchStreamStatus] Result:', result);
|
||||
return result;
|
||||
};
|
||||
|
||||
/**
|
||||
* React Query hook for checking if a conversation has an active generation stream.
|
||||
* Only fetches when conversationId is provided and resumable streams are enabled.
|
||||
*/
|
||||
export function useStreamStatus(conversationId: string | undefined, enabled = true) {
|
||||
return useQuery({
|
||||
queryKey: streamStatusQueryKey(conversationId || ''),
|
||||
queryFn: () => fetchStreamStatus(conversationId!),
|
||||
enabled: !!conversationId && enabled,
|
||||
staleTime: 1000, // Consider stale after 1 second
|
||||
refetchOnMount: true,
|
||||
refetchOnWindowFocus: true,
|
||||
retry: false,
|
||||
});
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue