🪵 refactor: Preserve Job Error State for Late Stream Subscribers (#11372)
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
Publish `@librechat/data-schemas` to NPM / build-and-publish (push) Waiting to run
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions

* 🪵 refactor: Preserve job error state for late stream subscribers

* 🔧 fix: Enhance error handling for late subscribers in GenerationJobManager

- Implemented a cleanup strategy for error jobs to prevent immediate deletion, allowing late clients to receive error messages.
- Updated job status handling to prioritize error notifications over completion events.
- Added integration tests to verify error preservation and proper notification to late subscribers, including scenarios with Redis support.
This commit is contained in:
Danny Avila 2026-01-15 23:02:03 -05:00 committed by GitHub
parent 81f4af55b5
commit c378e777ef
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 335 additions and 11 deletions

View file

@ -33,6 +33,7 @@ export interface GenerationJobManagerOptions {
* @property readyPromise - Resolves immediately (legacy, kept for API compatibility)
* @property resolveReady - Function to resolve readyPromise
* @property finalEvent - Cached final event for late subscribers
* @property errorEvent - Cached error event for late subscribers (errors before client connects)
* @property syncSent - Whether sync event was sent (reset when all subscribers leave)
* @property earlyEventBuffer - Buffer for events emitted before first subscriber connects
* @property hasSubscriber - Whether at least one subscriber has connected
@ -47,6 +48,7 @@ interface RuntimeJobState {
readyPromise: Promise<void>;
resolveReady: () => void;
finalEvent?: t.ServerSentEvent;
errorEvent?: string;
syncSent: boolean;
earlyEventBuffer: t.ServerSentEvent[];
hasSubscriber: boolean;
@ -421,6 +423,7 @@ class GenerationJobManagerClass {
earlyEventBuffer: [],
hasSubscriber: false,
finalEvent,
errorEvent: jobData.error,
};
this.runtimeState.set(streamId, runtime);
@ -510,6 +513,8 @@ class GenerationJobManagerClass {
/**
* Mark job as complete.
* If cleanupOnComplete is true (default), immediately cleans up job resources.
* Exception: Jobs with errors are NOT immediately deleted to allow late-connecting
* clients to receive the error (race condition where error occurs before client connects).
* Note: eventTransport is NOT cleaned up here to allow the final event to be
* fully transmitted. It will be cleaned up when subscribers disconnect or
* by the periodic cleanup job.
@ -527,7 +532,29 @@ class GenerationJobManagerClass {
this.jobStore.clearContentState(streamId);
this.runStepBuffers?.delete(streamId);
// Immediate cleanup if configured (default: true)
// For error jobs, DON'T delete immediately - keep around so late-connecting
// clients can receive the error. This handles the race condition where error
// occurs before client connects to SSE stream.
//
// Cleanup strategy: Error jobs are cleaned up by periodic cleanup (every 60s)
// via jobStore.cleanup() which checks for jobs with status 'error' and
// completedAt set. The TTL is configurable via jobStore options (default: 0,
// meaning cleanup on next interval). This gives clients ~60s to connect and
// receive the error before the job is removed.
if (error) {
await this.jobStore.updateJob(streamId, {
status: 'error',
completedAt: Date.now(),
error,
});
// Keep runtime state so subscribe() can access errorEvent
logger.debug(
`[GenerationJobManager] Job completed with error (keeping for late subscribers): ${streamId}`,
);
return;
}
// Immediate cleanup if configured (default: true) - only for successful completions
if (this._cleanupOnComplete) {
this.runtimeState.delete(streamId);
// Don't cleanup eventTransport here - let the done event fully transmit first.
@ -536,9 +563,8 @@ class GenerationJobManagerClass {
} else {
// Only update status if keeping the job around
await this.jobStore.updateJob(streamId, {
status: error ? 'error' : 'complete',
status: 'complete',
completedAt: Date.now(),
error,
});
}
@ -678,14 +704,22 @@ class GenerationJobManagerClass {
const jobData = await this.jobStore.getJob(streamId);
// If job already complete, send final event
// If job already complete/error, send final event or error
// Error status takes precedence to ensure errors aren't misreported as successes
setImmediate(() => {
if (
runtime.finalEvent &&
jobData &&
['complete', 'error', 'aborted'].includes(jobData.status)
) {
onDone?.(runtime.finalEvent);
if (jobData && ['complete', 'error', 'aborted'].includes(jobData.status)) {
// Check for error status FIRST and prioritize error handling
if (jobData.status === 'error' && (runtime.errorEvent || jobData.error)) {
const errorToSend = runtime.errorEvent ?? jobData.error;
if (errorToSend) {
logger.debug(
`[GenerationJobManager] Sending stored error to late subscriber: ${streamId}`,
);
onError?.(errorToSend);
}
} else if (runtime.finalEvent) {
onDone?.(runtime.finalEvent);
}
}
});
@ -986,8 +1020,18 @@ class GenerationJobManagerClass {
/**
* Emit an error event.
* Stores the error for late-connecting subscribers (race condition where error
* occurs before client connects to SSE stream).
*/
emitError(streamId: string, error: string): void {
const runtime = this.runtimeState.get(streamId);
if (runtime) {
runtime.errorEvent = error;
}
// Persist error to job store for cross-replica consistency
this.jobStore.updateJob(streamId, { error }).catch((err) => {
logger.error(`[GenerationJobManager] Failed to persist error:`, err);
});
this.eventTransport.emitError(streamId, error);
}

View file

@ -796,6 +796,282 @@ describe('GenerationJobManager Integration Tests', () => {
});
});
describe('Error Preservation for Late Subscribers', () => {
/**
* These tests verify the fix for the race condition where errors
* (like INPUT_LENGTH) occur before the SSE client connects.
*
* Problem: Error emitError completeJob job deleted client connects 404
* Fix: Store error, don't delete job immediately, send error to late subscriber
*/
test('should store error in emitError for late-connecting subscribers', async () => {
const { GenerationJobManager } = await import('../GenerationJobManager');
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport');
GenerationJobManager.configure({
jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }),
eventTransport: new InMemoryEventTransport(),
isRedis: false,
cleanupOnComplete: false,
});
await GenerationJobManager.initialize();
const streamId = `error-store-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1');
const errorMessage = '{ "type": "INPUT_LENGTH", "info": "234856 / 172627" }';
// Emit error (no subscribers yet - simulates race condition)
GenerationJobManager.emitError(streamId, errorMessage);
// Wait for async job store update
await new Promise((resolve) => setTimeout(resolve, 50));
// Verify error is stored in job store
const job = await GenerationJobManager.getJob(streamId);
expect(job?.error).toBe(errorMessage);
await GenerationJobManager.destroy();
});
test('should NOT delete job immediately when completeJob is called with error', async () => {
const { GenerationJobManager } = await import('../GenerationJobManager');
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport');
GenerationJobManager.configure({
jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }),
eventTransport: new InMemoryEventTransport(),
isRedis: false,
cleanupOnComplete: true, // Default behavior
});
await GenerationJobManager.initialize();
const streamId = `error-no-delete-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1');
const errorMessage = 'Test error message';
// Complete with error
await GenerationJobManager.completeJob(streamId, errorMessage);
// Job should still exist (not deleted)
const hasJob = await GenerationJobManager.hasJob(streamId);
expect(hasJob).toBe(true);
// Job should have error status
const job = await GenerationJobManager.getJob(streamId);
expect(job?.status).toBe('error');
expect(job?.error).toBe(errorMessage);
await GenerationJobManager.destroy();
});
test('should send stored error to late-connecting subscriber', async () => {
const { GenerationJobManager } = await import('../GenerationJobManager');
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport');
GenerationJobManager.configure({
jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }),
eventTransport: new InMemoryEventTransport(),
isRedis: false,
cleanupOnComplete: true,
});
await GenerationJobManager.initialize();
const streamId = `error-late-sub-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1');
const errorMessage = '{ "type": "INPUT_LENGTH", "info": "234856 / 172627" }';
// Simulate race condition: error occurs before client connects
GenerationJobManager.emitError(streamId, errorMessage);
await GenerationJobManager.completeJob(streamId, errorMessage);
// Wait for async operations
await new Promise((resolve) => setTimeout(resolve, 50));
// Now client connects (late subscriber)
let receivedError: string | undefined;
const subscription = await GenerationJobManager.subscribe(
streamId,
() => {}, // onChunk
() => {}, // onDone
(error) => {
receivedError = error;
}, // onError
);
expect(subscription).not.toBeNull();
// Wait for setImmediate in subscribe to fire
await new Promise((resolve) => setTimeout(resolve, 50));
// Late subscriber should receive the stored error
expect(receivedError).toBe(errorMessage);
subscription?.unsubscribe();
await GenerationJobManager.destroy();
});
test('should prioritize error status over finalEvent in subscribe', async () => {
const { GenerationJobManager } = await import('../GenerationJobManager');
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport');
GenerationJobManager.configure({
jobStore: new InMemoryJobStore({ ttlAfterComplete: 60000 }),
eventTransport: new InMemoryEventTransport(),
isRedis: false,
cleanupOnComplete: false,
});
await GenerationJobManager.initialize();
const streamId = `error-priority-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1');
const errorMessage = 'Error should take priority';
// Emit error and complete with error
GenerationJobManager.emitError(streamId, errorMessage);
await GenerationJobManager.completeJob(streamId, errorMessage);
await new Promise((resolve) => setTimeout(resolve, 50));
// Subscribe and verify error is received (not a done event)
let receivedError: string | undefined;
let receivedDone = false;
const subscription = await GenerationJobManager.subscribe(
streamId,
() => {},
() => {
receivedDone = true;
},
(error) => {
receivedError = error;
},
);
await new Promise((resolve) => setTimeout(resolve, 50));
// Error should be received, not done
expect(receivedError).toBe(errorMessage);
expect(receivedDone).toBe(false);
subscription?.unsubscribe();
await GenerationJobManager.destroy();
});
test('should handle error preservation in Redis mode (cross-replica)', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { createStreamServices } = await import('../createStreamServices');
const { RedisJobStore } = await import('../implementations/RedisJobStore');
// === Replica A: Creates job and emits error ===
const replicaAJobStore = new RedisJobStore(ioredisClient);
await replicaAJobStore.initialize();
const streamId = `redis-error-${Date.now()}`;
const errorMessage = '{ "type": "INPUT_LENGTH", "info": "234856 / 172627" }';
await replicaAJobStore.createJob(streamId, 'user-1');
await replicaAJobStore.updateJob(streamId, {
status: 'error',
error: errorMessage,
completedAt: Date.now(),
});
// === Replica B: Fresh manager receives client connection ===
jest.resetModules();
const { GenerationJobManager } = await import('../GenerationJobManager');
const services = createStreamServices({
useRedis: true,
redisClient: ioredisClient,
});
GenerationJobManager.configure({
...services,
cleanupOnComplete: false,
});
await GenerationJobManager.initialize();
// Client connects to Replica B (job created on Replica A)
let receivedError: string | undefined;
const subscription = await GenerationJobManager.subscribe(
streamId,
() => {},
() => {},
(error) => {
receivedError = error;
},
);
expect(subscription).not.toBeNull();
await new Promise((resolve) => setTimeout(resolve, 100));
// Error should be loaded from Redis and sent to subscriber
expect(receivedError).toBe(errorMessage);
subscription?.unsubscribe();
await GenerationJobManager.destroy();
await replicaAJobStore.destroy();
});
test('error jobs should be cleaned up by periodic cleanup after TTL', async () => {
const { GenerationJobManager } = await import('../GenerationJobManager');
const { InMemoryJobStore } = await import('../implementations/InMemoryJobStore');
const { InMemoryEventTransport } = await import('../implementations/InMemoryEventTransport');
// Use a very short TTL for testing
const jobStore = new InMemoryJobStore({ ttlAfterComplete: 100 });
GenerationJobManager.configure({
jobStore,
eventTransport: new InMemoryEventTransport(),
isRedis: false,
cleanupOnComplete: true,
});
await GenerationJobManager.initialize();
const streamId = `error-cleanup-${Date.now()}`;
await GenerationJobManager.createJob(streamId, 'user-1');
// Complete with error
await GenerationJobManager.completeJob(streamId, 'Test error');
// Job should exist immediately after error
let hasJob = await GenerationJobManager.hasJob(streamId);
expect(hasJob).toBe(true);
// Wait for TTL to expire
await new Promise((resolve) => setTimeout(resolve, 150));
// Trigger cleanup
await jobStore.cleanup();
// Job should be cleaned up after TTL
hasJob = await GenerationJobManager.hasJob(streamId);
expect(hasJob).toBe(false);
await GenerationJobManager.destroy();
});
});
describe('createStreamServices Auto-Detection', () => {
test('should auto-detect Redis when USE_REDIS is true', async () => {
if (!ioredisClient) {

View file

@ -79,7 +79,11 @@ export class InMemoryEventTransport implements IEventTransport {
emitError(streamId: string, error: string): void {
const state = this.streams.get(streamId);
state?.emitter.emit('error', error);
// Only emit if there are listeners - Node.js throws on unhandled 'error' events
// This is intentional for the race condition where error occurs before client connects
if (state?.emitter.listenerCount('error') ?? 0 > 0) {
state?.emitter.emit('error', error);
}
}
getSubscriberCount(streamId: string): number {