🌊 fix: Prevent Buffered Event Duplication on SSE Resume Connections (#12225)
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run

* fix: skipBufferReplay for job resume connections

- Introduced a new option `skipBufferReplay` in the `subscribe` method of `GenerationJobManagerClass` to prevent duplication of events when resuming a connection.
- Updated the logic to conditionally skip replaying buffered events if a sync event has already been sent, enhancing the efficiency of event handling during reconnections.
- Added integration tests to verify the correct behavior of the new option, ensuring that no buffered events are replayed when `skipBufferReplay` is true, while still allowing for normal replay behavior when false.

* refactor: Update GenerationJobManager to handle sync events more efficiently

- Modified the `subscribe` method to utilize a new `skipBufferReplay` option, allowing for the prevention of duplicate events during resume connections.
- Enhanced the logic in the `chat/stream` route to conditionally skip replaying buffered events if a sync event has already been sent, improving event handling efficiency.
- Updated integration tests to verify the correct behavior of the new option, ensuring that no buffered events are replayed when `skipBufferReplay` is true, while maintaining normal replay behavior when false.

* test: Enhance GenerationJobManager integration tests for Redis mode

- Updated integration tests to conditionally run based on the USE_REDIS environment variable, allowing for better control over Redis-related tests.
- Refactored test descriptions to utilize a dynamic `describeRedis` function, improving clarity and organization of tests related to Redis functionality.
- Removed redundant checks for Redis availability within individual tests, streamlining the test logic and enhancing readability.

* fix: sync handler state for new messages on resume

The sync event's else branch (new response message) was missing
resetContentHandler() and syncStepMessage() calls, leaving stale
handler state that caused subsequent deltas to build on partial
content instead of the synced aggregatedContent.

* feat: atomic subscribeWithResume to close resume event gap

Replaces separate getResumeState() + subscribe() calls with a single
subscribeWithResume() that atomically drains earlyEventBuffer between
the resume snapshot and the subscribe. In in-memory mode, drained events
are returned as pendingEvents for the client to replay after sync.
In Redis mode, pendingEvents is empty since chunks are already persisted.

The route handler now uses the atomic method for resume connections and
extracted shared SSE write helpers to reduce duplication. The client
replays any pendingEvents through the existing step/content handlers
after applying aggregatedContent from the sync payload.

* fix: only capture gap events in subscribeWithResume, not pre-snapshot buffer

The previous implementation drained the entire earlyEventBuffer into
pendingEvents, but pre-snapshot events are already reflected in
aggregatedContent. Replaying them re-introduced the duplication bug
through a different vector.

Now records buffer length before getResumeState() and slices from that
index, so only events arriving during the async gap are returned as
pendingEvents.

Also:
- Handle pendingEvents when resumeState is null (replay directly)
- Hoist duplicate test helpers to shared scope
- Remove redundant writableEnded guard in onDone
This commit is contained in:
Danny Avila 2026-03-14 10:54:26 -04:00 committed by GitHub
parent cbdc6f6060
commit 7bc793b18d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 700 additions and 259 deletions

View file

@ -76,52 +76,62 @@ router.get('/chat/stream/:streamId', async (req, res) => {
logger.debug(`[AgentStream] Client subscribed to ${streamId}, resume: ${isResume}`); logger.debug(`[AgentStream] Client subscribed to ${streamId}, resume: ${isResume}`);
// Send sync event with resume state for ALL reconnecting clients const writeEvent = (event) => {
// This supports multi-tab scenarios where each tab needs run step data if (!res.writableEnded) {
if (isResume) { res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`);
const resumeState = await GenerationJobManager.getResumeState(streamId);
if (resumeState && !res.writableEnded) {
// Send sync event with run steps AND aggregatedContent
// Client will use aggregatedContent to initialize message state
res.write(`event: message\ndata: ${JSON.stringify({ sync: true, resumeState })}\n\n`);
if (typeof res.flush === 'function') { if (typeof res.flush === 'function') {
res.flush(); res.flush();
} }
logger.debug(
`[AgentStream] Sent sync event for ${streamId} with ${resumeState.runSteps.length} run steps`,
);
} }
} };
const result = await GenerationJobManager.subscribe( const onDone = (event) => {
streamId, writeEvent(event);
(event) => { res.end();
if (!res.writableEnded) { };
res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`);
const onError = (error) => {
if (!res.writableEnded) {
res.write(`event: error\ndata: ${JSON.stringify({ error })}\n\n`);
if (typeof res.flush === 'function') {
res.flush();
}
res.end();
}
};
let result;
if (isResume) {
const { subscription, resumeState, pendingEvents } =
await GenerationJobManager.subscribeWithResume(streamId, writeEvent, onDone, onError);
if (!res.writableEnded) {
if (resumeState) {
res.write(
`event: message\ndata: ${JSON.stringify({ sync: true, resumeState, pendingEvents })}\n\n`,
);
if (typeof res.flush === 'function') { if (typeof res.flush === 'function') {
res.flush(); res.flush();
} }
} GenerationJobManager.markSyncSent(streamId);
}, logger.debug(
(event) => { `[AgentStream] Sent sync event for ${streamId} with ${resumeState.runSteps.length} run steps, ${pendingEvents.length} pending events`,
if (!res.writableEnded) { );
res.write(`event: message\ndata: ${JSON.stringify(event)}\n\n`); } else if (pendingEvents.length > 0) {
if (typeof res.flush === 'function') { for (const event of pendingEvents) {
res.flush(); writeEvent(event);
} }
res.end(); logger.warn(
`[AgentStream] Resume state null for ${streamId}, replayed ${pendingEvents.length} gap events directly`,
);
} }
}, }
(error) => {
if (!res.writableEnded) { result = subscription;
res.write(`event: error\ndata: ${JSON.stringify({ error })}\n\n`); } else {
if (typeof res.flush === 'function') { result = await GenerationJobManager.subscribe(streamId, writeEvent, onDone, onError);
res.flush(); }
}
res.end();
}
},
);
if (!result) { if (!result) {
return res.status(404).json({ error: 'Failed to subscribe to stream' }); return res.status(404).json({ error: 'Failed to subscribe to stream' });

View file

@ -226,12 +226,12 @@ export default function useResumableSSE(
if (data.sync != null) { if (data.sync != null) {
console.log('[ResumableSSE] SYNC received', { console.log('[ResumableSSE] SYNC received', {
runSteps: data.resumeState?.runSteps?.length ?? 0, runSteps: data.resumeState?.runSteps?.length ?? 0,
pendingEvents: data.pendingEvents?.length ?? 0,
}); });
const runId = v4(); const runId = v4();
setActiveRunId(runId); setActiveRunId(runId);
// Replay run steps
if (data.resumeState?.runSteps) { if (data.resumeState?.runSteps) {
for (const runStep of data.resumeState.runSteps) { for (const runStep of data.resumeState.runSteps) {
stepHandler({ event: 'on_run_step', data: runStep }, { stepHandler({ event: 'on_run_step', data: runStep }, {
@ -241,19 +241,15 @@ export default function useResumableSSE(
} }
} }
// Set message content from aggregatedContent
if (data.resumeState?.aggregatedContent && userMessage?.messageId) { if (data.resumeState?.aggregatedContent && userMessage?.messageId) {
const messages = getMessages() ?? []; const messages = getMessages() ?? [];
const userMsgId = userMessage.messageId; const userMsgId = userMessage.messageId;
const serverResponseId = data.resumeState.responseMessageId; const serverResponseId = data.resumeState.responseMessageId;
// Find the EXACT response message - prioritize responseMessageId from server
// This is critical when there are multiple responses to the same user message
let responseIdx = -1; let responseIdx = -1;
if (serverResponseId) { if (serverResponseId) {
responseIdx = messages.findIndex((m) => m.messageId === serverResponseId); responseIdx = messages.findIndex((m) => m.messageId === serverResponseId);
} }
// Fallback: find by parentMessageId pattern (for new messages)
if (responseIdx < 0) { if (responseIdx < 0) {
responseIdx = messages.findIndex( responseIdx = messages.findIndex(
(m) => (m) =>
@ -272,7 +268,6 @@ export default function useResumableSSE(
}); });
if (responseIdx >= 0) { if (responseIdx >= 0) {
// Update existing response message with aggregatedContent
const updated = [...messages]; const updated = [...messages];
const oldContent = updated[responseIdx]?.content; const oldContent = updated[responseIdx]?.content;
updated[responseIdx] = { updated[responseIdx] = {
@ -285,25 +280,34 @@ export default function useResumableSSE(
newContentLength: data.resumeState.aggregatedContent?.length, newContentLength: data.resumeState.aggregatedContent?.length,
}); });
setMessages(updated); setMessages(updated);
// Sync both content handler and step handler with the updated message
// so subsequent deltas build on synced content, not stale content
resetContentHandler(); resetContentHandler();
syncStepMessage(updated[responseIdx]); syncStepMessage(updated[responseIdx]);
console.log('[ResumableSSE] SYNC complete, handlers synced'); console.log('[ResumableSSE] SYNC complete, handlers synced');
} else { } else {
// Add new response message
const responseId = serverResponseId ?? `${userMsgId}_`; const responseId = serverResponseId ?? `${userMsgId}_`;
setMessages([ const newMessage = {
...messages, messageId: responseId,
{ parentMessageId: userMsgId,
messageId: responseId, conversationId: currentSubmission.conversation?.conversationId ?? '',
parentMessageId: userMsgId, text: '',
conversationId: currentSubmission.conversation?.conversationId ?? '', content: data.resumeState.aggregatedContent,
text: '', isCreatedByUser: false,
content: data.resumeState.aggregatedContent, } as TMessage;
isCreatedByUser: false, setMessages([...messages, newMessage]);
} as TMessage, resetContentHandler();
]); syncStepMessage(newMessage);
}
}
if (data.pendingEvents?.length > 0) {
console.log(`[ResumableSSE] Replaying ${data.pendingEvents.length} pending events`);
const submission = { ...currentSubmission, userMessage } as EventSubmission;
for (const pendingEvent of data.pendingEvents) {
if (pendingEvent.event != null) {
stepHandler(pendingEvent, submission);
} else if (pendingEvent.type != null) {
contentHandler({ data: pendingEvent, submission });
}
} }
} }

View file

@ -707,6 +707,10 @@ class GenerationJobManagerClass {
* @param onChunk - Handler for chunk events (streamed tokens, run steps, etc.) * @param onChunk - Handler for chunk events (streamed tokens, run steps, etc.)
* @param onDone - Handler for completion event (includes final message) * @param onDone - Handler for completion event (includes final message)
* @param onError - Handler for error events * @param onError - Handler for error events
* @param options - Subscription configuration
* @param options.skipBufferReplay - When true, skips replaying the earlyEventBuffer.
* Use this when a sync event was already sent (resume), since the sync's
* aggregatedContent already includes all buffered events.
* @returns Subscription object with unsubscribe function, or null if job not found * @returns Subscription object with unsubscribe function, or null if job not found
*/ */
async subscribe( async subscribe(
@ -714,6 +718,7 @@ class GenerationJobManagerClass {
onChunk: t.ChunkHandler, onChunk: t.ChunkHandler,
onDone?: t.DoneHandler, onDone?: t.DoneHandler,
onError?: t.ErrorHandler, onError?: t.ErrorHandler,
options?: t.SubscribeOptions,
): Promise<{ unsubscribe: t.UnsubscribeFn } | null> { ): Promise<{ unsubscribe: t.UnsubscribeFn } | null> {
// Use lazy initialization to support cross-replica subscriptions // Use lazy initialization to support cross-replica subscriptions
const runtime = await this.getOrCreateRuntimeState(streamId); const runtime = await this.getOrCreateRuntimeState(streamId);
@ -763,11 +768,17 @@ class GenerationJobManagerClass {
runtime.hasSubscriber = true; runtime.hasSubscriber = true;
if (runtime.earlyEventBuffer.length > 0) { if (runtime.earlyEventBuffer.length > 0) {
logger.debug( if (options?.skipBufferReplay) {
`[GenerationJobManager] Replaying ${runtime.earlyEventBuffer.length} buffered events for ${streamId}`, logger.debug(
); `[GenerationJobManager] Skipping ${runtime.earlyEventBuffer.length} buffered events for ${streamId} (skipBufferReplay)`,
for (const bufferedEvent of runtime.earlyEventBuffer) { );
onChunk(bufferedEvent); } else {
logger.debug(
`[GenerationJobManager] Replaying ${runtime.earlyEventBuffer.length} buffered events for ${streamId}`,
);
for (const bufferedEvent of runtime.earlyEventBuffer) {
onChunk(bufferedEvent);
}
} }
runtime.earlyEventBuffer = []; runtime.earlyEventBuffer = [];
} }
@ -785,6 +796,52 @@ class GenerationJobManagerClass {
return subscription; return subscription;
} }
/**
* Atomic resume + subscribe: snapshots resume state and drains the early event buffer
* in one synchronous step, then subscribes with skipBufferReplay.
*
* Closes the timing gap between separate `getResumeState()` and `subscribe()` calls
* where events could arrive in earlyEventBuffer after the snapshot but before subscribe
* clears the buffer.
*
* In-memory mode: drained buffer events are returned as `pendingEvents` since
* they exist nowhere else. The caller must deliver them after the sync payload.
* Redis mode: `pendingEvents` is empty chunks are persisted via appendChunk
* and will appear in aggregatedContent on the next resume.
*/
async subscribeWithResume(
streamId: string,
onChunk: t.ChunkHandler,
onDone?: t.DoneHandler,
onError?: t.ErrorHandler,
): Promise<t.SubscribeWithResumeResult> {
const bufferLengthAtSnapshot = !this._isRedis
? (this.runtimeState.get(streamId)?.earlyEventBuffer.length ?? 0)
: 0;
const resumeState = await this.getResumeState(streamId);
let pendingEvents: t.ServerSentEvent[] = [];
if (!this._isRedis) {
const runtime = this.runtimeState.get(streamId);
if (runtime) {
pendingEvents = runtime.earlyEventBuffer.slice(bufferLengthAtSnapshot);
runtime.earlyEventBuffer = [];
if (pendingEvents.length > 0) {
logger.debug(
`[GenerationJobManager] Captured ${pendingEvents.length} gap events for ${streamId}`,
);
}
}
}
const subscription = await this.subscribe(streamId, onChunk, onDone, onError, {
skipBufferReplay: true,
});
return { subscription, resumeState, pendingEvents };
}
/** /**
* Emit a chunk event to all subscribers. * Emit a chunk event to all subscribers.
* Uses runtime state check for performance (avoids async job store lookup per token). * Uses runtime state check for performance (avoids async job store lookup per token).

View file

@ -1,3 +1,4 @@
/* eslint jest/no-standalone-expect: ["error", { "additionalTestBlockFunctions": ["testRedis"] }] */
import type { Redis, Cluster } from 'ioredis'; import type { Redis, Cluster } from 'ioredis';
import type { ServerSentEvent } from '~/types/events'; import type { ServerSentEvent } from '~/types/events';
import { InMemoryEventTransport } from '~/stream/implementations/InMemoryEventTransport'; import { InMemoryEventTransport } from '~/stream/implementations/InMemoryEventTransport';
@ -27,6 +28,9 @@ describe('GenerationJobManager Integration Tests', () => {
let dynamicKeyvClient: unknown = null; let dynamicKeyvClient: unknown = null;
let dynamicKeyvReady: Promise<unknown> | null = null; let dynamicKeyvReady: Promise<unknown> | null = null;
const testPrefix = 'JobManager-Integration-Test'; const testPrefix = 'JobManager-Integration-Test';
const redisConfigured = process.env.USE_REDIS === 'true';
const describeRedis = redisConfigured ? describe : describe.skip;
const testRedis = redisConfigured ? test : test.skip;
beforeAll(async () => { beforeAll(async () => {
originalEnv = { ...process.env }; originalEnv = { ...process.env };
@ -82,6 +86,68 @@ describe('GenerationJobManager Integration Tests', () => {
process.env = originalEnv; process.env = originalEnv;
}); });
function createInMemoryManager(): GenerationJobManagerClass {
const manager = new GenerationJobManagerClass();
manager.configure({
jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }),
eventTransport: new InMemoryEventTransport(),
isRedis: false,
});
manager.initialize();
return manager;
}
function createRedisManager(): GenerationJobManagerClass {
const manager = new GenerationJobManagerClass();
manager.configure(
createStreamServices({
useRedis: true,
redisClient: ioredisClient!,
}),
);
manager.initialize();
return manager;
}
async function setupDisconnectedStream(
manager: GenerationJobManagerClass,
streamId: string,
delay: number,
): Promise<ServerSentEvent[]> {
const firstEvents: ServerSentEvent[] = [];
const sub = await manager.subscribe(streamId, (event) => firstEvents.push(event));
await new Promise((resolve) => setTimeout(resolve, delay));
await manager.emitChunk(streamId, {
event: 'on_run_step',
data: { id: 'step-1', runId: 'run-1', index: 0, stepDetails: { type: 'message_creation' } },
});
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { id: 'step-1', delta: { content: { type: 'text', text: 'Hello' } } },
});
await new Promise((resolve) => setTimeout(resolve, delay));
expect(firstEvents.length).toBe(2);
sub?.unsubscribe();
await new Promise((resolve) => setTimeout(resolve, delay));
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { id: 'step-1', delta: { content: { type: 'text', text: ' world' } } },
});
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { id: 'step-1', delta: { content: { type: 'text', text: '!' } } },
});
await new Promise((resolve) => setTimeout(resolve, delay));
return firstEvents;
}
describe('In-Memory Mode', () => { describe('In-Memory Mode', () => {
test('should create and manage jobs', async () => { test('should create and manage jobs', async () => {
// Configure with in-memory // Configure with in-memory
@ -171,13 +237,8 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
}); });
describe('Redis Mode', () => { describeRedis('Redis Mode', () => {
test('should create and manage jobs via Redis', async () => { test('should create and manage jobs via Redis', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
// Create Redis services // Create Redis services
const services = createStreamServices({ const services = createStreamServices({
useRedis: true, useRedis: true,
@ -209,11 +270,6 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should persist chunks for cross-instance resume', async () => { test('should persist chunks for cross-instance resume', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const services = createStreamServices({ const services = createStreamServices({
useRedis: true, useRedis: true,
redisClient: ioredisClient, redisClient: ioredisClient,
@ -264,11 +320,6 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should handle abort and return content', async () => { test('should handle abort and return content', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const services = createStreamServices({ const services = createStreamServices({
useRedis: true, useRedis: true,
redisClient: ioredisClient, redisClient: ioredisClient,
@ -374,7 +425,7 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
}); });
describe('Cross-Replica Support (Redis)', () => { describeRedis('Cross-Replica Support (Redis)', () => {
/** /**
* Problem: In k8s with Redis and multiple replicas, when a user sends a message: * Problem: In k8s with Redis and multiple replicas, when a user sends a message:
* 1. POST /api/agents/chat hits Replica A, creates job * 1. POST /api/agents/chat hits Replica A, creates job
@ -387,15 +438,10 @@ describe('GenerationJobManager Integration Tests', () => {
* when the job exists in Redis but not in local memory. * when the job exists in Redis but not in local memory.
*/ */
test('should NOT return 404 when stream endpoint hits different replica than job creator', async () => { test('should NOT return 404 when stream endpoint hits different replica than job creator', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
// === REPLICA A: Creates the job === // === REPLICA A: Creates the job ===
// Simulate Replica A creating the job directly in Redis // Simulate Replica A creating the job directly in Redis
// (In real scenario, this happens via GenerationJobManager.createJob on Replica A) // (In real scenario, this happens via GenerationJobManager.createJob on Replica A)
const replicaAJobStore = new RedisJobStore(ioredisClient); const replicaAJobStore = new RedisJobStore(ioredisClient!);
await replicaAJobStore.initialize(); await replicaAJobStore.initialize();
const streamId = `cross-replica-404-test-${Date.now()}`; const streamId = `cross-replica-404-test-${Date.now()}`;
@ -452,13 +498,8 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should lazily create runtime state for jobs created on other replicas', async () => { test('should lazily create runtime state for jobs created on other replicas', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
// Instance 1: Create the job directly in Redis (simulating another replica) // Instance 1: Create the job directly in Redis (simulating another replica)
const jobStore = new RedisJobStore(ioredisClient); const jobStore = new RedisJobStore(ioredisClient!);
await jobStore.initialize(); await jobStore.initialize();
const streamId = `cross-replica-${Date.now()}`; const streamId = `cross-replica-${Date.now()}`;
@ -500,11 +541,6 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should persist syncSent to Redis for cross-replica consistency', async () => { test('should persist syncSent to Redis for cross-replica consistency', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const services = createStreamServices({ const services = createStreamServices({
useRedis: true, useRedis: true,
redisClient: ioredisClient, redisClient: ioredisClient,
@ -539,11 +575,6 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should persist finalEvent to Redis for cross-replica access', async () => { test('should persist finalEvent to Redis for cross-replica access', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const services = createStreamServices({ const services = createStreamServices({
useRedis: true, useRedis: true,
redisClient: ioredisClient, redisClient: ioredisClient,
@ -581,11 +612,6 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should emit cross-replica abort signal via Redis pub/sub', async () => { test('should emit cross-replica abort signal via Redis pub/sub', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const services = createStreamServices({ const services = createStreamServices({
useRedis: true, useRedis: true,
redisClient: ioredisClient, redisClient: ioredisClient,
@ -620,16 +646,11 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should handle abort for lazily-initialized cross-replica jobs', async () => { test('should handle abort for lazily-initialized cross-replica jobs', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
// This test validates that jobs created on Replica A and lazily-initialized // This test validates that jobs created on Replica A and lazily-initialized
// on Replica B can still receive and handle abort signals. // on Replica B can still receive and handle abort signals.
// === Replica A: Create job directly in Redis === // === Replica A: Create job directly in Redis ===
const replicaAJobStore = new RedisJobStore(ioredisClient); const replicaAJobStore = new RedisJobStore(ioredisClient!);
await replicaAJobStore.initialize(); await replicaAJobStore.initialize();
const streamId = `lazy-abort-${Date.now()}`; const streamId = `lazy-abort-${Date.now()}`;
@ -675,11 +696,6 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should abort generation when abort signal received from another replica', async () => { test('should abort generation when abort signal received from another replica', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
// This test simulates: // This test simulates:
// 1. Replica A creates a job and starts generation // 1. Replica A creates a job and starts generation
// 2. Replica B receives abort request and emits abort signal // 2. Replica B receives abort request and emits abort signal
@ -729,13 +745,8 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should handle wasSyncSent for cross-replica scenarios', async () => { test('should handle wasSyncSent for cross-replica scenarios', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
// Create job directly in Redis with syncSent: true // Create job directly in Redis with syncSent: true
const jobStore = new RedisJobStore(ioredisClient); const jobStore = new RedisJobStore(ioredisClient!);
await jobStore.initialize(); await jobStore.initialize();
const streamId = `cross-sync-${Date.now()}`; const streamId = `cross-sync-${Date.now()}`;
@ -762,7 +773,7 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
}); });
describe('Sequential Event Ordering (Redis)', () => { describeRedis('Sequential Event Ordering (Redis)', () => {
/** /**
* These tests verify that events are delivered in strict sequential order * These tests verify that events are delivered in strict sequential order
* when using Redis mode. This is critical because: * when using Redis mode. This is critical because:
@ -773,11 +784,6 @@ describe('GenerationJobManager Integration Tests', () => {
* The fix: emitChunk now awaits Redis publish to ensure ordered delivery. * The fix: emitChunk now awaits Redis publish to ensure ordered delivery.
*/ */
test('should maintain strict order for rapid sequential emits', async () => { test('should maintain strict order for rapid sequential emits', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
jest.resetModules(); jest.resetModules();
const services = createStreamServices({ const services = createStreamServices({
@ -823,11 +829,6 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should maintain order for tool call argument deltas', async () => { test('should maintain order for tool call argument deltas', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
jest.resetModules(); jest.resetModules();
const services = createStreamServices({ const services = createStreamServices({
@ -882,11 +883,6 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should maintain order: on_run_step before on_run_step_delta', async () => { test('should maintain order: on_run_step before on_run_step_delta', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
jest.resetModules(); jest.resetModules();
const services = createStreamServices({ const services = createStreamServices({
@ -945,11 +941,6 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should not block other streams when awaiting emitChunk', async () => { test('should not block other streams when awaiting emitChunk', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
jest.resetModules(); jest.resetModules();
const services = createStreamServices({ const services = createStreamServices({
@ -1069,12 +1060,7 @@ describe('GenerationJobManager Integration Tests', () => {
await manager.destroy(); await manager.destroy();
}); });
test('should buffer and replay events emitted before subscribe (Redis)', async () => { testRedis('should buffer and replay events emitted before subscribe (Redis)', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const manager = new GenerationJobManagerClass(); const manager = new GenerationJobManagerClass();
const services = createStreamServices({ const services = createStreamServices({
useRedis: true, useRedis: true,
@ -1118,67 +1104,60 @@ describe('GenerationJobManager Integration Tests', () => {
await manager.destroy(); await manager.destroy();
}); });
test('should not lose events when emitting before and after subscribe (Redis)', async () => { testRedis(
if (!ioredisClient) { 'should not lose events when emitting before and after subscribe (Redis)',
console.warn('Redis not available, skipping test'); async () => {
return; const manager = new GenerationJobManagerClass();
} const services = createStreamServices({
useRedis: true,
const manager = new GenerationJobManagerClass(); redisClient: ioredisClient,
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
manager.configure(services);
manager.initialize();
const streamId = `no-loss-${Date.now()}`;
await manager.createJob(streamId, 'user-1');
await manager.emitChunk(streamId, {
created: true,
message: { text: 'hello' },
streamId,
} as unknown as ServerSentEvent);
await manager.emitChunk(streamId, {
event: 'on_run_step',
data: { id: 'step-1', type: 'message_creation', index: 0 },
});
const receivedEvents: unknown[] = [];
const subscription = await manager.subscribe(streamId, (event: unknown) =>
receivedEvents.push(event),
);
await new Promise((resolve) => setTimeout(resolve, 100));
for (let i = 0; i < 10; i++) {
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: `word${i} ` } }, index: i },
}); });
}
await new Promise((resolve) => setTimeout(resolve, 300)); manager.configure(services);
manager.initialize();
expect(receivedEvents.length).toBe(12); const streamId = `no-loss-${Date.now()}`;
expect((receivedEvents[0] as Record<string, unknown>).created).toBe(true); await manager.createJob(streamId, 'user-1');
expect((receivedEvents[1] as Record<string, unknown>).event).toBe('on_run_step');
for (let i = 0; i < 10; i++) {
expect((receivedEvents[i + 2] as Record<string, unknown>).event).toBe('on_message_delta');
}
subscription?.unsubscribe(); await manager.emitChunk(streamId, {
await manager.destroy(); created: true,
}); message: { text: 'hello' },
streamId,
} as unknown as ServerSentEvent);
await manager.emitChunk(streamId, {
event: 'on_run_step',
data: { id: 'step-1', type: 'message_creation', index: 0 },
});
test('RedisEventTransport.subscribe() should return a ready promise', async () => { const receivedEvents: unknown[] = [];
if (!ioredisClient) { const subscription = await manager.subscribe(streamId, (event: unknown) =>
console.warn('Redis not available, skipping test'); receivedEvents.push(event),
return; );
}
await new Promise((resolve) => setTimeout(resolve, 100));
for (let i = 0; i < 10; i++) {
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: `word${i} ` } }, index: i },
});
}
await new Promise((resolve) => setTimeout(resolve, 300));
expect(receivedEvents.length).toBe(12);
expect((receivedEvents[0] as Record<string, unknown>).created).toBe(true);
expect((receivedEvents[1] as Record<string, unknown>).event).toBe('on_run_step');
for (let i = 0; i < 10; i++) {
expect((receivedEvents[i + 2] as Record<string, unknown>).event).toBe('on_message_delta');
}
subscription?.unsubscribe();
await manager.destroy();
},
);
testRedis('RedisEventTransport.subscribe() should return a ready promise', async () => {
const subscriber = (ioredisClient as unknown as { duplicate: () => unknown }).duplicate(); const subscriber = (ioredisClient as unknown as { duplicate: () => unknown }).duplicate();
const transport = new RedisEventTransport(ioredisClient as never, subscriber as never); const transport = new RedisEventTransport(ioredisClient as never, subscriber as never);
@ -1211,6 +1190,421 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
}); });
describe('Resume: skipBufferReplay prevents duplication', () => {
/**
* Verifies the fix for duplicated content when navigating away from an
* in-progress conversation and back. Events accumulate in earlyEventBuffer
* while the subscriber is absent. On resume, the sync event delivers all
* accumulated content via aggregatedContent, so buffer replay must be
* skipped to prevent duplication.
*/
test('should NOT replay buffer when skipBufferReplay is true (resume scenario)', async () => {
const manager = createInMemoryManager();
const streamId = `skip-buf-${Date.now()}`;
await manager.createJob(streamId, 'user-1');
await setupDisconnectedStream(manager, streamId, 10);
const resumeState = await manager.getResumeState(streamId);
expect(resumeState).not.toBeNull();
const resumeEvents: ServerSentEvent[] = [];
const sub2 = await manager.subscribe(
streamId,
(event) => resumeEvents.push(event),
undefined,
undefined,
{ skipBufferReplay: true },
);
await new Promise((resolve) => setTimeout(resolve, 20));
expect(resumeEvents.length).toBe(0);
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { id: 'step-1', delta: { content: { type: 'text', text: ' Live!' } } },
});
await new Promise((resolve) => setTimeout(resolve, 20));
expect(resumeEvents.length).toBe(1);
expect(resumeEvents[0].event).toBe('on_message_delta');
sub2?.unsubscribe();
await manager.destroy();
});
test('should replay buffer by default when no options are passed', async () => {
const manager = createInMemoryManager();
const streamId = `replay-buf-${Date.now()}`;
await manager.createJob(streamId, 'user-1');
const sub1Events: ServerSentEvent[] = [];
const sub1 = await manager.subscribe(streamId, (event) => sub1Events.push(event));
await new Promise((resolve) => setTimeout(resolve, 10));
await manager.emitChunk(streamId, {
event: 'on_run_step',
data: { id: 'step-1', runId: 'run-1', index: 0, stepDetails: { type: 'message_creation' } },
});
await new Promise((resolve) => setTimeout(resolve, 10));
sub1?.unsubscribe();
await new Promise((resolve) => setTimeout(resolve, 20));
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { id: 'step-1', delta: { content: { type: 'text', text: 'buffered' } } },
});
const sub2Events: ServerSentEvent[] = [];
const sub2 = await manager.subscribe(streamId, (event) => sub2Events.push(event));
await new Promise((resolve) => setTimeout(resolve, 20));
expect(sub2Events.length).toBe(1);
expect(sub2Events[0].event).toBe('on_message_delta');
sub2?.unsubscribe();
await manager.destroy();
});
test('should clear earlyEventBuffer even when skipping replay (no memory leak)', async () => {
const manager = createInMemoryManager();
const streamId = `buf-clear-${Date.now()}`;
await manager.createJob(streamId, 'user-1');
const sub1 = await manager.subscribe(streamId, () => {});
await new Promise((resolve) => setTimeout(resolve, 10));
sub1?.unsubscribe();
await new Promise((resolve) => setTimeout(resolve, 20));
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'buf1' } } },
});
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'buf2' } } },
});
const sub2Events: ServerSentEvent[] = [];
const sub2 = await manager.subscribe(
streamId,
(event) => sub2Events.push(event),
undefined,
undefined,
{ skipBufferReplay: true },
);
await new Promise((resolve) => setTimeout(resolve, 20));
expect(sub2Events.length).toBe(0);
sub2?.unsubscribe();
await new Promise((resolve) => setTimeout(resolve, 20));
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'new-event' } } },
});
const sub3Events: ServerSentEvent[] = [];
const sub3 = await manager.subscribe(streamId, (event) => sub3Events.push(event));
await new Promise((resolve) => setTimeout(resolve, 20));
expect(sub3Events.length).toBe(1);
const event = sub3Events[0] as {
event: string;
data: { delta: { content: { text: string } } };
};
expect(event.data.delta.content.text).toBe('new-event');
sub3?.unsubscribe();
await manager.destroy();
});
test('should handle multiple disconnect/reconnect cycles with skipBufferReplay', async () => {
const manager = createInMemoryManager();
const streamId = `multi-reconnect-${Date.now()}`;
await manager.createJob(streamId, 'user-1');
const sub1 = await manager.subscribe(streamId, () => {});
await new Promise((resolve) => setTimeout(resolve, 10));
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'initial' } } },
});
await new Promise((resolve) => setTimeout(resolve, 10));
sub1?.unsubscribe();
await new Promise((resolve) => setTimeout(resolve, 20));
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'buffered-1' } } },
});
const resumeState1 = await manager.getResumeState(streamId);
expect(resumeState1).not.toBeNull();
const sub2Events: ServerSentEvent[] = [];
const sub2 = await manager.subscribe(
streamId,
(event) => sub2Events.push(event),
undefined,
undefined,
{ skipBufferReplay: true },
);
await new Promise((resolve) => setTimeout(resolve, 10));
expect(sub2Events.length).toBe(0);
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'live-1' } } },
});
await new Promise((resolve) => setTimeout(resolve, 10));
expect(sub2Events.length).toBe(1);
sub2?.unsubscribe();
await new Promise((resolve) => setTimeout(resolve, 20));
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'buffered-2' } } },
});
const resumeState2 = await manager.getResumeState(streamId);
expect(resumeState2).not.toBeNull();
const sub3Events: ServerSentEvent[] = [];
const sub3 = await manager.subscribe(
streamId,
(event) => sub3Events.push(event),
undefined,
undefined,
{ skipBufferReplay: true },
);
await new Promise((resolve) => setTimeout(resolve, 10));
expect(sub3Events.length).toBe(0);
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'live-2' } } },
});
await new Promise((resolve) => setTimeout(resolve, 10));
expect(sub3Events.length).toBe(1);
sub3?.unsubscribe();
await manager.destroy();
});
testRedis('should NOT replay buffer when skipBufferReplay is true (Redis)', async () => {
const manager = createRedisManager();
const streamId = `skip-buf-redis-${Date.now()}`;
await manager.createJob(streamId, 'user-1');
await setupDisconnectedStream(manager, streamId, 100);
const resumeState = await manager.getResumeState(streamId);
expect(resumeState).not.toBeNull();
expect(resumeState!.aggregatedContent?.length).toBeGreaterThan(0);
const resumeEvents: ServerSentEvent[] = [];
const sub2 = await manager.subscribe(
streamId,
(event) => resumeEvents.push(event),
undefined,
undefined,
{ skipBufferReplay: true },
);
await new Promise((resolve) => setTimeout(resolve, 200));
expect(resumeEvents.length).toBe(0);
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { id: 'step-1', delta: { content: { type: 'text', text: ' Live!' } } },
});
await new Promise((resolve) => setTimeout(resolve, 200));
expect(resumeEvents.length).toBe(1);
expect(resumeEvents[0].event).toBe('on_message_delta');
sub2?.unsubscribe();
await manager.destroy();
});
testRedis(
'should replay buffer without skipBufferReplay after disconnect (Redis)',
async () => {
const manager = createRedisManager();
const streamId = `replay-buf-redis-${Date.now()}`;
await manager.createJob(streamId, 'user-1');
const sub1 = await manager.subscribe(streamId, () => {});
await new Promise((resolve) => setTimeout(resolve, 100));
sub1?.unsubscribe();
await new Promise((resolve) => setTimeout(resolve, 100));
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'buffered-redis' } } },
});
await new Promise((resolve) => setTimeout(resolve, 100));
const sub2Events: ServerSentEvent[] = [];
const sub2 = await manager.subscribe(streamId, (event) => sub2Events.push(event));
await new Promise((resolve) => setTimeout(resolve, 200));
expect(sub2Events.length).toBe(1);
expect(sub2Events[0].event).toBe('on_message_delta');
sub2?.unsubscribe();
await manager.destroy();
},
);
});
describe('Atomic subscribeWithResume', () => {
test('should return empty pendingEvents for pre-snapshot buffer events (in-memory)', async () => {
const manager = createInMemoryManager();
const streamId = `atomic-drain-${Date.now()}`;
await manager.createJob(streamId, 'user-1');
const sub1 = await manager.subscribe(streamId, () => {});
await new Promise((resolve) => setTimeout(resolve, 10));
sub1?.unsubscribe();
await new Promise((resolve) => setTimeout(resolve, 20));
await manager.emitChunk(streamId, {
event: 'on_run_step',
data: { id: 'step-1', runId: 'run-1', index: 0, stepDetails: { type: 'message_creation' } },
});
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { id: 'step-1', delta: { content: { type: 'text', text: 'buffered' } } },
});
const liveEvents: ServerSentEvent[] = [];
const { subscription, resumeState, pendingEvents } = await manager.subscribeWithResume(
streamId,
(event) => liveEvents.push(event),
);
expect(resumeState).not.toBeNull();
expect(pendingEvents.length).toBe(0);
expect(liveEvents.length).toBe(0);
subscription?.unsubscribe();
await manager.destroy();
});
test('should return empty pendingEvents when buffer is empty', async () => {
const manager = createInMemoryManager();
const streamId = `atomic-empty-${Date.now()}`;
await manager.createJob(streamId, 'user-1');
const sub1 = await manager.subscribe(streamId, () => {});
await new Promise((resolve) => setTimeout(resolve, 10));
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'delivered' } } },
});
await new Promise((resolve) => setTimeout(resolve, 10));
sub1?.unsubscribe();
await new Promise((resolve) => setTimeout(resolve, 20));
const { pendingEvents } = await manager.subscribeWithResume(streamId, () => {});
expect(pendingEvents.length).toBe(0);
await manager.destroy();
});
test('should deliver live events after subscribeWithResume', async () => {
const manager = createInMemoryManager();
const streamId = `atomic-live-${Date.now()}`;
await manager.createJob(streamId, 'user-1');
const sub1 = await manager.subscribe(streamId, () => {});
await new Promise((resolve) => setTimeout(resolve, 10));
sub1?.unsubscribe();
await new Promise((resolve) => setTimeout(resolve, 20));
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'buffered-pre-snapshot' } } },
});
const liveEvents: ServerSentEvent[] = [];
const { subscription, pendingEvents } = await manager.subscribeWithResume(streamId, (event) =>
liveEvents.push(event),
);
expect(pendingEvents.length).toBe(0);
expect(liveEvents.length).toBe(0);
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'live-after' } } },
});
await new Promise((resolve) => setTimeout(resolve, 20));
expect(liveEvents.length).toBe(1);
const liveEvent = liveEvents[0] as {
event: string;
data: { delta: { content: { text: string } } };
};
expect(liveEvent.data.delta.content.text).toBe('live-after');
subscription?.unsubscribe();
await manager.destroy();
});
testRedis(
'should return empty pendingEvents in Redis mode (chunks already persisted)',
async () => {
const manager = createRedisManager();
const streamId = `atomic-redis-${Date.now()}`;
await manager.createJob(streamId, 'user-1');
const sub1 = await manager.subscribe(streamId, () => {});
await new Promise((resolve) => setTimeout(resolve, 100));
sub1?.unsubscribe();
await new Promise((resolve) => setTimeout(resolve, 100));
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'buffered-redis' } } },
});
await new Promise((resolve) => setTimeout(resolve, 100));
const liveEvents: ServerSentEvent[] = [];
const { subscription, resumeState, pendingEvents } = await manager.subscribeWithResume(
streamId,
(event) => liveEvents.push(event),
);
expect(resumeState).not.toBeNull();
expect(pendingEvents.length).toBe(0);
await manager.emitChunk(streamId, {
event: 'on_message_delta',
data: { delta: { content: { type: 'text', text: 'live-redis' } } },
});
await new Promise((resolve) => setTimeout(resolve, 200));
expect(liveEvents.length).toBe(1);
subscription?.unsubscribe();
await manager.destroy();
},
);
});
describe('Error Preservation for Late Subscribers', () => { describe('Error Preservation for Late Subscribers', () => {
/** /**
* These tests verify the fix for the race condition where errors * These tests verify the fix for the race condition where errors
@ -1369,14 +1763,9 @@ describe('GenerationJobManager Integration Tests', () => {
await GenerationJobManager.destroy(); await GenerationJobManager.destroy();
}); });
test('should handle error preservation in Redis mode (cross-replica)', async () => { testRedis('should handle error preservation in Redis mode (cross-replica)', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
// === Replica A: Creates job and emits error === // === Replica A: Creates job and emits error ===
const replicaAJobStore = new RedisJobStore(ioredisClient); const replicaAJobStore = new RedisJobStore(ioredisClient!);
await replicaAJobStore.initialize(); await replicaAJobStore.initialize();
const streamId = `redis-error-${Date.now()}`; const streamId = `redis-error-${Date.now()}`;
@ -1463,13 +1852,8 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
}); });
describe('Cross-Replica Live Streaming (Redis)', () => { describeRedis('Cross-Replica Live Streaming (Redis)', () => {
test('should publish events to Redis even when no local subscriber exists', async () => { test('should publish events to Redis even when no local subscriber exists', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const replicaA = new GenerationJobManagerClass(); const replicaA = new GenerationJobManagerClass();
const servicesA = createStreamServices({ const servicesA = createStreamServices({
useRedis: true, useRedis: true,
@ -1489,7 +1873,7 @@ describe('GenerationJobManager Integration Tests', () => {
const streamId = `cross-live-${Date.now()}`; const streamId = `cross-live-${Date.now()}`;
await replicaA.createJob(streamId, 'user-1'); await replicaA.createJob(streamId, 'user-1');
const replicaBJobStore = new RedisJobStore(ioredisClient); const replicaBJobStore = new RedisJobStore(ioredisClient!);
await replicaBJobStore.initialize(); await replicaBJobStore.initialize();
await replicaBJobStore.createJob(streamId, 'user-1'); await replicaBJobStore.createJob(streamId, 'user-1');
@ -1519,11 +1903,6 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should not cause data loss on cross-replica subscribers when local subscriber joins', async () => { test('should not cause data loss on cross-replica subscribers when local subscriber joins', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const replicaA = new GenerationJobManagerClass(); const replicaA = new GenerationJobManagerClass();
const servicesA = createStreamServices({ const servicesA = createStreamServices({
useRedis: true, useRedis: true,
@ -1543,7 +1922,7 @@ describe('GenerationJobManager Integration Tests', () => {
const streamId = `cross-seq-safe-${Date.now()}`; const streamId = `cross-seq-safe-${Date.now()}`;
await replicaA.createJob(streamId, 'user-1'); await replicaA.createJob(streamId, 'user-1');
const replicaBJobStore = new RedisJobStore(ioredisClient); const replicaBJobStore = new RedisJobStore(ioredisClient!);
await replicaBJobStore.initialize(); await replicaBJobStore.initialize();
await replicaBJobStore.createJob(streamId, 'user-1'); await replicaBJobStore.createJob(streamId, 'user-1');
@ -1603,11 +1982,6 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should deliver buffered events locally AND publish live events cross-replica', async () => { test('should deliver buffered events locally AND publish live events cross-replica', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const replicaA = new GenerationJobManagerClass(); const replicaA = new GenerationJobManagerClass();
const servicesA = createStreamServices({ const servicesA = createStreamServices({
useRedis: true, useRedis: true,
@ -1641,7 +2015,7 @@ describe('GenerationJobManager Integration Tests', () => {
replicaB.configure(servicesB); replicaB.configure(servicesB);
replicaB.initialize(); replicaB.initialize();
const replicaBJobStore = new RedisJobStore(ioredisClient); const replicaBJobStore = new RedisJobStore(ioredisClient!);
await replicaBJobStore.initialize(); await replicaBJobStore.initialize();
await replicaBJobStore.createJob(streamId, 'user-1'); await replicaBJobStore.createJob(streamId, 'user-1');
@ -1671,13 +2045,8 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
}); });
describe('Concurrent Subscriber Readiness (Redis)', () => { describeRedis('Concurrent Subscriber Readiness (Redis)', () => {
test('should return ready promise to all concurrent subscribers for same stream', async () => { test('should return ready promise to all concurrent subscribers for same stream', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const subscriber = ( const subscriber = (
ioredisClient as unknown as { duplicate: () => typeof ioredisClient } ioredisClient as unknown as { duplicate: () => typeof ioredisClient }
).duplicate()!; ).duplicate()!;
@ -1706,13 +2075,8 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
}); });
describe('Sequence Reset Safety (Redis)', () => { describeRedis('Sequence Reset Safety (Redis)', () => {
test('should not receive stale pre-subscribe events via Redis after sequence reset', async () => { test('should not receive stale pre-subscribe events via Redis after sequence reset', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const manager = new GenerationJobManagerClass(); const manager = new GenerationJobManagerClass();
const services = createStreamServices({ const services = createStreamServices({
useRedis: true, useRedis: true,
@ -1774,11 +2138,6 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
test('should not reset sequence when second subscriber joins mid-stream', async () => { test('should not reset sequence when second subscriber joins mid-stream', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const manager = new GenerationJobManagerClass(); const manager = new GenerationJobManagerClass();
const services = createStreamServices({ const services = createStreamServices({
useRedis: true, useRedis: true,
@ -1837,13 +2196,8 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
}); });
describe('Subscribe Error Recovery (Redis)', () => { describeRedis('Subscribe Error Recovery (Redis)', () => {
test('should allow resubscription after Redis subscribe failure', async () => { test('should allow resubscription after Redis subscribe failure', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const subscriber = ( const subscriber = (
ioredisClient as unknown as { duplicate: () => typeof ioredisClient } ioredisClient as unknown as { duplicate: () => typeof ioredisClient }
).duplicate()!; ).duplicate()!;
@ -1892,12 +2246,7 @@ describe('GenerationJobManager Integration Tests', () => {
}); });
describe('createStreamServices Auto-Detection', () => { describe('createStreamServices Auto-Detection', () => {
test('should use Redis when useRedis is true and client is available', () => { testRedis('should use Redis when useRedis is true and client is available', () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const services = createStreamServices({ const services = createStreamServices({
useRedis: true, useRedis: true,
redisClient: ioredisClient, redisClient: ioredisClient,

View file

@ -47,3 +47,24 @@ export type ChunkHandler = (event: ServerSentEvent) => void;
export type DoneHandler = (event: ServerSentEvent) => void; export type DoneHandler = (event: ServerSentEvent) => void;
export type ErrorHandler = (error: string) => void; export type ErrorHandler = (error: string) => void;
export type UnsubscribeFn = () => void; export type UnsubscribeFn = () => void;
/** Options for subscribing to a job event stream */
export interface SubscribeOptions {
/**
* When true, skips replaying the earlyEventBuffer.
* Use for resume connections after a sync event has been sent.
*/
skipBufferReplay?: boolean;
}
/** Result of an atomic subscribe-with-resume operation */
export interface SubscribeWithResumeResult {
subscription: { unsubscribe: UnsubscribeFn } | null;
resumeState: ResumeState | null;
/**
* Events that arrived between the resume snapshot and the subscribe call.
* In-memory mode: drained from earlyEventBuffer (only place they exist).
* Redis mode: empty chunks are persisted to the store and appear in aggregatedContent on next resume.
*/
pendingEvents: ServerSentEvent[];
}