diff --git a/packages/api/src/stream/implementations/RedisJobStore.ts b/packages/api/src/stream/implementations/RedisJobStore.ts index 7997fb15ee..86dccf3ab2 100644 --- a/packages/api/src/stream/implementations/RedisJobStore.ts +++ b/packages/api/src/stream/implementations/RedisJobStore.ts @@ -1,8 +1,9 @@ import { logger } from '@librechat/data-schemas'; import { createContentAggregator } from '@librechat/agents'; +import type { IJobStore, SerializableJobData, JobStatus } from '~/stream/interfaces/IJobStore'; +import type { StandardGraph } from '@librechat/agents'; import type { Agents } from 'librechat-data-provider'; import type { Redis, Cluster } from 'ioredis'; -import type { IJobStore, SerializableJobData, JobStatus } from '~/stream/interfaces/IJobStore'; /** * Key prefixes for Redis storage. @@ -72,6 +73,13 @@ export class RedisJobStore implements IJobStore { private cleanupInterval: NodeJS.Timeout | null = null; private ttl: typeof DEFAULT_TTL; + /** + * Local cache for graph references on THIS instance. + * Enables fast reconnects when client returns to the same server. + * Uses WeakRef to allow garbage collection when graph is no longer needed. + */ + private localGraphCache = new Map>(); + /** Cleanup interval in ms (1 minute) */ private cleanupIntervalMs = 60000; @@ -180,6 +188,9 @@ export class RedisJobStore implements IJobStore { } async deleteJob(streamId: string): Promise { + // Clear local cache + this.localGraphCache.delete(streamId); + const pipeline = this.redis.pipeline(); pipeline.del(KEYS.job(streamId)); pipeline.del(KEYS.chunks(streamId)); @@ -215,12 +226,20 @@ export class RedisJobStore implements IJobStore { const streamIds = await this.redis.smembers(KEYS.runningJobs); let cleaned = 0; + // Clean up stale local graph cache entries (WeakRefs that were collected) + for (const [streamId, graphRef] of this.localGraphCache) { + if (!graphRef.deref()) { + this.localGraphCache.delete(streamId); + } + } + for (const streamId of streamIds) { const job = await this.getJob(streamId); // Job no longer exists (TTL expired) - remove from set if (!job) { await this.redis.srem(KEYS.runningJobs, streamId); + this.localGraphCache.delete(streamId); cleaned++; continue; } @@ -228,6 +247,7 @@ export class RedisJobStore implements IJobStore { // Job completed but still in running set (shouldn't happen, but handle it) if (job.status !== 'running') { await this.redis.srem(KEYS.runningJobs, streamId); + this.localGraphCache.delete(streamId); cleaned++; continue; } @@ -269,20 +289,26 @@ export class RedisJobStore implements IJobStore { clearInterval(this.cleanupInterval); this.cleanupInterval = null; } + // Clear local cache + this.localGraphCache.clear(); // Don't close the Redis connection - it's shared logger.info('[RedisJobStore] Destroyed'); } // ===== Content State Methods ===== - // For Redis, graph/contentParts are NOT stored locally. - // Content is reconstructed from chunks on demand. + // For Redis, content is primarily reconstructed from chunks. + // However, we keep a LOCAL graph cache for fast same-instance reconnects. /** - * No-op for Redis - graph can't be serialized/transferred. - * Content is reconstructed from chunks instead. + * Store graph reference in local cache. + * This enables fast reconnects when client returns to the same instance. + * Falls back to Redis chunk reconstruction for cross-instance reconnects. + * + * @param streamId - The stream identifier + * @param graph - The graph instance (stored as WeakRef) */ - setGraph(): void { - // No-op: Redis uses chunks for content reconstruction + setGraph(streamId: string, graph: StandardGraph): void { + this.localGraphCache.set(streamId, new WeakRef(graph)); } /** @@ -293,10 +319,32 @@ export class RedisJobStore implements IJobStore { } /** - * Get aggregated content from chunks. - * Reconstructs message content from Redis Streams on demand. + * Get aggregated content - tries local cache first, falls back to Redis reconstruction. + * + * Optimization: If this instance has the live graph (same-instance reconnect), + * we return the content directly without Redis round-trip. + * For cross-instance reconnects, we reconstruct from Redis Streams. + * + * @param streamId - The stream identifier + * @returns Content parts array, or null if not found */ async getContentParts(streamId: string): Promise { + // 1. Try local graph cache first (fast path for same-instance reconnect) + const graphRef = this.localGraphCache.get(streamId); + if (graphRef) { + const graph = graphRef.deref(); + if (graph) { + const localParts = graph.getContentParts(); + if (localParts && localParts.length > 0) { + return localParts; + } + } else { + // WeakRef was collected, remove from cache + this.localGraphCache.delete(streamId); + } + } + + // 2. Fall back to Redis chunk reconstruction (cross-instance reconnect) const chunks = await this.getChunks(streamId); if (chunks.length === 0) { return null; @@ -337,9 +385,30 @@ export class RedisJobStore implements IJobStore { } /** - * Get run steps from Redis. + * Get run steps - tries local cache first, falls back to Redis. + * + * Optimization: If this instance has the live graph, we get run steps + * directly without Redis round-trip. + * + * @param streamId - The stream identifier + * @returns Run steps array */ async getRunSteps(streamId: string): Promise { + // 1. Try local graph cache first (fast path for same-instance reconnect) + const graphRef = this.localGraphCache.get(streamId); + if (graphRef) { + const graph = graphRef.deref(); + if (graph) { + const localSteps = graph.getRunSteps(); + if (localSteps && localSteps.length > 0) { + return localSteps; + } + } + // Note: Don't delete from cache here - graph may still be valid + // but just not have run steps yet + } + + // 2. Fall back to Redis (cross-instance reconnect) const key = KEYS.runSteps(streamId); const data = await this.redis.get(key); if (!data) { @@ -354,9 +423,13 @@ export class RedisJobStore implements IJobStore { /** * Clear content state for a job. + * Removes both local cache and Redis data. */ clearContentState(streamId: string): void { - // Fire and forget - async cleanup + // Clear local cache immediately + this.localGraphCache.delete(streamId); + + // Fire and forget - async cleanup for Redis this.clearContentStateAsync(streamId).catch((err) => { logger.error(`[RedisJobStore] Failed to clear content state for ${streamId}:`, err); });