LibreChat/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts
Danny Avila 10e9e2c008
ci: Improve Redis client disconnection handling in integration tests
- Updated the afterAll cleanup logic in integration tests for GenerationJobManager, RedisEventTransport, and RedisJobStore to use `quit()` for graceful disconnection of the Redis client.
- Added fallback to `disconnect()` if `quit()` fails, enhancing robustness in resource management during test teardown.
- Improved comments for clarity on the disconnection process and error handling.
2025-12-15 18:49:05 -05:00

415 lines
14 KiB
TypeScript

import type { Redis, Cluster } from 'ioredis';
/**
* Integration tests for GenerationJobManager.
*
* Tests the job manager with both in-memory and Redis backends
* to ensure consistent behavior across deployment modes.
*
* Run with: USE_REDIS=true npx jest GenerationJobManager.stream_integration
*/
describe('GenerationJobManager Integration Tests', () => {
let originalEnv: NodeJS.ProcessEnv;
let ioredisClient: Redis | Cluster | null = null;
const testPrefix = 'JobManager-Integration-Test';
beforeAll(async () => {
originalEnv = { ...process.env };
// Set up test environment
process.env.USE_REDIS = process.env.USE_REDIS ?? 'true';
process.env.REDIS_URI = process.env.REDIS_URI ?? 'redis://127.0.0.1:6379';
process.env.REDIS_KEY_PREFIX = testPrefix;
jest.resetModules();
const { ioredisClient: client } = await import('../../cache/redisClients');
ioredisClient = client;
});
afterEach(async () => {
// Clean up module state
jest.resetModules();
// Clean up Redis keys (delete individually for cluster compatibility)
if (ioredisClient) {
try {
const keys = await ioredisClient.keys(`${testPrefix}*`);
const streamKeys = await ioredisClient.keys(`stream:*`);
const allKeys = [...keys, ...streamKeys];
await Promise.all(allKeys.map((key) => ioredisClient!.del(key)));
} catch {
// Ignore cleanup errors
}
}
});
afterAll(async () => {
if (ioredisClient) {
try {
// Use quit() to gracefully close - waits for pending commands
await ioredisClient.quit();
} catch {
// Fall back to disconnect if quit fails
try {
ioredisClient.disconnect();
} catch {
// Ignore
}
}
}
process.env = originalEnv;
});
describe('In-Memory Mode', () => {
test('should create and manage jobs', async () => {
const { GenerationJobManager } = await import('../GenerationJobManager');
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport');
// Configure with in-memory
// cleanupOnComplete: false so we can verify completed status
GenerationJobManager.configure({
jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }),
eventTransport: new InMemoryEventTransport(),
isRedis: false,
cleanupOnComplete: false,
});
await GenerationJobManager.initialize();
const streamId = `inmem-job-${Date.now()}`;
const userId = 'test-user-1';
// Create job (async)
const job = await GenerationJobManager.createJob(streamId, userId);
expect(job.streamId).toBe(streamId);
expect(job.status).toBe('running');
// Check job exists
const hasJob = await GenerationJobManager.hasJob(streamId);
expect(hasJob).toBe(true);
// Get job
const retrieved = await GenerationJobManager.getJob(streamId);
expect(retrieved?.streamId).toBe(streamId);
// Update job
await GenerationJobManager.updateMetadata(streamId, { sender: 'TestAgent' });
const updated = await GenerationJobManager.getJob(streamId);
expect(updated?.metadata?.sender).toBe('TestAgent');
// Complete job
await GenerationJobManager.completeJob(streamId);
const completed = await GenerationJobManager.getJob(streamId);
expect(completed?.status).toBe('complete');
await GenerationJobManager.destroy();
});
test('should handle event streaming', async () => {
const { GenerationJobManager } = await import('../GenerationJobManager');
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport');
GenerationJobManager.configure({
jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }),
eventTransport: new InMemoryEventTransport(),
isRedis: false,
});
await GenerationJobManager.initialize();
const streamId = `inmem-events-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1');
const receivedChunks: unknown[] = [];
// Subscribe to events (subscribe takes separate args, not an object)
const subscription = await GenerationJobManager.subscribe(streamId, (event) =>
receivedChunks.push(event),
);
const { unsubscribe } = subscription!;
// Wait for first subscriber to be registered
await new Promise((resolve) => setTimeout(resolve, 10));
// Emit chunks (emitChunk takes { event, data } format)
GenerationJobManager.emitChunk(streamId, {
event: 'on_message_delta',
data: { type: 'text', text: 'Hello' },
});
GenerationJobManager.emitChunk(streamId, {
event: 'on_message_delta',
data: { type: 'text', text: ' world' },
});
// Give time for events to propagate
await new Promise((resolve) => setTimeout(resolve, 50));
// Verify chunks were received
expect(receivedChunks.length).toBeGreaterThan(0);
// Complete the job (this cleans up resources)
await GenerationJobManager.completeJob(streamId);
unsubscribe();
await GenerationJobManager.destroy();
});
});
describe('Redis Mode', () => {
test('should create and manage jobs via Redis', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { GenerationJobManager } = await import('../GenerationJobManager');
const { createStreamServices } = await import('../createStreamServices');
// Create Redis services
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
expect(services.isRedis).toBe(true);
GenerationJobManager.configure(services);
await GenerationJobManager.initialize();
const streamId = `redis-job-${Date.now()}`;
const userId = 'test-user-redis';
// Create job (async)
const job = await GenerationJobManager.createJob(streamId, userId);
expect(job.streamId).toBe(streamId);
// Verify in Redis
const hasJob = await GenerationJobManager.hasJob(streamId);
expect(hasJob).toBe(true);
// Update and verify
await GenerationJobManager.updateMetadata(streamId, { sender: 'RedisAgent' });
const updated = await GenerationJobManager.getJob(streamId);
expect(updated?.metadata?.sender).toBe('RedisAgent');
await GenerationJobManager.destroy();
});
test('should persist chunks for cross-instance resume', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { GenerationJobManager } = await import('../GenerationJobManager');
const { createStreamServices } = await import('../createStreamServices');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
await GenerationJobManager.initialize();
const streamId = `redis-chunks-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1');
// Emit chunks (these should be persisted to Redis)
// emitChunk takes { event, data } format
GenerationJobManager.emitChunk(streamId, {
event: 'on_run_step',
data: {
id: 'step-1',
runId: 'run-1',
index: 0,
stepDetails: { type: 'message_creation' },
},
});
GenerationJobManager.emitChunk(streamId, {
event: 'on_message_delta',
data: {
id: 'step-1',
delta: { content: { type: 'text', text: 'Persisted ' } },
},
});
GenerationJobManager.emitChunk(streamId, {
event: 'on_message_delta',
data: {
id: 'step-1',
delta: { content: { type: 'text', text: 'content' } },
},
});
// Wait for async operations
await new Promise((resolve) => setTimeout(resolve, 100));
// Simulate getting resume state (as if from different instance)
const resumeState = await GenerationJobManager.getResumeState(streamId);
expect(resumeState).not.toBeNull();
expect(resumeState!.aggregatedContent?.length).toBeGreaterThan(0);
await GenerationJobManager.destroy();
});
test('should handle abort and return content', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { GenerationJobManager } = await import('../GenerationJobManager');
const { createStreamServices } = await import('../createStreamServices');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
await GenerationJobManager.initialize();
const streamId = `redis-abort-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1');
// Emit some content (emitChunk takes { event, data } format)
GenerationJobManager.emitChunk(streamId, {
event: 'on_run_step',
data: {
id: 'step-1',
runId: 'run-1',
index: 0,
stepDetails: { type: 'message_creation' },
},
});
GenerationJobManager.emitChunk(streamId, {
event: 'on_message_delta',
data: {
id: 'step-1',
delta: { content: { type: 'text', text: 'Partial response...' } },
},
});
await new Promise((resolve) => setTimeout(resolve, 100));
// Abort the job
const abortResult = await GenerationJobManager.abortJob(streamId);
expect(abortResult.success).toBe(true);
expect(abortResult.content.length).toBeGreaterThan(0);
await GenerationJobManager.destroy();
});
});
describe('Cross-Mode Consistency', () => {
test('should have consistent API between in-memory and Redis modes', async () => {
// This test verifies that the same operations work identically
// regardless of backend mode
const runTestWithMode = async (isRedis: boolean) => {
jest.resetModules();
const { GenerationJobManager } = await import('../GenerationJobManager');
if (isRedis && ioredisClient) {
const { createStreamServices } = await import('../createStreamServices');
GenerationJobManager.configure({
...createStreamServices({
useRedis: true,
redisClient: ioredisClient,
}),
cleanupOnComplete: false, // Keep job for verification
});
} else {
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const { InMemoryEventTransport } = await import(
'../implementations/InMemoryEventTransport'
);
GenerationJobManager.configure({
jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }),
eventTransport: new InMemoryEventTransport(),
isRedis: false,
cleanupOnComplete: false,
});
}
await GenerationJobManager.initialize();
const streamId = `consistency-${isRedis ? 'redis' : 'inmem'}-${Date.now()}`;
// Test sequence
const job = await GenerationJobManager.createJob(streamId, 'user-1');
expect(job.streamId).toBe(streamId);
expect(job.status).toBe('running');
const hasJob = await GenerationJobManager.hasJob(streamId);
expect(hasJob).toBe(true);
await GenerationJobManager.updateMetadata(streamId, {
sender: 'ConsistencyAgent',
responseMessageId: 'resp-123',
});
const updated = await GenerationJobManager.getJob(streamId);
expect(updated?.metadata?.sender).toBe('ConsistencyAgent');
expect(updated?.metadata?.responseMessageId).toBe('resp-123');
await GenerationJobManager.completeJob(streamId);
const completed = await GenerationJobManager.getJob(streamId);
expect(completed?.status).toBe('complete');
await GenerationJobManager.destroy();
};
// Test in-memory mode
await runTestWithMode(false);
// Test Redis mode if available
if (ioredisClient) {
await runTestWithMode(true);
}
});
});
describe('createStreamServices Auto-Detection', () => {
test('should auto-detect Redis when USE_REDIS is true', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
// Force USE_REDIS to true
process.env.USE_REDIS = 'true';
jest.resetModules();
const { createStreamServices } = await import('../createStreamServices');
const services = createStreamServices();
// Should detect Redis
expect(services.isRedis).toBe(true);
});
test('should fall back to in-memory when USE_REDIS is false', async () => {
process.env.USE_REDIS = 'false';
jest.resetModules();
const { createStreamServices } = await import('../createStreamServices');
const services = createStreamServices();
expect(services.isRedis).toBe(false);
});
test('should allow forcing in-memory via config override', async () => {
const { createStreamServices } = await import('../createStreamServices');
const services = createStreamServices({ useRedis: false });
expect(services.isRedis).toBe(false);
});
});
});