refactor: cleanupOnComplete option to GenerationJobManager for flexible resource management

- Introduced a new configuration option, cleanupOnComplete, allowing immediate cleanup of event transport and job resources upon job completion.
- Updated completeJob and abortJob methods to respect the cleanupOnComplete setting, enhancing memory management.
- Improved cleanup logic in the cleanup method to handle orphaned resources effectively.
- Enhanced documentation and comments for better clarity on the new functionality.
This commit is contained in:
Danny Avila 2025-12-15 09:16:06 -05:00
parent 2b3f4d58db
commit e01684a30a
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956

View file

@ -17,6 +17,12 @@ import { InMemoryJobStore } from './implementations/InMemoryJobStore';
export interface GenerationJobManagerOptions { export interface GenerationJobManagerOptions {
jobStore?: IJobStore; jobStore?: IJobStore;
eventTransport?: IEventTransport; eventTransport?: IEventTransport;
/**
* If true, cleans up event transport immediately when job completes.
* If false, keeps EventEmitters until periodic cleanup for late reconnections.
* Default: true (immediate cleanup to save memory)
*/
cleanupOnComplete?: boolean;
} }
/** /**
@ -78,10 +84,14 @@ class GenerationJobManagerClass {
/** Whether we're using Redis stores */ /** Whether we're using Redis stores */
private _isRedis = false; private _isRedis = false;
/** Whether to cleanup event transport immediately on job completion */
private _cleanupOnComplete = true;
constructor(options?: GenerationJobManagerOptions) { constructor(options?: GenerationJobManagerOptions) {
this.jobStore = this.jobStore =
options?.jobStore ?? new InMemoryJobStore({ ttlAfterComplete: 300000, maxJobs: 1000 }); options?.jobStore ?? new InMemoryJobStore({ ttlAfterComplete: 0, maxJobs: 1000 });
this.eventTransport = options?.eventTransport ?? new InMemoryEventTransport(); this.eventTransport = options?.eventTransport ?? new InMemoryEventTransport();
this._cleanupOnComplete = options?.cleanupOnComplete ?? true;
} }
/** /**
@ -124,6 +134,7 @@ class GenerationJobManagerClass {
jobStore: IJobStore; jobStore: IJobStore;
eventTransport: IEventTransport; eventTransport: IEventTransport;
isRedis?: boolean; isRedis?: boolean;
cleanupOnComplete?: boolean;
}): void { }): void {
if (this.cleanupInterval) { if (this.cleanupInterval) {
logger.warn( logger.warn(
@ -135,6 +146,7 @@ class GenerationJobManagerClass {
this.jobStore = services.jobStore; this.jobStore = services.jobStore;
this.eventTransport = services.eventTransport; this.eventTransport = services.eventTransport;
this._isRedis = services.isRedis ?? false; this._isRedis = services.isRedis ?? false;
this._cleanupOnComplete = services.cleanupOnComplete ?? true;
logger.info( logger.info(
`[GenerationJobManager] Configured with ${this._isRedis ? 'Redis' : 'in-memory'} stores`, `[GenerationJobManager] Configured with ${this._isRedis ? 'Redis' : 'in-memory'} stores`,
@ -337,17 +349,26 @@ class GenerationJobManagerClass {
/** /**
* Mark job as complete. * Mark job as complete.
* If cleanupOnComplete is true (default), immediately cleans up all job resources.
*/ */
async completeJob(streamId: string, error?: string): Promise<void> { async completeJob(streamId: string, error?: string): Promise<void> {
await this.jobStore.updateJob(streamId, { // Clear content state and run step buffer (Redis only)
status: error ? 'error' : 'complete',
completedAt: Date.now(),
error,
});
// Clear content state and run step buffer
this.jobStore.clearContentState(streamId); this.jobStore.clearContentState(streamId);
this.runStepBuffers.delete(streamId); this.runStepBuffers?.delete(streamId);
// Immediate cleanup if configured (default: true)
if (this._cleanupOnComplete) {
this.runtimeState.delete(streamId);
this.eventTransport.cleanup(streamId);
await this.jobStore.deleteJob(streamId);
} else {
// Only update status if keeping the job around
await this.jobStore.updateJob(streamId, {
status: error ? 'error' : 'complete',
completedAt: Date.now(),
error,
});
}
logger.debug(`[GenerationJobManager] Job completed: ${streamId}`); logger.debug(`[GenerationJobManager] Job completed: ${streamId}`);
} }
@ -369,12 +390,7 @@ class GenerationJobManagerClass {
runtime.abortController.abort(); runtime.abortController.abort();
} }
await this.jobStore.updateJob(streamId, { // Get content before clearing state
status: 'aborted',
completedAt: Date.now(),
});
// Get content and extract text
const content = (await this.jobStore.getContentParts(streamId)) ?? []; const content = (await this.jobStore.getContentParts(streamId)) ?? [];
const text = this.extractTextFromContent(content); const text = this.extractTextFromContent(content);
@ -414,6 +430,20 @@ class GenerationJobManagerClass {
this.eventTransport.emitDone(streamId, abortFinalEvent); this.eventTransport.emitDone(streamId, abortFinalEvent);
this.jobStore.clearContentState(streamId); this.jobStore.clearContentState(streamId);
this.runStepBuffers?.delete(streamId);
// Immediate cleanup if configured (default: true)
if (this._cleanupOnComplete) {
this.runtimeState.delete(streamId);
this.eventTransport.cleanup(streamId);
await this.jobStore.deleteJob(streamId);
} else {
// Only update status if keeping the job around
await this.jobStore.updateJob(streamId, {
status: 'aborted',
completedAt: Date.now(),
});
}
logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`); logger.debug(`[GenerationJobManager] Job aborted: ${streamId}`);
@ -562,12 +592,18 @@ class GenerationJobManagerClass {
} }
/** /**
* Accumulate run steps for a stream. * Accumulate run steps for a stream (Redis mode only).
* Uses a simple in-memory buffer that gets flushed to Redis. * Uses a simple in-memory buffer that gets flushed to Redis.
* Not used in in-memory mode - run steps come from live graph via WeakRef.
*/ */
private runStepBuffers = new Map<string, Agents.RunStep[]>(); private runStepBuffers: Map<string, Agents.RunStep[]> | null = null;
private accumulateRunStep(streamId: string, runStep: Agents.RunStep): void { private accumulateRunStep(streamId: string, runStep: Agents.RunStep): void {
// Lazy initialization - only create map when first used (Redis mode)
if (!this.runStepBuffers) {
this.runStepBuffers = new Map();
}
let buffer = this.runStepBuffers.get(streamId); let buffer = this.runStepBuffers.get(streamId);
if (!buffer) { if (!buffer) {
buffer = []; buffer = [];
@ -582,7 +618,7 @@ class GenerationJobManagerClass {
buffer.push(runStep); buffer.push(runStep);
} }
// Debounced save to Redis // Save to Redis
if (this.jobStore.saveRunSteps) { if (this.jobStore.saveRunSteps) {
this.jobStore.saveRunSteps(streamId, buffer).catch((err) => { this.jobStore.saveRunSteps(streamId, buffer).catch((err) => {
logger.error(`[GenerationJobManager] Failed to save run steps:`, err); logger.error(`[GenerationJobManager] Failed to save run steps:`, err);
@ -619,7 +655,10 @@ class GenerationJobManagerClass {
/** /**
* Update job metadata. * Update job metadata.
*/ */
updateMetadata(streamId: string, metadata: Partial<t.GenerationJobMetadata>): void { async updateMetadata(
streamId: string,
metadata: Partial<t.GenerationJobMetadata>,
): Promise<void> {
const updates: Partial<SerializableJobData> = {}; const updates: Partial<SerializableJobData> = {};
if (metadata.responseMessageId) { if (metadata.responseMessageId) {
updates.responseMessageId = metadata.responseMessageId; updates.responseMessageId = metadata.responseMessageId;
@ -645,7 +684,7 @@ class GenerationJobManagerClass {
if (metadata.promptTokens !== undefined) { if (metadata.promptTokens !== undefined) {
updates.promptTokens = metadata.promptTokens; updates.promptTokens = metadata.promptTokens;
} }
this.jobStore.updateJob(streamId, updates); await this.jobStore.updateJob(streamId, updates);
} }
/** /**
@ -735,6 +774,7 @@ class GenerationJobManagerClass {
/** /**
* Cleanup expired jobs. * Cleanup expired jobs.
* Also cleans up any orphaned runtime state, buffers, and event transport entries.
*/ */
private async cleanup(): Promise<void> { private async cleanup(): Promise<void> {
const count = await this.jobStore.cleanup(); const count = await this.jobStore.cleanup();
@ -743,11 +783,21 @@ class GenerationJobManagerClass {
for (const streamId of this.runtimeState.keys()) { for (const streamId of this.runtimeState.keys()) {
if (!(await this.jobStore.hasJob(streamId))) { if (!(await this.jobStore.hasJob(streamId))) {
this.runtimeState.delete(streamId); this.runtimeState.delete(streamId);
this.runStepBuffers?.delete(streamId);
this.jobStore.clearContentState(streamId); this.jobStore.clearContentState(streamId);
this.eventTransport.cleanup(streamId); this.eventTransport.cleanup(streamId);
} }
} }
// Also check runStepBuffers for any orphaned entries (Redis mode only)
if (this.runStepBuffers) {
for (const streamId of this.runStepBuffers.keys()) {
if (!(await this.jobStore.hasJob(streamId))) {
this.runStepBuffers.delete(streamId);
}
}
}
if (count > 0) { if (count > 0) {
logger.debug(`[GenerationJobManager] Cleaned up ${count} expired jobs`); logger.debug(`[GenerationJobManager] Cleaned up ${count} expired jobs`);
} }
@ -799,6 +849,7 @@ class GenerationJobManagerClass {
/** /**
* Destroy the manager. * Destroy the manager.
* Cleans up all resources including runtime state, buffers, and stores.
*/ */
async destroy(): Promise<void> { async destroy(): Promise<void> {
if (this.cleanupInterval) { if (this.cleanupInterval) {
@ -809,6 +860,7 @@ class GenerationJobManagerClass {
await this.jobStore.destroy(); await this.jobStore.destroy();
this.eventTransport.destroy(); this.eventTransport.destroy();
this.runtimeState.clear(); this.runtimeState.clear();
this.runStepBuffers?.clear();
logger.debug('[GenerationJobManager] Destroyed'); logger.debug('[GenerationJobManager] Destroyed');
} }