🪐 feat: Cross-replica support in GenerationJobManager for Redis mode (#11233)
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run

* feat: Implement cross-replica support in GenerationJobManager for Redis mode

- Enhanced GenerationJobManager to support cross-replica scenarios by lazily creating runtime states from Redis when jobs exist but are not present in local memory.
- Added functionality to persist `syncSent` and `finalEvent` states to Redis for consistency across replicas.
- Implemented abort signal handling to allow replicas to receive and respond to abort requests from other instances, ensuring proper job termination.
- Updated tests to validate cross-replica behavior, including job retrieval, subscription, and abort signal propagation.

This update improves the robustness and reliability of job management in distributed environments.

* fix: Enhance error handling and implement abort signal for cross-replica jobs in GenerationJobManager

- Added error handling for Redis job updates in GenerationJobManager to log failures when persisting `syncSent` and `finalEvent` states.
- Implemented a listener for cross-replica abort signals, ensuring that lazily-initialized jobs can respond to abort requests from other replicas.
- Introduced a new integration test to validate the handling of abort signals for lazily-initialized jobs across replicas.

These changes improve the reliability and robustness of job management in distributed environments.
This commit is contained in:
Danny Avila 2026-01-06 11:39:24 -05:00 committed by GitHub
parent b5aa38ff33
commit a7645f4705
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 826 additions and 7 deletions

View file

@ -227,13 +227,17 @@ class GenerationJobManagerClass {
/**
* Set up all-subscribers-left callback.
* When all SSE clients disconnect, this:
* 1. Resets syncSent so reconnecting clients get sync event
* 1. Resets syncSent so reconnecting clients get sync event (persisted to Redis)
* 2. Calls any registered allSubscribersLeft handlers (e.g., to save partial responses)
*/
this.eventTransport.onAllSubscribersLeft(streamId, () => {
const currentRuntime = this.runtimeState.get(streamId);
if (currentRuntime) {
currentRuntime.syncSent = false;
// Persist syncSent=false to Redis for cross-replica consistency
this.jobStore.updateJob(streamId, { syncSent: false }).catch((err) => {
logger.error(`[GenerationJobManager] Failed to persist syncSent=false:`, err);
});
// Call registered handlers (from job.emitter.on('allSubscribersLeft', ...))
if (currentRuntime.allSubscribersLeftHandlers) {
this.jobStore
@ -258,6 +262,21 @@ class GenerationJobManagerClass {
}
});
/**
* Set up cross-replica abort listener (Redis mode only).
* When abort is triggered on ANY replica, this replica receives the signal
* and aborts its local AbortController (if it's the one running generation).
*/
if (this.eventTransport.onAbort) {
this.eventTransport.onAbort(streamId, () => {
const currentRuntime = this.runtimeState.get(streamId);
if (currentRuntime && !currentRuntime.abortController.signal.aborted) {
logger.debug(`[GenerationJobManager] Received cross-replica abort for ${streamId}`);
currentRuntime.abortController.abort();
}
});
}
logger.debug(`[GenerationJobManager] Created job: ${streamId}`);
// Return facade for backwards compatibility
@ -343,15 +362,133 @@ class GenerationJobManagerClass {
};
}
/**
* Get or create runtime state for a job.
*
* This enables cross-replica support in Redis mode:
* - If runtime exists locally (same replica), return it
* - If job exists in Redis but not locally (cross-replica), create minimal runtime
*
* The lazily-created runtime state is sufficient for:
* - Subscribing to events (via Redis pub/sub)
* - Getting resume state
* - Handling reconnections
* - Receiving cross-replica abort signals (via Redis pub/sub)
*
* @param streamId - The stream identifier
* @returns Runtime state or null if job doesn't exist anywhere
*/
private async getOrCreateRuntimeState(streamId: string): Promise<RuntimeJobState | null> {
const existingRuntime = this.runtimeState.get(streamId);
if (existingRuntime) {
return existingRuntime;
}
// Job doesn't exist locally - check Redis
const jobData = await this.jobStore.getJob(streamId);
if (!jobData) {
return null;
}
// Cross-replica scenario: job exists in Redis but not locally
// Create minimal runtime state for handling reconnection/subscription
logger.debug(`[GenerationJobManager] Creating cross-replica runtime for ${streamId}`);
let resolveReady: () => void;
const readyPromise = new Promise<void>((resolve) => {
resolveReady = resolve;
});
// For jobs created on other replicas, readyPromise should be pre-resolved
// since generation has already started
resolveReady!();
// Parse finalEvent from Redis if available
let finalEvent: t.ServerSentEvent | undefined;
if (jobData.finalEvent) {
try {
finalEvent = JSON.parse(jobData.finalEvent) as t.ServerSentEvent;
} catch {
// Ignore parse errors
}
}
const runtime: RuntimeJobState = {
abortController: new AbortController(),
readyPromise,
resolveReady: resolveReady!,
syncSent: jobData.syncSent ?? false,
earlyEventBuffer: [],
hasSubscriber: false,
finalEvent,
};
this.runtimeState.set(streamId, runtime);
// Set up all-subscribers-left callback for this replica
this.eventTransport.onAllSubscribersLeft(streamId, () => {
const currentRuntime = this.runtimeState.get(streamId);
if (currentRuntime) {
currentRuntime.syncSent = false;
// Persist syncSent=false to Redis
this.jobStore.updateJob(streamId, { syncSent: false }).catch((err) => {
logger.error(`[GenerationJobManager] Failed to persist syncSent=false:`, err);
});
// Call registered handlers
if (currentRuntime.allSubscribersLeftHandlers) {
this.jobStore
.getContentParts(streamId)
.then((result) => {
const parts = result?.content ?? [];
for (const handler of currentRuntime.allSubscribersLeftHandlers ?? []) {
try {
handler(parts);
} catch (err) {
logger.error(`[GenerationJobManager] Error in allSubscribersLeft handler:`, err);
}
}
})
.catch((err) => {
logger.error(
`[GenerationJobManager] Failed to get content parts for allSubscribersLeft handlers:`,
err,
);
});
}
}
});
// Set up cross-replica abort listener (Redis mode only)
// This ensures lazily-initialized jobs can receive abort signals
if (this.eventTransport.onAbort) {
this.eventTransport.onAbort(streamId, () => {
const currentRuntime = this.runtimeState.get(streamId);
if (currentRuntime && !currentRuntime.abortController.signal.aborted) {
logger.debug(
`[GenerationJobManager] Received cross-replica abort for lazily-init job ${streamId}`,
);
currentRuntime.abortController.abort();
}
});
}
return runtime;
}
/**
* Get a job by streamId.
*/
async getJob(streamId: string): Promise<t.GenerationJob | undefined> {
const jobData = await this.jobStore.getJob(streamId);
const runtime = this.runtimeState.get(streamId);
if (!jobData || !runtime) {
if (!jobData) {
return undefined;
}
const runtime = await this.getOrCreateRuntimeState(streamId);
if (!runtime) {
return undefined;
}
return this.buildJobFacade(streamId, jobData, runtime);
}
@ -411,6 +548,10 @@ class GenerationJobManagerClass {
/**
* Abort a job (user-initiated).
* Returns all data needed for token spending and message saving.
*
* Cross-replica support (Redis mode):
* - Emits abort signal via Redis pub/sub
* - The replica running generation receives signal and aborts its AbortController
*/
async abortJob(streamId: string): Promise<AbortResult> {
const jobData = await this.jobStore.getJob(streamId);
@ -421,6 +562,13 @@ class GenerationJobManagerClass {
return { success: false, jobData: null, content: [], finalEvent: null };
}
// Emit abort signal for cross-replica support (Redis mode)
// This ensures the generating replica receives the abort signal
if (this.eventTransport.emitAbort) {
this.eventTransport.emitAbort(streamId);
}
// Also abort local controller if we have it (same-replica abort)
if (runtime) {
runtime.abortController.abort();
}
@ -506,6 +654,10 @@ class GenerationJobManagerClass {
* - Resolves readyPromise (legacy, for API compatibility)
* - Replays any buffered early events (e.g., 'created' event)
*
* Supports cross-replica reconnection in Redis mode:
* - If job exists in Redis but not locally, creates minimal runtime state
* - Events are delivered via Redis pub/sub, not in-memory EventEmitter
*
* @param streamId - The stream to subscribe to
* @param onChunk - Handler for chunk events (streamed tokens, run steps, etc.)
* @param onDone - Handler for completion event (includes final message)
@ -518,7 +670,8 @@ class GenerationJobManagerClass {
onDone?: t.DoneHandler,
onError?: t.ErrorHandler,
): Promise<{ unsubscribe: t.UnsubscribeFn } | null> {
const runtime = this.runtimeState.get(streamId);
// Use lazy initialization to support cross-replica subscriptions
const runtime = await this.getOrCreateRuntimeState(streamId);
if (!runtime) {
return null;
}
@ -788,29 +941,46 @@ class GenerationJobManagerClass {
/**
* Mark that sync has been sent.
* Persists to Redis for cross-replica consistency.
*/
markSyncSent(streamId: string): void {
const runtime = this.runtimeState.get(streamId);
if (runtime) {
runtime.syncSent = true;
}
// Persist to Redis for cross-replica consistency
this.jobStore.updateJob(streamId, { syncSent: true }).catch((err) => {
logger.error(`[GenerationJobManager] Failed to persist syncSent flag:`, err);
});
}
/**
* Check if sync has been sent.
* Checks local runtime first, then falls back to Redis for cross-replica scenarios.
*/
wasSyncSent(streamId: string): boolean {
return this.runtimeState.get(streamId)?.syncSent ?? false;
async wasSyncSent(streamId: string): Promise<boolean> {
const localSyncSent = this.runtimeState.get(streamId)?.syncSent;
if (localSyncSent !== undefined) {
return localSyncSent;
}
// Cross-replica: check Redis
const jobData = await this.jobStore.getJob(streamId);
return jobData?.syncSent ?? false;
}
/**
* Emit a done event.
* Persists finalEvent to Redis for cross-replica access.
*/
emitDone(streamId: string, event: t.ServerSentEvent): void {
const runtime = this.runtimeState.get(streamId);
if (runtime) {
runtime.finalEvent = event;
}
// Persist finalEvent to Redis for cross-replica consistency
this.jobStore.updateJob(streamId, { finalEvent: JSON.stringify(event) }).catch((err) => {
logger.error(`[GenerationJobManager] Failed to persist finalEvent:`, err);
});
this.eventTransport.emitDone(streamId, event);
}

View file

@ -377,6 +377,426 @@ describe('GenerationJobManager Integration Tests', () => {
});
});
describe('Cross-Replica Support (Redis)', () => {
/**
* Problem: In k8s with Redis and multiple replicas, when a user sends a message:
* 1. POST /api/agents/chat hits Replica A, creates job
* 2. GET /api/agents/chat/stream/:streamId hits Replica B
* 3. Replica B calls getJob() which returned undefined because runtimeState
* was only in Replica A's memory
* 4. Stream endpoint returns 404
*
* Fix: getJob() and subscribe() now lazily create runtime state from Redis
* when the job exists in Redis but not in local memory.
*/
test('should NOT return 404 when stream endpoint hits different replica than job creator', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
// === REPLICA A: Creates the job ===
// Simulate Replica A creating the job directly in Redis
// (In real scenario, this happens via GenerationJobManager.createJob on Replica A)
const replicaAJobStore = new RedisJobStore(ioredisClient);
await replicaAJobStore.initialize();
const streamId = `cross-replica-404-test-${Date.now()}`;
const userId = 'test-user';
// Create job in Redis (simulates Replica A's createJob)
await replicaAJobStore.createJob(streamId, userId);
// === REPLICA B: Receives the stream request ===
// Fresh GenerationJobManager that does NOT have this job in its local runtimeState
jest.resetModules();
const { GenerationJobManager } = await import('../GenerationJobManager');
const { createStreamServices } = await import('../createStreamServices');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
await GenerationJobManager.initialize();
// This is what the stream endpoint does:
// const job = await GenerationJobManager.getJob(streamId);
// if (!job) return res.status(404).json({ error: 'Stream not found' });
const job = await GenerationJobManager.getJob(streamId);
// BEFORE FIX: job would be undefined → 404
// AFTER FIX: job should exist via lazy runtime state creation
expect(job).not.toBeNull();
expect(job).toBeDefined();
expect(job?.streamId).toBe(streamId);
// The stream endpoint then calls subscribe:
// const result = await GenerationJobManager.subscribe(streamId, onChunk, onDone, onError);
// if (!result) return res.status(404).json({ error: 'Failed to subscribe' });
const subscription = await GenerationJobManager.subscribe(
streamId,
() => {}, // onChunk
() => {}, // onDone
() => {}, // onError
);
// BEFORE FIX: subscription would be null → 404
// AFTER FIX: subscription should succeed
expect(subscription).not.toBeNull();
expect(subscription).toBeDefined();
expect(typeof subscription?.unsubscribe).toBe('function');
// Cleanup
subscription?.unsubscribe();
await GenerationJobManager.destroy();
await replicaAJobStore.destroy();
});
test('should lazily create runtime state for jobs created on other replicas', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
// Simulate two instances - one creates job, other tries to get it
const { createStreamServices } = await import('../createStreamServices');
const { RedisJobStore } = await import('../implementations/RedisJobStore');
// Instance 1: Create the job directly in Redis (simulating another replica)
const jobStore = new RedisJobStore(ioredisClient);
await jobStore.initialize();
const streamId = `cross-replica-${Date.now()}`;
const userId = 'test-user';
// Create job data directly in jobStore (as if from another instance)
await jobStore.createJob(streamId, userId);
// Instance 2: Fresh GenerationJobManager that doesn't have this job in memory
jest.resetModules();
const { GenerationJobManager } = await import('../GenerationJobManager');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
await GenerationJobManager.initialize();
// This should work even though the job was created by "another instance"
// The manager should lazily create runtime state from Redis data
const job = await GenerationJobManager.getJob(streamId);
expect(job).not.toBeNull();
expect(job?.streamId).toBe(streamId);
expect(job?.status).toBe('running');
// Should also be able to subscribe
const chunks: unknown[] = [];
const subscription = await GenerationJobManager.subscribe(streamId, (event) => {
chunks.push(event);
});
expect(subscription).not.toBeNull();
subscription?.unsubscribe();
await GenerationJobManager.destroy();
await jobStore.destroy();
});
test('should persist syncSent to Redis for cross-replica consistency', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { GenerationJobManager } = await import('../GenerationJobManager');
const { createStreamServices } = await import('../createStreamServices');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
await GenerationJobManager.initialize();
const streamId = `sync-sent-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1');
// Initially syncSent should be false
let wasSent = await GenerationJobManager.wasSyncSent(streamId);
expect(wasSent).toBe(false);
// Mark sync sent
GenerationJobManager.markSyncSent(streamId);
// Wait for async Redis update
await new Promise((resolve) => setTimeout(resolve, 50));
// Should now be true
wasSent = await GenerationJobManager.wasSyncSent(streamId);
expect(wasSent).toBe(true);
// Verify it's actually in Redis by checking via jobStore
const jobStore = services.jobStore;
const jobData = await jobStore.getJob(streamId);
expect(jobData?.syncSent).toBe(true);
await GenerationJobManager.destroy();
});
test('should persist finalEvent to Redis for cross-replica access', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { GenerationJobManager } = await import('../GenerationJobManager');
const { createStreamServices } = await import('../createStreamServices');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure({
...services,
cleanupOnComplete: false, // Keep job for verification
});
await GenerationJobManager.initialize();
const streamId = `final-event-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1');
// Emit done event with final data
const finalEventData = {
final: true,
conversation: { conversationId: streamId },
responseMessage: { text: 'Hello world' },
};
GenerationJobManager.emitDone(streamId, finalEventData as never);
// Wait for async Redis update
await new Promise((resolve) => setTimeout(resolve, 50));
// Verify finalEvent is in Redis
const jobStore = services.jobStore;
const jobData = await jobStore.getJob(streamId);
expect(jobData?.finalEvent).toBeDefined();
const storedFinalEvent = JSON.parse(jobData!.finalEvent!);
expect(storedFinalEvent.final).toBe(true);
expect(storedFinalEvent.conversation.conversationId).toBe(streamId);
await GenerationJobManager.destroy();
});
test('should emit cross-replica abort signal via Redis pub/sub', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { GenerationJobManager } = await import('../GenerationJobManager');
const { createStreamServices } = await import('../createStreamServices');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
await GenerationJobManager.initialize();
const streamId = `abort-signal-${Date.now()}`;
const job = await GenerationJobManager.createJob(streamId, 'user-1');
// Track if abort controller was signaled
let abortSignaled = false;
job.abortController.signal.addEventListener('abort', () => {
abortSignaled = true;
});
// Wait for abort listener setup
await new Promise((resolve) => setTimeout(resolve, 100));
// Abort the job - this should emit abort signal via Redis
await GenerationJobManager.abortJob(streamId);
// Wait for signal propagation
await new Promise((resolve) => setTimeout(resolve, 100));
// The local abort controller should be signaled
expect(abortSignaled).toBe(true);
expect(job.abortController.signal.aborted).toBe(true);
await GenerationJobManager.destroy();
});
test('should handle abort for lazily-initialized cross-replica jobs', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
// This test validates that jobs created on Replica A and lazily-initialized
// on Replica B can still receive and handle abort signals.
const { createStreamServices } = await import('../createStreamServices');
const { RedisJobStore } = await import('../implementations/RedisJobStore');
// === Replica A: Create job directly in Redis ===
const replicaAJobStore = new RedisJobStore(ioredisClient);
await replicaAJobStore.initialize();
const streamId = `lazy-abort-${Date.now()}`;
await replicaAJobStore.createJob(streamId, 'user-1');
// === Replica B: Fresh manager that lazily initializes the job ===
jest.resetModules();
const { GenerationJobManager } = await import('../GenerationJobManager');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
await GenerationJobManager.initialize();
// Get job triggers lazy initialization of runtime state
const job = await GenerationJobManager.getJob(streamId);
expect(job).not.toBeNull();
// Track abort signal
let abortSignaled = false;
job!.abortController.signal.addEventListener('abort', () => {
abortSignaled = true;
});
// Wait for abort listener to be set up via Redis subscription
await new Promise((resolve) => setTimeout(resolve, 150));
// Abort the job - this should emit abort signal via Redis pub/sub
// The lazily-initialized runtime should receive it
await GenerationJobManager.abortJob(streamId);
// Wait for signal propagation
await new Promise((resolve) => setTimeout(resolve, 150));
// Verify the lazily-initialized job received the abort signal
expect(abortSignaled).toBe(true);
expect(job!.abortController.signal.aborted).toBe(true);
await GenerationJobManager.destroy();
await replicaAJobStore.destroy();
});
test('should abort generation when abort signal received from another replica', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
// This test simulates:
// 1. Replica A creates a job and starts generation
// 2. Replica B receives abort request and emits abort signal
// 3. Replica A receives signal and aborts its AbortController
const { createStreamServices } = await import('../createStreamServices');
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
// Create the job on "Replica A"
const { GenerationJobManager } = await import('../GenerationJobManager');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
await GenerationJobManager.initialize();
const streamId = `cross-abort-${Date.now()}`;
const job = await GenerationJobManager.createJob(streamId, 'user-1');
let abortSignaled = false;
job.abortController.signal.addEventListener('abort', () => {
abortSignaled = true;
});
// Wait for abort listener to be set up via Redis subscription
await new Promise((resolve) => setTimeout(resolve, 150));
// Simulate "Replica B" emitting abort signal directly via Redis
// This is what would happen if abortJob was called on a different replica
const subscriber2 = (ioredisClient as unknown as { duplicate: () => unknown }).duplicate();
const replicaBTransport = new RedisEventTransport(
ioredisClient as never,
subscriber2 as never,
);
// Emit abort signal (as if from Replica B)
replicaBTransport.emitAbort(streamId);
// Wait for cross-replica signal propagation
await new Promise((resolve) => setTimeout(resolve, 200));
// Replica A's abort controller should be signaled
expect(abortSignaled).toBe(true);
expect(job.abortController.signal.aborted).toBe(true);
replicaBTransport.destroy();
(subscriber2 as { disconnect: () => void }).disconnect();
await GenerationJobManager.destroy();
});
test('should handle wasSyncSent for cross-replica scenarios', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { createStreamServices } = await import('../createStreamServices');
const { RedisJobStore } = await import('../implementations/RedisJobStore');
// Create job directly in Redis with syncSent: true
const jobStore = new RedisJobStore(ioredisClient);
await jobStore.initialize();
const streamId = `cross-sync-${Date.now()}`;
await jobStore.createJob(streamId, 'user-1');
await jobStore.updateJob(streamId, { syncSent: true });
// Fresh manager that doesn't have this job locally
jest.resetModules();
const { GenerationJobManager } = await import('../GenerationJobManager');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure(services);
await GenerationJobManager.initialize();
// wasSyncSent should check Redis even without local runtime
const wasSent = await GenerationJobManager.wasSyncSent(streamId);
expect(wasSent).toBe(true);
await GenerationJobManager.destroy();
await jobStore.destroy();
});
});
describe('createStreamServices Auto-Detection', () => {
test('should auto-detect Redis when USE_REDIS is true', async () => {
if (!ioredisClient) {

View file

@ -294,6 +294,154 @@ describe('RedisEventTransport Integration Tests', () => {
});
});
describe('Cross-Replica Abort', () => {
test('should emit and receive abort signals on same instance', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const subscriber = (ioredisClient as Redis).duplicate();
const transport = new RedisEventTransport(ioredisClient, subscriber);
const streamId = `abort-same-${Date.now()}`;
let abortReceived = false;
// Register abort callback
transport.onAbort(streamId, () => {
abortReceived = true;
});
// Wait for subscription to be established
await new Promise((resolve) => setTimeout(resolve, 100));
// Emit abort
transport.emitAbort(streamId);
// Wait for signal to propagate
await new Promise((resolve) => setTimeout(resolve, 200));
expect(abortReceived).toBe(true);
transport.destroy();
subscriber.disconnect();
});
test('should deliver abort signals across transport instances', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
// Two separate instances (simulating two servers)
const subscriber1 = (ioredisClient as Redis).duplicate();
const subscriber2 = (ioredisClient as Redis).duplicate();
const transport1 = new RedisEventTransport(ioredisClient, subscriber1);
const transport2 = new RedisEventTransport(ioredisClient, subscriber2);
const streamId = `abort-cross-${Date.now()}`;
let instance1AbortReceived = false;
// Instance 1 registers abort callback (simulates server running generation)
transport1.onAbort(streamId, () => {
instance1AbortReceived = true;
});
// Wait for subscription
await new Promise((resolve) => setTimeout(resolve, 100));
// Instance 2 emits abort (simulates server receiving abort request)
transport2.emitAbort(streamId);
// Wait for cross-instance delivery
await new Promise((resolve) => setTimeout(resolve, 200));
// Instance 1 should receive abort signal
expect(instance1AbortReceived).toBe(true);
transport1.destroy();
transport2.destroy();
subscriber1.disconnect();
subscriber2.disconnect();
});
test('should call multiple abort callbacks', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const subscriber = (ioredisClient as Redis).duplicate();
const transport = new RedisEventTransport(ioredisClient, subscriber);
const streamId = `abort-multi-${Date.now()}`;
let callback1Called = false;
let callback2Called = false;
// Multiple abort callbacks
transport.onAbort(streamId, () => {
callback1Called = true;
});
transport.onAbort(streamId, () => {
callback2Called = true;
});
await new Promise((resolve) => setTimeout(resolve, 100));
transport.emitAbort(streamId);
await new Promise((resolve) => setTimeout(resolve, 200));
expect(callback1Called).toBe(true);
expect(callback2Called).toBe(true);
transport.destroy();
subscriber.disconnect();
});
test('should cleanup abort callbacks on stream cleanup', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const subscriber = (ioredisClient as Redis).duplicate();
const transport = new RedisEventTransport(ioredisClient, subscriber);
const streamId = `abort-cleanup-${Date.now()}`;
let abortReceived = false;
transport.onAbort(streamId, () => {
abortReceived = true;
});
await new Promise((resolve) => setTimeout(resolve, 100));
// Cleanup the stream
transport.cleanup(streamId);
// Emit abort after cleanup
transport.emitAbort(streamId);
await new Promise((resolve) => setTimeout(resolve, 200));
// Should NOT receive abort since stream was cleaned up
expect(abortReceived).toBe(false);
transport.destroy();
subscriber.disconnect();
});
});
describe('Cleanup', () => {
test('should clean up stream resources', async () => {
if (!ioredisClient) {

View file

@ -17,6 +17,7 @@ const EventTypes = {
CHUNK: 'chunk',
DONE: 'done',
ERROR: 'error',
ABORT: 'abort',
} as const;
interface PubSubMessage {
@ -39,6 +40,8 @@ interface StreamSubscribers {
}
>;
allSubscribersLeftCallbacks: Array<() => void>;
/** Abort callbacks - called when abort signal is received from any replica */
abortCallbacks: Array<() => void>;
}
/**
@ -119,6 +122,20 @@ export class RedisEventTransport implements IEventTransport {
case EventTypes.ERROR:
handlers.onError?.(parsed.error ?? 'Unknown error');
break;
case EventTypes.ABORT:
// Abort is handled at stream level, not per-handler
break;
}
}
// Handle abort signals at stream level (not per-handler)
if (parsed.type === EventTypes.ABORT) {
for (const callback of streamState.abortCallbacks) {
try {
callback();
} catch (err) {
logger.error(`[RedisEventTransport] Error in abort callback:`, err);
}
}
}
} catch (err) {
@ -149,6 +166,7 @@ export class RedisEventTransport implements IEventTransport {
count: 0,
handlers: new Map(),
allSubscribersLeftCallbacks: [],
abortCallbacks: [],
});
}
@ -263,6 +281,53 @@ export class RedisEventTransport implements IEventTransport {
count: 0,
handlers: new Map(),
allSubscribersLeftCallbacks: [callback],
abortCallbacks: [],
});
}
}
/**
* Publish an abort signal to all replicas.
* This enables cross-replica abort: when a user aborts on Replica B,
* the generating Replica A receives the signal and stops.
*/
emitAbort(streamId: string): void {
const channel = CHANNELS.events(streamId);
const message: PubSubMessage = { type: EventTypes.ABORT };
this.publisher.publish(channel, JSON.stringify(message)).catch((err) => {
logger.error(`[RedisEventTransport] Failed to publish abort:`, err);
});
}
/**
* Register callback for abort signals from any replica.
* Called when abort is triggered on any replica (including this one).
*
* @param streamId - The stream identifier
* @param callback - Called when abort signal is received
*/
onAbort(streamId: string, callback: () => void): void {
const channel = CHANNELS.events(streamId);
let state = this.streams.get(streamId);
if (!state) {
state = {
count: 0,
handlers: new Map(),
allSubscribersLeftCallbacks: [],
abortCallbacks: [],
};
this.streams.set(streamId, state);
}
state.abortCallbacks.push(callback);
// Subscribe to Redis channel if not already subscribed
if (!this.subscribedChannels.has(channel)) {
this.subscribedChannels.add(channel);
this.subscriber.subscribe(channel).catch((err) => {
logger.error(`[RedisEventTransport] Failed to subscribe to ${channel}:`, err);
});
}
}
@ -282,9 +347,10 @@ export class RedisEventTransport implements IEventTransport {
const state = this.streams.get(streamId);
if (state) {
// Clear all handlers
// Clear all handlers and callbacks
state.handlers.clear();
state.allSubscribersLeftCallbacks = [];
state.abortCallbacks = [];
}
// Unsubscribe from Redis channel

View file

@ -236,6 +236,21 @@ export interface IEventTransport {
/** Publish an error event */
emitError(streamId: string, error: string): void;
/**
* Publish an abort signal to all replicas (Redis mode).
* Enables cross-replica abort: user aborts on Replica B,
* generating Replica A receives signal and stops.
* Optional - only implemented in Redis transport.
*/
emitAbort?(streamId: string): void;
/**
* Register callback for abort signals from any replica (Redis mode).
* Called when abort is triggered from any replica.
* Optional - only implemented in Redis transport.
*/
onAbort?(streamId: string, callback: () => void): void;
/** Get subscriber count for a stream */
getSubscriberCount(streamId: string): number;