feat: Implement active job tracking by user in RedisJobStore

- Added functionality to retrieve active job IDs for a specific user, enhancing user experience by allowing visibility of ongoing tasks.
- Implemented self-healing cleanup for stale job entries, ensuring accurate tracking of active jobs.
- Updated job creation, update, and deletion methods to manage user-specific job sets effectively.
- Enhanced integration tests to validate the new user-specific job management features.
This commit is contained in:
Danny Avila 2025-12-17 12:47:25 -05:00
parent e43ea7a4f4
commit 3c52ac0bce
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
2 changed files with 338 additions and 5 deletions

View file

@ -613,6 +613,273 @@ describe('RedisJobStore Integration Tests', () => {
});
});
describe('Active Jobs by User', () => {
test('should return active job IDs for a user', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const userId = `test-user-${Date.now()}`;
const streamId1 = `stream-1-${Date.now()}`;
const streamId2 = `stream-2-${Date.now()}`;
// Create two jobs for the same user
await store.createJob(streamId1, userId, streamId1);
await store.createJob(streamId2, userId, streamId2);
// Get active jobs for user
const activeJobs = await store.getActiveJobIdsByUser(userId);
expect(activeJobs).toHaveLength(2);
expect(activeJobs).toContain(streamId1);
expect(activeJobs).toContain(streamId2);
await store.destroy();
});
test('should return empty array for user with no jobs', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const userId = `nonexistent-user-${Date.now()}`;
const activeJobs = await store.getActiveJobIdsByUser(userId);
expect(activeJobs).toHaveLength(0);
await store.destroy();
});
test('should not return completed jobs', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const userId = `test-user-${Date.now()}`;
const streamId1 = `stream-1-${Date.now()}`;
const streamId2 = `stream-2-${Date.now()}`;
// Create two jobs
await store.createJob(streamId1, userId, streamId1);
await store.createJob(streamId2, userId, streamId2);
// Complete one job
await store.updateJob(streamId1, { status: 'complete', completedAt: Date.now() });
// Get active jobs - should only return the running one
const activeJobs = await store.getActiveJobIdsByUser(userId);
expect(activeJobs).toHaveLength(1);
expect(activeJobs).toContain(streamId2);
expect(activeJobs).not.toContain(streamId1);
await store.destroy();
});
test('should not return aborted jobs', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const userId = `test-user-${Date.now()}`;
const streamId = `stream-${Date.now()}`;
// Create a job and abort it
await store.createJob(streamId, userId, streamId);
await store.updateJob(streamId, { status: 'aborted', completedAt: Date.now() });
// Get active jobs - should be empty
const activeJobs = await store.getActiveJobIdsByUser(userId);
expect(activeJobs).toHaveLength(0);
await store.destroy();
});
test('should not return error jobs', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const userId = `test-user-${Date.now()}`;
const streamId = `stream-${Date.now()}`;
// Create a job with error status
await store.createJob(streamId, userId, streamId);
await store.updateJob(streamId, {
status: 'error',
error: 'Test error',
completedAt: Date.now(),
});
// Get active jobs - should be empty
const activeJobs = await store.getActiveJobIdsByUser(userId);
expect(activeJobs).toHaveLength(0);
await store.destroy();
});
test('should perform self-healing cleanup of stale entries', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const userId = `test-user-${Date.now()}`;
const streamId = `stream-${Date.now()}`;
const staleStreamId = `stale-stream-${Date.now()}`;
// Create a real job
await store.createJob(streamId, userId, streamId);
// Manually add a stale entry to the user's job set (simulating orphaned data)
const userJobsKey = `stream:user:{${userId}}:jobs`;
await ioredisClient.sadd(userJobsKey, staleStreamId);
// Verify both entries exist in the set
const beforeCleanup = await ioredisClient.smembers(userJobsKey);
expect(beforeCleanup).toContain(streamId);
expect(beforeCleanup).toContain(staleStreamId);
// Get active jobs - should trigger self-healing
const activeJobs = await store.getActiveJobIdsByUser(userId);
// Should only return the real job
expect(activeJobs).toHaveLength(1);
expect(activeJobs).toContain(streamId);
// Verify stale entry was removed
const afterCleanup = await ioredisClient.smembers(userJobsKey);
expect(afterCleanup).toContain(streamId);
expect(afterCleanup).not.toContain(staleStreamId);
await store.destroy();
});
test('should isolate jobs between different users', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const userId1 = `user-1-${Date.now()}`;
const userId2 = `user-2-${Date.now()}`;
const streamId1 = `stream-1-${Date.now()}`;
const streamId2 = `stream-2-${Date.now()}`;
// Create jobs for different users
await store.createJob(streamId1, userId1, streamId1);
await store.createJob(streamId2, userId2, streamId2);
// Get active jobs for user 1
const user1Jobs = await store.getActiveJobIdsByUser(userId1);
expect(user1Jobs).toHaveLength(1);
expect(user1Jobs).toContain(streamId1);
expect(user1Jobs).not.toContain(streamId2);
// Get active jobs for user 2
const user2Jobs = await store.getActiveJobIdsByUser(userId2);
expect(user2Jobs).toHaveLength(1);
expect(user2Jobs).toContain(streamId2);
expect(user2Jobs).not.toContain(streamId1);
await store.destroy();
});
test('should work across multiple store instances (horizontal scaling)', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
// Simulate two server instances
const instance1 = new RedisJobStore(ioredisClient);
const instance2 = new RedisJobStore(ioredisClient);
await instance1.initialize();
await instance2.initialize();
const userId = `test-user-${Date.now()}`;
const streamId = `stream-${Date.now()}`;
// Instance 1 creates a job
await instance1.createJob(streamId, userId, streamId);
// Instance 2 should see the active job
const activeJobs = await instance2.getActiveJobIdsByUser(userId);
expect(activeJobs).toHaveLength(1);
expect(activeJobs).toContain(streamId);
// Instance 1 completes the job
await instance1.updateJob(streamId, { status: 'complete', completedAt: Date.now() });
// Instance 2 should no longer see the job as active
const activeJobsAfter = await instance2.getActiveJobIdsByUser(userId);
expect(activeJobsAfter).toHaveLength(0);
await instance1.destroy();
await instance2.destroy();
});
test('should clean up user jobs set when job is deleted', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const userId = `test-user-${Date.now()}`;
const streamId = `stream-${Date.now()}`;
// Create a job
await store.createJob(streamId, userId, streamId);
// Verify job is in active list
let activeJobs = await store.getActiveJobIdsByUser(userId);
expect(activeJobs).toContain(streamId);
// Delete the job
await store.deleteJob(streamId);
// Job should no longer be in active list
activeJobs = await store.getActiveJobIdsByUser(userId);
expect(activeJobs).not.toContain(streamId);
await store.destroy();
});
});
describe('Local Graph Cache Optimization', () => {
test('should use local cache when available', async () => {
if (!ioredisClient) {

View file

@ -24,6 +24,8 @@ const KEYS = {
runSteps: (streamId: string) => `stream:{${streamId}}:runsteps`,
/** Running jobs set for cleanup (global set - single slot) */
runningJobs: 'stream:running',
/** User's active jobs set: stream:user:{userId}:jobs */
userJobs: (userId: string) => `stream:user:{${userId}}:jobs`,
};
/**
@ -137,18 +139,21 @@ export class RedisJobStore implements IJobStore {
};
const key = KEYS.job(streamId);
const userJobsKey = KEYS.userJobs(userId);
// For cluster mode, we can't pipeline keys on different slots
// The job key uses hash tag {streamId}, runningJobs is global
// The job key uses hash tag {streamId}, runningJobs and userJobs are on different slots
if (this.isCluster) {
await this.redis.hmset(key, this.serializeJob(job));
await this.redis.expire(key, this.ttl.running);
await this.redis.sadd(KEYS.runningJobs, streamId);
await this.redis.sadd(userJobsKey, streamId);
} else {
const pipeline = this.redis.pipeline();
pipeline.hmset(key, this.serializeJob(job));
pipeline.expire(key, this.ttl.running);
pipeline.sadd(KEYS.runningJobs, streamId);
pipeline.sadd(userJobsKey, streamId);
await pipeline.exec();
}
@ -166,8 +171,10 @@ export class RedisJobStore implements IJobStore {
async updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void> {
const key = KEYS.job(streamId);
const exists = await this.redis.exists(key);
if (!exists) {
// Get existing job data (needed for userId when removing from user jobs set)
const job = await this.getJob(streamId);
if (!job) {
return;
}
@ -178,12 +185,17 @@ export class RedisJobStore implements IJobStore {
await this.redis.hmset(key, serialized);
// If status changed to complete/error/aborted, update TTL and remove from running set
// If status changed to complete/error/aborted, update TTL and remove from running/user sets
if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) {
const userId = job.userId;
// In cluster mode, separate runningJobs (global) from stream-specific keys
if (this.isCluster) {
await this.redis.expire(key, this.ttl.completed);
await this.redis.srem(KEYS.runningJobs, streamId);
if (userId) {
await this.redis.srem(KEYS.userJobs(userId), streamId);
}
if (this.ttl.chunksAfterComplete === 0) {
await this.redis.del(KEYS.chunks(streamId));
@ -200,6 +212,9 @@ export class RedisJobStore implements IJobStore {
const pipeline = this.redis.pipeline();
pipeline.expire(key, this.ttl.completed);
pipeline.srem(KEYS.runningJobs, streamId);
if (userId) {
pipeline.srem(KEYS.userJobs(userId), streamId);
}
if (this.ttl.chunksAfterComplete === 0) {
pipeline.del(KEYS.chunks(streamId));
@ -222,6 +237,10 @@ export class RedisJobStore implements IJobStore {
// Clear local cache
this.localGraphCache.delete(streamId);
// Get job to find userId for user jobs cleanup
const job = await this.getJob(streamId);
const userId = job?.userId;
// In cluster mode, separate runningJobs (global) from stream-specific keys (same slot)
if (this.isCluster) {
// Stream-specific keys all hash to same slot due to {streamId}
@ -230,14 +249,20 @@ export class RedisJobStore implements IJobStore {
pipeline.del(KEYS.chunks(streamId));
pipeline.del(KEYS.runSteps(streamId));
await pipeline.exec();
// Global set is on different slot - execute separately
// Global sets are on different slots - execute separately
await this.redis.srem(KEYS.runningJobs, streamId);
if (userId) {
await this.redis.srem(KEYS.userJobs(userId), streamId);
}
} else {
const pipeline = this.redis.pipeline();
pipeline.del(KEYS.job(streamId));
pipeline.del(KEYS.chunks(streamId));
pipeline.del(KEYS.runSteps(streamId));
pipeline.srem(KEYS.runningJobs, streamId);
if (userId) {
pipeline.srem(KEYS.userJobs(userId), streamId);
}
await pipeline.exec();
}
logger.debug(`[RedisJobStore] Deleted job: ${streamId}`);
@ -327,6 +352,47 @@ export class RedisJobStore implements IJobStore {
return 0;
}
/**
* Get active job IDs for a user.
* Returns conversation IDs of running jobs belonging to the user.
* Also performs self-healing cleanup: removes stale entries for jobs that no longer exist.
*
* @param userId - The user ID to query
* @returns Array of conversation IDs with active jobs
*/
async getActiveJobIdsByUser(userId: string): Promise<string[]> {
const userJobsKey = KEYS.userJobs(userId);
const trackedIds = await this.redis.smembers(userJobsKey);
if (trackedIds.length === 0) {
return [];
}
const activeIds: string[] = [];
const staleIds: string[] = [];
for (const streamId of trackedIds) {
const job = await this.getJob(streamId);
// Only include if job exists AND is still running
if (job && job.status === 'running') {
activeIds.push(streamId);
} else {
// Self-healing: job completed/deleted but mapping wasn't cleaned - mark for removal
staleIds.push(streamId);
}
}
// Clean up stale entries
if (staleIds.length > 0) {
await this.redis.srem(userJobsKey, ...staleIds);
logger.debug(
`[RedisJobStore] Self-healed ${staleIds.length} stale job entries for user ${userId}`,
);
}
return activeIds;
}
async destroy(): Promise<void> {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);