feat: Add integration tests for GenerationJobManager, RedisEventTransport, and RedisJobStore, add Redis Cluster support

- Introduced comprehensive integration tests for GenerationJobManager, covering both in-memory and Redis modes to ensure consistent job management and event handling.
- Added tests for RedisEventTransport to validate pub/sub functionality, including cross-instance event delivery and error handling.
- Implemented integration tests for RedisJobStore, focusing on multi-instance job access, content reconstruction from chunks, and consumer group behavior.
- Enhanced test setup and teardown processes to ensure a clean environment for each test run, improving reliability and maintainability.
This commit is contained in:
Danny Avila 2025-12-15 09:32:01 -05:00
parent 30957ccc44
commit 059ab12bef
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
7 changed files with 1520 additions and 43 deletions

View file

@ -9,15 +9,20 @@ import type { Redis, Cluster } from 'ioredis';
* Key prefixes for Redis storage.
* All keys include the streamId for easy cleanup.
* Note: streamId === conversationId, so no separate mapping needed.
*
* IMPORTANT: Uses hash tags {streamId} for Redis Cluster compatibility.
* All keys for the same stream hash to the same slot, enabling:
* - Pipeline operations across related keys
* - Atomic multi-key operations
*/
const KEYS = {
/** Job metadata: stream:job:{streamId} */
job: (streamId: string) => `stream:job:${streamId}`,
/** Chunk stream (Redis Streams): stream:chunks:{streamId} */
chunks: (streamId: string) => `stream:chunks:${streamId}`,
/** Run steps: stream:runsteps:{streamId} */
runSteps: (streamId: string) => `stream:runsteps:${streamId}`,
/** Running jobs set for cleanup */
/** Job metadata: stream:{streamId}:job */
job: (streamId: string) => `stream:{${streamId}}:job`,
/** Chunk stream (Redis Streams): stream:{streamId}:chunks */
chunks: (streamId: string) => `stream:{${streamId}}:chunks`,
/** Run steps: stream:{streamId}:runsteps */
runSteps: (streamId: string) => `stream:{${streamId}}:runsteps`,
/** Running jobs set for cleanup (global set - single slot) */
runningJobs: 'stream:running',
};
@ -73,6 +78,9 @@ export class RedisJobStore implements IJobStore {
private cleanupInterval: NodeJS.Timeout | null = null;
private ttl: typeof DEFAULT_TTL;
/** Whether Redis client is in cluster mode (affects pipeline usage) */
private isCluster: boolean;
/**
* Local cache for graph references on THIS instance.
* Enables fast reconnects when client returns to the same server.
@ -91,6 +99,8 @@ export class RedisJobStore implements IJobStore {
chunksAfterComplete: options?.chunksAfterCompleteTtl ?? DEFAULT_TTL.chunksAfterComplete,
runStepsAfterComplete: options?.runStepsAfterCompleteTtl ?? DEFAULT_TTL.runStepsAfterComplete,
};
// Detect cluster mode using ioredis's isCluster property
this.isCluster = (redis as Cluster).isCluster === true;
}
async initialize(): Promise<void> {
@ -127,16 +137,20 @@ export class RedisJobStore implements IJobStore {
};
const key = KEYS.job(streamId);
const pipeline = this.redis.pipeline();
// Store job as hash
pipeline.hmset(key, this.serializeJob(job));
pipeline.expire(key, this.ttl.running);
// Add to running jobs set
pipeline.sadd(KEYS.runningJobs, streamId);
await pipeline.exec();
// For cluster mode, we can't pipeline keys on different slots
// The job key uses hash tag {streamId}, runningJobs is global
if (this.isCluster) {
await this.redis.hmset(key, this.serializeJob(job));
await this.redis.expire(key, this.ttl.running);
await this.redis.sadd(KEYS.runningJobs, streamId);
} else {
const pipeline = this.redis.pipeline();
pipeline.hmset(key, this.serializeJob(job));
pipeline.expire(key, this.ttl.running);
pipeline.sadd(KEYS.runningJobs, streamId);
await pipeline.exec();
}
logger.debug(`[RedisJobStore] Created job: ${streamId}`);
return job;
@ -166,24 +180,41 @@ export class RedisJobStore implements IJobStore {
// If status changed to complete/error/aborted, update TTL and remove from running set
if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) {
const pipeline = this.redis.pipeline();
pipeline.expire(key, this.ttl.completed);
pipeline.srem(KEYS.runningJobs, streamId);
// In cluster mode, separate runningJobs (global) from stream-specific keys
if (this.isCluster) {
await this.redis.expire(key, this.ttl.completed);
await this.redis.srem(KEYS.runningJobs, streamId);
// Delete or set TTL on related keys based on config
if (this.ttl.chunksAfterComplete === 0) {
pipeline.del(KEYS.chunks(streamId));
if (this.ttl.chunksAfterComplete === 0) {
await this.redis.del(KEYS.chunks(streamId));
} else {
await this.redis.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete);
}
if (this.ttl.runStepsAfterComplete === 0) {
await this.redis.del(KEYS.runSteps(streamId));
} else {
await this.redis.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete);
}
} else {
pipeline.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete);
}
const pipeline = this.redis.pipeline();
pipeline.expire(key, this.ttl.completed);
pipeline.srem(KEYS.runningJobs, streamId);
if (this.ttl.runStepsAfterComplete === 0) {
pipeline.del(KEYS.runSteps(streamId));
} else {
pipeline.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete);
}
if (this.ttl.chunksAfterComplete === 0) {
pipeline.del(KEYS.chunks(streamId));
} else {
pipeline.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete);
}
await pipeline.exec();
if (this.ttl.runStepsAfterComplete === 0) {
pipeline.del(KEYS.runSteps(streamId));
} else {
pipeline.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete);
}
await pipeline.exec();
}
}
}
@ -191,12 +222,24 @@ export class RedisJobStore implements IJobStore {
// Clear local cache
this.localGraphCache.delete(streamId);
const pipeline = this.redis.pipeline();
pipeline.del(KEYS.job(streamId));
pipeline.del(KEYS.chunks(streamId));
pipeline.del(KEYS.runSteps(streamId));
pipeline.srem(KEYS.runningJobs, streamId);
await pipeline.exec();
// In cluster mode, separate runningJobs (global) from stream-specific keys (same slot)
if (this.isCluster) {
// Stream-specific keys all hash to same slot due to {streamId}
const pipeline = this.redis.pipeline();
pipeline.del(KEYS.job(streamId));
pipeline.del(KEYS.chunks(streamId));
pipeline.del(KEYS.runSteps(streamId));
await pipeline.exec();
// Global set is on different slot - execute separately
await this.redis.srem(KEYS.runningJobs, streamId);
} else {
const pipeline = this.redis.pipeline();
pipeline.del(KEYS.job(streamId));
pipeline.del(KEYS.chunks(streamId));
pipeline.del(KEYS.runSteps(streamId));
pipeline.srem(KEYS.runningJobs, streamId);
await pipeline.exec();
}
logger.debug(`[RedisJobStore] Deleted job: ${streamId}`);
}