LibreChat/packages/api/src/stream/interfaces/IJobStore.ts
Danny Avila e646a3615e
🌊 fix: Prevent Truncations When Redis Resumable Streams Are Enabled (#11710)
* fix: prevent truncated responses when Redis resumable streams are enabled

Race condition in RedisEventTransport.subscribe() caused early events
(seq 0+) to be lost. The Redis SUBSCRIBE command was fired as
fire-and-forget, but GenerationJobManager immediately set
hasSubscriber=true, disabling the earlyEventBuffer. Events published
during the gap between subscribe() returning and the Redis subscription
actually taking effect were neither buffered nor received — they were
silently dropped by Pub/Sub.

This manifested as "timeout waiting for seq 0, force-flushing N messages"
warnings followed by truncated or missing response text in the UI.

The fix:

- IEventTransport.subscribe() now returns an optional `ready` promise
  that resolves once the transport can actually receive messages
- RedisEventTransport returns the Redis SUBSCRIBE acknowledgment as the
  `ready` promise instead of firing it as fire-and-forget
- GenerationJobManager.subscribe() awaits `ready` before setting
  hasSubscriber=true, keeping the earlyEventBuffer active during the
  subscription window so no events are lost
- GenerationJobManager.emitChunk() early-returns after buffering when no
  subscriber is connected, avoiding wasteful Redis PUBLISHes that nobody
  would receive

Adds 5 regression tests covering the race condition for both in-memory
and Redis transports, verifying that events emitted before subscribe are
buffered and replayed, that the ready promise contract is correct for
both transport implementations, and that no events are lost across the
subscribe boundary.

* refactor: Update import paths in GenerationJobManager integration tests

- Refactored import statements in the GenerationJobManager integration test file to use absolute paths instead of relative paths, improving code readability and maintainability.
- Removed redundant imports and ensured consistent usage of the updated import structure across the test cases.

* chore: Remove redundant await from GenerationJobManager initialization in tests

- Updated multiple test cases to call GenerationJobManager.initialize() without awaiting, improving test performance and clarity.
- Ensured consistent initialization across various scenarios in the CollectedUsage and AbortJob test suites.

* refactor: Enhance GenerationJobManager integration tests and RedisEventTransport cleanup

- Updated GenerationJobManager integration tests to utilize dynamic Redis clients and removed unnecessary awaits from initialization calls, improving test performance.
- Refactored RedisEventTransport's destroy method to safely disconnect the subscriber, enhancing resource management and preventing potential errors during cleanup.

* feat: Enhance GenerationJobManager and RedisEventTransport for improved event handling

- Added a resetSequence method to IEventTransport and implemented it in RedisEventTransport to manage publish sequence counters effectively.
- Updated GenerationJobManager to utilize the new resetSequence method, ensuring proper event handling during stream operations.
- Introduced integration tests for GenerationJobManager to validate cross-replica event publishing and subscriber readiness in Redis, enhancing test coverage and reliability.

* test: Add integration tests for GenerationJobManager sequence reset and error recovery with Redis

- Introduced new tests to validate the behavior of GenerationJobManager during sequence resets, ensuring no stale events are received after a reset.
- Added tests to confirm that the sequence is not reset when a second subscriber joins mid-stream, maintaining event integrity.
- Implemented a test for resubscription after a Redis subscribe failure, verifying that events can still be received post-error.
- Enhanced overall test coverage for Redis-related functionalities in GenerationJobManager.

* fix: Update GenerationJobManager and RedisEventTransport for improved event synchronization

- Replaced the resetSequence method with syncReorderBuffer in GenerationJobManager to enhance cross-replica event handling without resetting the publisher sequence.
- Added a new syncReorderBuffer method in RedisEventTransport to advance the subscriber reorder buffer safely, ensuring no data loss during subscriber transitions.
- Introduced a new integration test to validate that local subscribers joining do not cause data loss for cross-replica subscribers, enhancing the reliability of event delivery.
- Updated existing tests to reflect changes in event handling logic, improving overall test coverage and robustness.

* fix: Clear flushTimeout in RedisEventTransport to prevent potential memory leaks

- Added logic to clear the flushTimeout in the reorderBuffer when resetting the sequence counters, ensuring proper resource management and preventing memory leaks during state transitions in RedisEventTransport.
2026-02-10 13:16:29 -05:00

346 lines
10 KiB
TypeScript

import type { Agents } from 'librechat-data-provider';
import type { StandardGraph } from '@librechat/agents';
/**
* Job status enum
*/
export type JobStatus = 'running' | 'complete' | 'error' | 'aborted';
/**
* Serializable job data - no object references, suitable for Redis/external storage
*/
export interface SerializableJobData {
streamId: string;
userId: string;
status: JobStatus;
createdAt: number;
completedAt?: number;
conversationId?: string;
error?: string;
/** User message metadata */
userMessage?: {
messageId: string;
parentMessageId?: string;
conversationId?: string;
text?: string;
};
/** Response message ID for reconnection */
responseMessageId?: string;
/** Sender name for UI display */
sender?: string;
/** Whether sync has been sent to a client */
syncSent: boolean;
/** Serialized final event for replay */
finalEvent?: string;
/** Endpoint metadata for abort handling - avoids storing functions */
endpoint?: string;
iconURL?: string;
model?: string;
promptTokens?: number;
}
/**
* Usage metadata for token spending across different LLM providers.
*
* This interface supports two mutually exclusive cache token formats:
*
* **OpenAI format** (GPT-4, o1, etc.):
* - Uses `input_token_details.cache_creation` and `input_token_details.cache_read`
* - Cache tokens are nested under the `input_token_details` object
*
* **Anthropic format** (Claude models):
* - Uses `cache_creation_input_tokens` and `cache_read_input_tokens`
* - Cache tokens are top-level properties
*
* When processing usage data, check both formats:
* ```typescript
* const cacheCreation = usage.input_token_details?.cache_creation
* || usage.cache_creation_input_tokens || 0;
* ```
*/
export interface UsageMetadata {
/** Total input tokens (prompt tokens) */
input_tokens?: number;
/** Total output tokens (completion tokens) */
output_tokens?: number;
/** Model identifier that generated this usage */
model?: string;
/**
* OpenAI-style cache token details.
* Present for OpenAI models (GPT-4, o1, etc.)
*/
input_token_details?: {
/** Tokens written to cache */
cache_creation?: number;
/** Tokens read from cache */
cache_read?: number;
};
/**
* Anthropic-style cache creation tokens.
* Present for Claude models. Mutually exclusive with input_token_details.
*/
cache_creation_input_tokens?: number;
/**
* Anthropic-style cache read tokens.
* Present for Claude models. Mutually exclusive with input_token_details.
*/
cache_read_input_tokens?: number;
}
/**
* Result returned from aborting a job - contains all data needed
* for token spending and message saving without storing callbacks
*/
export interface AbortResult {
/** Whether the abort was successful */
success: boolean;
/** The job data at time of abort */
jobData: SerializableJobData | null;
/** Aggregated content from the stream */
content: Agents.MessageContentComplex[];
/** Final event to send to client */
finalEvent: unknown;
/** Concatenated text from all content parts for token counting fallback */
text: string;
/** Collected usage metadata from all models for token spending */
collectedUsage: UsageMetadata[];
}
/**
* Resume state for reconnecting clients
*/
export interface ResumeState {
runSteps: Agents.RunStep[];
aggregatedContent: Agents.MessageContentComplex[];
userMessage?: SerializableJobData['userMessage'];
responseMessageId?: string;
conversationId?: string;
sender?: string;
}
/**
* Interface for job storage backend.
* Implementations can use in-memory Map, Redis, KV store, etc.
*
* Content state is tied to jobs:
* - In-memory: Holds WeakRef to graph for live content/run steps access
* - Redis: Persists chunks, reconstructs content on reconnect
*
* This consolidates job metadata + content state into a single interface.
*/
export interface IJobStore {
/** Initialize the store (e.g., connect to Redis, start cleanup intervals) */
initialize(): Promise<void>;
/** Create a new job */
createJob(
streamId: string,
userId: string,
conversationId?: string,
): Promise<SerializableJobData>;
/** Get a job by streamId (streamId === conversationId) */
getJob(streamId: string): Promise<SerializableJobData | null>;
/** Update job data */
updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void>;
/** Delete a job */
deleteJob(streamId: string): Promise<void>;
/** Check if job exists */
hasJob(streamId: string): Promise<boolean>;
/** Get all running jobs (for cleanup) */
getRunningJobs(): Promise<SerializableJobData[]>;
/** Cleanup expired jobs */
cleanup(): Promise<number>;
/** Get total job count */
getJobCount(): Promise<number>;
/** Get job count by status */
getJobCountByStatus(status: JobStatus): Promise<number>;
/** Destroy the store and release resources */
destroy(): Promise<void>;
/**
* Get active job IDs for a user.
* Returns conversation IDs of running jobs belonging to the user.
* Also performs self-healing cleanup of stale entries.
*
* @param userId - The user ID to query
* @returns Array of conversation IDs with active jobs
*/
getActiveJobIdsByUser(userId: string): Promise<string[]>;
// ===== Content State Methods =====
// These methods manage volatile content state tied to each job.
// In-memory: Uses WeakRef to graph for live access
// Redis: Persists chunks and reconstructs on demand
/**
* Set the graph reference for a job (in-memory only).
* The graph provides live access to contentParts and contentData (run steps).
*
* In-memory: Stores WeakRef to graph
* Redis: No-op (graph not transferable, uses chunks instead)
*
* @param streamId - The stream identifier
* @param graph - The StandardGraph instance
*/
setGraph(streamId: string, graph: StandardGraph): void;
/**
* Set content parts reference for a job.
*
* In-memory: Stores direct reference to content array
* Redis: No-op (content built from chunks)
*
* @param streamId - The stream identifier
* @param contentParts - The content parts array
*/
setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void;
/**
* Get aggregated content for a job.
*
* In-memory: Returns live content from graph.contentParts or stored reference
* Redis: Reconstructs from stored chunks
*
* @param streamId - The stream identifier
* @returns Content parts or null if not available
*/
getContentParts(streamId: string): Promise<{
content: Agents.MessageContentComplex[];
} | null>;
/**
* Get run steps for a job (for resume state).
*
* In-memory: Returns live run steps from graph.contentData
* Redis: Fetches from persistent storage
*
* @param streamId - The stream identifier
* @returns Run steps or empty array
*/
getRunSteps(streamId: string): Promise<Agents.RunStep[]>;
/**
* Append a streaming chunk for later reconstruction.
*
* In-memory: No-op (content available via graph reference)
* Redis: Uses XADD for append-only log efficiency
*
* @param streamId - The stream identifier
* @param event - The SSE event to append
*/
appendChunk(streamId: string, event: unknown): Promise<void>;
/**
* Clear all content state for a job.
* Called on job completion/cleanup.
*
* @param streamId - The stream identifier
*/
clearContentState(streamId: string): void;
/**
* Save run steps to persistent storage.
* In-memory: No-op (run steps accessed via graph reference)
* Redis: Persists for resume across instances
*
* @param streamId - The stream identifier
* @param runSteps - Run steps to save
*/
saveRunSteps?(streamId: string, runSteps: Agents.RunStep[]): Promise<void>;
/**
* Set collected usage reference for a job.
* This array accumulates token usage from all models during generation.
*
* @param streamId - The stream identifier
* @param collectedUsage - Array of usage metadata from all models
*/
setCollectedUsage(streamId: string, collectedUsage: UsageMetadata[]): void;
/**
* Get collected usage for a job.
*
* @param streamId - The stream identifier
* @returns Array of usage metadata or empty array
*/
getCollectedUsage(streamId: string): UsageMetadata[];
}
/**
* Interface for pub/sub event transport.
* Implementations can use EventEmitter, Redis Pub/Sub, etc.
*/
export interface IEventTransport {
/** Subscribe to events for a stream. `ready` resolves once the transport can receive messages. */
subscribe(
streamId: string,
handlers: {
onChunk: (event: unknown) => void;
onDone?: (event: unknown) => void;
onError?: (error: string) => void;
},
): { unsubscribe: () => void; ready?: Promise<void> };
/** Publish a chunk event - returns Promise in Redis mode for ordered delivery */
emitChunk(streamId: string, event: unknown): void | Promise<void>;
/** Publish a done event - returns Promise in Redis mode for ordered delivery */
emitDone(streamId: string, event: unknown): void | Promise<void>;
/** Publish an error event - returns Promise in Redis mode for ordered delivery */
emitError(streamId: string, error: string): void | Promise<void>;
/**
* Publish an abort signal to all replicas (Redis mode).
* Enables cross-replica abort: user aborts on Replica B,
* generating Replica A receives signal and stops.
* Optional - only implemented in Redis transport.
*/
emitAbort?(streamId: string): void;
/**
* Register callback for abort signals from any replica (Redis mode).
* Called when abort is triggered from any replica.
* Optional - only implemented in Redis transport.
*/
onAbort?(streamId: string, callback: () => void): void;
/** Get subscriber count for a stream */
getSubscriberCount(streamId: string): number;
/** Check if this is the first subscriber (for ready signaling) */
isFirstSubscriber(streamId: string): boolean;
/** Listen for all subscribers leaving */
onAllSubscribersLeft(streamId: string, callback: () => void): void;
/** Reset publish sequence counter for a stream (used during full stream cleanup) */
resetSequence?(streamId: string): void;
/** Advance subscriber reorder buffer to match publisher sequence (cross-replica safe: doesn't reset publisher counter) */
syncReorderBuffer?(streamId: string): void;
/** Cleanup transport resources for a specific stream */
cleanup(streamId: string): void;
/** Get all tracked stream IDs (for orphan cleanup) */
getTrackedStreamIds(): string[];
/** Destroy all transport resources */
destroy(): void;
}