diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index a154435928..8a45c5445e 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -1,8 +1,7 @@ import { logger } from '@librechat/data-schemas'; -import type { Agents } from 'librechat-data-provider'; import type { StandardGraph } from '@librechat/agents'; +import type { Agents } from 'librechat-data-provider'; import type { - IContentStateManager, SerializableJobData, IEventTransport, AbortResult, @@ -10,7 +9,6 @@ import type { } from './interfaces/IJobStore'; import type * as t from '~/types'; import { InMemoryEventTransport } from './implementations/InMemoryEventTransport'; -import { InMemoryContentState } from './implementations/InMemoryContentState'; import { InMemoryJobStore } from './implementations/InMemoryJobStore'; /** @@ -40,10 +38,13 @@ interface RuntimeJobState { /** * Manages generation jobs for resumable LLM streams. * - * Architecture: Composes three pluggable services via dependency injection: - * - jobStore: Serializable job metadata (InMemory → Redis/KV for horizontal scaling) + * Architecture: Composes two pluggable services via dependency injection: + * - jobStore: Job metadata + content state (InMemory → Redis for horizontal scaling) * - eventTransport: Pub/sub events (InMemory → Redis Pub/Sub for horizontal scaling) - * - contentState: Volatile content refs with WeakRef (always in-memory, not shared) + * + * Content state is tied to jobs: + * - In-memory: jobStore holds WeakRef to graph for live content/run steps access + * - Redis: jobStore persists chunks, reconstructs content on demand * * All storage methods are async to support both in-memory and external stores (Redis, etc.). * @@ -52,17 +53,14 @@ interface RuntimeJobState { * const manager = new GenerationJobManagerClass({ * jobStore: new RedisJobStore(redisClient), * eventTransport: new RedisPubSubTransport(redisClient), - * contentState: new InMemoryContentState(), // Always local * }); * ``` */ class GenerationJobManagerClass { - /** Job metadata storage - swappable for Redis, KV store, etc. */ + /** Job metadata + content state storage - swappable for Redis, etc. */ private jobStore: IJobStore; /** Event pub/sub transport - swappable for Redis Pub/Sub, etc. */ private eventTransport: IEventTransport; - /** Volatile content state with WeakRef - always in-memory per instance */ - private contentState: IContentStateManager; /** Runtime state - always in-memory, not serializable */ private runtimeState = new Map(); @@ -72,7 +70,6 @@ class GenerationJobManagerClass { constructor() { this.jobStore = new InMemoryJobStore({ ttlAfterComplete: 300000, maxJobs: 1000 }); this.eventTransport = new InMemoryEventTransport(); - this.contentState = new InMemoryContentState(); } /** @@ -149,7 +146,7 @@ class GenerationJobManagerClass { if (currentRuntime) { currentRuntime.syncSent = false; // Call registered handlers (from job.emitter.on('allSubscribersLeft', ...)) - const content = this.contentState.getContentParts(streamId) ?? []; + const content = this.jobStore.getContentParts(streamId) ?? []; if (currentRuntime.allSubscribersLeftHandlers) { for (const handler of currentRuntime.allSubscribersLeftHandlers) { try { @@ -286,7 +283,7 @@ class GenerationJobManagerClass { }); // Clear content state - this.contentState.clearContentState(streamId); + this.jobStore.clearContentState(streamId); logger.debug(`[GenerationJobManager] Job completed: ${streamId}`); } @@ -314,7 +311,7 @@ class GenerationJobManagerClass { }); // Get content and extract text - const content = this.contentState.getContentParts(streamId) ?? []; + const content = this.jobStore.getContentParts(streamId) ?? []; const text = this.extractTextFromContent(content); // Create final event for abort @@ -352,7 +349,7 @@ class GenerationJobManagerClass { } this.eventTransport.emitDone(streamId, abortFinalEvent); - this.contentState.clearContentState(streamId); + this.jobStore.clearContentState(streamId); logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`); @@ -532,7 +529,7 @@ class GenerationJobManagerClass { if (!this.runtimeState.has(streamId)) { return; } - this.contentState.setContentParts(streamId, contentParts); + this.jobStore.setContentParts(streamId, contentParts); logger.debug(`[GenerationJobManager] Set contentParts for ${streamId}`); } @@ -544,7 +541,7 @@ class GenerationJobManagerClass { if (!this.runtimeState.has(streamId)) { return; } - this.contentState.setGraph(streamId, graph); + this.jobStore.setGraph(streamId, graph); logger.debug(`[GenerationJobManager] Set graph reference for ${streamId}`); } @@ -557,8 +554,8 @@ class GenerationJobManagerClass { return null; } - const aggregatedContent = this.contentState.getContentParts(streamId) ?? []; - const runSteps = this.contentState.getRunSteps(streamId); + const aggregatedContent = this.jobStore.getContentParts(streamId) ?? []; + const runSteps = this.jobStore.getRunSteps(streamId); logger.debug(`[GenerationJobManager] getResumeState:`, { streamId, @@ -621,7 +618,7 @@ class GenerationJobManagerClass { for (const streamId of this.runtimeState.keys()) { if (!(await this.jobStore.hasJob(streamId))) { this.runtimeState.delete(streamId); - this.contentState.clearContentState(streamId); + this.jobStore.clearContentState(streamId); this.eventTransport.cleanup(streamId); } } @@ -648,7 +645,7 @@ class GenerationJobManagerClass { return { active: jobData.status === 'running', status: jobData.status as t.GenerationJobStatus, - aggregatedContent: this.contentState.getContentParts(streamId) ?? [], + aggregatedContent: this.jobStore.getContentParts(streamId) ?? [], createdAt: jobData.createdAt, }; } @@ -684,7 +681,6 @@ class GenerationJobManagerClass { await this.jobStore.destroy(); this.eventTransport.destroy(); - this.contentState.destroy(); this.runtimeState.clear(); logger.debug('[GenerationJobManager] Destroyed'); diff --git a/packages/api/src/stream/implementations/InMemoryContentState.ts b/packages/api/src/stream/implementations/InMemoryContentState.ts deleted file mode 100644 index 29852458ab..0000000000 --- a/packages/api/src/stream/implementations/InMemoryContentState.ts +++ /dev/null @@ -1,107 +0,0 @@ -import type { Agents } from 'librechat-data-provider'; -import type { StandardGraph } from '@librechat/agents'; -import type { IContentStateManager } from '../interfaces/IJobStore'; - -/** - * Content state entry - volatile, in-memory only. - * Uses WeakRef to allow garbage collection of graph when no longer needed. - */ -interface ContentState { - contentParts: Agents.MessageContentComplex[]; - graphRef: WeakRef | null; -} - -/** - * In-memory content state manager. - * Manages volatile references to graph content that should NOT be persisted. - * Uses WeakRef for graph to allow garbage collection. - */ -export class InMemoryContentState implements IContentStateManager { - private state = new Map(); - - /** Cleanup interval for orphaned entries */ - private cleanupInterval: NodeJS.Timeout | null = null; - - constructor() { - // Cleanup orphaned content state every 5 minutes - this.cleanupInterval = setInterval(() => { - this.cleanupOrphaned(); - }, 300000); - - if (this.cleanupInterval.unref) { - this.cleanupInterval.unref(); - } - } - - setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void { - const existing = this.state.get(streamId); - if (existing) { - existing.contentParts = contentParts; - } else { - this.state.set(streamId, { contentParts, graphRef: null }); - } - } - - getContentParts(streamId: string): Agents.MessageContentComplex[] | null { - return this.state.get(streamId)?.contentParts ?? null; - } - - setGraph(streamId: string, graph: StandardGraph): void { - const existing = this.state.get(streamId); - if (existing) { - existing.graphRef = new WeakRef(graph); - } else { - this.state.set(streamId, { - contentParts: [], - graphRef: new WeakRef(graph), - }); - } - } - - getRunSteps(streamId: string): Agents.RunStep[] { - const state = this.state.get(streamId); - if (!state?.graphRef) { - return []; - } - - // Dereference WeakRef - may return undefined if GC'd - const graph = state.graphRef.deref(); - return graph?.contentData ?? []; - } - - clearContentState(streamId: string): void { - this.state.delete(streamId); - } - - /** - * Cleanup entries where graph has been garbage collected. - * These are orphaned states that are no longer useful. - */ - private cleanupOrphaned(): void { - const toDelete: string[] = []; - - for (const [streamId, state] of this.state) { - // If graphRef exists but has been GC'd, this state is orphaned - if (state.graphRef && !state.graphRef.deref()) { - toDelete.push(streamId); - } - } - - for (const id of toDelete) { - this.state.delete(id); - } - } - - /** Get count of tracked streams (for monitoring) */ - getStateCount(): number { - return this.state.size; - } - - destroy(): void { - if (this.cleanupInterval) { - clearInterval(this.cleanupInterval); - this.cleanupInterval = null; - } - this.state.clear(); - } -} diff --git a/packages/api/src/stream/implementations/InMemoryJobStore.ts b/packages/api/src/stream/implementations/InMemoryJobStore.ts index 10d9e18df2..e9391327d8 100644 --- a/packages/api/src/stream/implementations/InMemoryJobStore.ts +++ b/packages/api/src/stream/implementations/InMemoryJobStore.ts @@ -1,13 +1,29 @@ import { logger } from '@librechat/data-schemas'; -import type { IJobStore, SerializableJobData, JobStatus } from '../interfaces/IJobStore'; +import type { StandardGraph } from '@librechat/agents'; +import type { Agents } from 'librechat-data-provider'; +import type { IJobStore, SerializableJobData, JobStatus } from '~/stream/interfaces/IJobStore'; + +/** + * Content state for a job - volatile, in-memory only. + * Uses WeakRef to allow garbage collection of graph when no longer needed. + */ +interface ContentState { + contentParts: Agents.MessageContentComplex[]; + graphRef: WeakRef | null; +} /** * In-memory implementation of IJobStore. * Suitable for single-instance deployments. * For horizontal scaling, use RedisJobStore. + * + * Content state is tied to jobs: + * - Uses WeakRef to graph for live access to contentParts and contentData (run steps) + * - No chunk persistence needed - same instance handles generation and reconnects */ export class InMemoryJobStore implements IJobStore { private jobs = new Map(); + private contentState = new Map(); private cleanupInterval: NodeJS.Timeout | null = null; /** Time to keep completed jobs before cleanup (5 minutes) */ @@ -79,6 +95,7 @@ export class InMemoryJobStore implements IJobStore { async deleteJob(streamId: string): Promise { this.jobs.delete(streamId); + this.contentState.delete(streamId); logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`); } @@ -157,6 +174,74 @@ export class InMemoryJobStore implements IJobStore { this.cleanupInterval = null; } this.jobs.clear(); + this.contentState.clear(); logger.debug('[InMemoryJobStore] Destroyed'); } + + // ===== Content State Methods ===== + + /** + * Set the graph reference for a job. + * Uses WeakRef to allow garbage collection when graph is no longer needed. + */ + setGraph(streamId: string, graph: StandardGraph): void { + const existing = this.contentState.get(streamId); + if (existing) { + existing.graphRef = new WeakRef(graph); + } else { + this.contentState.set(streamId, { + contentParts: [], + graphRef: new WeakRef(graph), + }); + } + } + + /** + * Set content parts reference for a job. + */ + setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void { + const existing = this.contentState.get(streamId); + if (existing) { + existing.contentParts = contentParts; + } else { + this.contentState.set(streamId, { contentParts, graphRef: null }); + } + } + + /** + * Get content parts for a job. + * Returns live content from stored reference. + */ + getContentParts(streamId: string): Agents.MessageContentComplex[] | null { + return this.contentState.get(streamId)?.contentParts ?? null; + } + + /** + * Get run steps for a job from graph.contentData. + * Uses WeakRef - may return empty if graph has been GC'd. + */ + getRunSteps(streamId: string): Agents.RunStep[] { + const state = this.contentState.get(streamId); + if (!state?.graphRef) { + return []; + } + + // Dereference WeakRef - may return undefined if GC'd + const graph = state.graphRef.deref(); + return graph?.contentData ?? []; + } + + /** + * No-op for in-memory - content available via graph reference. + */ + async appendChunk(): Promise { + // No-op: content available via graph reference + } + + /** + * Clear content state for a job. + */ + clearContentState(streamId: string): void { + this.contentState.delete(streamId); + } } diff --git a/packages/api/src/stream/implementations/RedisJobStore.ts b/packages/api/src/stream/implementations/RedisJobStore.ts new file mode 100644 index 0000000000..e42a3b2b79 --- /dev/null +++ b/packages/api/src/stream/implementations/RedisJobStore.ts @@ -0,0 +1,452 @@ +import { logger } from '@librechat/data-schemas'; +import { createContentAggregator } from '@librechat/agents'; +import type { Agents } from 'librechat-data-provider'; +import type { Redis, Cluster } from 'ioredis'; +import type { IJobStore, SerializableJobData, JobStatus } from '~/stream/interfaces/IJobStore'; + +/** + * Key prefixes for Redis storage. + * All keys include the streamId for easy cleanup. + * Note: streamId === conversationId, so no separate mapping needed. + */ +const KEYS = { + /** Job metadata: stream:job:{streamId} */ + job: (streamId: string) => `stream:job:${streamId}`, + /** Chunk stream (Redis Streams): stream:chunks:{streamId} */ + chunks: (streamId: string) => `stream:chunks:${streamId}`, + /** Run steps: stream:runsteps:{streamId} */ + runSteps: (streamId: string) => `stream:runsteps:${streamId}`, + /** Running jobs set for cleanup */ + runningJobs: 'stream:running', +}; + +/** + * Default TTL values in seconds + */ +const TTL = { + /** TTL for completed jobs (5 minutes) */ + completed: 300, + /** TTL for running jobs (30 minutes - failsafe) */ + running: 1800, + /** TTL for chunks stream (5 minutes after completion) */ + chunks: 300, + /** TTL for run steps (5 minutes after completion) */ + runSteps: 300, +}; + +/** + * Redis implementation of IJobStore. + * Enables horizontal scaling with multi-instance deployments. + * + * Storage strategy: + * - Job metadata: Redis Hash (fast field access) + * - Chunks: Redis Streams (append-only, efficient for streaming) + * - Run steps: Redis String (JSON serialized) + * + * Note: streamId === conversationId, so getJob(conversationId) works directly. + * + * @example + * ```ts + * import { ioredisClient } from '~/cache'; + * const store = new RedisJobStore(ioredisClient); + * await store.initialize(); + * ``` + */ +export class RedisJobStore implements IJobStore { + private redis: Redis | Cluster; + private cleanupInterval: NodeJS.Timeout | null = null; + + /** Cleanup interval in ms (1 minute) */ + private cleanupIntervalMs = 60000; + + constructor(redis: Redis | Cluster) { + this.redis = redis; + } + + async initialize(): Promise { + if (this.cleanupInterval) { + return; + } + + // Start periodic cleanup + this.cleanupInterval = setInterval(() => { + this.cleanup().catch((err) => { + logger.error('[RedisJobStore] Cleanup error:', err); + }); + }, this.cleanupIntervalMs); + + if (this.cleanupInterval.unref) { + this.cleanupInterval.unref(); + } + + logger.info('[RedisJobStore] Initialized with cleanup interval'); + } + + async createJob( + streamId: string, + userId: string, + conversationId?: string, + ): Promise { + const job: SerializableJobData = { + streamId, + userId, + status: 'running', + createdAt: Date.now(), + conversationId, + syncSent: false, + }; + + const key = KEYS.job(streamId); + const pipeline = this.redis.pipeline(); + + // Store job as hash + pipeline.hmset(key, this.serializeJob(job)); + pipeline.expire(key, TTL.running); + + // Add to running jobs set + pipeline.sadd(KEYS.runningJobs, streamId); + + await pipeline.exec(); + + logger.debug(`[RedisJobStore] Created job: ${streamId}`); + return job; + } + + async getJob(streamId: string): Promise { + const data = await this.redis.hgetall(KEYS.job(streamId)); + if (!data || Object.keys(data).length === 0) { + return null; + } + return this.deserializeJob(data); + } + + async updateJob(streamId: string, updates: Partial): Promise { + const key = KEYS.job(streamId); + const exists = await this.redis.exists(key); + if (!exists) { + return; + } + + const serialized = this.serializeJob(updates as SerializableJobData); + if (Object.keys(serialized).length === 0) { + return; + } + + await this.redis.hmset(key, serialized); + + // If status changed to complete/error/aborted, update TTL and remove from running set + if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) { + const pipeline = this.redis.pipeline(); + pipeline.expire(key, TTL.completed); + pipeline.srem(KEYS.runningJobs, streamId); + + // Also set TTL on related keys + pipeline.expire(KEYS.chunks(streamId), TTL.chunks); + pipeline.expire(KEYS.runSteps(streamId), TTL.runSteps); + + await pipeline.exec(); + } + } + + async deleteJob(streamId: string): Promise { + const pipeline = this.redis.pipeline(); + pipeline.del(KEYS.job(streamId)); + pipeline.del(KEYS.chunks(streamId)); + pipeline.del(KEYS.runSteps(streamId)); + pipeline.srem(KEYS.runningJobs, streamId); + await pipeline.exec(); + logger.debug(`[RedisJobStore] Deleted job: ${streamId}`); + } + + async hasJob(streamId: string): Promise { + const exists = await this.redis.exists(KEYS.job(streamId)); + return exists === 1; + } + + async getRunningJobs(): Promise { + const streamIds = await this.redis.smembers(KEYS.runningJobs); + if (streamIds.length === 0) { + return []; + } + + const jobs: SerializableJobData[] = []; + for (const streamId of streamIds) { + const job = await this.getJob(streamId); + if (job && job.status === 'running') { + jobs.push(job); + } + } + return jobs; + } + + async cleanup(): Promise { + const now = Date.now(); + const streamIds = await this.redis.smembers(KEYS.runningJobs); + let cleaned = 0; + + for (const streamId of streamIds) { + const job = await this.getJob(streamId); + + // Job no longer exists (TTL expired) - remove from set + if (!job) { + await this.redis.srem(KEYS.runningJobs, streamId); + cleaned++; + continue; + } + + // Job completed but still in running set (shouldn't happen, but handle it) + if (job.status !== 'running') { + await this.redis.srem(KEYS.runningJobs, streamId); + cleaned++; + continue; + } + + // Stale running job (failsafe - running for > 30 minutes) + if (now - job.createdAt > TTL.running * 1000) { + logger.warn(`[RedisJobStore] Cleaning up stale job: ${streamId}`); + await this.deleteJob(streamId); + cleaned++; + } + } + + if (cleaned > 0) { + logger.debug(`[RedisJobStore] Cleaned up ${cleaned} jobs`); + } + + return cleaned; + } + + async getJobCount(): Promise { + // This is approximate - counts jobs in running set + scans for job keys + // For exact count, would need to scan all job:* keys + const runningCount = await this.redis.scard(KEYS.runningJobs); + return runningCount; + } + + async getJobCountByStatus(status: JobStatus): Promise { + if (status === 'running') { + return this.redis.scard(KEYS.runningJobs); + } + + // For other statuses, we'd need to scan - return 0 for now + // In production, consider maintaining separate sets per status if needed + return 0; + } + + async destroy(): Promise { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + } + // Don't close the Redis connection - it's shared + logger.info('[RedisJobStore] Destroyed'); + } + + // ===== Content State Methods ===== + // For Redis, graph/contentParts are NOT stored locally. + // Content is reconstructed from chunks on demand. + + /** + * No-op for Redis - graph can't be serialized/transferred. + * Content is reconstructed from chunks instead. + */ + setGraph(): void { + // No-op: Redis uses chunks for content reconstruction + } + + /** + * No-op for Redis - content is built from chunks. + */ + setContentParts(): void { + // No-op: Redis uses chunks for content reconstruction + } + + /** + * For Redis, this returns null - caller should use getAggregatedContentAsync(). + * This sync method exists for interface compatibility with in-memory. + * + * Note: GenerationJobManager should check for null and call the async version. + */ + getContentParts(): Agents.MessageContentComplex[] | null { + // Redis can't return content synchronously - must use chunks + return null; + } + + /** + * Get aggregated content from chunks (async version for Redis). + * Called on client reconnection to reconstruct message content. + */ + async getAggregatedContentAsync( + streamId: string, + ): Promise { + const chunks = await this.getChunks(streamId); + if (chunks.length === 0) { + return null; + } + + // Use the same content aggregator as live streaming + const { contentParts, aggregateContent } = createContentAggregator(); + + // Valid event types for content aggregation + const validEvents = new Set([ + 'on_run_step', + 'on_message_delta', + 'on_reasoning_delta', + 'on_run_step_delta', + 'on_run_step_completed', + 'on_agent_update', + ]); + + for (const chunk of chunks) { + const event = chunk as { event?: string; data?: unknown }; + if (!event.event || !event.data || !validEvents.has(event.event)) { + continue; + } + + // Pass event string directly - GraphEvents values are lowercase strings + // eslint-disable-next-line @typescript-eslint/no-explicit-any + aggregateContent({ event: event.event as any, data: event.data as any }); + } + + // Filter out undefined entries + const filtered: Agents.MessageContentComplex[] = []; + for (const part of contentParts) { + if (part !== undefined) { + filtered.push(part); + } + } + return filtered; + } + + /** + * For Redis, run steps must be fetched async. + * This sync method returns empty - caller should use getRunStepsAsync(). + */ + getRunSteps(): Agents.RunStep[] { + // Redis can't return run steps synchronously + return []; + } + + /** + * Get run steps (async version for Redis). + */ + async getRunStepsAsync(streamId: string): Promise { + const key = KEYS.runSteps(streamId); + const data = await this.redis.get(key); + if (!data) { + return []; + } + try { + return JSON.parse(data); + } catch { + return []; + } + } + + /** + * Clear content state for a job. + */ + clearContentState(streamId: string): void { + // Fire and forget - async cleanup + this.clearContentStateAsync(streamId).catch((err) => { + logger.error(`[RedisJobStore] Failed to clear content state for ${streamId}:`, err); + }); + } + + /** + * Clear content state async. + */ + private async clearContentStateAsync(streamId: string): Promise { + const pipeline = this.redis.pipeline(); + pipeline.del(KEYS.chunks(streamId)); + pipeline.del(KEYS.runSteps(streamId)); + await pipeline.exec(); + } + + /** + * Append a streaming chunk to Redis Stream. + * Uses XADD for efficient append-only storage. + */ + async appendChunk(streamId: string, event: unknown): Promise { + const key = KEYS.chunks(streamId); + await this.redis.xadd(key, '*', 'event', JSON.stringify(event)); + } + + /** + * Get all chunks from Redis Stream. + */ + private async getChunks(streamId: string): Promise { + const key = KEYS.chunks(streamId); + const entries = await this.redis.xrange(key, '-', '+'); + + return entries + .map(([, fields]) => { + const eventIdx = fields.indexOf('event'); + if (eventIdx >= 0 && eventIdx + 1 < fields.length) { + try { + return JSON.parse(fields[eventIdx + 1]); + } catch { + return null; + } + } + return null; + }) + .filter(Boolean); + } + + /** + * Save run steps for resume state. + */ + async saveRunSteps(streamId: string, runSteps: Agents.RunStep[]): Promise { + const key = KEYS.runSteps(streamId); + await this.redis.set(key, JSON.stringify(runSteps), 'EX', TTL.running); + } + + /** + * Serialize job data for Redis hash storage. + * Converts complex types to strings. + */ + private serializeJob(job: Partial): Record { + const result: Record = {}; + + for (const [key, value] of Object.entries(job)) { + if (value === undefined) { + continue; + } + + if (typeof value === 'object') { + result[key] = JSON.stringify(value); + } else if (typeof value === 'boolean') { + result[key] = value ? '1' : '0'; + } else { + result[key] = String(value); + } + } + + return result; + } + + /** + * Deserialize job data from Redis hash. + */ + private deserializeJob(data: Record): SerializableJobData { + return { + streamId: data.streamId, + userId: data.userId, + status: data.status as JobStatus, + createdAt: parseInt(data.createdAt, 10), + completedAt: data.completedAt ? parseInt(data.completedAt, 10) : undefined, + conversationId: data.conversationId || undefined, + error: data.error || undefined, + userMessage: data.userMessage ? JSON.parse(data.userMessage) : undefined, + responseMessageId: data.responseMessageId || undefined, + sender: data.sender || undefined, + syncSent: data.syncSent === '1', + finalEvent: data.finalEvent || undefined, + endpoint: data.endpoint || undefined, + iconURL: data.iconURL || undefined, + model: data.model || undefined, + promptTokens: data.promptTokens ? parseInt(data.promptTokens, 10) : undefined, + }; + } +} diff --git a/packages/api/src/stream/implementations/index.ts b/packages/api/src/stream/implementations/index.ts index 4060943e69..945c59cf4c 100644 --- a/packages/api/src/stream/implementations/index.ts +++ b/packages/api/src/stream/implementations/index.ts @@ -1,3 +1,3 @@ export * from './InMemoryJobStore'; -export * from './InMemoryContentState'; export * from './InMemoryEventTransport'; +export * from './RedisJobStore'; diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index d66db06039..ef4615c3ea 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -77,6 +77,12 @@ export interface ResumeState { /** * Interface for job storage backend. * Implementations can use in-memory Map, Redis, KV store, etc. + * + * Content state is tied to jobs: + * - In-memory: Holds WeakRef to graph for live content/run steps access + * - Redis: Persists chunks, reconstructs content on reconnect + * + * This consolidates job metadata + content state into a single interface. */ export interface IJobStore { /** Initialize the store (e.g., connect to Redis, start cleanup intervals) */ @@ -115,6 +121,75 @@ export interface IJobStore { /** Destroy the store and release resources */ destroy(): Promise; + + // ===== Content State Methods ===== + // These methods manage volatile content state tied to each job. + // In-memory: Uses WeakRef to graph for live access + // Redis: Persists chunks and reconstructs on demand + + /** + * Set the graph reference for a job (in-memory only). + * The graph provides live access to contentParts and contentData (run steps). + * + * In-memory: Stores WeakRef to graph + * Redis: No-op (graph not transferable, uses chunks instead) + * + * @param streamId - The stream identifier + * @param graph - The StandardGraph instance + */ + setGraph(streamId: string, graph: StandardGraph): void; + + /** + * Set content parts reference for a job. + * + * In-memory: Stores direct reference to content array + * Redis: No-op (content built from chunks) + * + * @param streamId - The stream identifier + * @param contentParts - The content parts array + */ + setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void; + + /** + * Get aggregated content for a job. + * + * In-memory: Returns live content from graph.contentParts or stored reference + * Redis: Reconstructs from stored chunks + * + * @param streamId - The stream identifier + * @returns Content parts or null if not available + */ + getContentParts(streamId: string): Agents.MessageContentComplex[] | null; + + /** + * Get run steps for a job (for resume state). + * + * In-memory: Returns live run steps from graph.contentData + * Redis: Fetches from persistent storage + * + * @param streamId - The stream identifier + * @returns Run steps or empty array + */ + getRunSteps(streamId: string): Agents.RunStep[]; + + /** + * Append a streaming chunk for later reconstruction. + * + * In-memory: No-op (content available via graph reference) + * Redis: Uses XADD for append-only log efficiency + * + * @param streamId - The stream identifier + * @param event - The SSE event to append + */ + appendChunk(streamId: string, event: unknown): Promise; + + /** + * Clear all content state for a job. + * Called on job completion/cleanup. + * + * @param streamId - The stream identifier + */ + clearContentState(streamId: string): void; } /** @@ -156,28 +231,3 @@ export interface IEventTransport { /** Destroy all transport resources */ destroy(): void; } - -/** - * Interface for content state management. - * Separates volatile content state from persistent job data. - * In-memory only - not persisted to external storage. - */ -export interface IContentStateManager { - /** Set content parts reference (in-memory only) */ - setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void; - - /** Get content parts */ - getContentParts(streamId: string): Agents.MessageContentComplex[] | null; - - /** Set graph reference for run steps */ - setGraph(streamId: string, graph: StandardGraph): void; - - /** Get run steps from graph */ - getRunSteps(streamId: string): Agents.RunStep[]; - - /** Clear content state for a job */ - clearContentState(streamId: string): void; - - /** Destroy all content state resources */ - destroy(): void; -}