diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index c09dafd6de..bb1a268ad0 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -357,7 +357,10 @@ class GenerationJobManagerClass { /** * Mark job as complete. - * If cleanupOnComplete is true (default), immediately cleans up all job resources. + * If cleanupOnComplete is true (default), immediately cleans up job resources. + * Note: eventTransport is NOT cleaned up here to allow the final event to be + * fully transmitted. It will be cleaned up when subscribers disconnect or + * by the periodic cleanup job. */ async completeJob(streamId: string, error?: string): Promise { // Clear content state and run step buffer (Redis only) @@ -367,7 +370,8 @@ class GenerationJobManagerClass { // Immediate cleanup if configured (default: true) if (this._cleanupOnComplete) { this.runtimeState.delete(streamId); - this.eventTransport.cleanup(streamId); + // Don't cleanup eventTransport here - let the done event fully transmit first. + // EventTransport will be cleaned up when subscribers disconnect or by periodic cleanup. await this.jobStore.deleteJob(streamId); } else { // Only update status if keeping the job around @@ -443,7 +447,7 @@ class GenerationJobManagerClass { // Immediate cleanup if configured (default: true) if (this._cleanupOnComplete) { this.runtimeState.delete(streamId); - this.eventTransport.cleanup(streamId); + // Don't cleanup eventTransport here - let the abort event fully transmit first. await this.jobStore.deleteJob(streamId); } else { // Only update status if keeping the job around @@ -806,6 +810,14 @@ class GenerationJobManagerClass { } } + // Check eventTransport for orphaned streams (e.g., connections dropped without clean close) + // These are streams that exist in eventTransport but have no corresponding job + for (const streamId of this.eventTransport.getTrackedStreamIds()) { + if (!(await this.jobStore.hasJob(streamId)) && !this.runtimeState.has(streamId)) { + this.eventTransport.cleanup(streamId); + } + } + if (count > 0) { logger.debug(`[GenerationJobManager] Cleaned up ${count} expired jobs`); } diff --git a/packages/api/src/stream/implementations/InMemoryEventTransport.ts b/packages/api/src/stream/implementations/InMemoryEventTransport.ts index e4ac88b19e..fd9c65e239 100644 --- a/packages/api/src/stream/implementations/InMemoryEventTransport.ts +++ b/packages/api/src/stream/implementations/InMemoryEventTransport.ts @@ -55,9 +55,12 @@ export class InMemoryEventTransport implements IEventTransport { currentState.emitter.off('done', doneHandler); currentState.emitter.off('error', errorHandler); - // Check if all subscribers left + // Check if all subscribers left - cleanup and notify if (currentState.emitter.listenerCount('chunk') === 0) { currentState.allSubscribersLeftCallback?.(); + // Auto-cleanup the stream entry when no subscribers remain + currentState.emitter.removeAllListeners(); + this.streams.delete(streamId); } } }, @@ -117,6 +120,13 @@ export class InMemoryEventTransport implements IEventTransport { return this.streams.size; } + /** + * Get all tracked stream IDs (for orphan cleanup) + */ + getTrackedStreamIds(): string[] { + return Array.from(this.streams.keys()); + } + destroy(): void { for (const state of this.streams.values()) { state.emitter.removeAllListeners(); diff --git a/packages/api/src/stream/implementations/RedisEventTransport.ts b/packages/api/src/stream/implementations/RedisEventTransport.ts index c2df372df7..79aa05699a 100644 --- a/packages/api/src/stream/implementations/RedisEventTransport.ts +++ b/packages/api/src/stream/implementations/RedisEventTransport.ts @@ -267,6 +267,13 @@ export class RedisEventTransport implements IEventTransport { } } + /** + * Get all tracked stream IDs (for orphan cleanup) + */ + getTrackedStreamIds(): string[] { + return Array.from(this.streams.keys()); + } + /** * Cleanup resources for a specific stream. */ diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index 186c2525ba..b1670a57ed 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -238,6 +238,9 @@ export interface IEventTransport { /** Cleanup transport resources for a specific stream */ cleanup(streamId: string): void; + /** Get all tracked stream IDs (for orphan cleanup) */ + getTrackedStreamIds(): string[]; + /** Destroy all transport resources */ destroy(): void; }