From c378e777efa23479b27db729955087f86f500514 Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Thu, 15 Jan 2026 23:02:03 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=AA=B5=20refactor:=20Preserve=20Job=20Err?= =?UTF-8?q?or=20State=20for=20Late=20Stream=20Subscribers=20(#11372)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 🪵 refactor: Preserve job error state for late stream subscribers * 🔧 fix: Enhance error handling for late subscribers in GenerationJobManager - Implemented a cleanup strategy for error jobs to prevent immediate deletion, allowing late clients to receive error messages. - Updated job status handling to prioritize error notifications over completion events. - Added integration tests to verify error preservation and proper notification to late subscribers, including scenarios with Redis support. --- .../api/src/stream/GenerationJobManager.ts | 64 +++- ...ationJobManager.stream_integration.spec.ts | 276 ++++++++++++++++++ .../implementations/InMemoryEventTransport.ts | 6 +- 3 files changed, 335 insertions(+), 11 deletions(-) diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 44b38f48f6..13544fc445 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -33,6 +33,7 @@ export interface GenerationJobManagerOptions { * @property readyPromise - Resolves immediately (legacy, kept for API compatibility) * @property resolveReady - Function to resolve readyPromise * @property finalEvent - Cached final event for late subscribers + * @property errorEvent - Cached error event for late subscribers (errors before client connects) * @property syncSent - Whether sync event was sent (reset when all subscribers leave) * @property earlyEventBuffer - Buffer for events emitted before first subscriber connects * @property hasSubscriber - Whether at least one subscriber has connected @@ -47,6 +48,7 @@ interface RuntimeJobState { readyPromise: Promise; resolveReady: () => void; finalEvent?: t.ServerSentEvent; + errorEvent?: string; syncSent: boolean; earlyEventBuffer: t.ServerSentEvent[]; hasSubscriber: boolean; @@ -421,6 +423,7 @@ class GenerationJobManagerClass { earlyEventBuffer: [], hasSubscriber: false, finalEvent, + errorEvent: jobData.error, }; this.runtimeState.set(streamId, runtime); @@ -510,6 +513,8 @@ class GenerationJobManagerClass { /** * Mark job as complete. * If cleanupOnComplete is true (default), immediately cleans up job resources. + * Exception: Jobs with errors are NOT immediately deleted to allow late-connecting + * clients to receive the error (race condition where error occurs before client connects). * Note: eventTransport is NOT cleaned up here to allow the final event to be * fully transmitted. It will be cleaned up when subscribers disconnect or * by the periodic cleanup job. @@ -527,7 +532,29 @@ class GenerationJobManagerClass { this.jobStore.clearContentState(streamId); this.runStepBuffers?.delete(streamId); - // Immediate cleanup if configured (default: true) + // For error jobs, DON'T delete immediately - keep around so late-connecting + // clients can receive the error. This handles the race condition where error + // occurs before client connects to SSE stream. + // + // Cleanup strategy: Error jobs are cleaned up by periodic cleanup (every 60s) + // via jobStore.cleanup() which checks for jobs with status 'error' and + // completedAt set. The TTL is configurable via jobStore options (default: 0, + // meaning cleanup on next interval). This gives clients ~60s to connect and + // receive the error before the job is removed. + if (error) { + await this.jobStore.updateJob(streamId, { + status: 'error', + completedAt: Date.now(), + error, + }); + // Keep runtime state so subscribe() can access errorEvent + logger.debug( + `[GenerationJobManager] Job completed with error (keeping for late subscribers): ${streamId}`, + ); + return; + } + + // Immediate cleanup if configured (default: true) - only for successful completions if (this._cleanupOnComplete) { this.runtimeState.delete(streamId); // Don't cleanup eventTransport here - let the done event fully transmit first. @@ -536,9 +563,8 @@ class GenerationJobManagerClass { } else { // Only update status if keeping the job around await this.jobStore.updateJob(streamId, { - status: error ? 'error' : 'complete', + status: 'complete', completedAt: Date.now(), - error, }); } @@ -678,14 +704,22 @@ class GenerationJobManagerClass { const jobData = await this.jobStore.getJob(streamId); - // If job already complete, send final event + // If job already complete/error, send final event or error + // Error status takes precedence to ensure errors aren't misreported as successes setImmediate(() => { - if ( - runtime.finalEvent && - jobData && - ['complete', 'error', 'aborted'].includes(jobData.status) - ) { - onDone?.(runtime.finalEvent); + if (jobData && ['complete', 'error', 'aborted'].includes(jobData.status)) { + // Check for error status FIRST and prioritize error handling + if (jobData.status === 'error' && (runtime.errorEvent || jobData.error)) { + const errorToSend = runtime.errorEvent ?? jobData.error; + if (errorToSend) { + logger.debug( + `[GenerationJobManager] Sending stored error to late subscriber: ${streamId}`, + ); + onError?.(errorToSend); + } + } else if (runtime.finalEvent) { + onDone?.(runtime.finalEvent); + } } }); @@ -986,8 +1020,18 @@ class GenerationJobManagerClass { /** * Emit an error event. + * Stores the error for late-connecting subscribers (race condition where error + * occurs before client connects to SSE stream). */ emitError(streamId: string, error: string): void { + const runtime = this.runtimeState.get(streamId); + if (runtime) { + runtime.errorEvent = error; + } + // Persist error to job store for cross-replica consistency + this.jobStore.updateJob(streamId, { error }).catch((err) => { + logger.error(`[GenerationJobManager] Failed to persist error:`, err); + }); this.eventTransport.emitError(streamId, error); } diff --git a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts index 4471d8c95d..e3ea16c8f0 100644 --- a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts @@ -796,6 +796,282 @@ describe('GenerationJobManager Integration Tests', () => { }); }); + describe('Error Preservation for Late Subscribers', () => { + /** + * These tests verify the fix for the race condition where errors + * (like INPUT_LENGTH) occur before the SSE client connects. + * + * Problem: Error → emitError → completeJob → job deleted → client connects → 404 + * Fix: Store error, don't delete job immediately, send error to late subscriber + */ + + test('should store error in emitError for late-connecting subscribers', async () => { + const { GenerationJobManager } = await import('../GenerationJobManager'); + const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); + const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); + + GenerationJobManager.configure({ + jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), + eventTransport: new InMemoryEventTransport(), + isRedis: false, + cleanupOnComplete: false, + }); + + await GenerationJobManager.initialize(); + + const streamId = `error-store-${Date.now()}`; + await GenerationJobManager.createJob(streamId, 'user-1'); + + const errorMessage = '{ "type": "INPUT_LENGTH", "info": "234856 / 172627" }'; + + // Emit error (no subscribers yet - simulates race condition) + GenerationJobManager.emitError(streamId, errorMessage); + + // Wait for async job store update + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Verify error is stored in job store + const job = await GenerationJobManager.getJob(streamId); + expect(job?.error).toBe(errorMessage); + + await GenerationJobManager.destroy(); + }); + + test('should NOT delete job immediately when completeJob is called with error', async () => { + const { GenerationJobManager } = await import('../GenerationJobManager'); + const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); + const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); + + GenerationJobManager.configure({ + jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), + eventTransport: new InMemoryEventTransport(), + isRedis: false, + cleanupOnComplete: true, // Default behavior + }); + + await GenerationJobManager.initialize(); + + const streamId = `error-no-delete-${Date.now()}`; + await GenerationJobManager.createJob(streamId, 'user-1'); + + const errorMessage = 'Test error message'; + + // Complete with error + await GenerationJobManager.completeJob(streamId, errorMessage); + + // Job should still exist (not deleted) + const hasJob = await GenerationJobManager.hasJob(streamId); + expect(hasJob).toBe(true); + + // Job should have error status + const job = await GenerationJobManager.getJob(streamId); + expect(job?.status).toBe('error'); + expect(job?.error).toBe(errorMessage); + + await GenerationJobManager.destroy(); + }); + + test('should send stored error to late-connecting subscriber', async () => { + const { GenerationJobManager } = await import('../GenerationJobManager'); + const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); + const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); + + GenerationJobManager.configure({ + jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), + eventTransport: new InMemoryEventTransport(), + isRedis: false, + cleanupOnComplete: true, + }); + + await GenerationJobManager.initialize(); + + const streamId = `error-late-sub-${Date.now()}`; + await GenerationJobManager.createJob(streamId, 'user-1'); + + const errorMessage = '{ "type": "INPUT_LENGTH", "info": "234856 / 172627" }'; + + // Simulate race condition: error occurs before client connects + GenerationJobManager.emitError(streamId, errorMessage); + await GenerationJobManager.completeJob(streamId, errorMessage); + + // Wait for async operations + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Now client connects (late subscriber) + let receivedError: string | undefined; + const subscription = await GenerationJobManager.subscribe( + streamId, + () => {}, // onChunk + () => {}, // onDone + (error) => { + receivedError = error; + }, // onError + ); + + expect(subscription).not.toBeNull(); + + // Wait for setImmediate in subscribe to fire + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Late subscriber should receive the stored error + expect(receivedError).toBe(errorMessage); + + subscription?.unsubscribe(); + await GenerationJobManager.destroy(); + }); + + test('should prioritize error status over finalEvent in subscribe', async () => { + const { GenerationJobManager } = await import('../GenerationJobManager'); + const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); + const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); + + GenerationJobManager.configure({ + jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }), + eventTransport: new InMemoryEventTransport(), + isRedis: false, + cleanupOnComplete: false, + }); + + await GenerationJobManager.initialize(); + + const streamId = `error-priority-${Date.now()}`; + await GenerationJobManager.createJob(streamId, 'user-1'); + + const errorMessage = 'Error should take priority'; + + // Emit error and complete with error + GenerationJobManager.emitError(streamId, errorMessage); + await GenerationJobManager.completeJob(streamId, errorMessage); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Subscribe and verify error is received (not a done event) + let receivedError: string | undefined; + let receivedDone = false; + + const subscription = await GenerationJobManager.subscribe( + streamId, + () => {}, + () => { + receivedDone = true; + }, + (error) => { + receivedError = error; + }, + ); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Error should be received, not done + expect(receivedError).toBe(errorMessage); + expect(receivedDone).toBe(false); + + subscription?.unsubscribe(); + await GenerationJobManager.destroy(); + }); + + test('should handle error preservation in Redis mode (cross-replica)', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { createStreamServices } = await import('../createStreamServices'); + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + + // === Replica A: Creates job and emits error === + const replicaAJobStore = new RedisJobStore(ioredisClient); + await replicaAJobStore.initialize(); + + const streamId = `redis-error-${Date.now()}`; + const errorMessage = '{ "type": "INPUT_LENGTH", "info": "234856 / 172627" }'; + + await replicaAJobStore.createJob(streamId, 'user-1'); + await replicaAJobStore.updateJob(streamId, { + status: 'error', + error: errorMessage, + completedAt: Date.now(), + }); + + // === Replica B: Fresh manager receives client connection === + jest.resetModules(); + const { GenerationJobManager } = await import('../GenerationJobManager'); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure({ + ...services, + cleanupOnComplete: false, + }); + await GenerationJobManager.initialize(); + + // Client connects to Replica B (job created on Replica A) + let receivedError: string | undefined; + const subscription = await GenerationJobManager.subscribe( + streamId, + () => {}, + () => {}, + (error) => { + receivedError = error; + }, + ); + + expect(subscription).not.toBeNull(); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Error should be loaded from Redis and sent to subscriber + expect(receivedError).toBe(errorMessage); + + subscription?.unsubscribe(); + await GenerationJobManager.destroy(); + await replicaAJobStore.destroy(); + }); + + test('error jobs should be cleaned up by periodic cleanup after TTL', async () => { + const { GenerationJobManager } = await import('../GenerationJobManager'); + const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore'); + const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport'); + + // Use a very short TTL for testing + const jobStore = new InMemoryJobStore({ ttlAfterComplete: 100 }); + + GenerationJobManager.configure({ + jobStore, + eventTransport: new InMemoryEventTransport(), + isRedis: false, + cleanupOnComplete: true, + }); + + await GenerationJobManager.initialize(); + + const streamId = `error-cleanup-${Date.now()}`; + await GenerationJobManager.createJob(streamId, 'user-1'); + + // Complete with error + await GenerationJobManager.completeJob(streamId, 'Test error'); + + // Job should exist immediately after error + let hasJob = await GenerationJobManager.hasJob(streamId); + expect(hasJob).toBe(true); + + // Wait for TTL to expire + await new Promise((resolve) => setTimeout(resolve, 150)); + + // Trigger cleanup + await jobStore.cleanup(); + + // Job should be cleaned up after TTL + hasJob = await GenerationJobManager.hasJob(streamId); + expect(hasJob).toBe(false); + + await GenerationJobManager.destroy(); + }); + }); + describe('createStreamServices Auto-Detection', () => { test('should auto-detect Redis when USE_REDIS is true', async () => { if (!ioredisClient) { diff --git a/packages/api/src/stream/implementations/InMemoryEventTransport.ts b/packages/api/src/stream/implementations/InMemoryEventTransport.ts index fd9c65e239..39b3d6029d 100644 --- a/packages/api/src/stream/implementations/InMemoryEventTransport.ts +++ b/packages/api/src/stream/implementations/InMemoryEventTransport.ts @@ -79,7 +79,11 @@ export class InMemoryEventTransport implements IEventTransport { emitError(streamId: string, error: string): void { const state = this.streams.get(streamId); - state?.emitter.emit('error', error); + // Only emit if there are listeners - Node.js throws on unhandled 'error' events + // This is intentional for the race condition where error occurs before client connects + if (state?.emitter.listenerCount('error') ?? 0 > 0) { + state?.emitter.emit('error', error); + } } getSubscriberCount(streamId: string): number {