import type { Redis, Cluster } from 'ioredis'; /** * Integration tests for GenerationJobManager. * * Tests the job manager with both in-memory and Redis backends * to ensure consistent behavior across deployment modes. * * Run with: USE_REDIS=true npx jest GenerationJobManager.stream_integration */ describe('GenerationJobManager Integration Tests', () => { let originalEnv: NodeJS.ProcessEnv; let ioredisClient: Redis | Cluster | null = null; const testPrefix = 'JobManager-Integration-Test'; beforeAll(async () => { originalEnv = { ...process.env }; // Set up test environment process.env.USE_REDIS = process.env.USE_REDIS ?? 'true'; process.env.REDIS_URI = process.env.REDIS_URI ?? 'redis://127.0.0.1:6379'; process.env.REDIS_KEY_PREFIX = testPrefix; jest.resetModules(); const { ioredisClient: client } = await import('../../cache/redisClients'); ioredisClient = client; }); afterEach(async () => { // Clean up module state jest.resetModules(); // Clean up Redis keys (delete individually for cluster compatibility) if (ioredisClient) { try { const keys = await ioredisClient.keys(`${testPrefix}*`); const streamKeys = await ioredisClient.keys(`stream:*`); const allKeys = [...keys, ...streamKeys]; await Promise.all(allKeys.map((key) => ioredisClient!.del(key))); } catch { // Ignore cleanup errors } } }); afterAll(async () => { if (ioredisClient && 'disconnect' in ioredisClient) { try { ioredisClient.disconnect(); } catch { // Ignore disconnect errors } } process.env = originalEnv; }); describe('In-Memory Mode', () => { test('should create and manage jobs', async () => { const { GenerationJobManager } = await import('../GenerationJobManager'); const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); // Configure with in-memory // cleanupOnComplete: false so we can verify completed status GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), isRedis: false, cleanupOnComplete: false, }); await GenerationJobManager.initialize(); const streamId = `inmem-job-${Date.now()}`; const userId = 'test-user-1'; // Create job (async) const job = await GenerationJobManager.createJob(streamId, userId); expect(job.streamId).toBe(streamId); expect(job.status).toBe('running'); // Check job exists const hasJob = await GenerationJobManager.hasJob(streamId); expect(hasJob).toBe(true); // Get job const retrieved = await GenerationJobManager.getJob(streamId); expect(retrieved?.streamId).toBe(streamId); // Update job await GenerationJobManager.updateMetadata(streamId, { sender: 'TestAgent' }); const updated = await GenerationJobManager.getJob(streamId); expect(updated?.metadata?.sender).toBe('TestAgent'); // Complete job await GenerationJobManager.completeJob(streamId); const completed = await GenerationJobManager.getJob(streamId); expect(completed?.status).toBe('complete'); await GenerationJobManager.destroy(); }); test('should handle event streaming', async () => { const { GenerationJobManager } = await import('../GenerationJobManager'); const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), isRedis: false, }); await GenerationJobManager.initialize(); const streamId = `inmem-events-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); const receivedChunks: unknown[] = []; // Subscribe to events (subscribe takes separate args, not an object) const subscription = await GenerationJobManager.subscribe(streamId, (event) => receivedChunks.push(event), ); const { unsubscribe } = subscription!; // Wait for first subscriber to be registered await new Promise((resolve) => setTimeout(resolve, 10)); // Emit chunks (emitChunk takes { event, data } format) GenerationJobManager.emitChunk(streamId, { event: 'on_message_delta', data: { type: 'text', text: 'Hello' }, }); GenerationJobManager.emitChunk(streamId, { event: 'on_message_delta', data: { type: 'text', text: ' world' }, }); // Give time for events to propagate await new Promise((resolve) => setTimeout(resolve, 50)); // Verify chunks were received expect(receivedChunks.length).toBeGreaterThan(0); // Complete the job (this cleans up resources) await GenerationJobManager.completeJob(streamId); unsubscribe(); await GenerationJobManager.destroy(); }); }); describe('Redis Mode', () => { test('should create and manage jobs via Redis', async () => { if (!ioredisClient) { console.warn('Redis not available, skipping test'); return; } const { GenerationJobManager } = await import('../GenerationJobManager'); const { createStreamServices } = await import('../createStreamServices'); // Create Redis services const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, }); expect(services.isRedis).toBe(true); GenerationJobManager.configure(services); await GenerationJobManager.initialize(); const streamId = `redis-job-${Date.now()}`; const userId = 'test-user-redis'; // Create job (async) const job = await GenerationJobManager.createJob(streamId, userId); expect(job.streamId).toBe(streamId); // Verify in Redis const hasJob = await GenerationJobManager.hasJob(streamId); expect(hasJob).toBe(true); // Update and verify await GenerationJobManager.updateMetadata(streamId, { sender: 'RedisAgent' }); const updated = await GenerationJobManager.getJob(streamId); expect(updated?.metadata?.sender).toBe('RedisAgent'); await GenerationJobManager.destroy(); }); test('should persist chunks for cross-instance resume', async () => { if (!ioredisClient) { console.warn('Redis not available, skipping test'); return; } const { GenerationJobManager } = await import('../GenerationJobManager'); const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, }); GenerationJobManager.configure(services); await GenerationJobManager.initialize(); const streamId = `redis-chunks-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); // Emit chunks (these should be persisted to Redis) // emitChunk takes { event, data } format GenerationJobManager.emitChunk(streamId, { event: 'on_run_step', data: { id: 'step-1', runId: 'run-1', index: 0, stepDetails: { type: 'message_creation' }, }, }); GenerationJobManager.emitChunk(streamId, { event: 'on_message_delta', data: { id: 'step-1', delta: { content: { type: 'text', text: 'Persisted ' } }, }, }); GenerationJobManager.emitChunk(streamId, { event: 'on_message_delta', data: { id: 'step-1', delta: { content: { type: 'text', text: 'content' } }, }, }); // Wait for async operations await new Promise((resolve) => setTimeout(resolve, 100)); // Simulate getting resume state (as if from different instance) const resumeState = await GenerationJobManager.getResumeState(streamId); expect(resumeState).not.toBeNull(); expect(resumeState!.aggregatedContent?.length).toBeGreaterThan(0); await GenerationJobManager.destroy(); }); test('should handle abort and return content', async () => { if (!ioredisClient) { console.warn('Redis not available, skipping test'); return; } const { GenerationJobManager } = await import('../GenerationJobManager'); const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: true, redisClient: ioredisClient, }); GenerationJobManager.configure(services); await GenerationJobManager.initialize(); const streamId = `redis-abort-${Date.now()}`; await GenerationJobManager.createJob(streamId, 'user-1'); // Emit some content (emitChunk takes { event, data } format) GenerationJobManager.emitChunk(streamId, { event: 'on_run_step', data: { id: 'step-1', runId: 'run-1', index: 0, stepDetails: { type: 'message_creation' }, }, }); GenerationJobManager.emitChunk(streamId, { event: 'on_message_delta', data: { id: 'step-1', delta: { content: { type: 'text', text: 'Partial response...' } }, }, }); await new Promise((resolve) => setTimeout(resolve, 100)); // Abort the job const abortResult = await GenerationJobManager.abortJob(streamId); expect(abortResult.success).toBe(true); expect(abortResult.content.length).toBeGreaterThan(0); await GenerationJobManager.destroy(); }); }); describe('Cross-Mode Consistency', () => { test('should have consistent API between in-memory and Redis modes', async () => { // This test verifies that the same operations work identically // regardless of backend mode const runTestWithMode = async (isRedis: boolean) => { jest.resetModules(); const { GenerationJobManager } = await import('../GenerationJobManager'); if (isRedis && ioredisClient) { const { createStreamServices } = await import('../createStreamServices'); GenerationJobManager.configure({ ...createStreamServices({ useRedis: true, redisClient: ioredisClient, }), cleanupOnComplete: false, // Keep job for verification }); } else { const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); const { InMemoryEventTransport } = await import( '../implementations/InMemoryEventTransport' ); GenerationJobManager.configure({ jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), eventTransport: new InMemoryEventTransport(), isRedis: false, cleanupOnComplete: false, }); } await GenerationJobManager.initialize(); const streamId = `consistency-${isRedis ? 'redis' : 'inmem'}-${Date.now()}`; // Test sequence const job = await GenerationJobManager.createJob(streamId, 'user-1'); expect(job.streamId).toBe(streamId); expect(job.status).toBe('running'); const hasJob = await GenerationJobManager.hasJob(streamId); expect(hasJob).toBe(true); await GenerationJobManager.updateMetadata(streamId, { sender: 'ConsistencyAgent', responseMessageId: 'resp-123', }); const updated = await GenerationJobManager.getJob(streamId); expect(updated?.metadata?.sender).toBe('ConsistencyAgent'); expect(updated?.metadata?.responseMessageId).toBe('resp-123'); await GenerationJobManager.completeJob(streamId); const completed = await GenerationJobManager.getJob(streamId); expect(completed?.status).toBe('complete'); await GenerationJobManager.destroy(); }; // Test in-memory mode await runTestWithMode(false); // Test Redis mode if available if (ioredisClient) { await runTestWithMode(true); } }); }); describe('createStreamServices Auto-Detection', () => { test('should auto-detect Redis when USE_REDIS is true', async () => { if (!ioredisClient) { console.warn('Redis not available, skipping test'); return; } // Force USE_REDIS to true process.env.USE_REDIS = 'true'; jest.resetModules(); const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices(); // Should detect Redis expect(services.isRedis).toBe(true); }); test('should fall back to in-memory when USE_REDIS is false', async () => { process.env.USE_REDIS = 'false'; jest.resetModules(); const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices(); expect(services.isRedis).toBe(false); }); test('should allow forcing in-memory via config override', async () => { const { createStreamServices } = await import('../createStreamServices'); const services = createStreamServices({ useRedis: false }); expect(services.isRedis).toBe(false); }); }); });