feat: Introduce Redis-backed stream services for enhanced job management

- Added createStreamServices function to configure job store and event transport, supporting both Redis and in-memory options.
- Updated GenerationJobManager to allow configuration with custom job stores and event transports, improving flexibility for different deployment scenarios.
- Refactored IJobStore interface to support asynchronous content retrieval, ensuring compatibility with Redis implementations.
- Implemented RedisEventTransport for real-time event delivery across instances, enhancing scalability and responsiveness.
- Updated InMemoryJobStore to align with new async patterns for content and run step retrieval, ensuring consistent behavior across storage options.
This commit is contained in:
Danny Avila 2025-12-14 23:45:08 -05:00
parent e51c8870e6
commit 78848c4af9
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
9 changed files with 835 additions and 49 deletions

View file

@ -11,6 +11,14 @@ import type * as t from '~/types';
import { InMemoryEventTransport } from './implementations/InMemoryEventTransport';
import { InMemoryJobStore } from './implementations/InMemoryJobStore';
/**
* Configuration options for GenerationJobManager
*/
export interface GenerationJobManagerOptions {
jobStore?: IJobStore;
eventTransport?: IEventTransport;
}
/**
* Runtime state for active jobs - not serializable, kept in-memory per instance.
* Contains AbortController, ready promise, and other non-serializable state.
@ -67,13 +75,18 @@ class GenerationJobManagerClass {
private cleanupInterval: NodeJS.Timeout | null = null;
constructor() {
this.jobStore = new InMemoryJobStore({ ttlAfterComplete: 300000, maxJobs: 1000 });
this.eventTransport = new InMemoryEventTransport();
/** Whether we're using Redis stores */
private _isRedis = false;
constructor(options?: GenerationJobManagerOptions) {
this.jobStore =
options?.jobStore ?? new InMemoryJobStore({ ttlAfterComplete: 300000, maxJobs: 1000 });
this.eventTransport = options?.eventTransport ?? new InMemoryEventTransport();
}
/**
* Initialize the job manager with periodic cleanup.
* Call this once at application startup.
*/
initialize(): void {
if (this.cleanupInterval) {
@ -93,6 +106,55 @@ class GenerationJobManagerClass {
logger.debug('[GenerationJobManager] Initialized');
}
/**
* Configure the manager with custom stores.
* Call this BEFORE initialize() to use Redis or other stores.
*
* @example Using Redis
* ```ts
* import { createStreamServicesFromCache } from '~/stream/createStreamServices';
* import { cacheConfig, ioredisClient } from '~/cache';
*
* const services = createStreamServicesFromCache({ cacheConfig, ioredisClient });
* GenerationJobManager.configure(services);
* GenerationJobManager.initialize();
* ```
*/
configure(services: {
jobStore: IJobStore;
eventTransport: IEventTransport;
isRedis?: boolean;
}): void {
if (this.cleanupInterval) {
logger.warn(
'[GenerationJobManager] Reconfiguring after initialization - destroying existing services',
);
this.destroy();
}
this.jobStore = services.jobStore;
this.eventTransport = services.eventTransport;
this._isRedis = services.isRedis ?? false;
logger.info(
`[GenerationJobManager] Configured with ${this._isRedis ? 'Redis' : 'in-memory'} stores`,
);
}
/**
* Check if using Redis stores.
*/
get isRedis(): boolean {
return this._isRedis;
}
/**
* Get the job store instance (for advanced use cases).
*/
getJobStore(): IJobStore {
return this.jobStore;
}
/**
* Create a new generation job.
*
@ -146,15 +208,17 @@ class GenerationJobManagerClass {
if (currentRuntime) {
currentRuntime.syncSent = false;
// Call registered handlers (from job.emitter.on('allSubscribersLeft', ...))
const content = this.jobStore.getContentParts(streamId) ?? [];
if (currentRuntime.allSubscribersLeftHandlers) {
for (const handler of currentRuntime.allSubscribersLeftHandlers) {
try {
handler(content);
} catch (err) {
logger.error(`[GenerationJobManager] Error in allSubscribersLeft handler:`, err);
this.jobStore.getContentParts(streamId).then((content) => {
const parts = content ?? [];
for (const handler of currentRuntime.allSubscribersLeftHandlers ?? []) {
try {
handler(parts);
} catch (err) {
logger.error(`[GenerationJobManager] Error in allSubscribersLeft handler:`, err);
}
}
}
});
}
}
logger.debug(`[GenerationJobManager] All subscribers left ${streamId}, reset syncSent`);
@ -282,8 +346,9 @@ class GenerationJobManagerClass {
error,
});
// Clear content state
// Clear content state and run step buffer
this.jobStore.clearContentState(streamId);
this.runStepBuffers.delete(streamId);
logger.debug(`[GenerationJobManager] Job completed: ${streamId}`);
}
@ -311,7 +376,7 @@ class GenerationJobManagerClass {
});
// Get content and extract text
const content = this.jobStore.getContentParts(streamId) ?? [];
const content = (await this.jobStore.getContentParts(streamId)) ?? [];
const text = this.extractTextFromContent(content);
// Create final event for abort
@ -458,9 +523,74 @@ class GenerationJobManagerClass {
// Track user message from created event
this.trackUserMessage(streamId, event);
// For Redis mode, persist chunk for later reconstruction
if (this._isRedis) {
// The SSE event structure is { event: string, data: unknown, ... }
// The aggregator expects { event: string, data: unknown } where data is the payload
const eventObj = event as Record<string, unknown>;
const eventType = eventObj.event as string | undefined;
const eventData = eventObj.data;
if (eventType && eventData !== undefined) {
// Store in format expected by aggregateContent: { event, data }
this.jobStore.appendChunk(streamId, { event: eventType, data: eventData }).catch((err) => {
logger.error(`[GenerationJobManager] Failed to append chunk:`, err);
});
// For run step events, also save to run steps key for quick retrieval
if (eventType === 'on_run_step' || eventType === 'on_run_step_completed') {
this.saveRunStepFromEvent(streamId, eventData as Record<string, unknown>);
}
}
}
this.eventTransport.emitChunk(streamId, event);
}
/**
* Extract and save run step from event data.
* The data is already the run step object from the event payload.
*/
private saveRunStepFromEvent(streamId: string, data: Record<string, unknown>): void {
// The data IS the run step object
const runStep = data as Agents.RunStep;
if (!runStep.id) {
return;
}
// Fire and forget - accumulate run steps
this.accumulateRunStep(streamId, runStep);
}
/**
* Accumulate run steps for a stream.
* Uses a simple in-memory buffer that gets flushed to Redis.
*/
private runStepBuffers = new Map<string, Agents.RunStep[]>();
private accumulateRunStep(streamId: string, runStep: Agents.RunStep): void {
let buffer = this.runStepBuffers.get(streamId);
if (!buffer) {
buffer = [];
this.runStepBuffers.set(streamId, buffer);
}
// Update or add run step
const existingIdx = buffer.findIndex((rs) => rs.id === runStep.id);
if (existingIdx >= 0) {
buffer[existingIdx] = runStep;
} else {
buffer.push(runStep);
}
// Debounced save to Redis
if (this.jobStore.saveRunSteps) {
this.jobStore.saveRunSteps(streamId, buffer).catch((err) => {
logger.error(`[GenerationJobManager] Failed to save run steps:`, err);
});
}
}
/**
* Track user message from created event.
*/
@ -554,8 +684,8 @@ class GenerationJobManagerClass {
return null;
}
const aggregatedContent = this.jobStore.getContentParts(streamId) ?? [];
const runSteps = this.jobStore.getRunSteps(streamId);
const aggregatedContent = (await this.jobStore.getContentParts(streamId)) ?? [];
const runSteps = await this.jobStore.getRunSteps(streamId);
logger.debug(`[GenerationJobManager] getResumeState:`, {
streamId,
@ -642,10 +772,12 @@ class GenerationJobManagerClass {
return null;
}
const aggregatedContent = (await this.jobStore.getContentParts(streamId)) ?? [];
return {
active: jobData.status === 'running',
status: jobData.status as t.GenerationJobStatus,
aggregatedContent: this.jobStore.getContentParts(streamId) ?? [],
aggregatedContent,
createdAt: jobData.createdAt,
};
}