mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-01-14 22:48:52 +01:00
🌊 feat: Resumable LLM Streams with Horizontal Scaling (#10926)
* ✨ feat: Implement Resumable Generation Jobs with SSE Support
- Introduced GenerationJobManager to handle resumable LLM generation jobs independently of HTTP connections.
- Added support for subscribing to ongoing generation jobs via SSE, allowing clients to reconnect and receive updates without losing progress.
- Enhanced existing agent controllers and routes to integrate resumable functionality, including job creation, completion, and error handling.
- Updated client-side hooks to manage adaptive SSE streams, switching between standard and resumable modes based on user settings.
- Added UI components and settings for enabling/disabling resumable streams, improving user experience during unstable connections.
* WIP: resuming
* WIP: resumable stream
* feat: Enhance Stream Management with Abort Functionality
- Updated the abort endpoint to support aborting ongoing generation streams using either streamId or conversationId.
- Introduced a new mutation hook `useAbortStreamMutation` for client-side integration.
- Added `useStreamStatus` query to monitor stream status and facilitate resuming conversations.
- Enhanced `useChatHelpers` to incorporate abort functionality when stopping generation.
- Improved `useResumableSSE` to handle stream errors and token refresh seamlessly.
- Updated `useResumeOnLoad` to check for active streams and resume conversations appropriately.
* fix: Update query parameter handling in useChatHelpers
- Refactored the logic for determining the query parameter used in fetching messages to prioritize paramId from the URL, falling back to conversationId only if paramId is not available. This change ensures consistency with the ChatView component's expectations.
* fix: improve syncing when switching conversations
* fix: Prevent memory leaks in useResumableSSE by clearing handler maps on stream completion and cleanup
* fix: Improve content type mismatch handling in useStepHandler
- Enhanced the condition for detecting content type mismatches to include additional checks, ensuring more robust validation of content types before processing updates.
* fix: Allow dynamic content creation in useChatFunctions
- Updated the initial response handling to avoid pre-initializing content types, enabling dynamic creation of content parts based on incoming delta events. This change supports various content types such as think and text.
* fix: Refine response message handling in useStepHandler
- Updated logic to determine the appropriate response message based on the last message's origin, ensuring correct message replacement or appending based on user interaction. This change enhances the accuracy of message updates in the chat flow.
* refactor: Enhance GenerationJobManager with In-Memory Implementations
- Introduced InMemoryJobStore, InMemoryEventTransport, and InMemoryContentState for improved job management and event handling.
- Updated GenerationJobManager to utilize these new implementations, allowing for better separation of concerns and easier maintenance.
- Enhanced job metadata handling to support user messages and response IDs for resumable functionality.
- Improved cleanup and state management processes to prevent memory leaks and ensure efficient resource usage.
* refactor: Enhance GenerationJobManager with improved subscriber handling
- Updated RuntimeJobState to include allSubscribersLeftHandlers for managing client disconnections without affecting subscriber count.
- Refined createJob and subscribe methods to ensure generation starts only when the first real client connects.
- Added detailed documentation for methods and properties to clarify the synchronization of job generation with client readiness.
- Improved logging for subscriber checks and event handling to facilitate debugging and monitoring.
* chore: Adjust timeout for subscriber readiness in ResumableAgentController
- Reduced the timeout duration from 5000ms to 2500ms in the startGeneration function to improve responsiveness when waiting for subscriber readiness. This change aims to enhance the efficiency of the agent's background generation process.
* refactor: Update GenerationJobManager documentation and structure
- Enhanced the documentation for GenerationJobManager to clarify the architecture and pluggable service design.
- Updated comments to reflect the potential for Redis integration and the need for async refactoring.
- Improved the structure of the GenerationJob facade to emphasize the unified API while allowing for implementation swapping without affecting consumer code.
* refactor: Convert GenerationJobManager methods to async for improved performance
- Updated methods in GenerationJobManager and InMemoryJobStore to be asynchronous, enhancing the handling of job creation, retrieval, and management.
- Adjusted the ResumableAgentController and related routes to await job operations, ensuring proper flow and error handling.
- Increased timeout duration in ResumableAgentController's startGeneration function to 3500ms for better subscriber readiness management.
* refactor: Simplify initial response handling in useChatFunctions
- Removed unnecessary pre-initialization of content types in the initial response, allowing for dynamic content creation based on incoming delta events. This change enhances flexibility in handling various content types in the chat flow.
* refactor: Clarify content handling logic in useStepHandler
- Updated comments to better explain the handling of initialContent and existingContent in edit and resume scenarios.
- Simplified the logic for merging content, ensuring that initialContent is used directly when available, improving clarity and maintainability.
* refactor: Improve message handling logic in useStepHandler
- Enhanced the logic for managing messages in multi-tab scenarios, ensuring that the most up-to-date message history is utilized.
- Removed existing response placeholders and ensured user messages are included, improving the accuracy of message updates in the chat flow.
* fix: remove unnecessary content length logging in the chat stream response, simplifying the debug message while retaining essential information about run steps. This change enhances clarity in logging without losing critical context.
* refactor: Integrate streamId handling for improved resumable functionality for attachments
- Added streamId parameter to various functions to support resumable mode in tool loading and memory processing.
- Updated related methods to ensure proper handling of attachments and responses based on the presence of streamId, enhancing the overall streaming experience.
- Improved logging and attachment management to accommodate both standard and resumable modes.
* refactor: Streamline abort handling and integrate GenerationJobManager for improved job management
- Removed the abortControllers middleware and integrated abort handling directly into GenerationJobManager.
- Updated abortMessage function to utilize GenerationJobManager for aborting jobs by conversation ID, enhancing clarity and efficiency.
- Simplified cleanup processes and improved error handling during abort operations.
- Enhanced metadata management for jobs, including endpoint and model information, to facilitate better tracking and resource management.
* refactor: Unify streamId and conversationId handling for improved job management
- Updated ResumableAgentController and AgentController to generate conversationId upfront, ensuring it matches streamId for consistency.
- Simplified job creation and metadata management by removing redundant conversationId updates from callbacks.
- Refactored abortMiddleware and related methods to utilize the unified streamId/conversationId approach, enhancing clarity in job handling.
- Removed deprecated methods from GenerationJobManager and InMemoryJobStore, streamlining the codebase and improving maintainability.
* refactor: Enhance resumable SSE handling with improved UI state management and error recovery
- Added UI state restoration on successful SSE connection to indicate ongoing submission.
- Implemented detailed error handling for network failures, including retry logic with exponential backoff.
- Introduced abort event handling to reset UI state on intentional stream closure.
- Enhanced debugging capabilities for testing reconnection and clean close scenarios.
- Updated generation function to retry on network errors, improving resilience during submission processes.
* refactor: Consolidate content state management into IJobStore for improved job handling
- Removed InMemoryContentState and integrated its functionality into InMemoryJobStore, streamlining content state management.
- Updated GenerationJobManager to utilize jobStore for content state operations, enhancing clarity and reducing redundancy.
- Introduced RedisJobStore for horizontal scaling, allowing for efficient job management and content reconstruction from chunks.
- Updated IJobStore interface to reflect changes in content state handling, ensuring consistency across implementations.
* feat: Introduce Redis-backed stream services for enhanced job management
- Added createStreamServices function to configure job store and event transport, supporting both Redis and in-memory options.
- Updated GenerationJobManager to allow configuration with custom job stores and event transports, improving flexibility for different deployment scenarios.
- Refactored IJobStore interface to support asynchronous content retrieval, ensuring compatibility with Redis implementations.
- Implemented RedisEventTransport for real-time event delivery across instances, enhancing scalability and responsiveness.
- Updated InMemoryJobStore to align with new async patterns for content and run step retrieval, ensuring consistent behavior across storage options.
* refactor: Remove redundant debug logging in GenerationJobManager and RedisEventTransport
- Eliminated unnecessary debug statements in GenerationJobManager related to subscriber actions and job updates, enhancing log clarity.
- Removed debug logging in RedisEventTransport for subscription and subscriber disconnection events, streamlining the logging output.
- Cleaned up debug messages in RedisJobStore to focus on essential information, improving overall logging efficiency.
* refactor: Enhance job state management and TTL configuration in RedisJobStore
- Updated the RedisJobStore to allow customizable TTL values for job states, improving flexibility in job management.
- Refactored the handling of job expiration and cleanup processes to align with new TTL configurations.
- Simplified the response structure in the chat status endpoint by consolidating state retrieval, enhancing clarity and performance.
- Improved comments and documentation for better understanding of the changes made.
* refactor: cleanupOnComplete option to GenerationJobManager for flexible resource management
- Introduced a new configuration option, cleanupOnComplete, allowing immediate cleanup of event transport and job resources upon job completion.
- Updated completeJob and abortJob methods to respect the cleanupOnComplete setting, enhancing memory management.
- Improved cleanup logic in the cleanup method to handle orphaned resources effectively.
- Enhanced documentation and comments for better clarity on the new functionality.
* refactor: Update TTL configuration for completed jobs in InMemoryJobStore
- Changed the TTL for completed jobs from 5 minutes to 0, allowing for immediate cleanup.
- Enhanced cleanup logic to respect the new TTL setting, improving resource management.
- Updated comments for clarity on the behavior of the TTL configuration.
* refactor: Enhance RedisJobStore with local graph caching for improved performance
- Introduced a local cache for graph references using WeakRef to optimize reconnects for the same instance.
- Updated job deletion and cleanup methods to manage the local cache effectively, ensuring stale entries are removed.
- Enhanced content retrieval methods to prioritize local cache access, reducing Redis round-trips for same-instance reconnects.
- Improved documentation and comments for clarity on the caching mechanism and its benefits.
* feat: Add integration tests for GenerationJobManager, RedisEventTransport, and RedisJobStore, add Redis Cluster support
- Introduced comprehensive integration tests for GenerationJobManager, covering both in-memory and Redis modes to ensure consistent job management and event handling.
- Added tests for RedisEventTransport to validate pub/sub functionality, including cross-instance event delivery and error handling.
- Implemented integration tests for RedisJobStore, focusing on multi-instance job access, content reconstruction from chunks, and consumer group behavior.
- Enhanced test setup and teardown processes to ensure a clean environment for each test run, improving reliability and maintainability.
* fix: Improve error handling in GenerationJobManager for allSubscribersLeft handlers
- Enhanced the error handling logic when retrieving content parts for allSubscribersLeft handlers, ensuring that any failures are logged appropriately.
- Updated the promise chain to catch errors from getContentParts, improving robustness and clarity in error reporting.
* ci: Improve Redis client disconnection handling in integration tests
- Updated the afterAll cleanup logic in integration tests for GenerationJobManager, RedisEventTransport, and RedisJobStore to use `quit()` for graceful disconnection of the Redis client.
- Added fallback to `disconnect()` if `quit()` fails, enhancing robustness in resource management during test teardown.
- Improved comments for clarity on the disconnection process and error handling.
* refactor: Enhance GenerationJobManager and event transports for improved resource management
- Updated GenerationJobManager to prevent immediate cleanup of eventTransport upon job completion, allowing final events to transmit fully before cleanup.
- Added orphaned stream cleanup logic in GenerationJobManager to handle streams without corresponding jobs.
- Introduced getTrackedStreamIds method in both InMemoryEventTransport and RedisEventTransport for better management of orphaned streams.
- Improved comments for clarity on resource management and cleanup processes.
* refactor: Update GenerationJobManager and ResumableAgentController for improved event handling
- Modified GenerationJobManager to resolve readyPromise immediately, eliminating startup latency and allowing early event buffering for late subscribers.
- Enhanced event handling logic to replay buffered events when the first subscriber connects, ensuring no events are lost due to race conditions.
- Updated comments for clarity on the new event synchronization mechanism and its benefits in both Redis and in-memory modes.
* fix: Update cache integration test command for stream to ensure proper execution
- Modified the test command for cache integration related to streams by adding the --forceExit flag to prevent hanging tests.
- This change enhances the reliability of the test suite by ensuring all tests complete as expected.
* feat: Add active job management for user and show progress in conversation list
- Implemented a new endpoint to retrieve active generation job IDs for the current user, enhancing user experience by allowing visibility of ongoing tasks.
- Integrated active job tracking in the Conversations component, displaying generation indicators based on active jobs.
- Optimized job management in the GenerationJobManager and InMemoryJobStore to support user-specific job queries, ensuring efficient resource handling and cleanup.
- Updated relevant components and hooks to utilize the new active jobs feature, improving overall application responsiveness and user feedback.
* feat: Implement active job tracking by user in RedisJobStore
- Added functionality to retrieve active job IDs for a specific user, enhancing user experience by allowing visibility of ongoing tasks.
- Implemented self-healing cleanup for stale job entries, ensuring accurate tracking of active jobs.
- Updated job creation, update, and deletion methods to manage user-specific job sets effectively.
- Enhanced integration tests to validate the new user-specific job management features.
* refactor: Simplify job deletion logic by removing user job cleanup from InMemoryJobStore and RedisJobStore
* WIP: Add backend inspect script for easier debugging in production
* refactor: title generation logic
- Changed the title generation endpoint from POST to GET, allowing for more efficient retrieval of titles based on conversation ID.
- Implemented exponential backoff for title fetching retries, improving responsiveness and reducing server load.
- Introduced a queuing mechanism for title generation, ensuring titles are generated only after job completion.
- Updated relevant components and hooks to utilize the new title generation logic, enhancing user experience and application performance.
* feat: Enhance updateConvoInAllQueries to support moving conversations to the top
* chore: temp. remove added multi convo
* refactor: Update active jobs query integration for optimistic updates on abort
- Introduced a new interface for active jobs response to standardize data handling.
- Updated query keys for active jobs to ensure consistency across components.
- Enhanced job management logic in hooks to properly reflect active job states, improving overall application responsiveness.
* refactor: useResumableStreamToggle hook to manage resumable streams for legacy/assistants endpoints
- Introduced a new hook, useResumableStreamToggle, to automatically toggle resumable streams off for assistants endpoints and restore the previous value when switching away.
- Updated ChatView component to utilize the new hook, enhancing the handling of streaming behavior based on endpoint type.
- Refactored imports in ChatView for better organization.
* refactor: streamline conversation title generation handling
- Removed unused type definition for TGenTitleMutation in mutations.ts to clean up the codebase.
- Integrated queueTitleGeneration call in useEventHandlers to trigger title generation for new conversations, enhancing the responsiveness of the application.
* feat: Add USE_REDIS_STREAMS configuration for stream job storage
- Introduced USE_REDIS_STREAMS to control Redis usage for resumable stream job storage, defaulting to true if USE_REDIS is enabled but not explicitly set.
- Updated cacheConfig to include USE_REDIS_STREAMS and modified createStreamServices to utilize this new configuration.
- Enhanced unit tests to validate the behavior of USE_REDIS_STREAMS under various environment settings, ensuring correct defaults and overrides.
* fix: title generation queue management for assistants
- Introduced a queueListeners mechanism to notify changes in the title generation queue, improving responsiveness for non-resumable streams.
- Updated the useTitleGeneration hook to track queue changes with a queueVersion state, ensuring accurate updates when jobs complete.
- Refactored the queueTitleGeneration function to trigger listeners upon adding new conversation IDs, enhancing the overall title generation flow.
* refactor: streamline agent controller and remove legacy resumable handling
- Updated the AgentController to route all requests to ResumableAgentController, simplifying the logic.
- Deprecated the legacy non-resumable path, providing a clear migration path for future use.
- Adjusted setHeaders middleware to remove unnecessary checks for resumable mode.
- Cleaned up the useResumableSSE hook to eliminate redundant query parameters, enhancing clarity and performance.
* feat: Add USE_REDIS_STREAMS configuration to .env.example
- Updated .env.example to include USE_REDIS_STREAMS setting, allowing control over Redis usage for resumable LLM streams.
- Provided additional context on the behavior of USE_REDIS_STREAMS when not explicitly set, enhancing clarity for configuration management.
* refactor: remove unused setHeaders middleware from chat route
- Eliminated the setHeaders middleware from the chat route, streamlining the request handling process.
- This change contributes to cleaner code and improved performance by reducing unnecessary middleware checks.
* fix: Add streamId parameter for resumable stream handling across services (actions, mcp oauth)
* fix(flow): add immediate abort handling and fix intervalId initialization
- Add immediate abort handler that responds instantly to abort signal
- Declare intervalId before cleanup function to prevent 'Cannot access before initialization' error
- Consolidate cleanup logic into single function to avoid duplicate cleanup
- Properly remove abort event listener on cleanup
* fix(mcp): clean up OAuth flows on abort and simplify flow handling
- Add abort handler in reconnectServer to clean up mcp_oauth and mcp_get_tokens flows
- Update createAbortHandler to clean up both flow types on tool call abort
- Pass abort signal to createFlow in returnOnOAuth path
- Simplify handleOAuthRequired to always cancel existing flows and start fresh
- This ensures user always gets a new OAuth URL instead of waiting for stale flows
* fix(agents): handle 'new' conversationId and improve abort reliability
- Treat 'new' as placeholder that needs UUID in request controller
- Send JSON response immediately before tool loading for faster SSE connection
- Use job's abort controller instead of prelimAbortController
- Emit errors to stream if headers already sent
- Skip 'new' as valid ID in abort endpoint
- Add fallback to find active jobs by userId when conversationId is 'new'
* fix(stream): detect early abort and prevent navigation to non-existent conversation
- Abort controller on job completion to signal pending operations
- Detect early abort (no content, no responseMessageId) in abortJob
- Set conversation and responseMessage to null for early aborts
- Add earlyAbort flag to final event for frontend detection
- Remove unused text field from AbortResult interface
- Frontend handles earlyAbort by staying on/navigating to new chat
* test(mcp): update test to expect signal parameter in createFlow
This commit is contained in:
parent
25a0ebee85
commit
60a2eb4a1a
75 changed files with 7376 additions and 599 deletions
|
|
@ -7,7 +7,13 @@ import { Constants, buildTree } from 'librechat-data-provider';
|
|||
import type { TMessage } from 'librechat-data-provider';
|
||||
import type { ChatFormValues } from '~/common';
|
||||
import { ChatContext, AddedChatContext, useFileMapContext, ChatFormProvider } from '~/Providers';
|
||||
import { useChatHelpers, useAddedResponse, useSSE } from '~/hooks';
|
||||
import {
|
||||
useResumableStreamToggle,
|
||||
useAddedResponse,
|
||||
useResumeOnLoad,
|
||||
useAdaptiveSSE,
|
||||
useChatHelpers,
|
||||
} from '~/hooks';
|
||||
import ConversationStarters from './Input/ConversationStarters';
|
||||
import { useGetMessagesByConvoId } from '~/data-provider';
|
||||
import MessagesView from './Messages/MessagesView';
|
||||
|
|
@ -32,7 +38,6 @@ function LoadingSpinner() {
|
|||
function ChatView({ index = 0 }: { index?: number }) {
|
||||
const { conversationId } = useParams();
|
||||
const rootSubmission = useRecoilValue(store.submissionByIndex(index));
|
||||
const addedSubmission = useRecoilValue(store.submissionByIndex(index + 1));
|
||||
const centerFormOnLanding = useRecoilValue(store.centerFormOnLanding);
|
||||
|
||||
const fileMap = useFileMapContext();
|
||||
|
|
@ -51,8 +56,16 @@ function ChatView({ index = 0 }: { index?: number }) {
|
|||
const chatHelpers = useChatHelpers(index, conversationId);
|
||||
const addedChatHelpers = useAddedResponse({ rootIndex: index });
|
||||
|
||||
useSSE(rootSubmission, chatHelpers, false);
|
||||
useSSE(addedSubmission, addedChatHelpers, true);
|
||||
useResumableStreamToggle(
|
||||
chatHelpers.conversation?.endpoint,
|
||||
chatHelpers.conversation?.endpointType,
|
||||
);
|
||||
|
||||
useAdaptiveSSE(rootSubmission, chatHelpers, false, index);
|
||||
|
||||
// Auto-resume if navigating back to conversation with active job
|
||||
// Wait for messages to load before resuming to avoid race condition
|
||||
useResumeOnLoad(conversationId, chatHelpers.getMessages, index, !isLoading);
|
||||
|
||||
const methods = useForm<ChatFormValues>({
|
||||
defaultValues: { text: '' },
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import { useGetStartupConfig } from '~/data-provider';
|
|||
import ExportAndShareMenu from './ExportAndShareMenu';
|
||||
import BookmarkMenu from './Menus/BookmarkMenu';
|
||||
import { TemporaryChat } from './TemporaryChat';
|
||||
import AddMultiConvo from './AddMultiConvo';
|
||||
// import AddMultiConvo from './AddMultiConvo';
|
||||
import { useHasAccess } from '~/hooks';
|
||||
import { cn } from '~/utils';
|
||||
|
||||
|
|
@ -30,10 +30,10 @@ export default function Header() {
|
|||
permission: Permissions.USE,
|
||||
});
|
||||
|
||||
const hasAccessToMultiConvo = useHasAccess({
|
||||
permissionType: PermissionTypes.MULTI_CONVO,
|
||||
permission: Permissions.USE,
|
||||
});
|
||||
// const hasAccessToMultiConvo = useHasAccess({
|
||||
// permissionType: PermissionTypes.MULTI_CONVO,
|
||||
// permission: Permissions.USE,
|
||||
// });
|
||||
|
||||
const isSmallScreen = useMediaQuery('(max-width: 768px)');
|
||||
|
||||
|
|
@ -67,7 +67,7 @@ export default function Header() {
|
|||
<ModelSelector startupConfig={startupConfig} />
|
||||
{interfaceConfig.presets === true && interfaceConfig.modelSelect && <PresetsMenu />}
|
||||
{hasAccessToBookmarks === true && <BookmarkMenu />}
|
||||
{hasAccessToMultiConvo === true && <AddMultiConvo />}
|
||||
{/* {hasAccessToMultiConvo === true && <AddMultiConvo />} */}
|
||||
{isSmallScreen && (
|
||||
<>
|
||||
<ExportAndShareMenu
|
||||
|
|
|
|||
|
|
@ -7,10 +7,11 @@ import type {
|
|||
Agents,
|
||||
} from 'librechat-data-provider';
|
||||
import { MessageContext, SearchContext } from '~/Providers';
|
||||
import { EditTextPart, EmptyText } from './Parts';
|
||||
import MemoryArtifacts from './MemoryArtifacts';
|
||||
import Sources from '~/components/Web/Sources';
|
||||
import { mapAttachments } from '~/utils/map';
|
||||
import { EditTextPart } from './Parts';
|
||||
import Container from './Container';
|
||||
import Part from './Part';
|
||||
|
||||
type ContentPartsProps = {
|
||||
|
|
@ -95,11 +96,19 @@ const ContentParts = memo(
|
|||
);
|
||||
}
|
||||
|
||||
/** Show cursor placeholder when content is empty but actively submitting */
|
||||
const showEmptyCursor = content.length === 0 && effectiveIsSubmitting;
|
||||
|
||||
return (
|
||||
<>
|
||||
<SearchContext.Provider value={{ searchResults }}>
|
||||
<MemoryArtifacts attachments={attachments} />
|
||||
<Sources messageId={messageId} conversationId={conversationId || undefined} />
|
||||
{showEmptyCursor && (
|
||||
<Container>
|
||||
<EmptyText />
|
||||
</Container>
|
||||
)}
|
||||
{content.map((part, idx) => {
|
||||
if (!part) {
|
||||
return null;
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import { List, AutoSizer, CellMeasurer, CellMeasurerCache } from 'react-virtuali
|
|||
import type { TConversation } from 'librechat-data-provider';
|
||||
import { useLocalize, TranslationKeys, useFavorites, useShowMarketplace } from '~/hooks';
|
||||
import FavoritesList from '~/components/Nav/Favorites/FavoritesList';
|
||||
import { useActiveJobs } from '~/data-provider';
|
||||
import { groupConversationsByDate, cn } from '~/utils';
|
||||
import Convo from './Convo';
|
||||
import store from '~/store';
|
||||
|
|
@ -120,18 +121,28 @@ const MemoizedConvo = memo(
|
|||
conversation,
|
||||
retainView,
|
||||
toggleNav,
|
||||
isGenerating,
|
||||
}: {
|
||||
conversation: TConversation;
|
||||
retainView: () => void;
|
||||
toggleNav: () => void;
|
||||
isGenerating: boolean;
|
||||
}) => {
|
||||
return <Convo conversation={conversation} retainView={retainView} toggleNav={toggleNav} />;
|
||||
return (
|
||||
<Convo
|
||||
conversation={conversation}
|
||||
retainView={retainView}
|
||||
toggleNav={toggleNav}
|
||||
isGenerating={isGenerating}
|
||||
/>
|
||||
);
|
||||
},
|
||||
(prevProps, nextProps) => {
|
||||
return (
|
||||
prevProps.conversation.conversationId === nextProps.conversation.conversationId &&
|
||||
prevProps.conversation.title === nextProps.conversation.title &&
|
||||
prevProps.conversation.endpoint === nextProps.conversation.endpoint
|
||||
prevProps.conversation.endpoint === nextProps.conversation.endpoint &&
|
||||
prevProps.isGenerating === nextProps.isGenerating
|
||||
);
|
||||
},
|
||||
);
|
||||
|
|
@ -149,11 +160,19 @@ const Conversations: FC<ConversationsProps> = ({
|
|||
}) => {
|
||||
const localize = useLocalize();
|
||||
const search = useRecoilValue(store.search);
|
||||
const resumableEnabled = useRecoilValue(store.resumableStreams);
|
||||
const { favorites, isLoading: isFavoritesLoading } = useFavorites();
|
||||
const isSmallScreen = useMediaQuery('(max-width: 768px)');
|
||||
const convoHeight = isSmallScreen ? 44 : 34;
|
||||
const showAgentMarketplace = useShowMarketplace();
|
||||
|
||||
// Fetch active job IDs for showing generation indicators
|
||||
const { data: activeJobsData } = useActiveJobs(resumableEnabled);
|
||||
const activeJobIds = useMemo(
|
||||
() => new Set(activeJobsData?.activeJobIds ?? []),
|
||||
[activeJobsData?.activeJobIds],
|
||||
);
|
||||
|
||||
// Determine if FavoritesList will render content
|
||||
const shouldShowFavorites =
|
||||
!search.query && (isFavoritesLoading || favorites.length > 0 || showAgentMarketplace);
|
||||
|
|
@ -292,9 +311,15 @@ const Conversations: FC<ConversationsProps> = ({
|
|||
}
|
||||
|
||||
if (item.type === 'convo') {
|
||||
const isGenerating = activeJobIds.has(item.convo.conversationId ?? '');
|
||||
return (
|
||||
<MeasuredRow key={key} {...rowProps}>
|
||||
<MemoizedConvo conversation={item.convo} retainView={moveToTop} toggleNav={toggleNav} />
|
||||
<MemoizedConvo
|
||||
conversation={item.convo}
|
||||
retainView={moveToTop}
|
||||
toggleNav={toggleNav}
|
||||
isGenerating={isGenerating}
|
||||
/>
|
||||
</MeasuredRow>
|
||||
);
|
||||
}
|
||||
|
|
@ -311,6 +336,7 @@ const Conversations: FC<ConversationsProps> = ({
|
|||
isChatsExpanded,
|
||||
setIsChatsExpanded,
|
||||
shouldShowFavorites,
|
||||
activeJobIds,
|
||||
],
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -19,9 +19,15 @@ interface ConversationProps {
|
|||
conversation: TConversation;
|
||||
retainView: () => void;
|
||||
toggleNav: () => void;
|
||||
isGenerating?: boolean;
|
||||
}
|
||||
|
||||
export default function Conversation({ conversation, retainView, toggleNav }: ConversationProps) {
|
||||
export default function Conversation({
|
||||
conversation,
|
||||
retainView,
|
||||
toggleNav,
|
||||
isGenerating = false,
|
||||
}: ConversationProps) {
|
||||
const params = useParams();
|
||||
const localize = useLocalize();
|
||||
const { showToast } = useToastContext();
|
||||
|
|
@ -182,12 +188,35 @@ export default function Conversation({ conversation, retainView, toggleNav }: Co
|
|||
isSmallScreen={isSmallScreen}
|
||||
localize={localize}
|
||||
>
|
||||
<EndpointIcon
|
||||
conversation={conversation}
|
||||
endpointsConfig={endpointsConfig}
|
||||
size={20}
|
||||
context="menu-item"
|
||||
/>
|
||||
{isGenerating ? (
|
||||
<svg
|
||||
className="h-5 w-5 flex-shrink-0 animate-spin text-text-primary"
|
||||
viewBox="0 0 24 24"
|
||||
fill="none"
|
||||
aria-label={localize('com_ui_generating')}
|
||||
>
|
||||
<circle
|
||||
className="opacity-25"
|
||||
cx="12"
|
||||
cy="12"
|
||||
r="10"
|
||||
stroke="currentColor"
|
||||
strokeWidth="3"
|
||||
/>
|
||||
<path
|
||||
className="opacity-75"
|
||||
fill="currentColor"
|
||||
d="M4 12a8 8 0 018-8V0C5.373 0 0 5.373 0 12h4zm2 5.291A7.962 7.962 0 014 12H0c0 3.042 1.135 5.824 3 7.938l3-2.647z"
|
||||
/>
|
||||
</svg>
|
||||
) : (
|
||||
<EndpointIcon
|
||||
conversation={conversation}
|
||||
endpointsConfig={endpointsConfig}
|
||||
size={20}
|
||||
context="menu-item"
|
||||
/>
|
||||
)}
|
||||
</ConvoLink>
|
||||
)}
|
||||
<div
|
||||
|
|
|
|||
|
|
@ -3,8 +3,9 @@ import { useRecoilValue } from 'recoil';
|
|||
import { AnimatePresence, motion } from 'framer-motion';
|
||||
import { Skeleton, useMediaQuery } from '@librechat/client';
|
||||
import { PermissionTypes, Permissions } from 'librechat-data-provider';
|
||||
import type { ConversationListResponse } from 'librechat-data-provider';
|
||||
import type { InfiniteQueryObserverResult } from '@tanstack/react-query';
|
||||
import type { ConversationListResponse } from 'librechat-data-provider';
|
||||
import type { List } from 'react-virtualized';
|
||||
import {
|
||||
useLocalize,
|
||||
useHasAccess,
|
||||
|
|
@ -12,7 +13,7 @@ import {
|
|||
useLocalStorage,
|
||||
useNavScrolling,
|
||||
} from '~/hooks';
|
||||
import { useConversationsInfiniteQuery } from '~/data-provider';
|
||||
import { useConversationsInfiniteQuery, useTitleGeneration } from '~/data-provider';
|
||||
import { Conversations } from '~/components/Conversations';
|
||||
import SearchBar from './SearchBar';
|
||||
import NewChat from './NewChat';
|
||||
|
|
@ -63,6 +64,7 @@ const Nav = memo(
|
|||
}) => {
|
||||
const localize = useLocalize();
|
||||
const { isAuthenticated } = useAuthContext();
|
||||
useTitleGeneration(isAuthenticated);
|
||||
|
||||
const [navWidth, setNavWidth] = useState(NAV_WIDTH_DESKTOP);
|
||||
const isSmallScreen = useMediaQuery('(max-width: 768px)');
|
||||
|
|
|
|||
|
|
@ -84,6 +84,13 @@ const toggleSwitchConfigs = [
|
|||
hoverCardText: 'com_nav_info_default_temporary_chat',
|
||||
key: 'defaultTemporaryChat',
|
||||
},
|
||||
{
|
||||
stateAtom: store.resumableStreams,
|
||||
localizationKey: 'com_nav_resumable_streams',
|
||||
switchId: 'resumableStreams',
|
||||
hoverCardText: 'com_nav_info_resumable_streams',
|
||||
key: 'resumableStreams',
|
||||
},
|
||||
];
|
||||
|
||||
function Chat() {
|
||||
|
|
|
|||
2
client/src/data-provider/SSE/index.ts
Normal file
2
client/src/data-provider/SSE/index.ts
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
export * from './queries';
|
||||
export * from './mutations';
|
||||
39
client/src/data-provider/SSE/mutations.ts
Normal file
39
client/src/data-provider/SSE/mutations.ts
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
import { useMutation } from '@tanstack/react-query';
|
||||
import { request } from 'librechat-data-provider';
|
||||
|
||||
export interface AbortStreamParams {
|
||||
/** The stream ID to abort (if known) */
|
||||
streamId?: string;
|
||||
/** The conversation ID to abort (backend will look up the job) */
|
||||
conversationId?: string;
|
||||
}
|
||||
|
||||
export interface AbortStreamResponse {
|
||||
success: boolean;
|
||||
aborted?: string;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort an ongoing generation stream.
|
||||
* The backend will emit a `done` event with `aborted: true` to the SSE stream,
|
||||
* allowing the client to handle cleanup via the normal event flow.
|
||||
*
|
||||
* Can pass either streamId or conversationId - backend will find the job.
|
||||
*/
|
||||
export const abortStream = async (params: AbortStreamParams): Promise<AbortStreamResponse> => {
|
||||
console.log('[abortStream] Calling abort endpoint with params:', params);
|
||||
const result = (await request.post('/api/agents/chat/abort', params)) as AbortStreamResponse;
|
||||
console.log('[abortStream] Abort response:', result);
|
||||
return result;
|
||||
};
|
||||
|
||||
/**
|
||||
* React Query mutation hook for aborting a generation stream.
|
||||
* Use this when the user explicitly clicks the stop button.
|
||||
*/
|
||||
export function useAbortStreamMutation() {
|
||||
return useMutation({
|
||||
mutationFn: abortStream,
|
||||
});
|
||||
}
|
||||
151
client/src/data-provider/SSE/queries.ts
Normal file
151
client/src/data-provider/SSE/queries.ts
Normal file
|
|
@ -0,0 +1,151 @@
|
|||
import { useEffect, useMemo, useState } from 'react';
|
||||
import { QueryKeys, request, dataService } from 'librechat-data-provider';
|
||||
import { useQuery, useQueries, useQueryClient } from '@tanstack/react-query';
|
||||
import type { Agents, TConversation } from 'librechat-data-provider';
|
||||
import { updateConvoInAllQueries } from '~/utils';
|
||||
|
||||
export interface StreamStatusResponse {
|
||||
active: boolean;
|
||||
streamId?: string;
|
||||
status?: 'running' | 'complete' | 'error' | 'aborted';
|
||||
aggregatedContent?: Array<{ type: string; text?: string }>;
|
||||
createdAt?: number;
|
||||
resumeState?: Agents.ResumeState;
|
||||
}
|
||||
|
||||
export const streamStatusQueryKey = (conversationId: string) => ['streamStatus', conversationId];
|
||||
|
||||
export const fetchStreamStatus = async (conversationId: string): Promise<StreamStatusResponse> => {
|
||||
return request.get<StreamStatusResponse>(`/api/agents/chat/status/${conversationId}`);
|
||||
};
|
||||
|
||||
export function useStreamStatus(conversationId: string | undefined, enabled = true) {
|
||||
return useQuery({
|
||||
queryKey: streamStatusQueryKey(conversationId || ''),
|
||||
queryFn: () => fetchStreamStatus(conversationId!),
|
||||
enabled: !!conversationId && enabled,
|
||||
staleTime: 1000,
|
||||
refetchOnMount: true,
|
||||
refetchOnWindowFocus: true,
|
||||
retry: false,
|
||||
});
|
||||
}
|
||||
|
||||
export const genTitleQueryKey = (conversationId: string) => ['genTitle', conversationId] as const;
|
||||
|
||||
/** Response type for active jobs query */
|
||||
export interface ActiveJobsResponse {
|
||||
activeJobIds: string[];
|
||||
}
|
||||
|
||||
/** Module-level queue for title generation (survives re-renders).
|
||||
* Stores conversationIds that need title generation once their job completes */
|
||||
const titleQueue = new Set<string>();
|
||||
const processedTitles = new Set<string>();
|
||||
|
||||
/** Listeners to notify when queue changes (for non-resumable streams like assistants) */
|
||||
const queueListeners = new Set<() => void>();
|
||||
|
||||
/** Queue a conversation for title generation (call when starting new conversation) */
|
||||
export function queueTitleGeneration(conversationId: string) {
|
||||
if (!processedTitles.has(conversationId)) {
|
||||
titleQueue.add(conversationId);
|
||||
queueListeners.forEach((listener) => listener());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook to process the title generation queue.
|
||||
* Only fetches titles AFTER the job completes (not in activeJobIds).
|
||||
* Place this high in the component tree (e.g., Nav.tsx).
|
||||
*/
|
||||
export function useTitleGeneration(enabled = true) {
|
||||
const queryClient = useQueryClient();
|
||||
const [queueVersion, setQueueVersion] = useState(0);
|
||||
const [readyToFetch, setReadyToFetch] = useState<string[]>([]);
|
||||
|
||||
const { data: activeJobsData } = useActiveJobs(enabled);
|
||||
const activeJobIds = useMemo(
|
||||
() => activeJobsData?.activeJobIds ?? [],
|
||||
[activeJobsData?.activeJobIds],
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
const listener = () => setQueueVersion((v) => v + 1);
|
||||
queueListeners.add(listener);
|
||||
return () => {
|
||||
queueListeners.delete(listener);
|
||||
};
|
||||
}, []);
|
||||
|
||||
useEffect(() => {
|
||||
const activeSet = new Set(activeJobIds);
|
||||
const completedJobs: string[] = [];
|
||||
|
||||
for (const conversationId of titleQueue) {
|
||||
if (!activeSet.has(conversationId) && !processedTitles.has(conversationId)) {
|
||||
completedJobs.push(conversationId);
|
||||
}
|
||||
}
|
||||
|
||||
if (completedJobs.length > 0) {
|
||||
setReadyToFetch((prev) => [...new Set([...prev, ...completedJobs])]);
|
||||
}
|
||||
}, [activeJobIds, queueVersion]);
|
||||
|
||||
// Fetch titles for ready conversations
|
||||
const titleQueries = useQueries({
|
||||
queries: readyToFetch.map((conversationId) => ({
|
||||
queryKey: genTitleQueryKey(conversationId),
|
||||
queryFn: () => dataService.genTitle({ conversationId }),
|
||||
staleTime: Infinity,
|
||||
retry: false,
|
||||
})),
|
||||
});
|
||||
|
||||
useEffect(() => {
|
||||
titleQueries.forEach((titleQuery, index) => {
|
||||
const conversationId = readyToFetch[index];
|
||||
if (!conversationId || processedTitles.has(conversationId)) return;
|
||||
|
||||
if (titleQuery.isSuccess && titleQuery.data) {
|
||||
const { title } = titleQuery.data;
|
||||
queryClient.setQueryData(
|
||||
[QueryKeys.conversation, conversationId],
|
||||
(convo: TConversation | undefined) => (convo ? { ...convo, title } : convo),
|
||||
);
|
||||
updateConvoInAllQueries(queryClient, conversationId, (c) => ({ ...c, title }));
|
||||
// Only update document title if this conversation is currently active
|
||||
if (window.location.pathname.includes(conversationId)) {
|
||||
document.title = title;
|
||||
}
|
||||
processedTitles.add(conversationId);
|
||||
titleQueue.delete(conversationId);
|
||||
setReadyToFetch((prev) => prev.filter((id) => id !== conversationId));
|
||||
} else if (titleQuery.isError) {
|
||||
// Mark as processed even on error to avoid infinite retries
|
||||
processedTitles.add(conversationId);
|
||||
titleQueue.delete(conversationId);
|
||||
setReadyToFetch((prev) => prev.filter((id) => id !== conversationId));
|
||||
}
|
||||
});
|
||||
}, [titleQueries, readyToFetch, queryClient]);
|
||||
}
|
||||
|
||||
/**
|
||||
* React Query hook for active job IDs.
|
||||
* - Polls while jobs are active
|
||||
* - Shows generation indicators in conversation list
|
||||
*/
|
||||
export function useActiveJobs(enabled = true) {
|
||||
return useQuery({
|
||||
queryKey: [QueryKeys.activeJobs],
|
||||
queryFn: () => dataService.getActiveJobs(),
|
||||
enabled,
|
||||
staleTime: 5_000,
|
||||
refetchOnMount: true,
|
||||
refetchOnWindowFocus: true,
|
||||
refetchInterval: (data) => ((data?.activeJobIds?.length ?? 0) > 0 ? 5_000 : false),
|
||||
retry: false,
|
||||
});
|
||||
}
|
||||
|
|
@ -15,3 +15,4 @@ export * from './queries';
|
|||
export * from './roles';
|
||||
export * from './tags';
|
||||
export * from './MCP';
|
||||
export * from './SSE';
|
||||
|
|
|
|||
|
|
@ -18,31 +18,6 @@ import useUpdateTagsInConvo from '~/hooks/Conversations/useUpdateTagsInConvo';
|
|||
import { updateConversationTag } from '~/utils/conversationTags';
|
||||
import { useConversationTagsQuery } from './queries';
|
||||
|
||||
export type TGenTitleMutation = UseMutationResult<
|
||||
t.TGenTitleResponse,
|
||||
unknown,
|
||||
t.TGenTitleRequest,
|
||||
unknown
|
||||
>;
|
||||
|
||||
export const useGenTitleMutation = (): TGenTitleMutation => {
|
||||
const queryClient = useQueryClient();
|
||||
return useMutation((payload: t.TGenTitleRequest) => dataService.genTitle(payload), {
|
||||
onSuccess: (response, vars) => {
|
||||
queryClient.setQueryData(
|
||||
[QueryKeys.conversation, vars.conversationId],
|
||||
(convo: t.TConversation | undefined) =>
|
||||
convo ? { ...convo, title: response.title } : convo,
|
||||
);
|
||||
updateConvoInAllQueries(queryClient, vars.conversationId, (c) => ({
|
||||
...c,
|
||||
title: response.title,
|
||||
}));
|
||||
document.title = response.title;
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
export const useUpdateConversationMutation = (
|
||||
id: string,
|
||||
): UseMutationResult<
|
||||
|
|
|
|||
|
|
@ -283,14 +283,7 @@ export default function useChatFunctions({
|
|||
}
|
||||
}
|
||||
} else {
|
||||
initialResponse.content = [
|
||||
{
|
||||
type: ContentTypes.TEXT,
|
||||
[ContentTypes.TEXT]: {
|
||||
value: '',
|
||||
},
|
||||
},
|
||||
];
|
||||
initialResponse.content = [];
|
||||
}
|
||||
setShowStopButton(true);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,11 @@
|
|||
import { useCallback, useState } from 'react';
|
||||
import { QueryKeys } from 'librechat-data-provider';
|
||||
import { QueryKeys, isAssistantsEndpoint } from 'librechat-data-provider';
|
||||
import { useQueryClient } from '@tanstack/react-query';
|
||||
import { useRecoilState, useResetRecoilState, useSetRecoilState } from 'recoil';
|
||||
import type { TMessage } from 'librechat-data-provider';
|
||||
import type { ActiveJobsResponse } from '~/data-provider';
|
||||
import { useGetMessagesByConvoId, useAbortStreamMutation } from '~/data-provider';
|
||||
import useChatFunctions from '~/hooks/Chat/useChatFunctions';
|
||||
import { useGetMessagesByConvoId } from '~/data-provider';
|
||||
import { useAuthContext } from '~/hooks/AuthContext';
|
||||
import useNewConvo from '~/hooks/useNewConvo';
|
||||
import store from '~/store';
|
||||
|
|
@ -17,17 +18,20 @@ export default function useChatHelpers(index = 0, paramId?: string) {
|
|||
|
||||
const queryClient = useQueryClient();
|
||||
const { isAuthenticated } = useAuthContext();
|
||||
const abortMutation = useAbortStreamMutation();
|
||||
|
||||
const { newConversation } = useNewConvo(index);
|
||||
const { useCreateConversationAtom } = store;
|
||||
const { conversation, setConversation } = useCreateConversationAtom(index);
|
||||
const { conversationId } = conversation ?? {};
|
||||
const { conversationId, endpoint, endpointType } = conversation ?? {};
|
||||
|
||||
const queryParam = paramId === 'new' ? paramId : (conversationId ?? paramId ?? '');
|
||||
/** Use paramId (from URL) as primary source for query key - this must match what ChatView uses
|
||||
Falling back to conversationId (Recoil) only if paramId is not available */
|
||||
const queryParam = paramId === 'new' ? paramId : (paramId ?? conversationId ?? '');
|
||||
|
||||
/* Messages: here simply to fetch, don't export and use `getMessages()` instead */
|
||||
|
||||
const { data: _messages } = useGetMessagesByConvoId(conversationId ?? '', {
|
||||
const { data: _messages } = useGetMessagesByConvoId(queryParam, {
|
||||
enabled: isAuthenticated,
|
||||
});
|
||||
|
||||
|
|
@ -107,7 +111,47 @@ export default function useChatHelpers(index = 0, paramId?: string) {
|
|||
}
|
||||
};
|
||||
|
||||
const stopGenerating = () => clearAllSubmissions();
|
||||
/**
|
||||
* Stop generation - for non-assistants endpoints, calls abort endpoint first.
|
||||
* The abort endpoint will cause the backend to emit a `done` event with `aborted: true`,
|
||||
* which will be handled by the SSE event handler to clean up UI.
|
||||
* Assistants endpoint has its own abort mechanism via useEventHandlers.abortConversation.
|
||||
*/
|
||||
const stopGenerating = useCallback(async () => {
|
||||
const actualEndpoint = endpointType ?? endpoint;
|
||||
const isAssistants = isAssistantsEndpoint(actualEndpoint);
|
||||
console.log('[useChatHelpers] stopGenerating called', {
|
||||
conversationId,
|
||||
endpoint,
|
||||
endpointType,
|
||||
actualEndpoint,
|
||||
isAssistants,
|
||||
});
|
||||
|
||||
// For non-assistants endpoints (using resumable streams), call abort endpoint first
|
||||
if (conversationId && !isAssistants) {
|
||||
queryClient.setQueryData<ActiveJobsResponse>([QueryKeys.activeJobs], (old) => ({
|
||||
activeJobIds: (old?.activeJobIds ?? []).filter((id) => id !== conversationId),
|
||||
}));
|
||||
|
||||
try {
|
||||
console.log('[useChatHelpers] Calling abort mutation for:', conversationId);
|
||||
await abortMutation.mutateAsync({ conversationId });
|
||||
console.log('[useChatHelpers] Abort mutation succeeded');
|
||||
// The SSE will receive a `done` event with `aborted: true` and clean up
|
||||
// We still clear submissions as a fallback
|
||||
clearAllSubmissions();
|
||||
} catch (error) {
|
||||
console.error('[useChatHelpers] Abort failed:', error);
|
||||
// Fall back to clearing submissions
|
||||
clearAllSubmissions();
|
||||
}
|
||||
} else {
|
||||
// For assistants endpoints, just clear submissions (existing behavior)
|
||||
console.log('[useChatHelpers] Assistants endpoint, just clearing submissions');
|
||||
clearAllSubmissions();
|
||||
}
|
||||
}, [conversationId, endpoint, endpointType, abortMutation, clearAllSubmissions, queryClient]);
|
||||
|
||||
const handleStopGenerating = (e: React.MouseEvent<HTMLButtonElement>) => {
|
||||
e.preventDefault();
|
||||
|
|
|
|||
|
|
@ -56,9 +56,7 @@ export default function useTextarea({
|
|||
});
|
||||
const entityName = entity?.name ?? '';
|
||||
|
||||
const isNotAppendable =
|
||||
(((latestMessage?.unfinished ?? false) && !isSubmitting) || (latestMessage?.error ?? false)) &&
|
||||
!isAssistant;
|
||||
const isNotAppendable = latestMessage?.error === true && !isAssistant;
|
||||
// && (conversationId?.length ?? 0) > 6; // also ensures that we don't show the wrong placeholder
|
||||
|
||||
useEffect(() => {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,8 @@
|
|||
export { default as useSSE } from './useSSE';
|
||||
export { default as useResumableSSE } from './useResumableSSE';
|
||||
export { default as useAdaptiveSSE } from './useAdaptiveSSE';
|
||||
export { default as useResumeOnLoad } from './useResumeOnLoad';
|
||||
export { default as useStepHandler } from './useStepHandler';
|
||||
export { default as useContentHandler } from './useContentHandler';
|
||||
export { default as useAttachmentHandler } from './useAttachmentHandler';
|
||||
export { default as useResumableStreamToggle } from './useResumableStreamToggle';
|
||||
|
|
|
|||
43
client/src/hooks/SSE/useAdaptiveSSE.ts
Normal file
43
client/src/hooks/SSE/useAdaptiveSSE.ts
Normal file
|
|
@ -0,0 +1,43 @@
|
|||
import { useRecoilValue } from 'recoil';
|
||||
import type { TSubmission } from 'librechat-data-provider';
|
||||
import type { EventHandlerParams } from './useEventHandlers';
|
||||
import useSSE from './useSSE';
|
||||
import useResumableSSE from './useResumableSSE';
|
||||
import store from '~/store';
|
||||
|
||||
type ChatHelpers = Pick<
|
||||
EventHandlerParams,
|
||||
| 'setMessages'
|
||||
| 'getMessages'
|
||||
| 'setConversation'
|
||||
| 'setIsSubmitting'
|
||||
| 'newConversation'
|
||||
| 'resetLatestMessage'
|
||||
>;
|
||||
|
||||
/**
|
||||
* Adaptive SSE hook that switches between standard and resumable modes.
|
||||
* Uses Recoil state to determine which mode to use.
|
||||
*
|
||||
* Note: Both hooks are always called to comply with React's Rules of Hooks.
|
||||
* We pass null submission to the inactive one.
|
||||
*/
|
||||
export default function useAdaptiveSSE(
|
||||
submission: TSubmission | null,
|
||||
chatHelpers: ChatHelpers,
|
||||
isAddedRequest = false,
|
||||
runIndex = 0,
|
||||
) {
|
||||
const resumableEnabled = useRecoilValue(store.resumableStreams);
|
||||
|
||||
useSSE(resumableEnabled ? null : submission, chatHelpers, isAddedRequest, runIndex);
|
||||
|
||||
const { streamId } = useResumableSSE(
|
||||
resumableEnabled ? submission : null,
|
||||
chatHelpers,
|
||||
isAddedRequest,
|
||||
runIndex,
|
||||
);
|
||||
|
||||
return { streamId, resumableEnabled };
|
||||
}
|
||||
|
|
@ -27,7 +27,13 @@ type TContentHandler = {
|
|||
export default function useContentHandler({ setMessages, getMessages }: TUseContentHandler) {
|
||||
const queryClient = useQueryClient();
|
||||
const messageMap = useMemo(() => new Map<string, TMessage>(), []);
|
||||
return useCallback(
|
||||
|
||||
/** Reset the message map - call this after sync to prevent stale state from overwriting synced content */
|
||||
const resetMessageMap = useCallback(() => {
|
||||
messageMap.clear();
|
||||
}, [messageMap]);
|
||||
|
||||
const handler = useCallback(
|
||||
({ data, submission }: TContentHandler) => {
|
||||
const { type, messageId, thread_id, conversationId, index } = data;
|
||||
|
||||
|
|
@ -41,8 +47,11 @@ export default function useContentHandler({ setMessages, getMessages }: TUseCont
|
|||
|
||||
let response = messageMap.get(messageId);
|
||||
if (!response) {
|
||||
// Check if message already exists in current messages (e.g., after sync)
|
||||
// Use that as base instead of stale initialResponse
|
||||
const existingMessage = _messages?.find((m) => m.messageId === messageId);
|
||||
response = {
|
||||
...(initialResponse as TMessage),
|
||||
...(existingMessage ?? (initialResponse as TMessage)),
|
||||
parentMessageId: userMessage?.messageId ?? '',
|
||||
conversationId,
|
||||
messageId,
|
||||
|
|
@ -82,4 +91,6 @@ export default function useContentHandler({ setMessages, getMessages }: TUseCont
|
|||
},
|
||||
[queryClient, getMessages, messageMap, setMessages],
|
||||
);
|
||||
|
||||
return { contentHandler: handler, resetContentHandler: resetMessageMap };
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ import type {
|
|||
} from 'librechat-data-provider';
|
||||
import type { TResData, TFinalResData, ConvoGenerator } from '~/common';
|
||||
import type { InfiniteData } from '@tanstack/react-query';
|
||||
import type { TGenTitleMutation } from '~/data-provider';
|
||||
import type { SetterOrUpdater, Resetter } from 'recoil';
|
||||
import type { ConversationCursorData } from '~/utils';
|
||||
import {
|
||||
|
|
@ -34,6 +33,7 @@ import {
|
|||
removeConvoFromAllQueries,
|
||||
findConversationInInfinite,
|
||||
} from '~/utils';
|
||||
import { queueTitleGeneration } from '~/data-provider/SSE/queries';
|
||||
import useAttachmentHandler from '~/hooks/SSE/useAttachmentHandler';
|
||||
import useContentHandler from '~/hooks/SSE/useContentHandler';
|
||||
import useStepHandler from '~/hooks/SSE/useStepHandler';
|
||||
|
|
@ -54,7 +54,6 @@ type TSyncData = {
|
|||
|
||||
export type EventHandlerParams = {
|
||||
isAddedRequest?: boolean;
|
||||
genTitle?: TGenTitleMutation;
|
||||
setCompleted: React.Dispatch<React.SetStateAction<Set<unknown>>>;
|
||||
setMessages: (messages: TMessage[]) => void;
|
||||
getMessages: () => TMessage[] | undefined;
|
||||
|
|
@ -167,7 +166,6 @@ export const getConvoTitle = ({
|
|||
};
|
||||
|
||||
export default function useEventHandlers({
|
||||
genTitle,
|
||||
setMessages,
|
||||
getMessages,
|
||||
setCompleted,
|
||||
|
|
@ -189,8 +187,8 @@ export default function useEventHandlers({
|
|||
const { conversationId: paramId } = useParams();
|
||||
const { token } = useAuthContext();
|
||||
|
||||
const contentHandler = useContentHandler({ setMessages, getMessages });
|
||||
const { stepHandler, clearStepMaps } = useStepHandler({
|
||||
const { contentHandler, resetContentHandler } = useContentHandler({ setMessages, getMessages });
|
||||
const { stepHandler, clearStepMaps, syncStepMessage } = useStepHandler({
|
||||
setMessages,
|
||||
getMessages,
|
||||
announcePolite,
|
||||
|
|
@ -258,13 +256,6 @@ export default function useEventHandlers({
|
|||
removeConvoFromAllQueries(queryClient, submission.conversation.conversationId as string);
|
||||
}
|
||||
|
||||
// refresh title
|
||||
if (genTitle && isNewConvo && requestMessage.parentMessageId === Constants.NO_PARENT) {
|
||||
setTimeout(() => {
|
||||
genTitle.mutate({ conversationId: convoUpdate.conversationId as string });
|
||||
}, 2500);
|
||||
}
|
||||
|
||||
if (setConversation && !isAddedRequest) {
|
||||
setConversation((prevState) => {
|
||||
const update = { ...prevState, ...convoUpdate };
|
||||
|
|
@ -274,7 +265,7 @@ export default function useEventHandlers({
|
|||
|
||||
setIsSubmitting(false);
|
||||
},
|
||||
[setMessages, setConversation, genTitle, isAddedRequest, queryClient, setIsSubmitting],
|
||||
[setMessages, setConversation, isAddedRequest, queryClient, setIsSubmitting],
|
||||
);
|
||||
|
||||
const syncHandler = useCallback(
|
||||
|
|
@ -320,7 +311,7 @@ export default function useEventHandlers({
|
|||
if (requestMessage.parentMessageId === Constants.NO_PARENT) {
|
||||
addConvoToAllQueries(queryClient, update);
|
||||
} else {
|
||||
updateConvoInAllQueries(queryClient, update.conversationId!, (_c) => update);
|
||||
updateConvoInAllQueries(queryClient, update.conversationId!, (_c) => update, true);
|
||||
}
|
||||
} else if (setConversation) {
|
||||
setConversation((prevState) => {
|
||||
|
|
@ -395,7 +386,7 @@ export default function useEventHandlers({
|
|||
if (parentMessageId === Constants.NO_PARENT) {
|
||||
addConvoToAllQueries(queryClient, update);
|
||||
} else {
|
||||
updateConvoInAllQueries(queryClient, update.conversationId!, (_c) => update);
|
||||
updateConvoInAllQueries(queryClient, update.conversationId!, (_c) => update, true);
|
||||
}
|
||||
}
|
||||
} else if (setConversation) {
|
||||
|
|
@ -443,10 +434,25 @@ export default function useEventHandlers({
|
|||
messages,
|
||||
conversation: submissionConvo,
|
||||
isRegenerate = false,
|
||||
isTemporary = false,
|
||||
isTemporary: _isTemporary = false,
|
||||
} = submission;
|
||||
|
||||
try {
|
||||
// Handle early abort - aborted during tool loading before any messages saved
|
||||
// Don't update conversation state, just reset UI and stay on new chat
|
||||
if ((data as Record<string, unknown>).earlyAbort) {
|
||||
console.log(
|
||||
'[finalHandler] Early abort detected - no messages saved, staying on new chat',
|
||||
);
|
||||
setShowStopButton(false);
|
||||
setIsSubmitting(false);
|
||||
// Navigate to new chat if not already there
|
||||
if (location.pathname !== `/c/${Constants.NEW_CONVO}`) {
|
||||
navigate(`/c/${Constants.NEW_CONVO}`, { replace: true });
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (responseMessage?.attachments && responseMessage.attachments.length > 0) {
|
||||
// Process each attachment through the attachmentHandler
|
||||
responseMessage.attachments.forEach((attachment) => {
|
||||
|
|
@ -476,6 +482,10 @@ export default function useEventHandlers({
|
|||
|
||||
const isNewConvo = conversation.conversationId !== submissionConvo.conversationId;
|
||||
|
||||
if (isNewConvo && conversation.conversationId) {
|
||||
queueTitleGeneration(conversation.conversationId);
|
||||
}
|
||||
|
||||
const setFinalMessages = (id: string | null, _messages: TMessage[]) => {
|
||||
setMessages(_messages);
|
||||
queryClient.setQueryData<TMessage[]>([QueryKeys.messages, id], _messages);
|
||||
|
|
@ -532,19 +542,6 @@ export default function useEventHandlers({
|
|||
removeConvoFromAllQueries(queryClient, submissionConvo.conversationId);
|
||||
}
|
||||
|
||||
/* Refresh title */
|
||||
if (
|
||||
genTitle &&
|
||||
isNewConvo &&
|
||||
!isTemporary &&
|
||||
requestMessage &&
|
||||
requestMessage.parentMessageId === Constants.NO_PARENT
|
||||
) {
|
||||
setTimeout(() => {
|
||||
genTitle.mutate({ conversationId: conversation.conversationId as string });
|
||||
}, 2500);
|
||||
}
|
||||
|
||||
if (setConversation && isAddedRequest !== true) {
|
||||
setConversation((prevState) => {
|
||||
const update = {
|
||||
|
|
@ -588,7 +585,6 @@ export default function useEventHandlers({
|
|||
},
|
||||
[
|
||||
navigate,
|
||||
genTitle,
|
||||
getMessages,
|
||||
setMessages,
|
||||
queryClient,
|
||||
|
|
@ -827,15 +823,17 @@ export default function useEventHandlers({
|
|||
);
|
||||
|
||||
return {
|
||||
clearStepMaps,
|
||||
stepHandler,
|
||||
syncHandler,
|
||||
finalHandler,
|
||||
errorHandler,
|
||||
clearStepMaps,
|
||||
messageHandler,
|
||||
contentHandler,
|
||||
createdHandler,
|
||||
syncStepMessage,
|
||||
attachmentHandler,
|
||||
abortConversation,
|
||||
resetContentHandler,
|
||||
};
|
||||
}
|
||||
|
|
|
|||
630
client/src/hooks/SSE/useResumableSSE.ts
Normal file
630
client/src/hooks/SSE/useResumableSSE.ts
Normal file
|
|
@ -0,0 +1,630 @@
|
|||
import { useEffect, useState, useRef, useCallback } from 'react';
|
||||
import { v4 } from 'uuid';
|
||||
import { SSE } from 'sse.js';
|
||||
import { useSetRecoilState } from 'recoil';
|
||||
import { useQueryClient } from '@tanstack/react-query';
|
||||
import {
|
||||
request,
|
||||
Constants,
|
||||
QueryKeys,
|
||||
createPayload,
|
||||
LocalStorageKeys,
|
||||
removeNullishValues,
|
||||
} from 'librechat-data-provider';
|
||||
import type { TMessage, TPayload, TSubmission, EventSubmission } from 'librechat-data-provider';
|
||||
import type { EventHandlerParams } from './useEventHandlers';
|
||||
import { useGetStartupConfig, useGetUserBalance, queueTitleGeneration } from '~/data-provider';
|
||||
import type { ActiveJobsResponse } from '~/data-provider';
|
||||
import { useAuthContext } from '~/hooks/AuthContext';
|
||||
import useEventHandlers from './useEventHandlers';
|
||||
import store from '~/store';
|
||||
|
||||
const clearDraft = (conversationId?: string | null) => {
|
||||
if (conversationId) {
|
||||
localStorage.removeItem(`${LocalStorageKeys.TEXT_DRAFT}${conversationId}`);
|
||||
localStorage.removeItem(`${LocalStorageKeys.FILES_DRAFT}${conversationId}`);
|
||||
} else {
|
||||
localStorage.removeItem(`${LocalStorageKeys.TEXT_DRAFT}${Constants.NEW_CONVO}`);
|
||||
localStorage.removeItem(`${LocalStorageKeys.FILES_DRAFT}${Constants.NEW_CONVO}`);
|
||||
}
|
||||
};
|
||||
|
||||
type ChatHelpers = Pick<
|
||||
EventHandlerParams,
|
||||
| 'setMessages'
|
||||
| 'getMessages'
|
||||
| 'setConversation'
|
||||
| 'setIsSubmitting'
|
||||
| 'newConversation'
|
||||
| 'resetLatestMessage'
|
||||
>;
|
||||
|
||||
const MAX_RETRIES = 5;
|
||||
|
||||
/**
|
||||
* Hook for resumable SSE streams.
|
||||
* Separates generation start (POST) from stream subscription (GET EventSource).
|
||||
* Supports auto-reconnection with exponential backoff.
|
||||
*
|
||||
* Key behavior:
|
||||
* - Navigation away does NOT abort the generation (just closes SSE)
|
||||
* - Only explicit abort (via stop button → backend abort endpoint) stops generation
|
||||
* - Backend emits `done` event with `aborted: true` on abort, handled via finalHandler
|
||||
*/
|
||||
export default function useResumableSSE(
|
||||
submission: TSubmission | null,
|
||||
chatHelpers: ChatHelpers,
|
||||
isAddedRequest = false,
|
||||
runIndex = 0,
|
||||
) {
|
||||
const queryClient = useQueryClient();
|
||||
const setActiveRunId = useSetRecoilState(store.activeRunFamily(runIndex));
|
||||
|
||||
const { token, isAuthenticated } = useAuthContext();
|
||||
|
||||
/**
|
||||
* Optimistically add a job ID to the active jobs cache.
|
||||
* Called when generation starts.
|
||||
*/
|
||||
const addActiveJob = useCallback(
|
||||
(jobId: string) => {
|
||||
queryClient.setQueryData<ActiveJobsResponse>([QueryKeys.activeJobs], (old) => ({
|
||||
activeJobIds: [...new Set([...(old?.activeJobIds ?? []), jobId])],
|
||||
}));
|
||||
},
|
||||
[queryClient],
|
||||
);
|
||||
|
||||
/**
|
||||
* Optimistically remove a job ID from the active jobs cache.
|
||||
* Called when generation completes, aborts, or errors.
|
||||
*/
|
||||
const removeActiveJob = useCallback(
|
||||
(jobId: string) => {
|
||||
queryClient.setQueryData<ActiveJobsResponse>([QueryKeys.activeJobs], (old) => ({
|
||||
activeJobIds: (old?.activeJobIds ?? []).filter((id) => id !== jobId),
|
||||
}));
|
||||
},
|
||||
[queryClient],
|
||||
);
|
||||
const [_completed, setCompleted] = useState(new Set());
|
||||
const [streamId, setStreamId] = useState<string | null>(null);
|
||||
const setAbortScroll = useSetRecoilState(store.abortScrollFamily(runIndex));
|
||||
const setShowStopButton = useSetRecoilState(store.showStopButtonByIndex(runIndex));
|
||||
|
||||
const sseRef = useRef<SSE | null>(null);
|
||||
const reconnectAttemptRef = useRef(0);
|
||||
const reconnectTimeoutRef = useRef<NodeJS.Timeout | null>(null);
|
||||
const submissionRef = useRef<TSubmission | null>(null);
|
||||
|
||||
const {
|
||||
setMessages,
|
||||
getMessages,
|
||||
setConversation,
|
||||
setIsSubmitting,
|
||||
newConversation,
|
||||
resetLatestMessage,
|
||||
} = chatHelpers;
|
||||
|
||||
const {
|
||||
stepHandler,
|
||||
finalHandler,
|
||||
errorHandler,
|
||||
clearStepMaps,
|
||||
messageHandler,
|
||||
contentHandler,
|
||||
createdHandler,
|
||||
syncStepMessage,
|
||||
attachmentHandler,
|
||||
resetContentHandler,
|
||||
} = useEventHandlers({
|
||||
setMessages,
|
||||
getMessages,
|
||||
setCompleted,
|
||||
isAddedRequest,
|
||||
setConversation,
|
||||
setIsSubmitting,
|
||||
newConversation,
|
||||
setShowStopButton,
|
||||
resetLatestMessage,
|
||||
});
|
||||
|
||||
const { data: startupConfig } = useGetStartupConfig();
|
||||
const balanceQuery = useGetUserBalance({
|
||||
enabled: !!isAuthenticated && startupConfig?.balance?.enabled,
|
||||
});
|
||||
|
||||
/**
|
||||
* Subscribe to stream via SSE library (supports custom headers)
|
||||
* Follows same auth pattern as useSSE
|
||||
* @param isResume - If true, adds ?resume=true to trigger sync event from server
|
||||
*/
|
||||
const subscribeToStream = useCallback(
|
||||
(currentStreamId: string, currentSubmission: TSubmission, isResume = false) => {
|
||||
let { userMessage } = currentSubmission;
|
||||
let textIndex: number | null = null;
|
||||
|
||||
const baseUrl = `/api/agents/chat/stream/${encodeURIComponent(currentStreamId)}`;
|
||||
const url = isResume ? `${baseUrl}?resume=true` : baseUrl;
|
||||
console.log('[ResumableSSE] Subscribing to stream:', url, { isResume });
|
||||
|
||||
const sse = new SSE(url, {
|
||||
headers: { Authorization: `Bearer ${token}` },
|
||||
method: 'GET',
|
||||
});
|
||||
sseRef.current = sse;
|
||||
|
||||
sse.addEventListener('open', () => {
|
||||
console.log('[ResumableSSE] Stream connected');
|
||||
setAbortScroll(false);
|
||||
// Restore UI state on successful connection (including reconnection)
|
||||
setIsSubmitting(true);
|
||||
setShowStopButton(true);
|
||||
reconnectAttemptRef.current = 0;
|
||||
});
|
||||
|
||||
sse.addEventListener('message', (e: MessageEvent) => {
|
||||
try {
|
||||
const data = JSON.parse(e.data);
|
||||
|
||||
if (data.final != null) {
|
||||
console.log('[ResumableSSE] Received FINAL event', {
|
||||
aborted: data.aborted,
|
||||
conversationId: data.conversation?.conversationId,
|
||||
hasResponseMessage: !!data.responseMessage,
|
||||
});
|
||||
clearDraft(currentSubmission.conversation?.conversationId);
|
||||
try {
|
||||
finalHandler(data, currentSubmission as EventSubmission);
|
||||
} catch (error) {
|
||||
console.error('[ResumableSSE] Error in finalHandler:', error);
|
||||
setIsSubmitting(false);
|
||||
setShowStopButton(false);
|
||||
}
|
||||
// Clear handler maps on stream completion to prevent memory leaks
|
||||
clearStepMaps();
|
||||
// Optimistically remove from active jobs
|
||||
removeActiveJob(currentStreamId);
|
||||
(startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch();
|
||||
sse.close();
|
||||
setStreamId(null);
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.created != null) {
|
||||
console.log('[ResumableSSE] Received CREATED event', {
|
||||
messageId: data.message?.messageId,
|
||||
conversationId: data.message?.conversationId,
|
||||
});
|
||||
const runId = v4();
|
||||
setActiveRunId(runId);
|
||||
userMessage = {
|
||||
...userMessage,
|
||||
...data.message,
|
||||
overrideParentMessageId: userMessage.overrideParentMessageId,
|
||||
};
|
||||
createdHandler(data, { ...currentSubmission, userMessage } as EventSubmission);
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.event === 'attachment' && data.data) {
|
||||
attachmentHandler({
|
||||
data: data.data,
|
||||
submission: currentSubmission as EventSubmission,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.event != null) {
|
||||
stepHandler(data, { ...currentSubmission, userMessage } as EventSubmission);
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.sync != null) {
|
||||
console.log('[ResumableSSE] SYNC received', {
|
||||
runSteps: data.resumeState?.runSteps?.length ?? 0,
|
||||
});
|
||||
|
||||
const runId = v4();
|
||||
setActiveRunId(runId);
|
||||
|
||||
// Replay run steps
|
||||
if (data.resumeState?.runSteps) {
|
||||
for (const runStep of data.resumeState.runSteps) {
|
||||
stepHandler({ event: 'on_run_step', data: runStep }, {
|
||||
...currentSubmission,
|
||||
userMessage,
|
||||
} as EventSubmission);
|
||||
}
|
||||
}
|
||||
|
||||
// Set message content from aggregatedContent
|
||||
if (data.resumeState?.aggregatedContent && userMessage?.messageId) {
|
||||
const messages = getMessages() ?? [];
|
||||
const userMsgId = userMessage.messageId;
|
||||
const serverResponseId = data.resumeState.responseMessageId;
|
||||
|
||||
// Find the EXACT response message - prioritize responseMessageId from server
|
||||
// This is critical when there are multiple responses to the same user message
|
||||
let responseIdx = -1;
|
||||
if (serverResponseId) {
|
||||
responseIdx = messages.findIndex((m) => m.messageId === serverResponseId);
|
||||
}
|
||||
// Fallback: find by parentMessageId pattern (for new messages)
|
||||
if (responseIdx < 0) {
|
||||
responseIdx = messages.findIndex(
|
||||
(m) =>
|
||||
!m.isCreatedByUser &&
|
||||
(m.messageId === `${userMsgId}_` || m.parentMessageId === userMsgId),
|
||||
);
|
||||
}
|
||||
|
||||
console.log('[ResumableSSE] SYNC update', {
|
||||
userMsgId,
|
||||
serverResponseId,
|
||||
responseIdx,
|
||||
foundMessageId: responseIdx >= 0 ? messages[responseIdx]?.messageId : null,
|
||||
messagesCount: messages.length,
|
||||
aggregatedContentLength: data.resumeState.aggregatedContent?.length,
|
||||
});
|
||||
|
||||
if (responseIdx >= 0) {
|
||||
// Update existing response message with aggregatedContent
|
||||
const updated = [...messages];
|
||||
const oldContent = updated[responseIdx]?.content;
|
||||
updated[responseIdx] = {
|
||||
...updated[responseIdx],
|
||||
content: data.resumeState.aggregatedContent,
|
||||
};
|
||||
console.log('[ResumableSSE] SYNC updating message', {
|
||||
messageId: updated[responseIdx]?.messageId,
|
||||
oldContentLength: Array.isArray(oldContent) ? oldContent.length : 0,
|
||||
newContentLength: data.resumeState.aggregatedContent?.length,
|
||||
});
|
||||
setMessages(updated);
|
||||
// Sync both content handler and step handler with the updated message
|
||||
// so subsequent deltas build on synced content, not stale content
|
||||
resetContentHandler();
|
||||
syncStepMessage(updated[responseIdx]);
|
||||
console.log('[ResumableSSE] SYNC complete, handlers synced');
|
||||
} else {
|
||||
// Add new response message
|
||||
const responseId = serverResponseId ?? `${userMsgId}_`;
|
||||
setMessages([
|
||||
...messages,
|
||||
{
|
||||
messageId: responseId,
|
||||
parentMessageId: userMsgId,
|
||||
conversationId: currentSubmission.conversation?.conversationId ?? '',
|
||||
text: '',
|
||||
content: data.resumeState.aggregatedContent,
|
||||
isCreatedByUser: false,
|
||||
} as TMessage,
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
setShowStopButton(true);
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.type != null) {
|
||||
const { text, index } = data;
|
||||
if (text != null && index !== textIndex) {
|
||||
textIndex = index;
|
||||
}
|
||||
contentHandler({ data, submission: currentSubmission as EventSubmission });
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.message != null) {
|
||||
const text = data.text ?? data.response;
|
||||
const initialResponse = {
|
||||
...(currentSubmission.initialResponse as TMessage),
|
||||
parentMessageId: data.parentMessageId,
|
||||
messageId: data.messageId,
|
||||
};
|
||||
messageHandler(text, { ...currentSubmission, userMessage, initialResponse });
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[ResumableSSE] Error processing message:', error);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Error event - fired on actual network failures (non-200, connection lost, etc.)
|
||||
* This should trigger reconnection with exponential backoff, except for 404 errors.
|
||||
*/
|
||||
sse.addEventListener('error', async (e: MessageEvent) => {
|
||||
(startupConfig?.balance?.enabled ?? false) && balanceQuery.refetch();
|
||||
|
||||
/* @ts-ignore - sse.js types don't expose responseCode */
|
||||
const responseCode = e.responseCode;
|
||||
|
||||
// 404 means job doesn't exist (completed/deleted) - don't retry
|
||||
if (responseCode === 404) {
|
||||
console.log('[ResumableSSE] Stream not found (404) - job completed or expired');
|
||||
sse.close();
|
||||
// Optimistically remove from active jobs since job is gone
|
||||
removeActiveJob(currentStreamId);
|
||||
setIsSubmitting(false);
|
||||
setShowStopButton(false);
|
||||
setStreamId(null);
|
||||
reconnectAttemptRef.current = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('[ResumableSSE] Stream error (network failure) - will attempt reconnect');
|
||||
|
||||
// Check for 401 and try to refresh token (same pattern as useSSE)
|
||||
if (responseCode === 401) {
|
||||
try {
|
||||
const refreshResponse = await request.refreshToken();
|
||||
const newToken = refreshResponse?.token ?? '';
|
||||
if (!newToken) {
|
||||
throw new Error('Token refresh failed.');
|
||||
}
|
||||
// Update headers on same SSE instance and retry (like useSSE)
|
||||
sse.headers = {
|
||||
Authorization: `Bearer ${newToken}`,
|
||||
};
|
||||
request.dispatchTokenUpdatedEvent(newToken);
|
||||
sse.stream();
|
||||
return;
|
||||
} catch (error) {
|
||||
console.log('[ResumableSSE] Token refresh failed:', error);
|
||||
}
|
||||
}
|
||||
|
||||
if (reconnectAttemptRef.current < MAX_RETRIES) {
|
||||
// Increment counter BEFORE close() so abort handler knows we're reconnecting
|
||||
reconnectAttemptRef.current++;
|
||||
const delay = Math.min(1000 * Math.pow(2, reconnectAttemptRef.current - 1), 30000);
|
||||
|
||||
console.log(
|
||||
`[ResumableSSE] Reconnecting in ${delay}ms (attempt ${reconnectAttemptRef.current}/${MAX_RETRIES})`,
|
||||
);
|
||||
|
||||
sse.close();
|
||||
|
||||
reconnectTimeoutRef.current = setTimeout(() => {
|
||||
if (submissionRef.current) {
|
||||
// Reconnect with isResume=true to get sync event with any missed content
|
||||
subscribeToStream(currentStreamId, submissionRef.current, true);
|
||||
}
|
||||
}, delay);
|
||||
|
||||
// Keep UI in "submitting" state during reconnection attempts
|
||||
// so user knows we're still trying (abort handler may have reset these)
|
||||
setIsSubmitting(true);
|
||||
setShowStopButton(true);
|
||||
} else {
|
||||
console.error('[ResumableSSE] Max reconnect attempts reached');
|
||||
sse.close();
|
||||
errorHandler({ data: undefined, submission: currentSubmission as EventSubmission });
|
||||
// Optimistically remove from active jobs on max retries
|
||||
removeActiveJob(currentStreamId);
|
||||
setIsSubmitting(false);
|
||||
setShowStopButton(false);
|
||||
setStreamId(null);
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Abort event - fired when sse.close() is called (intentional close).
|
||||
* This happens on cleanup/navigation OR when error handler closes to reconnect.
|
||||
* Only reset state if we're NOT in a reconnection cycle.
|
||||
*/
|
||||
sse.addEventListener('abort', () => {
|
||||
// If we're in a reconnection cycle, don't reset state
|
||||
// (error handler will set up the reconnect timeout)
|
||||
if (reconnectAttemptRef.current > 0) {
|
||||
console.log('[ResumableSSE] Stream closed for reconnect - preserving state');
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('[ResumableSSE] Stream aborted (intentional close) - no reconnect');
|
||||
// Clear any pending reconnect attempts
|
||||
if (reconnectTimeoutRef.current) {
|
||||
clearTimeout(reconnectTimeoutRef.current);
|
||||
reconnectTimeoutRef.current = null;
|
||||
}
|
||||
// Reset UI state - useResumeOnLoad will restore if user returns to this conversation
|
||||
setIsSubmitting(false);
|
||||
setShowStopButton(false);
|
||||
setStreamId(null);
|
||||
});
|
||||
|
||||
// Start the SSE connection
|
||||
sse.stream();
|
||||
|
||||
// Debug hooks for testing reconnection vs clean close behavior (dev only)
|
||||
if (import.meta.env.DEV) {
|
||||
const debugWindow = window as Window & {
|
||||
__sse?: SSE;
|
||||
__killNetwork?: () => void;
|
||||
__closeClean?: () => void;
|
||||
};
|
||||
debugWindow.__sse = sse;
|
||||
|
||||
/** Simulate network drop - triggers error event → reconnection */
|
||||
debugWindow.__killNetwork = () => {
|
||||
console.log('[Debug] Simulating network drop...');
|
||||
// @ts-ignore - sse.js types are incorrect, dispatchEvent actually takes Event
|
||||
sse.dispatchEvent(new Event('error'));
|
||||
};
|
||||
|
||||
/** Simulate clean close (navigation away) - triggers abort event → no reconnection */
|
||||
debugWindow.__closeClean = () => {
|
||||
console.log('[Debug] Simulating clean close (navigation away)...');
|
||||
sse.close();
|
||||
};
|
||||
}
|
||||
},
|
||||
[
|
||||
token,
|
||||
setAbortScroll,
|
||||
setActiveRunId,
|
||||
setShowStopButton,
|
||||
finalHandler,
|
||||
createdHandler,
|
||||
attachmentHandler,
|
||||
stepHandler,
|
||||
contentHandler,
|
||||
resetContentHandler,
|
||||
syncStepMessage,
|
||||
clearStepMaps,
|
||||
messageHandler,
|
||||
errorHandler,
|
||||
setIsSubmitting,
|
||||
getMessages,
|
||||
setMessages,
|
||||
startupConfig?.balance?.enabled,
|
||||
balanceQuery,
|
||||
removeActiveJob,
|
||||
],
|
||||
);
|
||||
|
||||
/**
|
||||
* Start generation (POST request that returns streamId)
|
||||
* Uses request.post which has axios interceptors for automatic token refresh.
|
||||
* Retries up to 3 times on network errors with exponential backoff.
|
||||
*/
|
||||
const startGeneration = useCallback(
|
||||
async (currentSubmission: TSubmission): Promise<string | null> => {
|
||||
const payloadData = createPayload(currentSubmission);
|
||||
let { payload } = payloadData;
|
||||
payload = removeNullishValues(payload) as TPayload;
|
||||
|
||||
clearStepMaps();
|
||||
|
||||
const url = payloadData.server;
|
||||
|
||||
const maxRetries = 3;
|
||||
let lastError: unknown = null;
|
||||
|
||||
for (let attempt = 1; attempt <= maxRetries; attempt++) {
|
||||
try {
|
||||
// Use request.post which handles auth token refresh via axios interceptors
|
||||
const data = (await request.post(url, payload)) as { streamId: string };
|
||||
console.log('[ResumableSSE] Generation started:', { streamId: data.streamId });
|
||||
return data.streamId;
|
||||
} catch (error) {
|
||||
lastError = error;
|
||||
// Check if it's a network error (retry) vs server error (don't retry)
|
||||
const isNetworkError =
|
||||
error instanceof Error &&
|
||||
'code' in error &&
|
||||
(error.code === 'ERR_NETWORK' || error.code === 'ERR_INTERNET_DISCONNECTED');
|
||||
|
||||
if (isNetworkError && attempt < maxRetries) {
|
||||
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 8000);
|
||||
console.log(
|
||||
`[ResumableSSE] Network error starting generation, retrying in ${delay}ms (attempt ${attempt}/${maxRetries})`,
|
||||
);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Don't retry: either not a network error or max retries reached
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// All retries failed or non-network error
|
||||
console.error('[ResumableSSE] Error starting generation:', lastError);
|
||||
errorHandler({ data: undefined, submission: currentSubmission as EventSubmission });
|
||||
setIsSubmitting(false);
|
||||
return null;
|
||||
},
|
||||
[clearStepMaps, errorHandler, setIsSubmitting],
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
if (!submission || Object.keys(submission).length === 0) {
|
||||
console.log('[ResumableSSE] No submission, cleaning up');
|
||||
// Clear reconnect timeout if submission is cleared
|
||||
if (reconnectTimeoutRef.current) {
|
||||
clearTimeout(reconnectTimeoutRef.current);
|
||||
reconnectTimeoutRef.current = null;
|
||||
}
|
||||
// Close SSE but do NOT dispatch cancel - navigation should not abort
|
||||
if (sseRef.current) {
|
||||
sseRef.current.close();
|
||||
sseRef.current = null;
|
||||
}
|
||||
setStreamId(null);
|
||||
reconnectAttemptRef.current = 0;
|
||||
submissionRef.current = null;
|
||||
return;
|
||||
}
|
||||
|
||||
const resumeStreamId = (submission as TSubmission & { resumeStreamId?: string }).resumeStreamId;
|
||||
console.log('[ResumableSSE] Effect triggered', {
|
||||
conversationId: submission.conversation?.conversationId,
|
||||
hasResumeStreamId: !!resumeStreamId,
|
||||
resumeStreamId,
|
||||
userMessageId: submission.userMessage?.messageId,
|
||||
});
|
||||
|
||||
submissionRef.current = submission;
|
||||
|
||||
const initStream = async () => {
|
||||
setIsSubmitting(true);
|
||||
setShowStopButton(true);
|
||||
|
||||
if (resumeStreamId) {
|
||||
// Resume: just subscribe to existing stream, don't start new generation
|
||||
console.log('[ResumableSSE] Resuming existing stream:', resumeStreamId);
|
||||
setStreamId(resumeStreamId);
|
||||
// Optimistically add to active jobs (in case it's not already there)
|
||||
addActiveJob(resumeStreamId);
|
||||
subscribeToStream(resumeStreamId, submission, true); // isResume=true
|
||||
} else {
|
||||
// New generation: start and then subscribe
|
||||
console.log('[ResumableSSE] Starting NEW generation');
|
||||
const newStreamId = await startGeneration(submission);
|
||||
if (newStreamId) {
|
||||
setStreamId(newStreamId);
|
||||
// Optimistically add to active jobs
|
||||
addActiveJob(newStreamId);
|
||||
// Queue title generation if this is a new conversation (first message)
|
||||
const isNewConvo = submission.userMessage?.parentMessageId === Constants.NO_PARENT;
|
||||
if (isNewConvo) {
|
||||
queueTitleGeneration(newStreamId);
|
||||
}
|
||||
subscribeToStream(newStreamId, submission);
|
||||
} else {
|
||||
console.error('[ResumableSSE] Failed to get streamId from startGeneration');
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
initStream();
|
||||
|
||||
return () => {
|
||||
console.log('[ResumableSSE] Cleanup - closing SSE, resetting UI state');
|
||||
// Cleanup on unmount/navigation - close connection but DO NOT abort backend
|
||||
// Reset UI state so it doesn't leak to other conversations
|
||||
// If user returns to this conversation, useResumeOnLoad will restore the state
|
||||
if (reconnectTimeoutRef.current) {
|
||||
clearTimeout(reconnectTimeoutRef.current);
|
||||
reconnectTimeoutRef.current = null;
|
||||
}
|
||||
// Reset reconnect counter before closing (so abort handler doesn't think we're reconnecting)
|
||||
reconnectAttemptRef.current = 0;
|
||||
if (sseRef.current) {
|
||||
sseRef.current.close();
|
||||
sseRef.current = null;
|
||||
}
|
||||
// Clear handler maps to prevent memory leaks and stale state
|
||||
clearStepMaps();
|
||||
// Reset UI state on cleanup - useResumeOnLoad will restore if needed
|
||||
setIsSubmitting(false);
|
||||
setShowStopButton(false);
|
||||
};
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [submission]);
|
||||
|
||||
return { streamId };
|
||||
}
|
||||
41
client/src/hooks/SSE/useResumableStreamToggle.ts
Normal file
41
client/src/hooks/SSE/useResumableStreamToggle.ts
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
import { useEffect, useRef } from 'react';
|
||||
import { useRecoilState } from 'recoil';
|
||||
import { isAssistantsEndpoint } from 'librechat-data-provider';
|
||||
import type { EModelEndpoint } from 'librechat-data-provider';
|
||||
import store from '~/store';
|
||||
|
||||
/**
|
||||
* Automatically toggles resumable streams off for assistants endpoints
|
||||
* and restores the previous value when switching away.
|
||||
*
|
||||
* Assistants endpoints have their own streaming mechanism and don't support resumable streams.
|
||||
*/
|
||||
export default function useResumableStreamToggle(
|
||||
endpoint: EModelEndpoint | string | null | undefined,
|
||||
endpointType?: EModelEndpoint | string | null,
|
||||
) {
|
||||
const [resumableStreams, setResumableStreams] = useRecoilState(store.resumableStreams);
|
||||
const savedValueRef = useRef<boolean | null>(null);
|
||||
const wasAssistantsRef = useRef(false);
|
||||
|
||||
useEffect(() => {
|
||||
const actualEndpoint = endpointType ?? endpoint;
|
||||
const isAssistants = isAssistantsEndpoint(actualEndpoint);
|
||||
|
||||
if (isAssistants && !wasAssistantsRef.current) {
|
||||
// Switching TO assistants: save current value and disable
|
||||
savedValueRef.current = resumableStreams;
|
||||
if (resumableStreams) {
|
||||
setResumableStreams(false);
|
||||
}
|
||||
wasAssistantsRef.current = true;
|
||||
} else if (!isAssistants && wasAssistantsRef.current) {
|
||||
// Switching AWAY from assistants: restore saved value
|
||||
if (savedValueRef.current !== null) {
|
||||
setResumableStreams(savedValueRef.current);
|
||||
savedValueRef.current = null;
|
||||
}
|
||||
wasAssistantsRef.current = false;
|
||||
}
|
||||
}, [endpoint, endpointType, resumableStreams, setResumableStreams]);
|
||||
}
|
||||
256
client/src/hooks/SSE/useResumeOnLoad.ts
Normal file
256
client/src/hooks/SSE/useResumeOnLoad.ts
Normal file
|
|
@ -0,0 +1,256 @@
|
|||
import { useEffect, useRef } from 'react';
|
||||
import { useSetRecoilState, useRecoilValue } from 'recoil';
|
||||
import { Constants, tMessageSchema } from 'librechat-data-provider';
|
||||
import type { TMessage, TConversation, TSubmission, Agents } from 'librechat-data-provider';
|
||||
import { useStreamStatus } from '~/data-provider';
|
||||
import store from '~/store';
|
||||
|
||||
/**
|
||||
* Build a submission object from resume state for reconnected streams.
|
||||
* This provides the minimum data needed for useResumableSSE to subscribe.
|
||||
*/
|
||||
function buildSubmissionFromResumeState(
|
||||
resumeState: Agents.ResumeState,
|
||||
streamId: string,
|
||||
messages: TMessage[],
|
||||
conversationId: string,
|
||||
): TSubmission {
|
||||
const userMessageData = resumeState.userMessage;
|
||||
const responseMessageId =
|
||||
resumeState.responseMessageId ?? `${userMessageData?.messageId ?? 'resume'}_`;
|
||||
|
||||
// Try to find existing user message in the messages array (from database)
|
||||
const existingUserMessage = messages.find(
|
||||
(m) => m.isCreatedByUser && m.messageId === userMessageData?.messageId,
|
||||
);
|
||||
|
||||
// Try to find existing response message in the messages array (from database)
|
||||
const existingResponseMessage = messages.find(
|
||||
(m) =>
|
||||
!m.isCreatedByUser &&
|
||||
(m.messageId === responseMessageId || m.parentMessageId === userMessageData?.messageId),
|
||||
);
|
||||
|
||||
// Create or use existing user message
|
||||
const userMessage: TMessage =
|
||||
existingUserMessage ??
|
||||
(userMessageData
|
||||
? (tMessageSchema.parse({
|
||||
messageId: userMessageData.messageId,
|
||||
parentMessageId: userMessageData.parentMessageId ?? Constants.NO_PARENT,
|
||||
conversationId: userMessageData.conversationId ?? conversationId,
|
||||
text: userMessageData.text ?? '',
|
||||
isCreatedByUser: true,
|
||||
role: 'user',
|
||||
}) as TMessage)
|
||||
: (messages[messages.length - 2] ??
|
||||
({
|
||||
messageId: 'resume_user_msg',
|
||||
conversationId,
|
||||
text: '',
|
||||
isCreatedByUser: true,
|
||||
} as TMessage)));
|
||||
|
||||
// ALWAYS use aggregatedContent from resumeState - it has the latest content from the running job.
|
||||
// DB content may be stale (saved at disconnect, but generation continued).
|
||||
const initialResponse: TMessage = {
|
||||
messageId: existingResponseMessage?.messageId ?? responseMessageId,
|
||||
parentMessageId: existingResponseMessage?.parentMessageId ?? userMessage.messageId,
|
||||
conversationId,
|
||||
text: '',
|
||||
// aggregatedContent is authoritative - it reflects actual job state
|
||||
content: (resumeState.aggregatedContent as TMessage['content']) ?? [],
|
||||
isCreatedByUser: false,
|
||||
role: 'assistant',
|
||||
sender: existingResponseMessage?.sender ?? resumeState.sender,
|
||||
model: existingResponseMessage?.model,
|
||||
} as TMessage;
|
||||
|
||||
const conversation: TConversation = {
|
||||
conversationId,
|
||||
title: 'Resumed Chat',
|
||||
endpoint: null,
|
||||
} as TConversation;
|
||||
|
||||
return {
|
||||
messages,
|
||||
userMessage,
|
||||
initialResponse,
|
||||
conversation,
|
||||
isRegenerate: false,
|
||||
isTemporary: false,
|
||||
endpointOption: {},
|
||||
// Signal to useResumableSSE to subscribe to existing stream instead of starting new
|
||||
resumeStreamId: streamId,
|
||||
} as TSubmission & { resumeStreamId: string };
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook to resume streaming if navigating to a conversation with active generation.
|
||||
* Checks stream status via React Query and sets submission if active job found.
|
||||
*
|
||||
* This hook:
|
||||
* 1. Uses useStreamStatus to check for active jobs on navigation
|
||||
* 2. If active job found, builds a submission with streamId and sets it
|
||||
* 3. useResumableSSE picks up the submission and subscribes to the stream
|
||||
*
|
||||
* @param messagesLoaded - Whether the messages query has finished loading (prevents race condition)
|
||||
*/
|
||||
export default function useResumeOnLoad(
|
||||
conversationId: string | undefined,
|
||||
getMessages: () => TMessage[] | undefined,
|
||||
runIndex = 0,
|
||||
messagesLoaded = true,
|
||||
) {
|
||||
const resumableEnabled = useRecoilValue(store.resumableStreams);
|
||||
const setSubmission = useSetRecoilState(store.submissionByIndex(runIndex));
|
||||
const currentSubmission = useRecoilValue(store.submissionByIndex(runIndex));
|
||||
// Track conversations we've already processed (either resumed or skipped)
|
||||
const processedConvoRef = useRef<string | null>(null);
|
||||
|
||||
// Check for active stream when conversation changes
|
||||
// Allow check if no submission OR submission is for a different conversation (stale)
|
||||
const submissionConvoId = currentSubmission?.conversation?.conversationId;
|
||||
const hasActiveSubmissionForThisConvo = currentSubmission && submissionConvoId === conversationId;
|
||||
|
||||
const shouldCheck =
|
||||
resumableEnabled &&
|
||||
messagesLoaded && // Wait for messages to load before checking
|
||||
!hasActiveSubmissionForThisConvo && // Allow if no submission or stale submission
|
||||
!!conversationId &&
|
||||
conversationId !== Constants.NEW_CONVO &&
|
||||
processedConvoRef.current !== conversationId; // Don't re-check processed convos
|
||||
|
||||
const { data: streamStatus, isSuccess } = useStreamStatus(conversationId, shouldCheck);
|
||||
|
||||
useEffect(() => {
|
||||
console.log('[ResumeOnLoad] Effect check', {
|
||||
resumableEnabled,
|
||||
conversationId,
|
||||
messagesLoaded,
|
||||
hasCurrentSubmission: !!currentSubmission,
|
||||
currentSubmissionConvoId: currentSubmission?.conversation?.conversationId,
|
||||
isSuccess,
|
||||
streamStatusActive: streamStatus?.active,
|
||||
streamStatusStreamId: streamStatus?.streamId,
|
||||
processedConvoRef: processedConvoRef.current,
|
||||
});
|
||||
|
||||
if (!resumableEnabled || !conversationId || conversationId === Constants.NEW_CONVO) {
|
||||
console.log('[ResumeOnLoad] Skipping - not enabled or new convo');
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait for messages to load to avoid race condition where sync overwrites then DB overwrites
|
||||
if (!messagesLoaded) {
|
||||
console.log('[ResumeOnLoad] Waiting for messages to load');
|
||||
return;
|
||||
}
|
||||
|
||||
// Don't resume if we already have an active submission FOR THIS CONVERSATION
|
||||
// A stale submission with undefined/different conversationId should not block us
|
||||
if (hasActiveSubmissionForThisConvo) {
|
||||
console.log('[ResumeOnLoad] Skipping - already have active submission for this conversation');
|
||||
// Mark as processed so we don't try again
|
||||
processedConvoRef.current = conversationId;
|
||||
return;
|
||||
}
|
||||
|
||||
// If there's a stale submission for a different conversation, log it but continue
|
||||
if (currentSubmission && submissionConvoId !== conversationId) {
|
||||
console.log(
|
||||
'[ResumeOnLoad] Found stale submission for different conversation, will check for resume',
|
||||
{
|
||||
staleConvoId: submissionConvoId,
|
||||
currentConvoId: conversationId,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Wait for stream status query to complete
|
||||
if (!isSuccess || !streamStatus) {
|
||||
console.log('[ResumeOnLoad] Waiting for stream status query');
|
||||
return;
|
||||
}
|
||||
|
||||
// Don't process the same conversation twice
|
||||
if (processedConvoRef.current === conversationId) {
|
||||
console.log('[ResumeOnLoad] Skipping - already processed this conversation');
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if there's an active job to resume
|
||||
// DON'T mark as processed here - only mark when we actually create a submission
|
||||
// This prevents stale cache data from blocking subsequent resume attempts
|
||||
if (!streamStatus.active || !streamStatus.streamId) {
|
||||
console.log('[ResumeOnLoad] No active job to resume for:', conversationId);
|
||||
return;
|
||||
}
|
||||
|
||||
// Mark as processed NOW - we verified there's an active job and will create submission
|
||||
processedConvoRef.current = conversationId;
|
||||
|
||||
console.log('[ResumeOnLoad] Found active job, creating submission...', {
|
||||
streamId: streamStatus.streamId,
|
||||
status: streamStatus.status,
|
||||
resumeState: streamStatus.resumeState,
|
||||
});
|
||||
|
||||
const messages = getMessages() || [];
|
||||
|
||||
// Build submission from resume state if available
|
||||
if (streamStatus.resumeState) {
|
||||
const submission = buildSubmissionFromResumeState(
|
||||
streamStatus.resumeState,
|
||||
streamStatus.streamId,
|
||||
messages,
|
||||
conversationId,
|
||||
);
|
||||
setSubmission(submission);
|
||||
} else {
|
||||
// Minimal submission without resume state
|
||||
const lastUserMessage = [...messages].reverse().find((m) => m.isCreatedByUser);
|
||||
const submission = {
|
||||
messages,
|
||||
userMessage:
|
||||
lastUserMessage ?? ({ messageId: 'resume', conversationId, text: '' } as TMessage),
|
||||
initialResponse: {
|
||||
messageId: 'resume_',
|
||||
conversationId,
|
||||
text: '',
|
||||
content: streamStatus.aggregatedContent ?? [{ type: 'text', text: '' }],
|
||||
} as TMessage,
|
||||
conversation: { conversationId, title: 'Resumed Chat' } as TConversation,
|
||||
isRegenerate: false,
|
||||
isTemporary: false,
|
||||
endpointOption: {},
|
||||
// Signal to useResumableSSE to subscribe to existing stream instead of starting new
|
||||
resumeStreamId: streamStatus.streamId,
|
||||
} as TSubmission & { resumeStreamId: string };
|
||||
setSubmission(submission);
|
||||
}
|
||||
}, [
|
||||
conversationId,
|
||||
resumableEnabled,
|
||||
messagesLoaded,
|
||||
hasActiveSubmissionForThisConvo,
|
||||
submissionConvoId,
|
||||
currentSubmission,
|
||||
isSuccess,
|
||||
streamStatus,
|
||||
getMessages,
|
||||
setSubmission,
|
||||
]);
|
||||
|
||||
// Reset processedConvoRef when conversation changes to allow re-checking
|
||||
useEffect(() => {
|
||||
// Always reset when conversation changes - this allows resuming when navigating back
|
||||
if (conversationId !== processedConvoRef.current) {
|
||||
console.log('[ResumeOnLoad] Resetting processedConvoRef for new conversation:', {
|
||||
old: processedConvoRef.current,
|
||||
new: conversationId,
|
||||
});
|
||||
processedConvoRef.current = null;
|
||||
}
|
||||
}, [conversationId]);
|
||||
}
|
||||
|
|
@ -13,7 +13,7 @@ import {
|
|||
import type { TMessage, TPayload, TSubmission, EventSubmission } from 'librechat-data-provider';
|
||||
import type { EventHandlerParams } from './useEventHandlers';
|
||||
import type { TResData } from '~/common';
|
||||
import { useGenTitleMutation, useGetStartupConfig, useGetUserBalance } from '~/data-provider';
|
||||
import { useGetStartupConfig, useGetUserBalance } from '~/data-provider';
|
||||
import { useAuthContext } from '~/hooks/AuthContext';
|
||||
import useEventHandlers from './useEventHandlers';
|
||||
import store from '~/store';
|
||||
|
|
@ -44,7 +44,6 @@ export default function useSSE(
|
|||
isAddedRequest = false,
|
||||
runIndex = 0,
|
||||
) {
|
||||
const genTitle = useGenTitleMutation();
|
||||
const setActiveRunId = useSetRecoilState(store.activeRunFamily(runIndex));
|
||||
|
||||
const { token, isAuthenticated } = useAuthContext();
|
||||
|
|
@ -73,7 +72,6 @@ export default function useSSE(
|
|||
attachmentHandler,
|
||||
abortConversation,
|
||||
} = useEventHandlers({
|
||||
genTitle,
|
||||
setMessages,
|
||||
getMessages,
|
||||
setCompleted,
|
||||
|
|
|
|||
|
|
@ -21,7 +21,8 @@ type TUseStepHandler = {
|
|||
announcePolite: (options: AnnounceOptions) => void;
|
||||
setMessages: (messages: TMessage[]) => void;
|
||||
getMessages: () => TMessage[] | undefined;
|
||||
setIsSubmitting: SetterOrUpdater<boolean>;
|
||||
/** @deprecated - isSubmitting should be derived from submission state */
|
||||
setIsSubmitting?: SetterOrUpdater<boolean>;
|
||||
lastAnnouncementTimeRef: React.MutableRefObject<number>;
|
||||
};
|
||||
|
||||
|
|
@ -53,7 +54,6 @@ type AllContentTypes =
|
|||
export default function useStepHandler({
|
||||
setMessages,
|
||||
getMessages,
|
||||
setIsSubmitting,
|
||||
announcePolite,
|
||||
lastAnnouncementTimeRef,
|
||||
}: TUseStepHandler) {
|
||||
|
|
@ -101,8 +101,13 @@ export default function useStepHandler({
|
|||
}
|
||||
/** Prevent overwriting an existing content part with a different type */
|
||||
const existingType = (updatedContent[index]?.type as string | undefined) ?? '';
|
||||
if (existingType && !contentType.startsWith(existingType)) {
|
||||
console.warn('Content type mismatch');
|
||||
if (
|
||||
existingType &&
|
||||
existingType !== contentType &&
|
||||
!contentType.startsWith(existingType) &&
|
||||
!existingType.startsWith(contentType)
|
||||
) {
|
||||
console.warn('Content type mismatch', { existingType, contentType, index });
|
||||
return message;
|
||||
}
|
||||
|
||||
|
|
@ -198,7 +203,6 @@ export default function useStepHandler({
|
|||
({ event, data }: TStepEvent, submission: EventSubmission) => {
|
||||
const messages = getMessages() || [];
|
||||
const { userMessage } = submission;
|
||||
setIsSubmitting(true);
|
||||
let parentMessageId = userMessage.messageId;
|
||||
|
||||
const currentTime = Date.now();
|
||||
|
|
@ -228,18 +232,42 @@ export default function useStepHandler({
|
|||
let response = messageMap.current.get(responseMessageId);
|
||||
|
||||
if (!response) {
|
||||
const responseMessage = messages[messages.length - 1] as TMessage;
|
||||
// Find the actual response message - check if last message is a response, otherwise use initialResponse
|
||||
const lastMessage = messages[messages.length - 1] as TMessage;
|
||||
const responseMessage =
|
||||
lastMessage && !lastMessage.isCreatedByUser
|
||||
? lastMessage
|
||||
: (submission?.initialResponse as TMessage);
|
||||
|
||||
// For edit scenarios, initialContent IS the complete starting content (not to be merged)
|
||||
// For resume scenarios (no editedContent), initialContent is empty and we use existingContent
|
||||
const existingContent = responseMessage?.content ?? [];
|
||||
const mergedContent = initialContent.length > 0 ? initialContent : existingContent;
|
||||
|
||||
response = {
|
||||
...responseMessage,
|
||||
parentMessageId,
|
||||
conversationId: userMessage.conversationId,
|
||||
messageId: responseMessageId,
|
||||
content: initialContent,
|
||||
content: mergedContent,
|
||||
};
|
||||
|
||||
messageMap.current.set(responseMessageId, response);
|
||||
setMessages([...messages.slice(0, -1), response]);
|
||||
|
||||
// Get fresh messages to handle multi-tab scenarios where messages may have loaded
|
||||
// after this handler started (Tab 2 may have more complete history now)
|
||||
const freshMessages = getMessages() || [];
|
||||
const currentMessages = freshMessages.length > messages.length ? freshMessages : messages;
|
||||
|
||||
// Remove any existing response placeholder
|
||||
let updatedMessages = currentMessages.filter((m) => m.messageId !== responseMessageId);
|
||||
|
||||
// Ensure userMessage is present (multi-tab: Tab 2 may not have it yet)
|
||||
if (!updatedMessages.some((m) => m.messageId === userMessage.messageId)) {
|
||||
updatedMessages = [...updatedMessages, userMessage as TMessage];
|
||||
}
|
||||
|
||||
setMessages([...updatedMessages, response]);
|
||||
}
|
||||
|
||||
// Store tool call IDs if present
|
||||
|
|
@ -461,7 +489,7 @@ export default function useStepHandler({
|
|||
stepMap.current.clear();
|
||||
};
|
||||
},
|
||||
[getMessages, setIsSubmitting, lastAnnouncementTimeRef, announcePolite, setMessages],
|
||||
[getMessages, lastAnnouncementTimeRef, announcePolite, setMessages],
|
||||
);
|
||||
|
||||
const clearStepMaps = useCallback(() => {
|
||||
|
|
@ -469,5 +497,17 @@ export default function useStepHandler({
|
|||
messageMap.current.clear();
|
||||
stepMap.current.clear();
|
||||
}, []);
|
||||
return { stepHandler, clearStepMaps };
|
||||
|
||||
/**
|
||||
* Sync a message into the step handler's messageMap.
|
||||
* Call this after receiving sync event to ensure subsequent deltas
|
||||
* build on the synced content, not stale content.
|
||||
*/
|
||||
const syncStepMessage = useCallback((message: TMessage) => {
|
||||
if (message?.messageId) {
|
||||
messageMap.current.set(message.messageId, { ...message });
|
||||
}
|
||||
}, []);
|
||||
|
||||
return { stepHandler, clearStepMaps, syncStepMessage };
|
||||
}
|
||||
|
|
|
|||
|
|
@ -490,6 +490,7 @@
|
|||
"com_nav_info_save_draft": "When enabled, the text and attachments you enter in the chat form will be automatically saved locally as drafts. These drafts will be available even if you reload the page or switch to a different conversation. Drafts are stored locally on your device and are deleted once the message is sent.",
|
||||
"com_nav_info_show_thinking": "When enabled, the chat will display the thinking dropdowns open by default, allowing you to view the AI's reasoning in real-time. When disabled, the thinking dropdowns will remain closed by default for a cleaner and more streamlined interface",
|
||||
"com_nav_info_user_name_display": "When enabled, the username of the sender will be shown above each message you send. When disabled, you will only see \"You\" above your messages.",
|
||||
"com_nav_info_resumable_streams": "When enabled, LLM generation continues in the background even if your connection drops. You can reconnect and resume receiving the response without losing progress. This is useful for unstable connections or long responses.",
|
||||
"com_nav_keep_screen_awake": "Keep screen awake during response generation",
|
||||
"com_nav_lang_arabic": "العربية",
|
||||
"com_nav_lang_armenian": "Հայերեն",
|
||||
|
|
@ -548,6 +549,7 @@
|
|||
"com_nav_plus_command": "+-Command",
|
||||
"com_nav_plus_command_description": "Toggle command \"+\" for adding a multi-response setting",
|
||||
"com_nav_profile_picture": "Profile Picture",
|
||||
"com_nav_resumable_streams": "Resumable Streams (Beta)",
|
||||
"com_nav_save_badges_state": "Save badges state",
|
||||
"com_nav_save_drafts": "Save drafts locally",
|
||||
"com_nav_scroll_button": "Scroll to the end button",
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ const localStorageAtoms = {
|
|||
LaTeXParsing: atomWithLocalStorage('LaTeXParsing', true),
|
||||
centerFormOnLanding: atomWithLocalStorage('centerFormOnLanding', true),
|
||||
showFooter: atomWithLocalStorage('showFooter', true),
|
||||
resumableStreams: atomWithLocalStorage('resumableStreams', true),
|
||||
|
||||
// Commands settings
|
||||
atCommand: atomWithLocalStorage('atCommand', true),
|
||||
|
|
|
|||
|
|
@ -596,6 +596,77 @@ describe('Conversation Utilities', () => {
|
|||
expect(data!.pages[0].conversations[0].model).toBe('gpt-4');
|
||||
});
|
||||
|
||||
it('updateConvoInAllQueries with moveToTop moves convo to front and updates updatedAt', () => {
|
||||
// Add more conversations so 'a' is not at position 0
|
||||
const convoC = { conversationId: 'c', updatedAt: '2024-01-03T12:00:00Z' } as TConversation;
|
||||
queryClient.setQueryData(['allConversations'], {
|
||||
pages: [{ conversations: [convoC, convoA], nextCursor: null }],
|
||||
pageParams: [],
|
||||
});
|
||||
|
||||
const before = new Date().toISOString();
|
||||
updateConvoInAllQueries(queryClient, 'a', (c) => ({ ...c, model: 'gpt-4' }), true);
|
||||
const data = queryClient.getQueryData<InfiniteData<any>>(['allConversations']);
|
||||
|
||||
// 'a' should now be at position 0
|
||||
expect(data!.pages[0].conversations[0].conversationId).toBe('a');
|
||||
expect(data!.pages[0].conversations[0].model).toBe('gpt-4');
|
||||
// updatedAt should be updated
|
||||
expect(
|
||||
new Date(data!.pages[0].conversations[0].updatedAt).getTime(),
|
||||
).toBeGreaterThanOrEqual(new Date(before).getTime());
|
||||
// 'c' should now be at position 1
|
||||
expect(data!.pages[0].conversations[1].conversationId).toBe('c');
|
||||
});
|
||||
|
||||
it('updateConvoInAllQueries with moveToTop from second page', () => {
|
||||
const convoC = { conversationId: 'c', updatedAt: '2024-01-03T12:00:00Z' } as TConversation;
|
||||
const convoD = { conversationId: 'd', updatedAt: '2024-01-04T12:00:00Z' } as TConversation;
|
||||
queryClient.setQueryData(['allConversations'], {
|
||||
pages: [
|
||||
{ conversations: [convoC, convoD], nextCursor: 'cursor1' },
|
||||
{ conversations: [convoA, convoB], nextCursor: null },
|
||||
],
|
||||
pageParams: [],
|
||||
});
|
||||
|
||||
updateConvoInAllQueries(queryClient, 'a', (c) => ({ ...c, title: 'Updated' }), true);
|
||||
const data = queryClient.getQueryData<InfiniteData<any>>(['allConversations']);
|
||||
|
||||
// 'a' should now be at front of page 0
|
||||
expect(data!.pages[0].conversations[0].conversationId).toBe('a');
|
||||
expect(data!.pages[0].conversations[0].title).toBe('Updated');
|
||||
// Page 0 should have 3 conversations now
|
||||
expect(data!.pages[0].conversations.length).toBe(3);
|
||||
// Page 1 should have 1 conversation (only 'b' remains)
|
||||
expect(data!.pages[1].conversations.length).toBe(1);
|
||||
expect(data!.pages[1].conversations[0].conversationId).toBe('b');
|
||||
});
|
||||
|
||||
it('updateConvoInAllQueries with moveToTop when already at position 0 updates in place', () => {
|
||||
const originalUpdatedAt = convoA.updatedAt;
|
||||
updateConvoInAllQueries(queryClient, 'a', (c) => ({ ...c, model: 'gpt-4' }), true);
|
||||
const data = queryClient.getQueryData<InfiniteData<any>>(['allConversations']);
|
||||
|
||||
expect(data!.pages[0].conversations[0].conversationId).toBe('a');
|
||||
expect(data!.pages[0].conversations[0].model).toBe('gpt-4');
|
||||
// updatedAt should still be updated even when already at top
|
||||
expect(data!.pages[0].conversations[0].updatedAt).not.toBe(originalUpdatedAt);
|
||||
});
|
||||
|
||||
it('updateConvoInAllQueries with moveToTop returns original data if convo not found', () => {
|
||||
const dataBefore = queryClient.getQueryData<InfiniteData<any>>(['allConversations']);
|
||||
updateConvoInAllQueries(
|
||||
queryClient,
|
||||
'nonexistent',
|
||||
(c) => ({ ...c, model: 'gpt-4' }),
|
||||
true,
|
||||
);
|
||||
const dataAfter = queryClient.getQueryData<InfiniteData<any>>(['allConversations']);
|
||||
|
||||
expect(dataAfter).toEqual(dataBefore);
|
||||
});
|
||||
|
||||
it('removeConvoFromAllQueries deletes conversation', () => {
|
||||
removeConvoFromAllQueries(queryClient, 'a');
|
||||
const data = queryClient.getQueryData<InfiniteData<any>>(['allConversations']);
|
||||
|
|
|
|||
|
|
@ -352,6 +352,7 @@ export function updateConvoInAllQueries(
|
|||
queryClient: QueryClient,
|
||||
conversationId: string,
|
||||
updater: (c: TConversation) => TConversation,
|
||||
moveToTop = false,
|
||||
) {
|
||||
const queries = queryClient
|
||||
.getQueryCache()
|
||||
|
|
@ -362,15 +363,67 @@ export function updateConvoInAllQueries(
|
|||
if (!oldData) {
|
||||
return oldData;
|
||||
}
|
||||
return {
|
||||
...oldData,
|
||||
pages: oldData.pages.map((page) => ({
|
||||
...page,
|
||||
conversations: page.conversations.map((c) =>
|
||||
c.conversationId === conversationId ? updater(c) : c,
|
||||
|
||||
// Find conversation location (single pass with early exit)
|
||||
let pageIdx = -1;
|
||||
let convoIdx = -1;
|
||||
for (let pi = 0; pi < oldData.pages.length; pi++) {
|
||||
const ci = oldData.pages[pi].conversations.findIndex(
|
||||
(c) => c.conversationId === conversationId,
|
||||
);
|
||||
if (ci !== -1) {
|
||||
pageIdx = pi;
|
||||
convoIdx = ci;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pageIdx === -1) {
|
||||
return oldData;
|
||||
}
|
||||
|
||||
const found = oldData.pages[pageIdx].conversations[convoIdx];
|
||||
const updated = moveToTop
|
||||
? { ...updater(found), updatedAt: new Date().toISOString() }
|
||||
: updater(found);
|
||||
|
||||
// If not moving to top, or already at top of page 0, update in place
|
||||
if (!moveToTop || (pageIdx === 0 && convoIdx === 0)) {
|
||||
return {
|
||||
...oldData,
|
||||
pages: oldData.pages.map((page, pi) =>
|
||||
pi === pageIdx
|
||||
? {
|
||||
...page,
|
||||
conversations: page.conversations.map((c, ci) => (ci === convoIdx ? updated : c)),
|
||||
}
|
||||
: page,
|
||||
),
|
||||
})),
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
// Move to top: only modify affected pages
|
||||
const newPages = oldData.pages.map((page, pi) => {
|
||||
if (pi === 0 && pageIdx === 0) {
|
||||
// Source is page 0: remove from current position, add to front
|
||||
const convos = page.conversations.filter((_, ci) => ci !== convoIdx);
|
||||
return { ...page, conversations: [updated, ...convos] };
|
||||
}
|
||||
if (pi === 0) {
|
||||
// Add to front of page 0
|
||||
return { ...page, conversations: [updated, ...page.conversations] };
|
||||
}
|
||||
if (pi === pageIdx) {
|
||||
// Remove from source page
|
||||
return {
|
||||
...page,
|
||||
conversations: page.conversations.filter((_, ci) => ci !== convoIdx),
|
||||
};
|
||||
}
|
||||
return page;
|
||||
});
|
||||
|
||||
return { ...oldData, pages: newPages };
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue