diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 61646b3aa5..44b38f48f6 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -227,13 +227,17 @@ class GenerationJobManagerClass { /** * Set up all-subscribers-left callback. * When all SSE clients disconnect, this: - * 1. Resets syncSent so reconnecting clients get sync event + * 1. Resets syncSent so reconnecting clients get sync event (persisted to Redis) * 2. Calls any registered allSubscribersLeft handlers (e.g., to save partial responses) */ this.eventTransport.onAllSubscribersLeft(streamId, () => { const currentRuntime = this.runtimeState.get(streamId); if (currentRuntime) { currentRuntime.syncSent = false; + // Persist syncSent=false to Redis for cross-replica consistency + this.jobStore.updateJob(streamId, { syncSent: false }).catch((err) => { + logger.error(`[GenerationJobManager] Failed to persist syncSent=false:`, err); + }); // Call registered handlers (from job.emitter.on('allSubscribersLeft', ...)) if (currentRuntime.allSubscribersLeftHandlers) { this.jobStore @@ -258,6 +262,21 @@ class GenerationJobManagerClass { } }); + /** + * Set up cross-replica abort listener (Redis mode only). + * When abort is triggered on ANY replica, this replica receives the signal + * and aborts its local AbortController (if it's the one running generation). + */ + if (this.eventTransport.onAbort) { + this.eventTransport.onAbort(streamId, () => { + const currentRuntime = this.runtimeState.get(streamId); + if (currentRuntime && !currentRuntime.abortController.signal.aborted) { + logger.debug(`[GenerationJobManager] Received cross-replica abort for ${streamId}`); + currentRuntime.abortController.abort(); + } + }); + } + logger.debug(`[GenerationJobManager] Created job: ${streamId}`); // Return facade for backwards compatibility @@ -343,15 +362,133 @@ class GenerationJobManagerClass { }; } + /** + * Get or create runtime state for a job. + * + * This enables cross-replica support in Redis mode: + * - If runtime exists locally (same replica), return it + * - If job exists in Redis but not locally (cross-replica), create minimal runtime + * + * The lazily-created runtime state is sufficient for: + * - Subscribing to events (via Redis pub/sub) + * - Getting resume state + * - Handling reconnections + * - Receiving cross-replica abort signals (via Redis pub/sub) + * + * @param streamId - The stream identifier + * @returns Runtime state or null if job doesn't exist anywhere + */ + private async getOrCreateRuntimeState(streamId: string): Promise { + const existingRuntime = this.runtimeState.get(streamId); + if (existingRuntime) { + return existingRuntime; + } + + // Job doesn't exist locally - check Redis + const jobData = await this.jobStore.getJob(streamId); + if (!jobData) { + return null; + } + + // Cross-replica scenario: job exists in Redis but not locally + // Create minimal runtime state for handling reconnection/subscription + logger.debug(`[GenerationJobManager] Creating cross-replica runtime for ${streamId}`); + + let resolveReady: () => void; + const readyPromise = new Promise((resolve) => { + resolveReady = resolve; + }); + + // For jobs created on other replicas, readyPromise should be pre-resolved + // since generation has already started + resolveReady!(); + + // Parse finalEvent from Redis if available + let finalEvent: t.ServerSentEvent | undefined; + if (jobData.finalEvent) { + try { + finalEvent = JSON.parse(jobData.finalEvent) as t.ServerSentEvent; + } catch { + // Ignore parse errors + } + } + + const runtime: RuntimeJobState = { + abortController: new AbortController(), + readyPromise, + resolveReady: resolveReady!, + syncSent: jobData.syncSent ?? false, + earlyEventBuffer: [], + hasSubscriber: false, + finalEvent, + }; + + this.runtimeState.set(streamId, runtime); + + // Set up all-subscribers-left callback for this replica + this.eventTransport.onAllSubscribersLeft(streamId, () => { + const currentRuntime = this.runtimeState.get(streamId); + if (currentRuntime) { + currentRuntime.syncSent = false; + // Persist syncSent=false to Redis + this.jobStore.updateJob(streamId, { syncSent: false }).catch((err) => { + logger.error(`[GenerationJobManager] Failed to persist syncSent=false:`, err); + }); + // Call registered handlers + if (currentRuntime.allSubscribersLeftHandlers) { + this.jobStore + .getContentParts(streamId) + .then((result) => { + const parts = result?.content ?? []; + for (const handler of currentRuntime.allSubscribersLeftHandlers ?? []) { + try { + handler(parts); + } catch (err) { + logger.error(`[GenerationJobManager] Error in allSubscribersLeft handler:`, err); + } + } + }) + .catch((err) => { + logger.error( + `[GenerationJobManager] Failed to get content parts for allSubscribersLeft handlers:`, + err, + ); + }); + } + } + }); + + // Set up cross-replica abort listener (Redis mode only) + // This ensures lazily-initialized jobs can receive abort signals + if (this.eventTransport.onAbort) { + this.eventTransport.onAbort(streamId, () => { + const currentRuntime = this.runtimeState.get(streamId); + if (currentRuntime && !currentRuntime.abortController.signal.aborted) { + logger.debug( + `[GenerationJobManager] Received cross-replica abort for lazily-init job ${streamId}`, + ); + currentRuntime.abortController.abort(); + } + }); + } + + return runtime; + } + /** * Get a job by streamId. */ async getJob(streamId: string): Promise { const jobData = await this.jobStore.getJob(streamId); - const runtime = this.runtimeState.get(streamId); - if (!jobData || !runtime) { + if (!jobData) { return undefined; } + + const runtime = await this.getOrCreateRuntimeState(streamId); + if (!runtime) { + return undefined; + } + return this.buildJobFacade(streamId, jobData, runtime); } @@ -411,6 +548,10 @@ class GenerationJobManagerClass { /** * Abort a job (user-initiated). * Returns all data needed for token spending and message saving. + * + * Cross-replica support (Redis mode): + * - Emits abort signal via Redis pub/sub + * - The replica running generation receives signal and aborts its AbortController */ async abortJob(streamId: string): Promise { const jobData = await this.jobStore.getJob(streamId); @@ -421,6 +562,13 @@ class GenerationJobManagerClass { return { success: false, jobData: null, content: [], finalEvent: null }; } + // Emit abort signal for cross-replica support (Redis mode) + // This ensures the generating replica receives the abort signal + if (this.eventTransport.emitAbort) { + this.eventTransport.emitAbort(streamId); + } + + // Also abort local controller if we have it (same-replica abort) if (runtime) { runtime.abortController.abort(); } @@ -506,6 +654,10 @@ class GenerationJobManagerClass { * - Resolves readyPromise (legacy, for API compatibility) * - Replays any buffered early events (e.g., 'created' event) * + * Supports cross-replica reconnection in Redis mode: + * - If job exists in Redis but not locally, creates minimal runtime state + * - Events are delivered via Redis pub/sub, not in-memory EventEmitter + * * @param streamId - The stream to subscribe to * @param onChunk - Handler for chunk events (streamed tokens, run steps, etc.) * @param onDone - Handler for completion event (includes final message) @@ -518,7 +670,8 @@ class GenerationJobManagerClass { onDone?: t.DoneHandler, onError?: t.ErrorHandler, ): Promise<{ unsubscribe: t.UnsubscribeFn } | null> { - const runtime = this.runtimeState.get(streamId); + // Use lazy initialization to support cross-replica subscriptions + const runtime = await this.getOrCreateRuntimeState(streamId); if (!runtime) { return null; } @@ -788,29 +941,46 @@ class GenerationJobManagerClass { /** * Mark that sync has been sent. + * Persists to Redis for cross-replica consistency. */ markSyncSent(streamId: string): void { const runtime = this.runtimeState.get(streamId); if (runtime) { runtime.syncSent = true; } + // Persist to Redis for cross-replica consistency + this.jobStore.updateJob(streamId, { syncSent: true }).catch((err) => { + logger.error(`[GenerationJobManager] Failed to persist syncSent flag:`, err); + }); } /** * Check if sync has been sent. + * Checks local runtime first, then falls back to Redis for cross-replica scenarios. */ - wasSyncSent(streamId: string): boolean { - return this.runtimeState.get(streamId)?.syncSent ?? false; + async wasSyncSent(streamId: string): Promise { + const localSyncSent = this.runtimeState.get(streamId)?.syncSent; + if (localSyncSent !== undefined) { + return localSyncSent; + } + // Cross-replica: check Redis + const jobData = await this.jobStore.getJob(streamId); + return jobData?.syncSent ?? false; } /** * Emit a done event. + * Persists finalEvent to Redis for cross-replica access. */ emitDone(streamId: string, event: t.ServerSentEvent): void { const runtime = this.runtimeState.get(streamId); if (runtime) { runtime.finalEvent = event; } + // Persist finalEvent to Redis for cross-replica consistency + this.jobStore.updateJob(streamId, { finalEvent: JSON.stringify(event) }).catch((err) => { + logger.error(`[GenerationJobManager] Failed to persist finalEvent:`, err); + }); this.eventTransport.emitDone(streamId, event); } diff --git a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts index c593d3d15a..d95376f782 100644 --- a/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/GenerationJobManager.stream_integration.spec.ts @@ -377,6 +377,426 @@ describe('GenerationJobManager Integration Tests', () => { }); }); + describe('Cross-Replica Support (Redis)', () => { + /** + * Problem: In k8s with Redis and multiple replicas, when a user sends a message: + * 1. POST /api/agents/chat hits Replica A, creates job + * 2. GET /api/agents/chat/stream/:streamId hits Replica B + * 3. Replica B calls getJob() which returned undefined because runtimeState + * was only in Replica A's memory + * 4. Stream endpoint returns 404 + * + * Fix: getJob() and subscribe() now lazily create runtime state from Redis + * when the job exists in Redis but not in local memory. + */ + test('should NOT return 404 when stream endpoint hits different replica than job creator', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + + // === REPLICA A: Creates the job === + // Simulate Replica A creating the job directly in Redis + // (In real scenario, this happens via GenerationJobManager.createJob on Replica A) + const replicaAJobStore = new RedisJobStore(ioredisClient); + await replicaAJobStore.initialize(); + + const streamId = `cross-replica-404-test-${Date.now()}`; + const userId = 'test-user'; + + // Create job in Redis (simulates Replica A's createJob) + await replicaAJobStore.createJob(streamId, userId); + + // === REPLICA B: Receives the stream request === + // Fresh GenerationJobManager that does NOT have this job in its local runtimeState + jest.resetModules(); + const { GenerationJobManager } = await import('../GenerationJobManager'); + const { createStreamServices } = await import('../createStreamServices'); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + await GenerationJobManager.initialize(); + + // This is what the stream endpoint does: + // const job = await GenerationJobManager.getJob(streamId); + // if (!job) return res.status(404).json({ error: 'Stream not found' }); + + const job = await GenerationJobManager.getJob(streamId); + + // BEFORE FIX: job would be undefined → 404 + // AFTER FIX: job should exist via lazy runtime state creation + expect(job).not.toBeNull(); + expect(job).toBeDefined(); + expect(job?.streamId).toBe(streamId); + + // The stream endpoint then calls subscribe: + // const result = await GenerationJobManager.subscribe(streamId, onChunk, onDone, onError); + // if (!result) return res.status(404).json({ error: 'Failed to subscribe' }); + + const subscription = await GenerationJobManager.subscribe( + streamId, + () => {}, // onChunk + () => {}, // onDone + () => {}, // onError + ); + + // BEFORE FIX: subscription would be null → 404 + // AFTER FIX: subscription should succeed + expect(subscription).not.toBeNull(); + expect(subscription).toBeDefined(); + expect(typeof subscription?.unsubscribe).toBe('function'); + + // Cleanup + subscription?.unsubscribe(); + await GenerationJobManager.destroy(); + await replicaAJobStore.destroy(); + }); + + test('should lazily create runtime state for jobs created on other replicas', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + // Simulate two instances - one creates job, other tries to get it + const { createStreamServices } = await import('../createStreamServices'); + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + + // Instance 1: Create the job directly in Redis (simulating another replica) + const jobStore = new RedisJobStore(ioredisClient); + await jobStore.initialize(); + + const streamId = `cross-replica-${Date.now()}`; + const userId = 'test-user'; + + // Create job data directly in jobStore (as if from another instance) + await jobStore.createJob(streamId, userId); + + // Instance 2: Fresh GenerationJobManager that doesn't have this job in memory + jest.resetModules(); + const { GenerationJobManager } = await import('../GenerationJobManager'); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + await GenerationJobManager.initialize(); + + // This should work even though the job was created by "another instance" + // The manager should lazily create runtime state from Redis data + const job = await GenerationJobManager.getJob(streamId); + + expect(job).not.toBeNull(); + expect(job?.streamId).toBe(streamId); + expect(job?.status).toBe('running'); + + // Should also be able to subscribe + const chunks: unknown[] = []; + const subscription = await GenerationJobManager.subscribe(streamId, (event) => { + chunks.push(event); + }); + + expect(subscription).not.toBeNull(); + + subscription?.unsubscribe(); + await GenerationJobManager.destroy(); + await jobStore.destroy(); + }); + + test('should persist syncSent to Redis for cross-replica consistency', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { GenerationJobManager } = await import('../GenerationJobManager'); + const { createStreamServices } = await import('../createStreamServices'); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + await GenerationJobManager.initialize(); + + const streamId = `sync-sent-${Date.now()}`; + await GenerationJobManager.createJob(streamId, 'user-1'); + + // Initially syncSent should be false + let wasSent = await GenerationJobManager.wasSyncSent(streamId); + expect(wasSent).toBe(false); + + // Mark sync sent + GenerationJobManager.markSyncSent(streamId); + + // Wait for async Redis update + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Should now be true + wasSent = await GenerationJobManager.wasSyncSent(streamId); + expect(wasSent).toBe(true); + + // Verify it's actually in Redis by checking via jobStore + const jobStore = services.jobStore; + const jobData = await jobStore.getJob(streamId); + expect(jobData?.syncSent).toBe(true); + + await GenerationJobManager.destroy(); + }); + + test('should persist finalEvent to Redis for cross-replica access', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { GenerationJobManager } = await import('../GenerationJobManager'); + const { createStreamServices } = await import('../createStreamServices'); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure({ + ...services, + cleanupOnComplete: false, // Keep job for verification + }); + await GenerationJobManager.initialize(); + + const streamId = `final-event-${Date.now()}`; + await GenerationJobManager.createJob(streamId, 'user-1'); + + // Emit done event with final data + const finalEventData = { + final: true, + conversation: { conversationId: streamId }, + responseMessage: { text: 'Hello world' }, + }; + GenerationJobManager.emitDone(streamId, finalEventData as never); + + // Wait for async Redis update + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Verify finalEvent is in Redis + const jobStore = services.jobStore; + const jobData = await jobStore.getJob(streamId); + expect(jobData?.finalEvent).toBeDefined(); + + const storedFinalEvent = JSON.parse(jobData!.finalEvent!); + expect(storedFinalEvent.final).toBe(true); + expect(storedFinalEvent.conversation.conversationId).toBe(streamId); + + await GenerationJobManager.destroy(); + }); + + test('should emit cross-replica abort signal via Redis pub/sub', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { GenerationJobManager } = await import('../GenerationJobManager'); + const { createStreamServices } = await import('../createStreamServices'); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + await GenerationJobManager.initialize(); + + const streamId = `abort-signal-${Date.now()}`; + const job = await GenerationJobManager.createJob(streamId, 'user-1'); + + // Track if abort controller was signaled + let abortSignaled = false; + job.abortController.signal.addEventListener('abort', () => { + abortSignaled = true; + }); + + // Wait for abort listener setup + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Abort the job - this should emit abort signal via Redis + await GenerationJobManager.abortJob(streamId); + + // Wait for signal propagation + await new Promise((resolve) => setTimeout(resolve, 100)); + + // The local abort controller should be signaled + expect(abortSignaled).toBe(true); + expect(job.abortController.signal.aborted).toBe(true); + + await GenerationJobManager.destroy(); + }); + + test('should handle abort for lazily-initialized cross-replica jobs', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + // This test validates that jobs created on Replica A and lazily-initialized + // on Replica B can still receive and handle abort signals. + + const { createStreamServices } = await import('../createStreamServices'); + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + + // === Replica A: Create job directly in Redis === + const replicaAJobStore = new RedisJobStore(ioredisClient); + await replicaAJobStore.initialize(); + + const streamId = `lazy-abort-${Date.now()}`; + await replicaAJobStore.createJob(streamId, 'user-1'); + + // === Replica B: Fresh manager that lazily initializes the job === + jest.resetModules(); + const { GenerationJobManager } = await import('../GenerationJobManager'); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + await GenerationJobManager.initialize(); + + // Get job triggers lazy initialization of runtime state + const job = await GenerationJobManager.getJob(streamId); + expect(job).not.toBeNull(); + + // Track abort signal + let abortSignaled = false; + job!.abortController.signal.addEventListener('abort', () => { + abortSignaled = true; + }); + + // Wait for abort listener to be set up via Redis subscription + await new Promise((resolve) => setTimeout(resolve, 150)); + + // Abort the job - this should emit abort signal via Redis pub/sub + // The lazily-initialized runtime should receive it + await GenerationJobManager.abortJob(streamId); + + // Wait for signal propagation + await new Promise((resolve) => setTimeout(resolve, 150)); + + // Verify the lazily-initialized job received the abort signal + expect(abortSignaled).toBe(true); + expect(job!.abortController.signal.aborted).toBe(true); + + await GenerationJobManager.destroy(); + await replicaAJobStore.destroy(); + }); + + test('should abort generation when abort signal received from another replica', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + // This test simulates: + // 1. Replica A creates a job and starts generation + // 2. Replica B receives abort request and emits abort signal + // 3. Replica A receives signal and aborts its AbortController + + const { createStreamServices } = await import('../createStreamServices'); + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + // Create the job on "Replica A" + const { GenerationJobManager } = await import('../GenerationJobManager'); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + await GenerationJobManager.initialize(); + + const streamId = `cross-abort-${Date.now()}`; + const job = await GenerationJobManager.createJob(streamId, 'user-1'); + + let abortSignaled = false; + job.abortController.signal.addEventListener('abort', () => { + abortSignaled = true; + }); + + // Wait for abort listener to be set up via Redis subscription + await new Promise((resolve) => setTimeout(resolve, 150)); + + // Simulate "Replica B" emitting abort signal directly via Redis + // This is what would happen if abortJob was called on a different replica + const subscriber2 = (ioredisClient as unknown as { duplicate: () => unknown }).duplicate(); + const replicaBTransport = new RedisEventTransport( + ioredisClient as never, + subscriber2 as never, + ); + + // Emit abort signal (as if from Replica B) + replicaBTransport.emitAbort(streamId); + + // Wait for cross-replica signal propagation + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Replica A's abort controller should be signaled + expect(abortSignaled).toBe(true); + expect(job.abortController.signal.aborted).toBe(true); + + replicaBTransport.destroy(); + (subscriber2 as { disconnect: () => void }).disconnect(); + await GenerationJobManager.destroy(); + }); + + test('should handle wasSyncSent for cross-replica scenarios', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { createStreamServices } = await import('../createStreamServices'); + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + + // Create job directly in Redis with syncSent: true + const jobStore = new RedisJobStore(ioredisClient); + await jobStore.initialize(); + + const streamId = `cross-sync-${Date.now()}`; + await jobStore.createJob(streamId, 'user-1'); + await jobStore.updateJob(streamId, { syncSent: true }); + + // Fresh manager that doesn't have this job locally + jest.resetModules(); + const { GenerationJobManager } = await import('../GenerationJobManager'); + + const services = createStreamServices({ + useRedis: true, + redisClient: ioredisClient, + }); + + GenerationJobManager.configure(services); + await GenerationJobManager.initialize(); + + // wasSyncSent should check Redis even without local runtime + const wasSent = await GenerationJobManager.wasSyncSent(streamId); + expect(wasSent).toBe(true); + + await GenerationJobManager.destroy(); + await jobStore.destroy(); + }); + }); + describe('createStreamServices Auto-Detection', () => { test('should auto-detect Redis when USE_REDIS is true', async () => { if (!ioredisClient) { diff --git a/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts b/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts index b70e53012e..31266b3e11 100644 --- a/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/RedisEventTransport.stream_integration.spec.ts @@ -294,6 +294,154 @@ describe('RedisEventTransport Integration Tests', () => { }); }); + describe('Cross-Replica Abort', () => { + test('should emit and receive abort signals on same instance', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const subscriber = (ioredisClient as Redis).duplicate(); + const transport = new RedisEventTransport(ioredisClient, subscriber); + + const streamId = `abort-same-${Date.now()}`; + let abortReceived = false; + + // Register abort callback + transport.onAbort(streamId, () => { + abortReceived = true; + }); + + // Wait for subscription to be established + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Emit abort + transport.emitAbort(streamId); + + // Wait for signal to propagate + await new Promise((resolve) => setTimeout(resolve, 200)); + + expect(abortReceived).toBe(true); + + transport.destroy(); + subscriber.disconnect(); + }); + + test('should deliver abort signals across transport instances', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + // Two separate instances (simulating two servers) + const subscriber1 = (ioredisClient as Redis).duplicate(); + const subscriber2 = (ioredisClient as Redis).duplicate(); + + const transport1 = new RedisEventTransport(ioredisClient, subscriber1); + const transport2 = new RedisEventTransport(ioredisClient, subscriber2); + + const streamId = `abort-cross-${Date.now()}`; + let instance1AbortReceived = false; + + // Instance 1 registers abort callback (simulates server running generation) + transport1.onAbort(streamId, () => { + instance1AbortReceived = true; + }); + + // Wait for subscription + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Instance 2 emits abort (simulates server receiving abort request) + transport2.emitAbort(streamId); + + // Wait for cross-instance delivery + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Instance 1 should receive abort signal + expect(instance1AbortReceived).toBe(true); + + transport1.destroy(); + transport2.destroy(); + subscriber1.disconnect(); + subscriber2.disconnect(); + }); + + test('should call multiple abort callbacks', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const subscriber = (ioredisClient as Redis).duplicate(); + const transport = new RedisEventTransport(ioredisClient, subscriber); + + const streamId = `abort-multi-${Date.now()}`; + let callback1Called = false; + let callback2Called = false; + + // Multiple abort callbacks + transport.onAbort(streamId, () => { + callback1Called = true; + }); + transport.onAbort(streamId, () => { + callback2Called = true; + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + transport.emitAbort(streamId); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + expect(callback1Called).toBe(true); + expect(callback2Called).toBe(true); + + transport.destroy(); + subscriber.disconnect(); + }); + + test('should cleanup abort callbacks on stream cleanup', async () => { + if (!ioredisClient) { + console.warn('Redis not available, skipping test'); + return; + } + + const { RedisEventTransport } = await import('../implementations/RedisEventTransport'); + + const subscriber = (ioredisClient as Redis).duplicate(); + const transport = new RedisEventTransport(ioredisClient, subscriber); + + const streamId = `abort-cleanup-${Date.now()}`; + let abortReceived = false; + + transport.onAbort(streamId, () => { + abortReceived = true; + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Cleanup the stream + transport.cleanup(streamId); + + // Emit abort after cleanup + transport.emitAbort(streamId); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Should NOT receive abort since stream was cleaned up + expect(abortReceived).toBe(false); + + transport.destroy(); + subscriber.disconnect(); + }); + }); + describe('Cleanup', () => { test('should clean up stream resources', async () => { if (!ioredisClient) { diff --git a/packages/api/src/stream/implementations/RedisEventTransport.ts b/packages/api/src/stream/implementations/RedisEventTransport.ts index 79aa05699a..02d4cb69ed 100644 --- a/packages/api/src/stream/implementations/RedisEventTransport.ts +++ b/packages/api/src/stream/implementations/RedisEventTransport.ts @@ -17,6 +17,7 @@ const EventTypes = { CHUNK: 'chunk', DONE: 'done', ERROR: 'error', + ABORT: 'abort', } as const; interface PubSubMessage { @@ -39,6 +40,8 @@ interface StreamSubscribers { } >; allSubscribersLeftCallbacks: Array<() => void>; + /** Abort callbacks - called when abort signal is received from any replica */ + abortCallbacks: Array<() => void>; } /** @@ -119,6 +122,20 @@ export class RedisEventTransport implements IEventTransport { case EventTypes.ERROR: handlers.onError?.(parsed.error ?? 'Unknown error'); break; + case EventTypes.ABORT: + // Abort is handled at stream level, not per-handler + break; + } + } + + // Handle abort signals at stream level (not per-handler) + if (parsed.type === EventTypes.ABORT) { + for (const callback of streamState.abortCallbacks) { + try { + callback(); + } catch (err) { + logger.error(`[RedisEventTransport] Error in abort callback:`, err); + } } } } catch (err) { @@ -149,6 +166,7 @@ export class RedisEventTransport implements IEventTransport { count: 0, handlers: new Map(), allSubscribersLeftCallbacks: [], + abortCallbacks: [], }); } @@ -263,6 +281,53 @@ export class RedisEventTransport implements IEventTransport { count: 0, handlers: new Map(), allSubscribersLeftCallbacks: [callback], + abortCallbacks: [], + }); + } + } + + /** + * Publish an abort signal to all replicas. + * This enables cross-replica abort: when a user aborts on Replica B, + * the generating Replica A receives the signal and stops. + */ + emitAbort(streamId: string): void { + const channel = CHANNELS.events(streamId); + const message: PubSubMessage = { type: EventTypes.ABORT }; + + this.publisher.publish(channel, JSON.stringify(message)).catch((err) => { + logger.error(`[RedisEventTransport] Failed to publish abort:`, err); + }); + } + + /** + * Register callback for abort signals from any replica. + * Called when abort is triggered on any replica (including this one). + * + * @param streamId - The stream identifier + * @param callback - Called when abort signal is received + */ + onAbort(streamId: string, callback: () => void): void { + const channel = CHANNELS.events(streamId); + let state = this.streams.get(streamId); + + if (!state) { + state = { + count: 0, + handlers: new Map(), + allSubscribersLeftCallbacks: [], + abortCallbacks: [], + }; + this.streams.set(streamId, state); + } + + state.abortCallbacks.push(callback); + + // Subscribe to Redis channel if not already subscribed + if (!this.subscribedChannels.has(channel)) { + this.subscribedChannels.add(channel); + this.subscriber.subscribe(channel).catch((err) => { + logger.error(`[RedisEventTransport] Failed to subscribe to ${channel}:`, err); }); } } @@ -282,9 +347,10 @@ export class RedisEventTransport implements IEventTransport { const state = this.streams.get(streamId); if (state) { - // Clear all handlers + // Clear all handlers and callbacks state.handlers.clear(); state.allSubscribersLeftCallbacks = []; + state.abortCallbacks = []; } // Unsubscribe from Redis channel diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index 80cdd30d83..14611a7fad 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -236,6 +236,21 @@ export interface IEventTransport { /** Publish an error event */ emitError(streamId: string, error: string): void; + /** + * Publish an abort signal to all replicas (Redis mode). + * Enables cross-replica abort: user aborts on Replica B, + * generating Replica A receives signal and stops. + * Optional - only implemented in Redis transport. + */ + emitAbort?(streamId: string): void; + + /** + * Register callback for abort signals from any replica (Redis mode). + * Called when abort is triggered from any replica. + * Optional - only implemented in Redis transport. + */ + onAbort?(streamId: string, callback: () => void): void; + /** Get subscriber count for a stream */ getSubscriberCount(streamId: string): number;