feat: Introduce Redis-backed stream services for enhanced job management

- Added createStreamServices function to configure job store and event transport, supporting both Redis and in-memory options.
- Updated GenerationJobManager to allow configuration with custom job stores and event transports, improving flexibility for different deployment scenarios.
- Refactored IJobStore interface to support asynchronous content retrieval, ensuring compatibility with Redis implementations.
- Implemented RedisEventTransport for real-time event delivery across instances, enhancing scalability and responsiveness.
- Updated InMemoryJobStore to align with new async patterns for content and run step retrieval, ensuring consistent behavior across storage options.
This commit is contained in:
Danny Avila 2025-12-14 23:45:08 -05:00
parent ed1e6e7a47
commit 0fa06af7b9
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
9 changed files with 835 additions and 49 deletions

View file

@ -262,24 +262,13 @@ export class RedisJobStore implements IJobStore {
}
/**
* For Redis, this returns null - caller should use getAggregatedContentAsync().
* This sync method exists for interface compatibility with in-memory.
*
* Note: GenerationJobManager should check for null and call the async version.
* Get aggregated content from chunks.
* Reconstructs message content from Redis Streams on demand.
*/
getContentParts(): Agents.MessageContentComplex[] | null {
// Redis can't return content synchronously - must use chunks
return null;
}
/**
* Get aggregated content from chunks (async version for Redis).
* Called on client reconnection to reconstruct message content.
*/
async getAggregatedContentAsync(
streamId: string,
): Promise<Agents.MessageContentComplex[] | null> {
async getContentParts(streamId: string): Promise<Agents.MessageContentComplex[] | null> {
const chunks = await this.getChunks(streamId);
logger.debug(`[RedisJobStore] getContentParts: ${streamId} has ${chunks.length} chunks`);
if (chunks.length === 0) {
return null;
}
@ -319,25 +308,19 @@ export class RedisJobStore implements IJobStore {
}
/**
* For Redis, run steps must be fetched async.
* This sync method returns empty - caller should use getRunStepsAsync().
* Get run steps from Redis.
*/
getRunSteps(): Agents.RunStep[] {
// Redis can't return run steps synchronously
return [];
}
/**
* Get run steps (async version for Redis).
*/
async getRunStepsAsync(streamId: string): Promise<Agents.RunStep[]> {
async getRunSteps(streamId: string): Promise<Agents.RunStep[]> {
const key = KEYS.runSteps(streamId);
const data = await this.redis.get(key);
if (!data) {
logger.debug(`[RedisJobStore] getRunSteps: ${streamId} has no run steps`);
return [];
}
try {
return JSON.parse(data);
const runSteps = JSON.parse(data);
logger.debug(`[RedisJobStore] getRunSteps: ${streamId} has ${runSteps.length} run steps`);
return runSteps;
} catch {
return [];
}
@ -369,6 +352,8 @@ export class RedisJobStore implements IJobStore {
*/
async appendChunk(streamId: string, event: unknown): Promise<void> {
const key = KEYS.chunks(streamId);
const eventObj = event as { event?: string };
logger.debug(`[RedisJobStore] appendChunk: ${streamId} event=${eventObj.event}`);
await this.redis.xadd(key, '*', 'event', JSON.stringify(event));
}
@ -402,6 +387,197 @@ export class RedisJobStore implements IJobStore {
await this.redis.set(key, JSON.stringify(runSteps), 'EX', TTL.running);
}
// ===== Consumer Group Methods =====
// These enable tracking which chunks each client has seen.
// Based on https://upstash.com/blog/resumable-llm-streams
/**
* Create a consumer group for a stream.
* Used to track which chunks a client has already received.
*
* @param streamId - The stream identifier
* @param groupName - Unique name for the consumer group (e.g., session ID)
* @param startFrom - Where to start reading ('0' = from beginning, '$' = only new)
*/
async createConsumerGroup(
streamId: string,
groupName: string,
startFrom: '0' | '$' = '0',
): Promise<void> {
const key = KEYS.chunks(streamId);
try {
await this.redis.xgroup('CREATE', key, groupName, startFrom, 'MKSTREAM');
logger.debug(`[RedisJobStore] Created consumer group ${groupName} for ${streamId}`);
} catch (err) {
// BUSYGROUP error means group already exists - that's fine
const error = err as Error;
if (!error.message?.includes('BUSYGROUP')) {
throw err;
}
}
}
/**
* Read chunks from a consumer group (only unseen chunks).
* This is the key to the resumable stream pattern.
*
* @param streamId - The stream identifier
* @param groupName - Consumer group name
* @param consumerName - Name of the consumer within the group
* @param count - Maximum number of chunks to read (default: all available)
* @returns Array of { id, event } where id is the Redis stream entry ID
*/
async readChunksFromGroup(
streamId: string,
groupName: string,
consumerName: string = 'consumer-1',
count?: number,
): Promise<Array<{ id: string; event: unknown }>> {
const key = KEYS.chunks(streamId);
try {
// XREADGROUP GROUP groupName consumerName [COUNT count] STREAMS key >
// The '>' means only read new messages not yet delivered to this consumer
let result;
if (count) {
result = await this.redis.xreadgroup(
'GROUP',
groupName,
consumerName,
'COUNT',
count,
'STREAMS',
key,
'>',
);
} else {
result = await this.redis.xreadgroup('GROUP', groupName, consumerName, 'STREAMS', key, '>');
}
if (!result || result.length === 0) {
return [];
}
// Result format: [[streamKey, [[id, [field, value, ...]], ...]]]
const [, messages] = result[0] as [string, Array<[string, string[]]>];
const chunks: Array<{ id: string; event: unknown }> = [];
for (const [id, fields] of messages) {
const eventIdx = fields.indexOf('event');
if (eventIdx >= 0 && eventIdx + 1 < fields.length) {
try {
chunks.push({
id,
event: JSON.parse(fields[eventIdx + 1]),
});
} catch {
// Skip malformed entries
}
}
}
return chunks;
} catch (err) {
const error = err as Error;
// NOGROUP error means the group doesn't exist yet
if (error.message?.includes('NOGROUP')) {
return [];
}
throw err;
}
}
/**
* Acknowledge that chunks have been processed.
* This tells Redis we've successfully delivered these chunks to the client.
*
* @param streamId - The stream identifier
* @param groupName - Consumer group name
* @param messageIds - Array of Redis stream entry IDs to acknowledge
*/
async acknowledgeChunks(
streamId: string,
groupName: string,
messageIds: string[],
): Promise<void> {
if (messageIds.length === 0) {
return;
}
const key = KEYS.chunks(streamId);
await this.redis.xack(key, groupName, ...messageIds);
}
/**
* Delete a consumer group.
* Called when a client disconnects and won't reconnect.
*
* @param streamId - The stream identifier
* @param groupName - Consumer group name to delete
*/
async deleteConsumerGroup(streamId: string, groupName: string): Promise<void> {
const key = KEYS.chunks(streamId);
try {
await this.redis.xgroup('DESTROY', key, groupName);
logger.debug(`[RedisJobStore] Deleted consumer group ${groupName} for ${streamId}`);
} catch {
// Ignore errors - group may not exist
}
}
/**
* Get pending chunks for a consumer (chunks delivered but not acknowledged).
* Useful for recovering from crashes.
*
* @param streamId - The stream identifier
* @param groupName - Consumer group name
* @param consumerName - Consumer name
*/
async getPendingChunks(
streamId: string,
groupName: string,
consumerName: string = 'consumer-1',
): Promise<Array<{ id: string; event: unknown }>> {
const key = KEYS.chunks(streamId);
try {
// Read pending messages (delivered but not acked) by using '0' instead of '>'
const result = await this.redis.xreadgroup(
'GROUP',
groupName,
consumerName,
'STREAMS',
key,
'0',
);
if (!result || result.length === 0) {
return [];
}
const [, messages] = result[0] as [string, Array<[string, string[]]>];
const chunks: Array<{ id: string; event: unknown }> = [];
for (const [id, fields] of messages) {
const eventIdx = fields.indexOf('event');
if (eventIdx >= 0 && eventIdx + 1 < fields.length) {
try {
chunks.push({
id,
event: JSON.parse(fields[eventIdx + 1]),
});
} catch {
// Skip malformed entries
}
}
}
return chunks;
} catch {
return [];
}
}
/**
* Serialize job data for Redis hash storage.
* Converts complex types to strings.