diff --git a/api/server/index.js b/api/server/index.js index acd376a514..a7ddd47f37 100644 --- a/api/server/index.js +++ b/api/server/index.js @@ -17,6 +17,7 @@ const { handleJsonParseError, initializeFileStorage, GenerationJobManager, + createStreamServices, } = require('@librechat/api'); const { connectDb, indexSync } = require('~/db'); const initializeOAuthReconnectManager = require('./services/initializeOAuthReconnectManager'); @@ -193,6 +194,10 @@ const startServer = async () => { await initializeMCPs(); await initializeOAuthReconnectManager(); await checkMigrations(); + + // Configure stream services (auto-detects Redis from USE_REDIS env var) + const streamServices = createStreamServices(); + GenerationJobManager.configure(streamServices); GenerationJobManager.initialize(); }); }; diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 8a45c5445e..b6dd4efa29 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -11,6 +11,14 @@ import type * as t from '~/types'; import { InMemoryEventTransport } from './implementations/InMemoryEventTransport'; import { InMemoryJobStore } from './implementations/InMemoryJobStore'; +/** + * Configuration options for GenerationJobManager + */ +export interface GenerationJobManagerOptions { + jobStore?: IJobStore; + eventTransport?: IEventTransport; +} + /** * Runtime state for active jobs - not serializable, kept in-memory per instance. * Contains AbortController, ready promise, and other non-serializable state. @@ -67,13 +75,18 @@ class GenerationJobManagerClass { private cleanupInterval: NodeJS.Timeout | null = null; - constructor() { - this.jobStore = new InMemoryJobStore({ ttlAfterComplete: 300000, maxJobs: 1000 }); - this.eventTransport = new InMemoryEventTransport(); + /** Whether we're using Redis stores */ + private _isRedis = false; + + constructor(options?: GenerationJobManagerOptions) { + this.jobStore = + options?.jobStore ?? new InMemoryJobStore({ ttlAfterComplete: 300000, maxJobs: 1000 }); + this.eventTransport = options?.eventTransport ?? new InMemoryEventTransport(); } /** * Initialize the job manager with periodic cleanup. + * Call this once at application startup. */ initialize(): void { if (this.cleanupInterval) { @@ -93,6 +106,55 @@ class GenerationJobManagerClass { logger.debug('[GenerationJobManager] Initialized'); } + /** + * Configure the manager with custom stores. + * Call this BEFORE initialize() to use Redis or other stores. + * + * @example Using Redis + * ```ts + * import { createStreamServicesFromCache } from '~/stream/createStreamServices'; + * import { cacheConfig, ioredisClient } from '~/cache'; + * + * const services = createStreamServicesFromCache({ cacheConfig, ioredisClient }); + * GenerationJobManager.configure(services); + * GenerationJobManager.initialize(); + * ``` + */ + configure(services: { + jobStore: IJobStore; + eventTransport: IEventTransport; + isRedis?: boolean; + }): void { + if (this.cleanupInterval) { + logger.warn( + '[GenerationJobManager] Reconfiguring after initialization - destroying existing services', + ); + this.destroy(); + } + + this.jobStore = services.jobStore; + this.eventTransport = services.eventTransport; + this._isRedis = services.isRedis ?? false; + + logger.info( + `[GenerationJobManager] Configured with ${this._isRedis ? 'Redis' : 'in-memory'} stores`, + ); + } + + /** + * Check if using Redis stores. + */ + get isRedis(): boolean { + return this._isRedis; + } + + /** + * Get the job store instance (for advanced use cases). + */ + getJobStore(): IJobStore { + return this.jobStore; + } + /** * Create a new generation job. * @@ -146,15 +208,17 @@ class GenerationJobManagerClass { if (currentRuntime) { currentRuntime.syncSent = false; // Call registered handlers (from job.emitter.on('allSubscribersLeft', ...)) - const content = this.jobStore.getContentParts(streamId) ?? []; if (currentRuntime.allSubscribersLeftHandlers) { - for (const handler of currentRuntime.allSubscribersLeftHandlers) { - try { - handler(content); - } catch (err) { - logger.error(`[GenerationJobManager] Error in allSubscribersLeft handler:`, err); + this.jobStore.getContentParts(streamId).then((content) => { + const parts = content ?? []; + for (const handler of currentRuntime.allSubscribersLeftHandlers ?? []) { + try { + handler(parts); + } catch (err) { + logger.error(`[GenerationJobManager] Error in allSubscribersLeft handler:`, err); + } } - } + }); } } logger.debug(`[GenerationJobManager] All subscribers left ${streamId}, reset syncSent`); @@ -282,8 +346,9 @@ class GenerationJobManagerClass { error, }); - // Clear content state + // Clear content state and run step buffer this.jobStore.clearContentState(streamId); + this.runStepBuffers.delete(streamId); logger.debug(`[GenerationJobManager] Job completed: ${streamId}`); } @@ -311,7 +376,7 @@ class GenerationJobManagerClass { }); // Get content and extract text - const content = this.jobStore.getContentParts(streamId) ?? []; + const content = (await this.jobStore.getContentParts(streamId)) ?? []; const text = this.extractTextFromContent(content); // Create final event for abort @@ -458,9 +523,74 @@ class GenerationJobManagerClass { // Track user message from created event this.trackUserMessage(streamId, event); + // For Redis mode, persist chunk for later reconstruction + if (this._isRedis) { + // The SSE event structure is { event: string, data: unknown, ... } + // The aggregator expects { event: string, data: unknown } where data is the payload + const eventObj = event as Record; + const eventType = eventObj.event as string | undefined; + const eventData = eventObj.data; + + if (eventType && eventData !== undefined) { + // Store in format expected by aggregateContent: { event, data } + this.jobStore.appendChunk(streamId, { event: eventType, data: eventData }).catch((err) => { + logger.error(`[GenerationJobManager] Failed to append chunk:`, err); + }); + + // For run step events, also save to run steps key for quick retrieval + if (eventType === 'on_run_step' || eventType === 'on_run_step_completed') { + this.saveRunStepFromEvent(streamId, eventData as Record); + } + } + } + this.eventTransport.emitChunk(streamId, event); } + /** + * Extract and save run step from event data. + * The data is already the run step object from the event payload. + */ + private saveRunStepFromEvent(streamId: string, data: Record): void { + // The data IS the run step object + const runStep = data as Agents.RunStep; + if (!runStep.id) { + return; + } + + // Fire and forget - accumulate run steps + this.accumulateRunStep(streamId, runStep); + } + + /** + * Accumulate run steps for a stream. + * Uses a simple in-memory buffer that gets flushed to Redis. + */ + private runStepBuffers = new Map(); + + private accumulateRunStep(streamId: string, runStep: Agents.RunStep): void { + let buffer = this.runStepBuffers.get(streamId); + if (!buffer) { + buffer = []; + this.runStepBuffers.set(streamId, buffer); + } + + // Update or add run step + const existingIdx = buffer.findIndex((rs) => rs.id === runStep.id); + if (existingIdx >= 0) { + buffer[existingIdx] = runStep; + } else { + buffer.push(runStep); + } + + // Debounced save to Redis + if (this.jobStore.saveRunSteps) { + this.jobStore.saveRunSteps(streamId, buffer).catch((err) => { + logger.error(`[GenerationJobManager] Failed to save run steps:`, err); + }); + } + } + /** * Track user message from created event. */ @@ -554,8 +684,8 @@ class GenerationJobManagerClass { return null; } - const aggregatedContent = this.jobStore.getContentParts(streamId) ?? []; - const runSteps = this.jobStore.getRunSteps(streamId); + const aggregatedContent = (await this.jobStore.getContentParts(streamId)) ?? []; + const runSteps = await this.jobStore.getRunSteps(streamId); logger.debug(`[GenerationJobManager] getResumeState:`, { streamId, @@ -642,10 +772,12 @@ class GenerationJobManagerClass { return null; } + const aggregatedContent = (await this.jobStore.getContentParts(streamId)) ?? []; + return { active: jobData.status === 'running', status: jobData.status as t.GenerationJobStatus, - aggregatedContent: this.jobStore.getContentParts(streamId) ?? [], + aggregatedContent, createdAt: jobData.createdAt, }; } diff --git a/packages/api/src/stream/createStreamServices.ts b/packages/api/src/stream/createStreamServices.ts new file mode 100644 index 0000000000..6c8090c187 --- /dev/null +++ b/packages/api/src/stream/createStreamServices.ts @@ -0,0 +1,130 @@ +import type { Redis, Cluster } from 'ioredis'; +import { logger } from '@librechat/data-schemas'; +import type { IJobStore, IEventTransport } from './interfaces/IJobStore'; +import { InMemoryJobStore } from './implementations/InMemoryJobStore'; +import { InMemoryEventTransport } from './implementations/InMemoryEventTransport'; +import { RedisJobStore } from './implementations/RedisJobStore'; +import { RedisEventTransport } from './implementations/RedisEventTransport'; +import { cacheConfig } from '~/cache/cacheConfig'; +import { ioredisClient } from '~/cache/redisClients'; + +/** + * Configuration for stream services (optional overrides) + */ +export interface StreamServicesConfig { + /** + * Override Redis detection. If not provided, uses cacheConfig.USE_REDIS. + */ + useRedis?: boolean; + + /** + * Override Redis client. If not provided, uses ioredisClient from cache. + */ + redisClient?: Redis | Cluster | null; + + /** + * Dedicated Redis client for pub/sub subscribing. + * If not provided, will duplicate the main client. + */ + redisSubscriber?: Redis | Cluster | null; + + /** + * Options for in-memory job store + */ + inMemoryOptions?: { + ttlAfterComplete?: number; + maxJobs?: number; + }; +} + +/** + * Stream services result + */ +export interface StreamServices { + jobStore: IJobStore; + eventTransport: IEventTransport; + isRedis: boolean; +} + +/** + * Create stream services (job store + event transport). + * + * Automatically detects Redis from cacheConfig.USE_REDIS and uses + * the existing ioredisClient. Falls back to in-memory if Redis + * is not configured or not available. + * + * @example Auto-detect (uses cacheConfig) + * ```ts + * const services = createStreamServices(); + * // Uses Redis if USE_REDIS=true, otherwise in-memory + * ``` + * + * @example Force in-memory + * ```ts + * const services = createStreamServices({ useRedis: false }); + * ``` + */ +export function createStreamServices(config: StreamServicesConfig = {}): StreamServices { + // Use provided config or fall back to cache config + const useRedis = config.useRedis ?? cacheConfig.USE_REDIS; + const redisClient = config.redisClient ?? ioredisClient; + const { redisSubscriber, inMemoryOptions } = config; + + // Check if we should and can use Redis + if (useRedis && redisClient) { + try { + // For subscribing, we need a dedicated connection + // If subscriber not provided, duplicate the main client + let subscriber = redisSubscriber; + + if (!subscriber && 'duplicate' in redisClient) { + subscriber = (redisClient as Redis).duplicate(); + logger.info('[StreamServices] Duplicated Redis client for subscriber'); + } + + if (!subscriber) { + logger.warn('[StreamServices] No subscriber client available, falling back to in-memory'); + return createInMemoryServices(inMemoryOptions); + } + + const jobStore = new RedisJobStore(redisClient); + const eventTransport = new RedisEventTransport(redisClient, subscriber); + + logger.info('[StreamServices] Created Redis-backed stream services'); + + return { + jobStore, + eventTransport, + isRedis: true, + }; + } catch (err) { + logger.error( + '[StreamServices] Failed to create Redis services, falling back to in-memory:', + err, + ); + return createInMemoryServices(inMemoryOptions); + } + } + + return createInMemoryServices(inMemoryOptions); +} + +/** + * Create in-memory stream services + */ +function createInMemoryServices(options?: StreamServicesConfig['inMemoryOptions']): StreamServices { + const jobStore = new InMemoryJobStore({ + ttlAfterComplete: options?.ttlAfterComplete ?? 300000, // 5 minutes + maxJobs: options?.maxJobs ?? 1000, + }); + + const eventTransport = new InMemoryEventTransport(); + + logger.info('[StreamServices] Created in-memory stream services'); + + return { + jobStore, + eventTransport, + isRedis: false, + }; +} diff --git a/packages/api/src/stream/implementations/InMemoryJobStore.ts b/packages/api/src/stream/implementations/InMemoryJobStore.ts index e9391327d8..0e60d28010 100644 --- a/packages/api/src/stream/implementations/InMemoryJobStore.ts +++ b/packages/api/src/stream/implementations/InMemoryJobStore.ts @@ -212,7 +212,7 @@ export class InMemoryJobStore implements IJobStore { * Get content parts for a job. * Returns live content from stored reference. */ - getContentParts(streamId: string): Agents.MessageContentComplex[] | null { + async getContentParts(streamId: string): Promise { return this.contentState.get(streamId)?.contentParts ?? null; } @@ -220,7 +220,7 @@ export class InMemoryJobStore implements IJobStore { * Get run steps for a job from graph.contentData. * Uses WeakRef - may return empty if graph has been GC'd. */ - getRunSteps(streamId: string): Agents.RunStep[] { + async getRunSteps(streamId: string): Promise { const state = this.contentState.get(streamId); if (!state?.graphRef) { return []; diff --git a/packages/api/src/stream/implementations/RedisEventTransport.ts b/packages/api/src/stream/implementations/RedisEventTransport.ts new file mode 100644 index 0000000000..858e5865bc --- /dev/null +++ b/packages/api/src/stream/implementations/RedisEventTransport.ts @@ -0,0 +1,312 @@ +import type { Redis, Cluster } from 'ioredis'; +import { logger } from '@librechat/data-schemas'; +import type { IEventTransport } from '~/stream/interfaces/IJobStore'; + +/** + * Redis key prefixes for pub/sub channels + */ +const CHANNELS = { + /** Main event channel: stream:events:{streamId} */ + events: (streamId: string) => `stream:events:${streamId}`, +}; + +/** + * Event types for pub/sub messages + */ +const EventTypes = { + CHUNK: 'chunk', + DONE: 'done', + ERROR: 'error', +} as const; + +interface PubSubMessage { + type: (typeof EventTypes)[keyof typeof EventTypes]; + data?: unknown; + error?: string; +} + +/** + * Subscriber state for a stream + */ +interface StreamSubscribers { + count: number; + handlers: Map< + string, + { + onChunk: (event: unknown) => void; + onDone?: (event: unknown) => void; + onError?: (error: string) => void; + } + >; + allSubscribersLeftCallbacks: Array<() => void>; +} + +/** + * Redis Pub/Sub implementation of IEventTransport. + * Enables real-time event delivery across multiple instances. + * + * Architecture (inspired by https://upstash.com/blog/resumable-llm-streams): + * - Publisher: Emits events to Redis channel when chunks arrive + * - Subscriber: Listens to Redis channel and forwards to SSE clients + * - Decoupled: Generator and consumer don't need direct connection + * + * Note: Requires TWO Redis connections - one for publishing, one for subscribing. + * This is a Redis limitation: a client in subscribe mode can't publish. + * + * @example + * ```ts + * const transport = new RedisEventTransport(publisherClient, subscriberClient); + * transport.subscribe(streamId, { onChunk: (e) => res.write(e) }); + * transport.emitChunk(streamId, { text: 'Hello' }); + * ``` + */ +export class RedisEventTransport implements IEventTransport { + /** Redis client for publishing events */ + private publisher: Redis | Cluster; + /** Redis client for subscribing to events (separate connection required) */ + private subscriber: Redis | Cluster; + /** Track subscribers per stream */ + private streams = new Map(); + /** Track which channels we're subscribed to */ + private subscribedChannels = new Set(); + /** Counter for generating unique subscriber IDs */ + private subscriberIdCounter = 0; + + /** + * Create a new Redis event transport. + * + * @param publisher - Redis client for publishing (can be shared) + * @param subscriber - Redis client for subscribing (must be dedicated) + */ + constructor(publisher: Redis | Cluster, subscriber: Redis | Cluster) { + this.publisher = publisher; + this.subscriber = subscriber; + + // Set up message handler for all subscriptions + this.subscriber.on('message', (channel: string, message: string) => { + this.handleMessage(channel, message); + }); + } + + /** + * Handle incoming pub/sub message + */ + private handleMessage(channel: string, message: string): void { + // Extract streamId from channel name + const prefix = 'stream:events:'; + if (!channel.startsWith(prefix)) { + return; + } + const streamId = channel.slice(prefix.length); + + const streamState = this.streams.get(streamId); + if (!streamState) { + return; + } + + try { + const parsed = JSON.parse(message) as PubSubMessage; + + for (const [, handlers] of streamState.handlers) { + switch (parsed.type) { + case EventTypes.CHUNK: + handlers.onChunk(parsed.data); + break; + case EventTypes.DONE: + handlers.onDone?.(parsed.data); + break; + case EventTypes.ERROR: + handlers.onError?.(parsed.error ?? 'Unknown error'); + break; + } + } + } catch (err) { + logger.error(`[RedisEventTransport] Failed to parse message:`, err); + } + } + + /** + * Subscribe to events for a stream. + * + * On first subscriber for a stream, subscribes to the Redis channel. + * Returns unsubscribe function that cleans up when last subscriber leaves. + */ + subscribe( + streamId: string, + handlers: { + onChunk: (event: unknown) => void; + onDone?: (event: unknown) => void; + onError?: (error: string) => void; + }, + ): { unsubscribe: () => void } { + const channel = CHANNELS.events(streamId); + const subscriberId = `sub_${++this.subscriberIdCounter}`; + + // Initialize stream state if needed + if (!this.streams.has(streamId)) { + this.streams.set(streamId, { + count: 0, + handlers: new Map(), + allSubscribersLeftCallbacks: [], + }); + } + + const streamState = this.streams.get(streamId)!; + streamState.count++; + streamState.handlers.set(subscriberId, handlers); + + // Subscribe to Redis channel if this is first subscriber + if (!this.subscribedChannels.has(channel)) { + this.subscribedChannels.add(channel); + this.subscriber.subscribe(channel).catch((err) => { + logger.error(`[RedisEventTransport] Failed to subscribe to ${channel}:`, err); + }); + logger.debug(`[RedisEventTransport] Subscribed to channel: ${channel}`); + } + + // Return unsubscribe function + return { + unsubscribe: () => { + const state = this.streams.get(streamId); + if (!state) { + return; + } + + state.handlers.delete(subscriberId); + state.count--; + + // If last subscriber left, unsubscribe from Redis and notify + if (state.count === 0) { + this.subscriber.unsubscribe(channel).catch((err) => { + logger.error(`[RedisEventTransport] Failed to unsubscribe from ${channel}:`, err); + }); + this.subscribedChannels.delete(channel); + + // Call all-subscribers-left callbacks + for (const callback of state.allSubscribersLeftCallbacks) { + try { + callback(); + } catch (err) { + logger.error(`[RedisEventTransport] Error in allSubscribersLeft callback:`, err); + } + } + + this.streams.delete(streamId); + logger.debug(`[RedisEventTransport] All subscribers left ${streamId}`); + } + }, + }; + } + + /** + * Publish a chunk event to all subscribers across all instances. + */ + emitChunk(streamId: string, event: unknown): void { + const channel = CHANNELS.events(streamId); + const message: PubSubMessage = { type: EventTypes.CHUNK, data: event }; + + this.publisher.publish(channel, JSON.stringify(message)).catch((err) => { + logger.error(`[RedisEventTransport] Failed to publish chunk:`, err); + }); + } + + /** + * Publish a done event to all subscribers. + */ + emitDone(streamId: string, event: unknown): void { + const channel = CHANNELS.events(streamId); + const message: PubSubMessage = { type: EventTypes.DONE, data: event }; + + this.publisher.publish(channel, JSON.stringify(message)).catch((err) => { + logger.error(`[RedisEventTransport] Failed to publish done:`, err); + }); + } + + /** + * Publish an error event to all subscribers. + */ + emitError(streamId: string, error: string): void { + const channel = CHANNELS.events(streamId); + const message: PubSubMessage = { type: EventTypes.ERROR, error }; + + this.publisher.publish(channel, JSON.stringify(message)).catch((err) => { + logger.error(`[RedisEventTransport] Failed to publish error:`, err); + }); + } + + /** + * Get subscriber count for a stream (local instance only). + * + * Note: In a multi-instance setup, this only returns local subscriber count. + * For global count, would need to track in Redis (e.g., with a counter key). + */ + getSubscriberCount(streamId: string): number { + return this.streams.get(streamId)?.count ?? 0; + } + + /** + * Check if this is the first subscriber (local instance only). + */ + isFirstSubscriber(streamId: string): boolean { + return this.getSubscriberCount(streamId) === 1; + } + + /** + * Register callback for when all subscribers leave. + */ + onAllSubscribersLeft(streamId: string, callback: () => void): void { + const state = this.streams.get(streamId); + if (state) { + state.allSubscribersLeftCallbacks.push(callback); + } else { + // Create state just for the callback + this.streams.set(streamId, { + count: 0, + handlers: new Map(), + allSubscribersLeftCallbacks: [callback], + }); + } + } + + /** + * Cleanup resources for a specific stream. + */ + cleanup(streamId: string): void { + const channel = CHANNELS.events(streamId); + const state = this.streams.get(streamId); + + if (state) { + // Clear all handlers + state.handlers.clear(); + state.allSubscribersLeftCallbacks = []; + } + + // Unsubscribe from Redis channel + if (this.subscribedChannels.has(channel)) { + this.subscriber.unsubscribe(channel).catch((err) => { + logger.error(`[RedisEventTransport] Failed to cleanup ${channel}:`, err); + }); + this.subscribedChannels.delete(channel); + } + + this.streams.delete(streamId); + } + + /** + * Destroy all resources. + */ + destroy(): void { + // Unsubscribe from all channels + for (const channel of this.subscribedChannels) { + this.subscriber.unsubscribe(channel).catch(() => { + // Ignore errors during shutdown + }); + } + + this.subscribedChannels.clear(); + this.streams.clear(); + + // Note: Don't close Redis connections - they may be shared + logger.info('[RedisEventTransport] Destroyed'); + } +} diff --git a/packages/api/src/stream/implementations/RedisJobStore.ts b/packages/api/src/stream/implementations/RedisJobStore.ts index e42a3b2b79..112ab5dcea 100644 --- a/packages/api/src/stream/implementations/RedisJobStore.ts +++ b/packages/api/src/stream/implementations/RedisJobStore.ts @@ -262,24 +262,13 @@ export class RedisJobStore implements IJobStore { } /** - * For Redis, this returns null - caller should use getAggregatedContentAsync(). - * This sync method exists for interface compatibility with in-memory. - * - * Note: GenerationJobManager should check for null and call the async version. + * Get aggregated content from chunks. + * Reconstructs message content from Redis Streams on demand. */ - getContentParts(): Agents.MessageContentComplex[] | null { - // Redis can't return content synchronously - must use chunks - return null; - } - - /** - * Get aggregated content from chunks (async version for Redis). - * Called on client reconnection to reconstruct message content. - */ - async getAggregatedContentAsync( - streamId: string, - ): Promise { + async getContentParts(streamId: string): Promise { const chunks = await this.getChunks(streamId); + logger.debug(`[RedisJobStore] getContentParts: ${streamId} has ${chunks.length} chunks`); + if (chunks.length === 0) { return null; } @@ -319,25 +308,19 @@ export class RedisJobStore implements IJobStore { } /** - * For Redis, run steps must be fetched async. - * This sync method returns empty - caller should use getRunStepsAsync(). + * Get run steps from Redis. */ - getRunSteps(): Agents.RunStep[] { - // Redis can't return run steps synchronously - return []; - } - - /** - * Get run steps (async version for Redis). - */ - async getRunStepsAsync(streamId: string): Promise { + async getRunSteps(streamId: string): Promise { const key = KEYS.runSteps(streamId); const data = await this.redis.get(key); if (!data) { + logger.debug(`[RedisJobStore] getRunSteps: ${streamId} has no run steps`); return []; } try { - return JSON.parse(data); + const runSteps = JSON.parse(data); + logger.debug(`[RedisJobStore] getRunSteps: ${streamId} has ${runSteps.length} run steps`); + return runSteps; } catch { return []; } @@ -369,6 +352,8 @@ export class RedisJobStore implements IJobStore { */ async appendChunk(streamId: string, event: unknown): Promise { const key = KEYS.chunks(streamId); + const eventObj = event as { event?: string }; + logger.debug(`[RedisJobStore] appendChunk: ${streamId} event=${eventObj.event}`); await this.redis.xadd(key, '*', 'event', JSON.stringify(event)); } @@ -402,6 +387,197 @@ export class RedisJobStore implements IJobStore { await this.redis.set(key, JSON.stringify(runSteps), 'EX', TTL.running); } + // ===== Consumer Group Methods ===== + // These enable tracking which chunks each client has seen. + // Based on https://upstash.com/blog/resumable-llm-streams + + /** + * Create a consumer group for a stream. + * Used to track which chunks a client has already received. + * + * @param streamId - The stream identifier + * @param groupName - Unique name for the consumer group (e.g., session ID) + * @param startFrom - Where to start reading ('0' = from beginning, '$' = only new) + */ + async createConsumerGroup( + streamId: string, + groupName: string, + startFrom: '0' | '$' = '0', + ): Promise { + const key = KEYS.chunks(streamId); + try { + await this.redis.xgroup('CREATE', key, groupName, startFrom, 'MKSTREAM'); + logger.debug(`[RedisJobStore] Created consumer group ${groupName} for ${streamId}`); + } catch (err) { + // BUSYGROUP error means group already exists - that's fine + const error = err as Error; + if (!error.message?.includes('BUSYGROUP')) { + throw err; + } + } + } + + /** + * Read chunks from a consumer group (only unseen chunks). + * This is the key to the resumable stream pattern. + * + * @param streamId - The stream identifier + * @param groupName - Consumer group name + * @param consumerName - Name of the consumer within the group + * @param count - Maximum number of chunks to read (default: all available) + * @returns Array of { id, event } where id is the Redis stream entry ID + */ + async readChunksFromGroup( + streamId: string, + groupName: string, + consumerName: string = 'consumer-1', + count?: number, + ): Promise> { + const key = KEYS.chunks(streamId); + + try { + // XREADGROUP GROUP groupName consumerName [COUNT count] STREAMS key > + // The '>' means only read new messages not yet delivered to this consumer + let result; + if (count) { + result = await this.redis.xreadgroup( + 'GROUP', + groupName, + consumerName, + 'COUNT', + count, + 'STREAMS', + key, + '>', + ); + } else { + result = await this.redis.xreadgroup('GROUP', groupName, consumerName, 'STREAMS', key, '>'); + } + + if (!result || result.length === 0) { + return []; + } + + // Result format: [[streamKey, [[id, [field, value, ...]], ...]]] + const [, messages] = result[0] as [string, Array<[string, string[]]>]; + const chunks: Array<{ id: string; event: unknown }> = []; + + for (const [id, fields] of messages) { + const eventIdx = fields.indexOf('event'); + if (eventIdx >= 0 && eventIdx + 1 < fields.length) { + try { + chunks.push({ + id, + event: JSON.parse(fields[eventIdx + 1]), + }); + } catch { + // Skip malformed entries + } + } + } + + return chunks; + } catch (err) { + const error = err as Error; + // NOGROUP error means the group doesn't exist yet + if (error.message?.includes('NOGROUP')) { + return []; + } + throw err; + } + } + + /** + * Acknowledge that chunks have been processed. + * This tells Redis we've successfully delivered these chunks to the client. + * + * @param streamId - The stream identifier + * @param groupName - Consumer group name + * @param messageIds - Array of Redis stream entry IDs to acknowledge + */ + async acknowledgeChunks( + streamId: string, + groupName: string, + messageIds: string[], + ): Promise { + if (messageIds.length === 0) { + return; + } + + const key = KEYS.chunks(streamId); + await this.redis.xack(key, groupName, ...messageIds); + } + + /** + * Delete a consumer group. + * Called when a client disconnects and won't reconnect. + * + * @param streamId - The stream identifier + * @param groupName - Consumer group name to delete + */ + async deleteConsumerGroup(streamId: string, groupName: string): Promise { + const key = KEYS.chunks(streamId); + try { + await this.redis.xgroup('DESTROY', key, groupName); + logger.debug(`[RedisJobStore] Deleted consumer group ${groupName} for ${streamId}`); + } catch { + // Ignore errors - group may not exist + } + } + + /** + * Get pending chunks for a consumer (chunks delivered but not acknowledged). + * Useful for recovering from crashes. + * + * @param streamId - The stream identifier + * @param groupName - Consumer group name + * @param consumerName - Consumer name + */ + async getPendingChunks( + streamId: string, + groupName: string, + consumerName: string = 'consumer-1', + ): Promise> { + const key = KEYS.chunks(streamId); + + try { + // Read pending messages (delivered but not acked) by using '0' instead of '>' + const result = await this.redis.xreadgroup( + 'GROUP', + groupName, + consumerName, + 'STREAMS', + key, + '0', + ); + + if (!result || result.length === 0) { + return []; + } + + const [, messages] = result[0] as [string, Array<[string, string[]]>]; + const chunks: Array<{ id: string; event: unknown }> = []; + + for (const [id, fields] of messages) { + const eventIdx = fields.indexOf('event'); + if (eventIdx >= 0 && eventIdx + 1 < fields.length) { + try { + chunks.push({ + id, + event: JSON.parse(fields[eventIdx + 1]), + }); + } catch { + // Skip malformed entries + } + } + } + + return chunks; + } catch { + return []; + } + } + /** * Serialize job data for Redis hash storage. * Converts complex types to strings. diff --git a/packages/api/src/stream/implementations/index.ts b/packages/api/src/stream/implementations/index.ts index 945c59cf4c..6926938a46 100644 --- a/packages/api/src/stream/implementations/index.ts +++ b/packages/api/src/stream/implementations/index.ts @@ -1,3 +1,4 @@ export * from './InMemoryJobStore'; export * from './InMemoryEventTransport'; export * from './RedisJobStore'; +export * from './RedisEventTransport'; diff --git a/packages/api/src/stream/index.ts b/packages/api/src/stream/index.ts index c7ab2a07db..4e9bab324c 100644 --- a/packages/api/src/stream/index.ts +++ b/packages/api/src/stream/index.ts @@ -1,2 +1,22 @@ -export { GenerationJobManager, GenerationJobManagerClass } from './GenerationJobManager'; -export type { AbortResult, SerializableJobData, JobStatus } from './interfaces/IJobStore'; +export { + GenerationJobManager, + GenerationJobManagerClass, + type GenerationJobManagerOptions, +} from './GenerationJobManager'; + +export type { + AbortResult, + SerializableJobData, + JobStatus, + IJobStore, + IEventTransport, +} from './interfaces/IJobStore'; + +export { createStreamServices } from './createStreamServices'; +export type { StreamServicesConfig, StreamServices } from './createStreamServices'; + +// Implementations (for advanced use cases) +export { InMemoryJobStore } from './implementations/InMemoryJobStore'; +export { InMemoryEventTransport } from './implementations/InMemoryEventTransport'; +export { RedisJobStore } from './implementations/RedisJobStore'; +export { RedisEventTransport } from './implementations/RedisEventTransport'; diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index ef4615c3ea..186c2525ba 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -159,7 +159,7 @@ export interface IJobStore { * @param streamId - The stream identifier * @returns Content parts or null if not available */ - getContentParts(streamId: string): Agents.MessageContentComplex[] | null; + getContentParts(streamId: string): Promise; /** * Get run steps for a job (for resume state). @@ -170,7 +170,7 @@ export interface IJobStore { * @param streamId - The stream identifier * @returns Run steps or empty array */ - getRunSteps(streamId: string): Agents.RunStep[]; + getRunSteps(streamId: string): Promise; /** * Append a streaming chunk for later reconstruction. @@ -190,6 +190,16 @@ export interface IJobStore { * @param streamId - The stream identifier */ clearContentState(streamId: string): void; + + /** + * Save run steps to persistent storage. + * In-memory: No-op (run steps accessed via graph reference) + * Redis: Persists for resume across instances + * + * @param streamId - The stream identifier + * @param runSteps - Run steps to save + */ + saveRunSteps?(streamId: string, runSteps: Agents.RunStep[]): Promise; } /**