From e01684a30ae11901e6b53c4fde04ba74b9004198 Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Mon, 15 Dec 2025 09:16:06 -0500 Subject: [PATCH] refactor: cleanupOnComplete option to GenerationJobManager for flexible resource management - Introduced a new configuration option, cleanupOnComplete, allowing immediate cleanup of event transport and job resources upon job completion. - Updated completeJob and abortJob methods to respect the cleanupOnComplete setting, enhancing memory management. - Improved cleanup logic in the cleanup method to handle orphaned resources effectively. - Enhanced documentation and comments for better clarity on the new functionality. --- .../api/src/stream/GenerationJobManager.ts | 92 +++++++++++++++---- 1 file changed, 72 insertions(+), 20 deletions(-) diff --git a/packages/api/src/stream/GenerationJobManager.ts b/packages/api/src/stream/GenerationJobManager.ts index 46669c0b5a..75a4182405 100644 --- a/packages/api/src/stream/GenerationJobManager.ts +++ b/packages/api/src/stream/GenerationJobManager.ts @@ -17,6 +17,12 @@ import { InMemoryJobStore } from './implementations/InMemoryJobStore'; export interface GenerationJobManagerOptions { jobStore?: IJobStore; eventTransport?: IEventTransport; + /** + * If true, cleans up event transport immediately when job completes. + * If false, keeps EventEmitters until periodic cleanup for late reconnections. + * Default: true (immediate cleanup to save memory) + */ + cleanupOnComplete?: boolean; } /** @@ -78,10 +84,14 @@ class GenerationJobManagerClass { /** Whether we're using Redis stores */ private _isRedis = false; + /** Whether to cleanup event transport immediately on job completion */ + private _cleanupOnComplete = true; + constructor(options?: GenerationJobManagerOptions) { this.jobStore = - options?.jobStore ?? new InMemoryJobStore({ ttlAfterComplete: 300000, maxJobs: 1000 }); + options?.jobStore ?? new InMemoryJobStore({ ttlAfterComplete: 0, maxJobs: 1000 }); this.eventTransport = options?.eventTransport ?? new InMemoryEventTransport(); + this._cleanupOnComplete = options?.cleanupOnComplete ?? true; } /** @@ -124,6 +134,7 @@ class GenerationJobManagerClass { jobStore: IJobStore; eventTransport: IEventTransport; isRedis?: boolean; + cleanupOnComplete?: boolean; }): void { if (this.cleanupInterval) { logger.warn( @@ -135,6 +146,7 @@ class GenerationJobManagerClass { this.jobStore = services.jobStore; this.eventTransport = services.eventTransport; this._isRedis = services.isRedis ?? false; + this._cleanupOnComplete = services.cleanupOnComplete ?? true; logger.info( `[GenerationJobManager] Configured with ${this._isRedis ? 'Redis' : 'in-memory'} stores`, @@ -337,17 +349,26 @@ class GenerationJobManagerClass { /** * Mark job as complete. + * If cleanupOnComplete is true (default), immediately cleans up all job resources. */ async completeJob(streamId: string, error?: string): Promise { - await this.jobStore.updateJob(streamId, { - status: error ? 'error' : 'complete', - completedAt: Date.now(), - error, - }); - - // Clear content state and run step buffer + // Clear content state and run step buffer (Redis only) this.jobStore.clearContentState(streamId); - this.runStepBuffers.delete(streamId); + this.runStepBuffers?.delete(streamId); + + // Immediate cleanup if configured (default: true) + if (this._cleanupOnComplete) { + this.runtimeState.delete(streamId); + this.eventTransport.cleanup(streamId); + await this.jobStore.deleteJob(streamId); + } else { + // Only update status if keeping the job around + await this.jobStore.updateJob(streamId, { + status: error ? 'error' : 'complete', + completedAt: Date.now(), + error, + }); + } logger.debug(`[GenerationJobManager] Job completed: ${streamId}`); } @@ -369,12 +390,7 @@ class GenerationJobManagerClass { runtime.abortController.abort(); } - await this.jobStore.updateJob(streamId, { - status: 'aborted', - completedAt: Date.now(), - }); - - // Get content and extract text + // Get content before clearing state const content = (await this.jobStore.getContentParts(streamId)) ?? []; const text = this.extractTextFromContent(content); @@ -414,6 +430,20 @@ class GenerationJobManagerClass { this.eventTransport.emitDone(streamId, abortFinalEvent); this.jobStore.clearContentState(streamId); + this.runStepBuffers?.delete(streamId); + + // Immediate cleanup if configured (default: true) + if (this._cleanupOnComplete) { + this.runtimeState.delete(streamId); + this.eventTransport.cleanup(streamId); + await this.jobStore.deleteJob(streamId); + } else { + // Only update status if keeping the job around + await this.jobStore.updateJob(streamId, { + status: 'aborted', + completedAt: Date.now(), + }); + } logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`); @@ -562,12 +592,18 @@ class GenerationJobManagerClass { } /** - * Accumulate run steps for a stream. + * Accumulate run steps for a stream (Redis mode only). * Uses a simple in-memory buffer that gets flushed to Redis. + * Not used in in-memory mode - run steps come from live graph via WeakRef. */ - private runStepBuffers = new Map(); + private runStepBuffers: Map | null = null; private accumulateRunStep(streamId: string, runStep: Agents.RunStep): void { + // Lazy initialization - only create map when first used (Redis mode) + if (!this.runStepBuffers) { + this.runStepBuffers = new Map(); + } + let buffer = this.runStepBuffers.get(streamId); if (!buffer) { buffer = []; @@ -582,7 +618,7 @@ class GenerationJobManagerClass { buffer.push(runStep); } - // Debounced save to Redis + // Save to Redis if (this.jobStore.saveRunSteps) { this.jobStore.saveRunSteps(streamId, buffer).catch((err) => { logger.error(`[GenerationJobManager] Failed to save run steps:`, err); @@ -619,7 +655,10 @@ class GenerationJobManagerClass { /** * Update job metadata. */ - updateMetadata(streamId: string, metadata: Partial): void { + async updateMetadata( + streamId: string, + metadata: Partial, + ): Promise { const updates: Partial = {}; if (metadata.responseMessageId) { updates.responseMessageId = metadata.responseMessageId; @@ -645,7 +684,7 @@ class GenerationJobManagerClass { if (metadata.promptTokens !== undefined) { updates.promptTokens = metadata.promptTokens; } - this.jobStore.updateJob(streamId, updates); + await this.jobStore.updateJob(streamId, updates); } /** @@ -735,6 +774,7 @@ class GenerationJobManagerClass { /** * Cleanup expired jobs. + * Also cleans up any orphaned runtime state, buffers, and event transport entries. */ private async cleanup(): Promise { const count = await this.jobStore.cleanup(); @@ -743,11 +783,21 @@ class GenerationJobManagerClass { for (const streamId of this.runtimeState.keys()) { if (!(await this.jobStore.hasJob(streamId))) { this.runtimeState.delete(streamId); + this.runStepBuffers?.delete(streamId); this.jobStore.clearContentState(streamId); this.eventTransport.cleanup(streamId); } } + // Also check runStepBuffers for any orphaned entries (Redis mode only) + if (this.runStepBuffers) { + for (const streamId of this.runStepBuffers.keys()) { + if (!(await this.jobStore.hasJob(streamId))) { + this.runStepBuffers.delete(streamId); + } + } + } + if (count > 0) { logger.debug(`[GenerationJobManager] Cleaned up ${count} expired jobs`); } @@ -799,6 +849,7 @@ class GenerationJobManagerClass { /** * Destroy the manager. + * Cleans up all resources including runtime state, buffers, and stores. */ async destroy(): Promise { if (this.cleanupInterval) { @@ -809,6 +860,7 @@ class GenerationJobManagerClass { await this.jobStore.destroy(); this.eventTransport.destroy(); this.runtimeState.clear(); + this.runStepBuffers?.clear(); logger.debug('[GenerationJobManager] Destroyed'); }