feat: Add active job management for user and show progress in conversation list

- Implemented a new endpoint to retrieve active generation job IDs for the current user, enhancing user experience by allowing visibility of ongoing tasks.
- Integrated active job tracking in the Conversations component, displaying generation indicators based on active jobs.
- Optimized job management in the GenerationJobManager and InMemoryJobStore to support user-specific job queries, ensuring efficient resource handling and cleanup.
- Updated relevant components and hooks to utilize the new active jobs feature, improving overall application responsiveness and user feedback.
This commit is contained in:
Danny Avila 2025-12-17 12:22:51 -05:00
parent 339e222fe9
commit e43ea7a4f4
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
11 changed files with 279 additions and 20 deletions

View file

@ -903,6 +903,18 @@ class GenerationJobManagerClass {
return { running, complete, error, aborted };
}
/**
* Get active job IDs for a user.
* Returns conversation IDs of running jobs belonging to the user.
* Performs self-healing cleanup of stale entries.
*
* @param userId - The user ID to query
* @returns Array of conversation IDs with active jobs
*/
async getActiveJobIdsForUser(userId: string): Promise<string[]> {
return this.jobStore.getActiveJobIdsByUser(userId);
}
/**
* Destroy the manager.
* Cleans up all resources including runtime state, buffers, and stores.

View file

@ -26,6 +26,9 @@ export class InMemoryJobStore implements IJobStore {
private contentState = new Map<string, ContentState>();
private cleanupInterval: NodeJS.Timeout | null = null;
/** Maps userId -> Set of streamIds (conversationIds) for active jobs */
private userJobMap = new Map<string, Set<string>>();
/** Time to keep completed jobs before cleanup (0 = immediate) */
private ttlAfterComplete = 0;
@ -76,6 +79,15 @@ export class InMemoryJobStore implements IJobStore {
};
this.jobs.set(streamId, job);
// Track job by userId for efficient user-scoped queries
let userJobs = this.userJobMap.get(userId);
if (!userJobs) {
userJobs = new Set();
this.userJobMap.set(userId, userJobs);
}
userJobs.add(streamId);
logger.debug(`[InMemoryJobStore] Created job: ${streamId}`);
return job;
@ -94,6 +106,18 @@ export class InMemoryJobStore implements IJobStore {
}
async deleteJob(streamId: string): Promise<void> {
// Remove from user's job set before deleting
const job = this.jobs.get(streamId);
if (job) {
const userJobs = this.userJobMap.get(job.userId);
if (userJobs) {
userJobs.delete(streamId);
if (userJobs.size === 0) {
this.userJobMap.delete(job.userId);
}
}
}
this.jobs.delete(streamId);
this.contentState.delete(streamId);
logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`);
@ -178,9 +202,42 @@ export class InMemoryJobStore implements IJobStore {
}
this.jobs.clear();
this.contentState.clear();
this.userJobMap.clear();
logger.debug('[InMemoryJobStore] Destroyed');
}
/**
* Get active job IDs for a user.
* Returns conversation IDs of running jobs belonging to the user.
* Also performs self-healing cleanup: removes stale entries for jobs that no longer exist.
*/
async getActiveJobIdsByUser(userId: string): Promise<string[]> {
const trackedIds = this.userJobMap.get(userId);
if (!trackedIds || trackedIds.size === 0) {
return [];
}
const activeIds: string[] = [];
for (const streamId of trackedIds) {
const job = this.jobs.get(streamId);
// Only include if job exists AND is still running
if (job && job.status === 'running') {
activeIds.push(streamId);
} else {
// Self-healing: job completed/deleted but mapping wasn't cleaned - fix it now
trackedIds.delete(streamId);
}
}
// Clean up empty set
if (trackedIds.size === 0) {
this.userJobMap.delete(userId);
}
return activeIds;
}
// ===== Content State Methods =====
/**

View file

@ -122,6 +122,16 @@ export interface IJobStore {
/** Destroy the store and release resources */
destroy(): Promise<void>;
/**
* Get active job IDs for a user.
* Returns conversation IDs of running jobs belonging to the user.
* Also performs self-healing cleanup of stale entries.
*
* @param userId - The user ID to query
* @returns Array of conversation IDs with active jobs
*/
getActiveJobIdsByUser(userId: string): Promise<string[]>;
// ===== Content State Methods =====
// These methods manage volatile content state tied to each job.
// In-memory: Uses WeakRef to graph for live access