From 2b3f4d58dbc65898cf5642aad3c2f1d74ae8ec60 Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Mon, 15 Dec 2025 01:44:57 -0500 Subject: [PATCH] refactor: Enhance job state management and TTL configuration in RedisJobStore - Updated the RedisJobStore to allow customizable TTL values for job states, improving flexibility in job management. - Refactored the handling of job expiration and cleanup processes to align with new TTL configurations. - Simplified the response structure in the chat status endpoint by consolidating state retrieval, enhancing clarity and performance. - Improved comments and documentation for better understanding of the changes made. --- api/server/routes/agents/index.js | 12 +-- .../stream/implementations/RedisJobStore.ts | 77 ++++++++++++++----- 2 files changed, 66 insertions(+), 23 deletions(-) diff --git a/api/server/routes/agents/index.js b/api/server/routes/agents/index.js index 3b2d3d5f38..ddce168962 100644 --- a/api/server/routes/agents/index.js +++ b/api/server/routes/agents/index.js @@ -133,15 +133,17 @@ router.get('/chat/status/:conversationId', async (req, res) => { return res.status(403).json({ error: 'Unauthorized' }); } - const info = await GenerationJobManager.getStreamInfo(conversationId); + // Get resume state which contains aggregatedContent + // Avoid calling both getStreamInfo and getResumeState (both fetch content) const resumeState = await GenerationJobManager.getResumeState(conversationId); + const isActive = job.status === 'running'; res.json({ - active: info?.active ?? false, + active: isActive, streamId: conversationId, - status: info?.status ?? job.status, - aggregatedContent: info?.aggregatedContent, - createdAt: info?.createdAt ?? job.createdAt, + status: job.status, + aggregatedContent: resumeState?.aggregatedContent ?? [], + createdAt: job.createdAt, resumeState, }); }); diff --git a/packages/api/src/stream/implementations/RedisJobStore.ts b/packages/api/src/stream/implementations/RedisJobStore.ts index 8a6084ca30..7997fb15ee 100644 --- a/packages/api/src/stream/implementations/RedisJobStore.ts +++ b/packages/api/src/stream/implementations/RedisJobStore.ts @@ -21,17 +21,18 @@ const KEYS = { }; /** - * Default TTL values in seconds + * Default TTL values in seconds. + * Can be overridden via constructor options. */ -const TTL = { +const DEFAULT_TTL = { /** TTL for completed jobs (5 minutes) */ completed: 300, - /** TTL for running jobs (30 minutes - failsafe) */ - running: 1800, - /** TTL for chunks stream (5 minutes after completion) */ - chunks: 300, - /** TTL for run steps (5 minutes after completion) */ - runSteps: 300, + /** TTL for running jobs/chunks (20 minutes - failsafe for crashed jobs) */ + running: 1200, + /** TTL for chunks after completion (0 = delete immediately) */ + chunksAfterComplete: 0, + /** TTL for run steps after completion (0 = delete immediately) */ + runStepsAfterComplete: 0, }; /** @@ -52,15 +53,36 @@ const TTL = { * await store.initialize(); * ``` */ +/** + * Configuration options for RedisJobStore + */ +export interface RedisJobStoreOptions { + /** TTL for completed jobs in seconds (default: 300 = 5 minutes) */ + completedTtl?: number; + /** TTL for running jobs/chunks in seconds (default: 1200 = 20 minutes) */ + runningTtl?: number; + /** TTL for chunks after completion in seconds (default: 0 = delete immediately) */ + chunksAfterCompleteTtl?: number; + /** TTL for run steps after completion in seconds (default: 0 = delete immediately) */ + runStepsAfterCompleteTtl?: number; +} + export class RedisJobStore implements IJobStore { private redis: Redis | Cluster; private cleanupInterval: NodeJS.Timeout | null = null; + private ttl: typeof DEFAULT_TTL; /** Cleanup interval in ms (1 minute) */ private cleanupIntervalMs = 60000; - constructor(redis: Redis | Cluster) { + constructor(redis: Redis | Cluster, options?: RedisJobStoreOptions) { this.redis = redis; + this.ttl = { + completed: options?.completedTtl ?? DEFAULT_TTL.completed, + running: options?.runningTtl ?? DEFAULT_TTL.running, + chunksAfterComplete: options?.chunksAfterCompleteTtl ?? DEFAULT_TTL.chunksAfterComplete, + runStepsAfterComplete: options?.runStepsAfterCompleteTtl ?? DEFAULT_TTL.runStepsAfterComplete, + }; } async initialize(): Promise { @@ -101,7 +123,7 @@ export class RedisJobStore implements IJobStore { // Store job as hash pipeline.hmset(key, this.serializeJob(job)); - pipeline.expire(key, TTL.running); + pipeline.expire(key, this.ttl.running); // Add to running jobs set pipeline.sadd(KEYS.runningJobs, streamId); @@ -137,12 +159,21 @@ export class RedisJobStore implements IJobStore { // If status changed to complete/error/aborted, update TTL and remove from running set if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) { const pipeline = this.redis.pipeline(); - pipeline.expire(key, TTL.completed); + pipeline.expire(key, this.ttl.completed); pipeline.srem(KEYS.runningJobs, streamId); - // Also set TTL on related keys - pipeline.expire(KEYS.chunks(streamId), TTL.chunks); - pipeline.expire(KEYS.runSteps(streamId), TTL.runSteps); + // Delete or set TTL on related keys based on config + if (this.ttl.chunksAfterComplete === 0) { + pipeline.del(KEYS.chunks(streamId)); + } else { + pipeline.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete); + } + + if (this.ttl.runStepsAfterComplete === 0) { + pipeline.del(KEYS.runSteps(streamId)); + } else { + pipeline.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete); + } await pipeline.exec(); } @@ -201,8 +232,8 @@ export class RedisJobStore implements IJobStore { continue; } - // Stale running job (failsafe - running for > 30 minutes) - if (now - job.createdAt > TTL.running * 1000) { + // Stale running job (failsafe - running for > configured TTL) + if (now - job.createdAt > this.ttl.running * 1000) { logger.warn(`[RedisJobStore] Cleaning up stale job: ${streamId}`); await this.deleteJob(streamId); cleaned++; @@ -344,10 +375,20 @@ export class RedisJobStore implements IJobStore { /** * Append a streaming chunk to Redis Stream. * Uses XADD for efficient append-only storage. + * Sets TTL on first chunk to ensure cleanup if job crashes. */ async appendChunk(streamId: string, event: unknown): Promise { const key = KEYS.chunks(streamId); - await this.redis.xadd(key, '*', 'event', JSON.stringify(event)); + const added = await this.redis.xadd(key, '*', 'event', JSON.stringify(event)); + + // Set TTL on first chunk (when stream is created) + // Subsequent chunks inherit the stream's TTL + if (added) { + const len = await this.redis.xlen(key); + if (len === 1) { + await this.redis.expire(key, this.ttl.running); + } + } } /** @@ -377,7 +418,7 @@ export class RedisJobStore implements IJobStore { */ async saveRunSteps(streamId: string, runSteps: Agents.RunStep[]): Promise { const key = KEYS.runSteps(streamId); - await this.redis.set(key, JSON.stringify(runSteps), 'EX', TTL.running); + await this.redis.set(key, JSON.stringify(runSteps), 'EX', this.ttl.running); } // ===== Consumer Group Methods =====