refactor: Enhance job state management and TTL configuration in RedisJobStore

- Updated the RedisJobStore to allow customizable TTL values for job states, improving flexibility in job management.
- Refactored the handling of job expiration and cleanup processes to align with new TTL configurations.
- Simplified the response structure in the chat status endpoint by consolidating state retrieval, enhancing clarity and performance.
- Improved comments and documentation for better understanding of the changes made.
This commit is contained in:
Danny Avila 2025-12-15 01:44:57 -05:00
parent ca21f16848
commit 2b3f4d58db
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
2 changed files with 66 additions and 23 deletions

View file

@ -133,15 +133,17 @@ router.get('/chat/status/:conversationId', async (req, res) => {
return res.status(403).json({ error: 'Unauthorized' }); return res.status(403).json({ error: 'Unauthorized' });
} }
const info = await GenerationJobManager.getStreamInfo(conversationId); // Get resume state which contains aggregatedContent
// Avoid calling both getStreamInfo and getResumeState (both fetch content)
const resumeState = await GenerationJobManager.getResumeState(conversationId); const resumeState = await GenerationJobManager.getResumeState(conversationId);
const isActive = job.status === 'running';
res.json({ res.json({
active: info?.active ?? false, active: isActive,
streamId: conversationId, streamId: conversationId,
status: info?.status ?? job.status, status: job.status,
aggregatedContent: info?.aggregatedContent, aggregatedContent: resumeState?.aggregatedContent ?? [],
createdAt: info?.createdAt ?? job.createdAt, createdAt: job.createdAt,
resumeState, resumeState,
}); });
}); });

View file

@ -21,17 +21,18 @@ const KEYS = {
}; };
/** /**
* Default TTL values in seconds * Default TTL values in seconds.
* Can be overridden via constructor options.
*/ */
const TTL = { const DEFAULT_TTL = {
/** TTL for completed jobs (5 minutes) */ /** TTL for completed jobs (5 minutes) */
completed: 300, completed: 300,
/** TTL for running jobs (30 minutes - failsafe) */ /** TTL for running jobs/chunks (20 minutes - failsafe for crashed jobs) */
running: 1800, running: 1200,
/** TTL for chunks stream (5 minutes after completion) */ /** TTL for chunks after completion (0 = delete immediately) */
chunks: 300, chunksAfterComplete: 0,
/** TTL for run steps (5 minutes after completion) */ /** TTL for run steps after completion (0 = delete immediately) */
runSteps: 300, runStepsAfterComplete: 0,
}; };
/** /**
@ -52,15 +53,36 @@ const TTL = {
* await store.initialize(); * await store.initialize();
* ``` * ```
*/ */
/**
* Configuration options for RedisJobStore
*/
export interface RedisJobStoreOptions {
/** TTL for completed jobs in seconds (default: 300 = 5 minutes) */
completedTtl?: number;
/** TTL for running jobs/chunks in seconds (default: 1200 = 20 minutes) */
runningTtl?: number;
/** TTL for chunks after completion in seconds (default: 0 = delete immediately) */
chunksAfterCompleteTtl?: number;
/** TTL for run steps after completion in seconds (default: 0 = delete immediately) */
runStepsAfterCompleteTtl?: number;
}
export class RedisJobStore implements IJobStore { export class RedisJobStore implements IJobStore {
private redis: Redis | Cluster; private redis: Redis | Cluster;
private cleanupInterval: NodeJS.Timeout | null = null; private cleanupInterval: NodeJS.Timeout | null = null;
private ttl: typeof DEFAULT_TTL;
/** Cleanup interval in ms (1 minute) */ /** Cleanup interval in ms (1 minute) */
private cleanupIntervalMs = 60000; private cleanupIntervalMs = 60000;
constructor(redis: Redis | Cluster) { constructor(redis: Redis | Cluster, options?: RedisJobStoreOptions) {
this.redis = redis; this.redis = redis;
this.ttl = {
completed: options?.completedTtl ?? DEFAULT_TTL.completed,
running: options?.runningTtl ?? DEFAULT_TTL.running,
chunksAfterComplete: options?.chunksAfterCompleteTtl ?? DEFAULT_TTL.chunksAfterComplete,
runStepsAfterComplete: options?.runStepsAfterCompleteTtl ?? DEFAULT_TTL.runStepsAfterComplete,
};
} }
async initialize(): Promise<void> { async initialize(): Promise<void> {
@ -101,7 +123,7 @@ export class RedisJobStore implements IJobStore {
// Store job as hash // Store job as hash
pipeline.hmset(key, this.serializeJob(job)); pipeline.hmset(key, this.serializeJob(job));
pipeline.expire(key, TTL.running); pipeline.expire(key, this.ttl.running);
// Add to running jobs set // Add to running jobs set
pipeline.sadd(KEYS.runningJobs, streamId); pipeline.sadd(KEYS.runningJobs, streamId);
@ -137,12 +159,21 @@ export class RedisJobStore implements IJobStore {
// If status changed to complete/error/aborted, update TTL and remove from running set // If status changed to complete/error/aborted, update TTL and remove from running set
if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) { if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) {
const pipeline = this.redis.pipeline(); const pipeline = this.redis.pipeline();
pipeline.expire(key, TTL.completed); pipeline.expire(key, this.ttl.completed);
pipeline.srem(KEYS.runningJobs, streamId); pipeline.srem(KEYS.runningJobs, streamId);
// Also set TTL on related keys // Delete or set TTL on related keys based on config
pipeline.expire(KEYS.chunks(streamId), TTL.chunks); if (this.ttl.chunksAfterComplete === 0) {
pipeline.expire(KEYS.runSteps(streamId), TTL.runSteps); pipeline.del(KEYS.chunks(streamId));
} else {
pipeline.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete);
}
if (this.ttl.runStepsAfterComplete === 0) {
pipeline.del(KEYS.runSteps(streamId));
} else {
pipeline.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete);
}
await pipeline.exec(); await pipeline.exec();
} }
@ -201,8 +232,8 @@ export class RedisJobStore implements IJobStore {
continue; continue;
} }
// Stale running job (failsafe - running for > 30 minutes) // Stale running job (failsafe - running for > configured TTL)
if (now - job.createdAt > TTL.running * 1000) { if (now - job.createdAt > this.ttl.running * 1000) {
logger.warn(`[RedisJobStore] Cleaning up stale job: ${streamId}`); logger.warn(`[RedisJobStore] Cleaning up stale job: ${streamId}`);
await this.deleteJob(streamId); await this.deleteJob(streamId);
cleaned++; cleaned++;
@ -344,10 +375,20 @@ export class RedisJobStore implements IJobStore {
/** /**
* Append a streaming chunk to Redis Stream. * Append a streaming chunk to Redis Stream.
* Uses XADD for efficient append-only storage. * Uses XADD for efficient append-only storage.
* Sets TTL on first chunk to ensure cleanup if job crashes.
*/ */
async appendChunk(streamId: string, event: unknown): Promise<void> { async appendChunk(streamId: string, event: unknown): Promise<void> {
const key = KEYS.chunks(streamId); const key = KEYS.chunks(streamId);
await this.redis.xadd(key, '*', 'event', JSON.stringify(event)); const added = await this.redis.xadd(key, '*', 'event', JSON.stringify(event));
// Set TTL on first chunk (when stream is created)
// Subsequent chunks inherit the stream's TTL
if (added) {
const len = await this.redis.xlen(key);
if (len === 1) {
await this.redis.expire(key, this.ttl.running);
}
}
} }
/** /**
@ -377,7 +418,7 @@ export class RedisJobStore implements IJobStore {
*/ */
async saveRunSteps(streamId: string, runSteps: Agents.RunStep[]): Promise<void> { async saveRunSteps(streamId: string, runSteps: Agents.RunStep[]): Promise<void> {
const key = KEYS.runSteps(streamId); const key = KEYS.runSteps(streamId);
await this.redis.set(key, JSON.stringify(runSteps), 'EX', TTL.running); await this.redis.set(key, JSON.stringify(runSteps), 'EX', this.ttl.running);
} }
// ===== Consumer Group Methods ===== // ===== Consumer Group Methods =====