refactor: Enhance RedisJobStore with local graph caching for improved performance

- Introduced a local cache for graph references using WeakRef to optimize reconnects for the same instance.
- Updated job deletion and cleanup methods to manage the local cache effectively, ensuring stale entries are removed.
- Enhanced content retrieval methods to prioritize local cache access, reducing Redis round-trips for same-instance reconnects.
- Improved documentation and comments for clarity on the caching mechanism and its benefits.
This commit is contained in:
Danny Avila 2025-12-15 09:16:43 -05:00
parent 51c6d7ad8d
commit 10b4b6eeae
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956

View file

@ -1,8 +1,9 @@
import { logger } from '@librechat/data-schemas';
import { createContentAggregator } from '@librechat/agents';
import type { IJobStore, SerializableJobData, JobStatus } from '~/stream/interfaces/IJobStore';
import type { StandardGraph } from '@librechat/agents';
import type { Agents } from 'librechat-data-provider';
import type { Redis, Cluster } from 'ioredis';
import type { IJobStore, SerializableJobData, JobStatus } from '~/stream/interfaces/IJobStore';
/**
* Key prefixes for Redis storage.
@ -72,6 +73,13 @@ export class RedisJobStore implements IJobStore {
private cleanupInterval: NodeJS.Timeout | null = null;
private ttl: typeof DEFAULT_TTL;
/**
* Local cache for graph references on THIS instance.
* Enables fast reconnects when client returns to the same server.
* Uses WeakRef to allow garbage collection when graph is no longer needed.
*/
private localGraphCache = new Map<string, WeakRef<StandardGraph>>();
/** Cleanup interval in ms (1 minute) */
private cleanupIntervalMs = 60000;
@ -180,6 +188,9 @@ export class RedisJobStore implements IJobStore {
}
async deleteJob(streamId: string): Promise<void> {
// Clear local cache
this.localGraphCache.delete(streamId);
const pipeline = this.redis.pipeline();
pipeline.del(KEYS.job(streamId));
pipeline.del(KEYS.chunks(streamId));
@ -215,12 +226,20 @@ export class RedisJobStore implements IJobStore {
const streamIds = await this.redis.smembers(KEYS.runningJobs);
let cleaned = 0;
// Clean up stale local graph cache entries (WeakRefs that were collected)
for (const [streamId, graphRef] of this.localGraphCache) {
if (!graphRef.deref()) {
this.localGraphCache.delete(streamId);
}
}
for (const streamId of streamIds) {
const job = await this.getJob(streamId);
// Job no longer exists (TTL expired) - remove from set
if (!job) {
await this.redis.srem(KEYS.runningJobs, streamId);
this.localGraphCache.delete(streamId);
cleaned++;
continue;
}
@ -228,6 +247,7 @@ export class RedisJobStore implements IJobStore {
// Job completed but still in running set (shouldn't happen, but handle it)
if (job.status !== 'running') {
await this.redis.srem(KEYS.runningJobs, streamId);
this.localGraphCache.delete(streamId);
cleaned++;
continue;
}
@ -269,20 +289,26 @@ export class RedisJobStore implements IJobStore {
clearInterval(this.cleanupInterval);
this.cleanupInterval = null;
}
// Clear local cache
this.localGraphCache.clear();
// Don't close the Redis connection - it's shared
logger.info('[RedisJobStore] Destroyed');
}
// ===== Content State Methods =====
// For Redis, graph/contentParts are NOT stored locally.
// Content is reconstructed from chunks on demand.
// For Redis, content is primarily reconstructed from chunks.
// However, we keep a LOCAL graph cache for fast same-instance reconnects.
/**
* No-op for Redis - graph can't be serialized/transferred.
* Content is reconstructed from chunks instead.
* Store graph reference in local cache.
* This enables fast reconnects when client returns to the same instance.
* Falls back to Redis chunk reconstruction for cross-instance reconnects.
*
* @param streamId - The stream identifier
* @param graph - The graph instance (stored as WeakRef)
*/
setGraph(): void {
// No-op: Redis uses chunks for content reconstruction
setGraph(streamId: string, graph: StandardGraph): void {
this.localGraphCache.set(streamId, new WeakRef(graph));
}
/**
@ -293,10 +319,32 @@ export class RedisJobStore implements IJobStore {
}
/**
* Get aggregated content from chunks.
* Reconstructs message content from Redis Streams on demand.
* Get aggregated content - tries local cache first, falls back to Redis reconstruction.
*
* Optimization: If this instance has the live graph (same-instance reconnect),
* we return the content directly without Redis round-trip.
* For cross-instance reconnects, we reconstruct from Redis Streams.
*
* @param streamId - The stream identifier
* @returns Content parts array, or null if not found
*/
async getContentParts(streamId: string): Promise<Agents.MessageContentComplex[] | null> {
// 1. Try local graph cache first (fast path for same-instance reconnect)
const graphRef = this.localGraphCache.get(streamId);
if (graphRef) {
const graph = graphRef.deref();
if (graph) {
const localParts = graph.getContentParts();
if (localParts && localParts.length > 0) {
return localParts;
}
} else {
// WeakRef was collected, remove from cache
this.localGraphCache.delete(streamId);
}
}
// 2. Fall back to Redis chunk reconstruction (cross-instance reconnect)
const chunks = await this.getChunks(streamId);
if (chunks.length === 0) {
return null;
@ -337,9 +385,30 @@ export class RedisJobStore implements IJobStore {
}
/**
* Get run steps from Redis.
* Get run steps - tries local cache first, falls back to Redis.
*
* Optimization: If this instance has the live graph, we get run steps
* directly without Redis round-trip.
*
* @param streamId - The stream identifier
* @returns Run steps array
*/
async getRunSteps(streamId: string): Promise<Agents.RunStep[]> {
// 1. Try local graph cache first (fast path for same-instance reconnect)
const graphRef = this.localGraphCache.get(streamId);
if (graphRef) {
const graph = graphRef.deref();
if (graph) {
const localSteps = graph.getRunSteps();
if (localSteps && localSteps.length > 0) {
return localSteps;
}
}
// Note: Don't delete from cache here - graph may still be valid
// but just not have run steps yet
}
// 2. Fall back to Redis (cross-instance reconnect)
const key = KEYS.runSteps(streamId);
const data = await this.redis.get(key);
if (!data) {
@ -354,9 +423,13 @@ export class RedisJobStore implements IJobStore {
/**
* Clear content state for a job.
* Removes both local cache and Redis data.
*/
clearContentState(streamId: string): void {
// Fire and forget - async cleanup
// Clear local cache immediately
this.localGraphCache.delete(streamId);
// Fire and forget - async cleanup for Redis
this.clearContentStateAsync(streamId).catch((err) => {
logger.error(`[RedisJobStore] Failed to clear content state for ${streamId}:`, err);
});