mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-17 17:00:15 +01:00
refactor: Convert GenerationJobManager methods to async for improved performance
- Updated methods in GenerationJobManager and InMemoryJobStore to be asynchronous, enhancing the handling of job creation, retrieval, and management. - Adjusted the ResumableAgentController and related routes to await job operations, ensuring proper flow and error handling. - Increased timeout duration in ResumableAgentController's startGeneration function to 3500ms for better subscriber readiness management.
This commit is contained in:
parent
3d28289b52
commit
56bca2e45c
5 changed files with 104 additions and 107 deletions
|
|
@ -63,7 +63,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
const job = GenerationJobManager.createJob(streamId, userId, reqConversationId);
|
const job = await GenerationJobManager.createJob(streamId, userId, reqConversationId);
|
||||||
req._resumableStreamId = streamId;
|
req._resumableStreamId = streamId;
|
||||||
|
|
||||||
// Track if partial response was already saved to avoid duplicates
|
// Track if partial response was already saved to avoid duplicates
|
||||||
|
|
@ -83,7 +83,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const resumeState = GenerationJobManager.getResumeState(streamId);
|
const resumeState = await GenerationJobManager.getResumeState(streamId);
|
||||||
if (!resumeState?.userMessage) {
|
if (!resumeState?.userMessage) {
|
||||||
logger.debug('[ResumableAgentController] No user message to save partial response for');
|
logger.debug('[ResumableAgentController] No user message to save partial response for');
|
||||||
return;
|
return;
|
||||||
|
|
@ -166,7 +166,7 @@ const ResumableAgentController = async (req, res, next, initializeClient, addTit
|
||||||
// Start background generation - wait for subscriber with timeout fallback
|
// Start background generation - wait for subscriber with timeout fallback
|
||||||
const startGeneration = async () => {
|
const startGeneration = async () => {
|
||||||
try {
|
try {
|
||||||
await Promise.race([job.readyPromise, new Promise((resolve) => setTimeout(resolve, 2500))]);
|
await Promise.race([job.readyPromise, new Promise((resolve) => setTimeout(resolve, 3500))]);
|
||||||
} catch (waitError) {
|
} catch (waitError) {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
`[ResumableAgentController] Error waiting for subscriber: ${waitError.message}`,
|
`[ResumableAgentController] Error waiting for subscriber: ${waitError.message}`,
|
||||||
|
|
|
||||||
|
|
@ -35,11 +35,11 @@ router.use('/', v1);
|
||||||
* @description Sends sync event with resume state, replays missed chunks, then streams live
|
* @description Sends sync event with resume state, replays missed chunks, then streams live
|
||||||
* @query resume=true - Indicates this is a reconnection (sends sync event)
|
* @query resume=true - Indicates this is a reconnection (sends sync event)
|
||||||
*/
|
*/
|
||||||
router.get('/chat/stream/:streamId', (req, res) => {
|
router.get('/chat/stream/:streamId', async (req, res) => {
|
||||||
const { streamId } = req.params;
|
const { streamId } = req.params;
|
||||||
const isResume = req.query.resume === 'true';
|
const isResume = req.query.resume === 'true';
|
||||||
|
|
||||||
const job = GenerationJobManager.getJob(streamId);
|
const job = await GenerationJobManager.getJob(streamId);
|
||||||
if (!job) {
|
if (!job) {
|
||||||
return res.status(404).json({
|
return res.status(404).json({
|
||||||
error: 'Stream not found',
|
error: 'Stream not found',
|
||||||
|
|
@ -59,7 +59,7 @@ router.get('/chat/stream/:streamId', (req, res) => {
|
||||||
// Send sync event with resume state for ALL reconnecting clients
|
// Send sync event with resume state for ALL reconnecting clients
|
||||||
// This supports multi-tab scenarios where each tab needs run step data
|
// This supports multi-tab scenarios where each tab needs run step data
|
||||||
if (isResume) {
|
if (isResume) {
|
||||||
const resumeState = GenerationJobManager.getResumeState(streamId);
|
const resumeState = await GenerationJobManager.getResumeState(streamId);
|
||||||
if (resumeState && !res.writableEnded) {
|
if (resumeState && !res.writableEnded) {
|
||||||
// Send sync event with run steps AND aggregatedContent
|
// Send sync event with run steps AND aggregatedContent
|
||||||
// Client will use aggregatedContent to initialize message state
|
// Client will use aggregatedContent to initialize message state
|
||||||
|
|
@ -74,7 +74,7 @@ router.get('/chat/stream/:streamId', (req, res) => {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const result = GenerationJobManager.subscribe(
|
const result = await GenerationJobManager.subscribe(
|
||||||
streamId,
|
streamId,
|
||||||
(event) => {
|
(event) => {
|
||||||
if (!res.writableEnded) {
|
if (!res.writableEnded) {
|
||||||
|
|
@ -120,10 +120,10 @@ router.get('/chat/stream/:streamId', (req, res) => {
|
||||||
* @access Private
|
* @access Private
|
||||||
* @returns { active, streamId, status, aggregatedContent, createdAt, resumeState }
|
* @returns { active, streamId, status, aggregatedContent, createdAt, resumeState }
|
||||||
*/
|
*/
|
||||||
router.get('/chat/status/:conversationId', (req, res) => {
|
router.get('/chat/status/:conversationId', async (req, res) => {
|
||||||
const { conversationId } = req.params;
|
const { conversationId } = req.params;
|
||||||
|
|
||||||
const job = GenerationJobManager.getJobByConversation(conversationId);
|
const job = await GenerationJobManager.getJobByConversation(conversationId);
|
||||||
|
|
||||||
if (!job) {
|
if (!job) {
|
||||||
return res.json({ active: false });
|
return res.json({ active: false });
|
||||||
|
|
@ -133,8 +133,8 @@ router.get('/chat/status/:conversationId', (req, res) => {
|
||||||
return res.status(403).json({ error: 'Unauthorized' });
|
return res.status(403).json({ error: 'Unauthorized' });
|
||||||
}
|
}
|
||||||
|
|
||||||
const info = GenerationJobManager.getStreamInfo(job.streamId);
|
const info = await GenerationJobManager.getStreamInfo(job.streamId);
|
||||||
const resumeState = GenerationJobManager.getResumeState(job.streamId);
|
const resumeState = await GenerationJobManager.getResumeState(job.streamId);
|
||||||
|
|
||||||
res.json({
|
res.json({
|
||||||
active: info?.active ?? false,
|
active: info?.active ?? false,
|
||||||
|
|
@ -152,7 +152,7 @@ router.get('/chat/status/:conversationId', (req, res) => {
|
||||||
* @access Private
|
* @access Private
|
||||||
* @description Mounted before chatRouter to bypass buildEndpointOption middleware
|
* @description Mounted before chatRouter to bypass buildEndpointOption middleware
|
||||||
*/
|
*/
|
||||||
router.post('/chat/abort', (req, res) => {
|
router.post('/chat/abort', async (req, res) => {
|
||||||
logger.debug(`[AgentStream] ========== ABORT ENDPOINT HIT ==========`);
|
logger.debug(`[AgentStream] ========== ABORT ENDPOINT HIT ==========`);
|
||||||
logger.debug(`[AgentStream] Method: ${req.method}, Path: ${req.path}`);
|
logger.debug(`[AgentStream] Method: ${req.method}, Path: ${req.path}`);
|
||||||
logger.debug(`[AgentStream] Body:`, req.body);
|
logger.debug(`[AgentStream] Body:`, req.body);
|
||||||
|
|
@ -161,10 +161,10 @@ router.post('/chat/abort', (req, res) => {
|
||||||
|
|
||||||
// Try to find job by streamId first, then by conversationId, then by abortKey
|
// Try to find job by streamId first, then by conversationId, then by abortKey
|
||||||
let jobStreamId = streamId;
|
let jobStreamId = streamId;
|
||||||
let job = jobStreamId ? GenerationJobManager.getJob(jobStreamId) : null;
|
let job = jobStreamId ? await GenerationJobManager.getJob(jobStreamId) : null;
|
||||||
|
|
||||||
if (!job && conversationId) {
|
if (!job && conversationId) {
|
||||||
job = GenerationJobManager.getJobByConversation(conversationId);
|
job = await GenerationJobManager.getJobByConversation(conversationId);
|
||||||
if (job) {
|
if (job) {
|
||||||
jobStreamId = job.streamId;
|
jobStreamId = job.streamId;
|
||||||
}
|
}
|
||||||
|
|
@ -172,14 +172,14 @@ router.post('/chat/abort', (req, res) => {
|
||||||
|
|
||||||
if (!job && abortKey) {
|
if (!job && abortKey) {
|
||||||
jobStreamId = abortKey.split(':')[0];
|
jobStreamId = abortKey.split(':')[0];
|
||||||
job = GenerationJobManager.getJob(jobStreamId);
|
job = await GenerationJobManager.getJob(jobStreamId);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug(`[AgentStream] Computed jobStreamId: ${jobStreamId}`);
|
logger.debug(`[AgentStream] Computed jobStreamId: ${jobStreamId}`);
|
||||||
|
|
||||||
if (job && jobStreamId) {
|
if (job && jobStreamId) {
|
||||||
logger.debug(`[AgentStream] Job found, aborting: ${jobStreamId}`);
|
logger.debug(`[AgentStream] Job found, aborting: ${jobStreamId}`);
|
||||||
GenerationJobManager.abortJob(jobStreamId);
|
await GenerationJobManager.abortJob(jobStreamId);
|
||||||
logger.debug(`[AgentStream] Job aborted successfully: ${jobStreamId}`);
|
logger.debug(`[AgentStream] Job aborted successfully: ${jobStreamId}`);
|
||||||
return res.json({ success: true, aborted: jobStreamId });
|
return res.json({ success: true, aborted: jobStreamId });
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,12 @@
|
||||||
import { logger } from '@librechat/data-schemas';
|
import { logger } from '@librechat/data-schemas';
|
||||||
import type { Agents } from 'librechat-data-provider';
|
import type { Agents } from 'librechat-data-provider';
|
||||||
import type { StandardGraph } from '@librechat/agents';
|
import type { StandardGraph } from '@librechat/agents';
|
||||||
import type { SerializableJobData } from './interfaces/IJobStore';
|
import type {
|
||||||
|
IContentStateManager,
|
||||||
|
SerializableJobData,
|
||||||
|
IEventTransport,
|
||||||
|
IJobStore,
|
||||||
|
} from './interfaces/IJobStore';
|
||||||
import type * as t from '~/types';
|
import type * as t from '~/types';
|
||||||
import { InMemoryEventTransport } from './implementations/InMemoryEventTransport';
|
import { InMemoryEventTransport } from './implementations/InMemoryEventTransport';
|
||||||
import { InMemoryContentState } from './implementations/InMemoryContentState';
|
import { InMemoryContentState } from './implementations/InMemoryContentState';
|
||||||
|
|
@ -34,15 +39,14 @@ interface RuntimeJobState {
|
||||||
/**
|
/**
|
||||||
* Manages generation jobs for resumable LLM streams.
|
* Manages generation jobs for resumable LLM streams.
|
||||||
*
|
*
|
||||||
* Architecture: Composes three pluggable services for clean separation:
|
* Architecture: Composes three pluggable services via dependency injection:
|
||||||
* - jobStore: Serializable job metadata (InMemory → Redis/KV for horizontal scaling)
|
* - jobStore: Serializable job metadata (InMemory → Redis/KV for horizontal scaling)
|
||||||
* - eventTransport: Pub/sub events (InMemory → Redis Pub/Sub for horizontal scaling)
|
* - eventTransport: Pub/sub events (InMemory → Redis Pub/Sub for horizontal scaling)
|
||||||
* - contentState: Volatile content refs with WeakRef (always in-memory, not shared)
|
* - contentState: Volatile content refs with WeakRef (always in-memory, not shared)
|
||||||
*
|
*
|
||||||
* Current implementation uses sync methods for performance. When adding Redis support,
|
* All storage methods are async to support both in-memory and external stores (Redis, etc.).
|
||||||
* the manager methods will need to become async, or use a sync-capable Redis client.
|
|
||||||
*
|
*
|
||||||
* @example Future Redis injection (requires async refactor):
|
* @example Redis injection:
|
||||||
* ```ts
|
* ```ts
|
||||||
* const manager = new GenerationJobManagerClass({
|
* const manager = new GenerationJobManagerClass({
|
||||||
* jobStore: new RedisJobStore(redisClient),
|
* jobStore: new RedisJobStore(redisClient),
|
||||||
|
|
@ -53,11 +57,11 @@ interface RuntimeJobState {
|
||||||
*/
|
*/
|
||||||
class GenerationJobManagerClass {
|
class GenerationJobManagerClass {
|
||||||
/** Job metadata storage - swappable for Redis, KV store, etc. */
|
/** Job metadata storage - swappable for Redis, KV store, etc. */
|
||||||
private jobStore: InMemoryJobStore;
|
private jobStore: IJobStore;
|
||||||
/** Event pub/sub transport - swappable for Redis Pub/Sub, etc. */
|
/** Event pub/sub transport - swappable for Redis Pub/Sub, etc. */
|
||||||
private eventTransport: InMemoryEventTransport;
|
private eventTransport: IEventTransport;
|
||||||
/** Volatile content state with WeakRef - always in-memory per instance */
|
/** Volatile content state with WeakRef - always in-memory per instance */
|
||||||
private contentState: InMemoryContentState;
|
private contentState: IContentStateManager;
|
||||||
|
|
||||||
/** Runtime state - always in-memory, not serializable */
|
/** Runtime state - always in-memory, not serializable */
|
||||||
private runtimeState = new Map<string, RuntimeJobState>();
|
private runtimeState = new Map<string, RuntimeJobState>();
|
||||||
|
|
@ -106,11 +110,14 @@ class GenerationJobManagerClass {
|
||||||
* @param streamId - Unique identifier for this stream
|
* @param streamId - Unique identifier for this stream
|
||||||
* @param userId - User who initiated the request
|
* @param userId - User who initiated the request
|
||||||
* @param conversationId - Optional conversation ID for lookup
|
* @param conversationId - Optional conversation ID for lookup
|
||||||
* @returns A facade object compatible with the old GenerationJob interface
|
* @returns A facade object for the GenerationJob
|
||||||
*/
|
*/
|
||||||
createJob(streamId: string, userId: string, conversationId?: string): t.GenerationJob {
|
async createJob(
|
||||||
// Create serializable job data (sync for in-memory)
|
streamId: string,
|
||||||
const jobData = this.jobStore.createJobSync(streamId, userId, conversationId);
|
userId: string,
|
||||||
|
conversationId?: string,
|
||||||
|
): Promise<t.GenerationJob> {
|
||||||
|
const jobData = await this.jobStore.createJob(streamId, userId, conversationId);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create runtime state with readyPromise.
|
* Create runtime state with readyPromise.
|
||||||
|
|
@ -243,8 +250,8 @@ class GenerationJobManagerClass {
|
||||||
/**
|
/**
|
||||||
* Get a job by streamId.
|
* Get a job by streamId.
|
||||||
*/
|
*/
|
||||||
getJob(streamId: string): t.GenerationJob | undefined {
|
async getJob(streamId: string): Promise<t.GenerationJob | undefined> {
|
||||||
const jobData = this.jobStore.getJobSync(streamId);
|
const jobData = await this.jobStore.getJob(streamId);
|
||||||
const runtime = this.runtimeState.get(streamId);
|
const runtime = this.runtimeState.get(streamId);
|
||||||
if (!jobData || !runtime) {
|
if (!jobData || !runtime) {
|
||||||
return undefined;
|
return undefined;
|
||||||
|
|
@ -255,8 +262,8 @@ class GenerationJobManagerClass {
|
||||||
/**
|
/**
|
||||||
* Find an active job by conversationId.
|
* Find an active job by conversationId.
|
||||||
*/
|
*/
|
||||||
getJobByConversation(conversationId: string): t.GenerationJob | undefined {
|
async getJobByConversation(conversationId: string): Promise<t.GenerationJob | undefined> {
|
||||||
const jobData = this.jobStore.getJobByConversationSync(conversationId);
|
const jobData = await this.jobStore.getJobByConversation(conversationId);
|
||||||
if (!jobData) {
|
if (!jobData) {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
@ -270,15 +277,15 @@ class GenerationJobManagerClass {
|
||||||
/**
|
/**
|
||||||
* Check if a job exists.
|
* Check if a job exists.
|
||||||
*/
|
*/
|
||||||
hasJob(streamId: string): boolean {
|
async hasJob(streamId: string): Promise<boolean> {
|
||||||
return this.jobStore.hasJobSync(streamId);
|
return this.jobStore.hasJob(streamId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get job status.
|
* Get job status.
|
||||||
*/
|
*/
|
||||||
getJobStatus(streamId: string): t.GenerationJobStatus | undefined {
|
async getJobStatus(streamId: string): Promise<t.GenerationJobStatus | undefined> {
|
||||||
const jobData = this.jobStore.getJobSync(streamId);
|
const jobData = await this.jobStore.getJob(streamId);
|
||||||
return jobData?.status as t.GenerationJobStatus | undefined;
|
return jobData?.status as t.GenerationJobStatus | undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -302,7 +309,7 @@ class GenerationJobManagerClass {
|
||||||
* Abort a job (user-initiated).
|
* Abort a job (user-initiated).
|
||||||
*/
|
*/
|
||||||
async abortJob(streamId: string): Promise<void> {
|
async abortJob(streamId: string): Promise<void> {
|
||||||
const jobData = this.jobStore.getJobSync(streamId);
|
const jobData = await this.jobStore.getJob(streamId);
|
||||||
const runtime = this.runtimeState.get(streamId);
|
const runtime = this.runtimeState.get(streamId);
|
||||||
|
|
||||||
if (!jobData) {
|
if (!jobData) {
|
||||||
|
|
@ -376,18 +383,18 @@ class GenerationJobManagerClass {
|
||||||
* @param onError - Handler for error events
|
* @param onError - Handler for error events
|
||||||
* @returns Subscription object with unsubscribe function, or null if job not found
|
* @returns Subscription object with unsubscribe function, or null if job not found
|
||||||
*/
|
*/
|
||||||
subscribe(
|
async subscribe(
|
||||||
streamId: string,
|
streamId: string,
|
||||||
onChunk: t.ChunkHandler,
|
onChunk: t.ChunkHandler,
|
||||||
onDone?: t.DoneHandler,
|
onDone?: t.DoneHandler,
|
||||||
onError?: t.ErrorHandler,
|
onError?: t.ErrorHandler,
|
||||||
): { unsubscribe: t.UnsubscribeFn } | null {
|
): Promise<{ unsubscribe: t.UnsubscribeFn } | null> {
|
||||||
const runtime = this.runtimeState.get(streamId);
|
const runtime = this.runtimeState.get(streamId);
|
||||||
if (!runtime) {
|
if (!runtime) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
const jobData = this.jobStore.getJobSync(streamId);
|
const jobData = await this.jobStore.getJob(streamId);
|
||||||
|
|
||||||
// If job already complete, send final event
|
// If job already complete, send final event
|
||||||
setImmediate(() => {
|
setImmediate(() => {
|
||||||
|
|
@ -429,10 +436,11 @@ class GenerationJobManagerClass {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Emit a chunk event to all subscribers.
|
* Emit a chunk event to all subscribers.
|
||||||
|
* Uses runtime state check for performance (avoids async job store lookup per token).
|
||||||
*/
|
*/
|
||||||
emitChunk(streamId: string, event: t.ServerSentEvent): void {
|
emitChunk(streamId: string, event: t.ServerSentEvent): void {
|
||||||
const jobData = this.jobStore.getJobSync(streamId);
|
const runtime = this.runtimeState.get(streamId);
|
||||||
if (!jobData || jobData.status !== 'running') {
|
if (!runtime || runtime.abortController.signal.aborted) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -494,7 +502,8 @@ class GenerationJobManagerClass {
|
||||||
* Set reference to the graph's contentParts array.
|
* Set reference to the graph's contentParts array.
|
||||||
*/
|
*/
|
||||||
setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void {
|
setContentParts(streamId: string, contentParts: Agents.MessageContentComplex[]): void {
|
||||||
if (!this.jobStore.hasJobSync(streamId)) {
|
// Use runtime state check for performance (sync check)
|
||||||
|
if (!this.runtimeState.has(streamId)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.contentState.setContentParts(streamId, contentParts);
|
this.contentState.setContentParts(streamId, contentParts);
|
||||||
|
|
@ -505,7 +514,8 @@ class GenerationJobManagerClass {
|
||||||
* Set reference to the graph instance.
|
* Set reference to the graph instance.
|
||||||
*/
|
*/
|
||||||
setGraph(streamId: string, graph: StandardGraph): void {
|
setGraph(streamId: string, graph: StandardGraph): void {
|
||||||
if (!this.jobStore.hasJobSync(streamId)) {
|
// Use runtime state check for performance (sync check)
|
||||||
|
if (!this.runtimeState.has(streamId)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.contentState.setGraph(streamId, graph);
|
this.contentState.setGraph(streamId, graph);
|
||||||
|
|
@ -515,8 +525,8 @@ class GenerationJobManagerClass {
|
||||||
/**
|
/**
|
||||||
* Get resume state for reconnecting clients.
|
* Get resume state for reconnecting clients.
|
||||||
*/
|
*/
|
||||||
getResumeState(streamId: string): t.ResumeState | null {
|
async getResumeState(streamId: string): Promise<t.ResumeState | null> {
|
||||||
const jobData = this.jobStore.getJobSync(streamId);
|
const jobData = await this.jobStore.getJob(streamId);
|
||||||
if (!jobData) {
|
if (!jobData) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
@ -583,7 +593,7 @@ class GenerationJobManagerClass {
|
||||||
|
|
||||||
// Cleanup runtime state for deleted jobs
|
// Cleanup runtime state for deleted jobs
|
||||||
for (const streamId of this.runtimeState.keys()) {
|
for (const streamId of this.runtimeState.keys()) {
|
||||||
if (!this.jobStore.hasJobSync(streamId)) {
|
if (!(await this.jobStore.hasJob(streamId))) {
|
||||||
this.runtimeState.delete(streamId);
|
this.runtimeState.delete(streamId);
|
||||||
this.contentState.clearContentState(streamId);
|
this.contentState.clearContentState(streamId);
|
||||||
this.eventTransport.cleanup(streamId);
|
this.eventTransport.cleanup(streamId);
|
||||||
|
|
@ -598,13 +608,13 @@ class GenerationJobManagerClass {
|
||||||
/**
|
/**
|
||||||
* Get stream info for status endpoint.
|
* Get stream info for status endpoint.
|
||||||
*/
|
*/
|
||||||
getStreamInfo(streamId: string): {
|
async getStreamInfo(streamId: string): Promise<{
|
||||||
active: boolean;
|
active: boolean;
|
||||||
status: t.GenerationJobStatus;
|
status: t.GenerationJobStatus;
|
||||||
aggregatedContent?: Agents.MessageContentComplex[];
|
aggregatedContent?: Agents.MessageContentComplex[];
|
||||||
createdAt: number;
|
createdAt: number;
|
||||||
} | null {
|
} | null> {
|
||||||
const jobData = this.jobStore.getJobSync(streamId);
|
const jobData = await this.jobStore.getJob(streamId);
|
||||||
if (!jobData) {
|
if (!jobData) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
@ -620,27 +630,33 @@ class GenerationJobManagerClass {
|
||||||
/**
|
/**
|
||||||
* Get total job count.
|
* Get total job count.
|
||||||
*/
|
*/
|
||||||
getJobCount(): number {
|
async getJobCount(): Promise<number> {
|
||||||
return this.jobStore.getJobCount();
|
return this.jobStore.getJobCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get job count by status.
|
* Get job count by status.
|
||||||
*/
|
*/
|
||||||
getJobCountByStatus(): Record<t.GenerationJobStatus, number> {
|
async getJobCountByStatus(): Promise<Record<t.GenerationJobStatus, number>> {
|
||||||
return this.jobStore.getJobCountByStatus() as Record<t.GenerationJobStatus, number>;
|
const [running, complete, error, aborted] = await Promise.all([
|
||||||
|
this.jobStore.getJobCountByStatus('running'),
|
||||||
|
this.jobStore.getJobCountByStatus('complete'),
|
||||||
|
this.jobStore.getJobCountByStatus('error'),
|
||||||
|
this.jobStore.getJobCountByStatus('aborted'),
|
||||||
|
]);
|
||||||
|
return { running, complete, error, aborted };
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destroy the manager.
|
* Destroy the manager.
|
||||||
*/
|
*/
|
||||||
destroy(): void {
|
async destroy(): Promise<void> {
|
||||||
if (this.cleanupInterval) {
|
if (this.cleanupInterval) {
|
||||||
clearInterval(this.cleanupInterval);
|
clearInterval(this.cleanupInterval);
|
||||||
this.cleanupInterval = null;
|
this.cleanupInterval = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.jobStore.destroy();
|
await this.jobStore.destroy();
|
||||||
this.eventTransport.destroy();
|
this.eventTransport.destroy();
|
||||||
this.contentState.destroy();
|
this.contentState.destroy();
|
||||||
this.runtimeState.clear();
|
this.runtimeState.clear();
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ export class InMemoryJobStore implements IJobStore {
|
||||||
private jobs = new Map<string, SerializableJobData>();
|
private jobs = new Map<string, SerializableJobData>();
|
||||||
private cleanupInterval: NodeJS.Timeout | null = null;
|
private cleanupInterval: NodeJS.Timeout | null = null;
|
||||||
|
|
||||||
/** Time to keep completed jobs before cleanup (5 minutes - reduced from 1 hour) */
|
/** Time to keep completed jobs before cleanup (5 minutes) */
|
||||||
private ttlAfterComplete = 300000;
|
private ttlAfterComplete = 300000;
|
||||||
|
|
||||||
/** Maximum number of concurrent jobs */
|
/** Maximum number of concurrent jobs */
|
||||||
|
|
@ -25,7 +25,7 @@ export class InMemoryJobStore implements IJobStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
initialize(): void {
|
async initialize(): Promise<void> {
|
||||||
if (this.cleanupInterval) {
|
if (this.cleanupInterval) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -46,13 +46,8 @@ export class InMemoryJobStore implements IJobStore {
|
||||||
userId: string,
|
userId: string,
|
||||||
conversationId?: string,
|
conversationId?: string,
|
||||||
): Promise<SerializableJobData> {
|
): Promise<SerializableJobData> {
|
||||||
return this.createJobSync(streamId, userId, conversationId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Synchronous version for in-memory use */
|
|
||||||
createJobSync(streamId: string, userId: string, conversationId?: string): SerializableJobData {
|
|
||||||
if (this.jobs.size >= this.maxJobs) {
|
if (this.jobs.size >= this.maxJobs) {
|
||||||
this.evictOldestSync();
|
await this.evictOldest();
|
||||||
}
|
}
|
||||||
|
|
||||||
const job: SerializableJobData = {
|
const job: SerializableJobData = {
|
||||||
|
|
@ -71,20 +66,10 @@ export class InMemoryJobStore implements IJobStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
async getJob(streamId: string): Promise<SerializableJobData | null> {
|
async getJob(streamId: string): Promise<SerializableJobData | null> {
|
||||||
return this.getJobSync(streamId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Synchronous version for in-memory use */
|
|
||||||
getJobSync(streamId: string): SerializableJobData | null {
|
|
||||||
return this.jobs.get(streamId) ?? null;
|
return this.jobs.get(streamId) ?? null;
|
||||||
}
|
}
|
||||||
|
|
||||||
async getJobByConversation(conversationId: string): Promise<SerializableJobData | null> {
|
async getJobByConversation(conversationId: string): Promise<SerializableJobData | null> {
|
||||||
return this.getJobByConversationSync(conversationId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Synchronous version for in-memory use */
|
|
||||||
getJobByConversationSync(conversationId: string): SerializableJobData | null {
|
|
||||||
// Direct match first (streamId === conversationId for existing conversations)
|
// Direct match first (streamId === conversationId for existing conversations)
|
||||||
const directMatch = this.jobs.get(conversationId);
|
const directMatch = this.jobs.get(conversationId);
|
||||||
if (directMatch && directMatch.status === 'running') {
|
if (directMatch && directMatch.status === 'running') {
|
||||||
|
|
@ -102,11 +87,6 @@ export class InMemoryJobStore implements IJobStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
async updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void> {
|
async updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void> {
|
||||||
this.updateJobSync(streamId, updates);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Synchronous version for in-memory use */
|
|
||||||
updateJobSync(streamId: string, updates: Partial<SerializableJobData>): void {
|
|
||||||
const job = this.jobs.get(streamId);
|
const job = this.jobs.get(streamId);
|
||||||
if (!job) {
|
if (!job) {
|
||||||
return;
|
return;
|
||||||
|
|
@ -115,21 +95,11 @@ export class InMemoryJobStore implements IJobStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
async deleteJob(streamId: string): Promise<void> {
|
async deleteJob(streamId: string): Promise<void> {
|
||||||
this.deleteJobSync(streamId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Synchronous version for in-memory use */
|
|
||||||
deleteJobSync(streamId: string): void {
|
|
||||||
this.jobs.delete(streamId);
|
this.jobs.delete(streamId);
|
||||||
logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`);
|
logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
async hasJob(streamId: string): Promise<boolean> {
|
async hasJob(streamId: string): Promise<boolean> {
|
||||||
return this.hasJobSync(streamId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Synchronous version for in-memory use */
|
|
||||||
hasJobSync(streamId: string): boolean {
|
|
||||||
return this.jobs.has(streamId);
|
return this.jobs.has(streamId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -166,11 +136,6 @@ export class InMemoryJobStore implements IJobStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
private async evictOldest(): Promise<void> {
|
private async evictOldest(): Promise<void> {
|
||||||
this.evictOldestSync();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Synchronous version for in-memory use */
|
|
||||||
private evictOldestSync(): void {
|
|
||||||
let oldestId: string | null = null;
|
let oldestId: string | null = null;
|
||||||
let oldestTime = Infinity;
|
let oldestTime = Infinity;
|
||||||
|
|
||||||
|
|
@ -183,32 +148,27 @@ export class InMemoryJobStore implements IJobStore {
|
||||||
|
|
||||||
if (oldestId) {
|
if (oldestId) {
|
||||||
logger.warn(`[InMemoryJobStore] Evicting oldest job: ${oldestId}`);
|
logger.warn(`[InMemoryJobStore] Evicting oldest job: ${oldestId}`);
|
||||||
this.deleteJobSync(oldestId);
|
await this.deleteJob(oldestId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Get job count (for monitoring) */
|
/** Get job count (for monitoring) */
|
||||||
getJobCount(): number {
|
async getJobCount(): Promise<number> {
|
||||||
return this.jobs.size;
|
return this.jobs.size;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Get job count by status (for monitoring) */
|
/** Get job count by status (for monitoring) */
|
||||||
getJobCountByStatus(): Record<JobStatus, number> {
|
async getJobCountByStatus(status: JobStatus): Promise<number> {
|
||||||
const counts: Record<JobStatus, number> = {
|
let count = 0;
|
||||||
running: 0,
|
|
||||||
complete: 0,
|
|
||||||
error: 0,
|
|
||||||
aborted: 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
for (const job of this.jobs.values()) {
|
for (const job of this.jobs.values()) {
|
||||||
counts[job.status]++;
|
if (job.status === status) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return count;
|
||||||
return counts;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
destroy(): void {
|
async destroy(): Promise<void> {
|
||||||
if (this.cleanupInterval) {
|
if (this.cleanupInterval) {
|
||||||
clearInterval(this.cleanupInterval);
|
clearInterval(this.cleanupInterval);
|
||||||
this.cleanupInterval = null;
|
this.cleanupInterval = null;
|
||||||
|
|
|
||||||
|
|
@ -56,6 +56,9 @@ export interface ResumeState {
|
||||||
* Implementations can use in-memory Map, Redis, KV store, etc.
|
* Implementations can use in-memory Map, Redis, KV store, etc.
|
||||||
*/
|
*/
|
||||||
export interface IJobStore {
|
export interface IJobStore {
|
||||||
|
/** Initialize the store (e.g., connect to Redis, start cleanup intervals) */
|
||||||
|
initialize(): Promise<void>;
|
||||||
|
|
||||||
/** Create a new job */
|
/** Create a new job */
|
||||||
createJob(
|
createJob(
|
||||||
streamId: string,
|
streamId: string,
|
||||||
|
|
@ -83,6 +86,15 @@ export interface IJobStore {
|
||||||
|
|
||||||
/** Cleanup expired jobs */
|
/** Cleanup expired jobs */
|
||||||
cleanup(): Promise<number>;
|
cleanup(): Promise<number>;
|
||||||
|
|
||||||
|
/** Get total job count */
|
||||||
|
getJobCount(): Promise<number>;
|
||||||
|
|
||||||
|
/** Get job count by status */
|
||||||
|
getJobCountByStatus(status: JobStatus): Promise<number>;
|
||||||
|
|
||||||
|
/** Destroy the store and release resources */
|
||||||
|
destroy(): Promise<void>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -117,6 +129,12 @@ export interface IEventTransport {
|
||||||
|
|
||||||
/** Listen for all subscribers leaving */
|
/** Listen for all subscribers leaving */
|
||||||
onAllSubscribersLeft(streamId: string, callback: () => void): void;
|
onAllSubscribersLeft(streamId: string, callback: () => void): void;
|
||||||
|
|
||||||
|
/** Cleanup transport resources for a specific stream */
|
||||||
|
cleanup(streamId: string): void;
|
||||||
|
|
||||||
|
/** Destroy all transport resources */
|
||||||
|
destroy(): void;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -139,4 +157,7 @@ export interface IContentStateManager {
|
||||||
|
|
||||||
/** Clear content state for a job */
|
/** Clear content state for a job */
|
||||||
clearContentState(streamId: string): void;
|
clearContentState(streamId: string): void;
|
||||||
|
|
||||||
|
/** Destroy all content state resources */
|
||||||
|
destroy(): void;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue