diff --git a/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts b/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts index 95ea15dbb3..d3fc2d0813 100644 --- a/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts @@ -613,6 +613,273 @@ describe('RedisJobStore Integration Tests', () => { }); }); + describe('Active Jobs by User', () => { + test('should return active job IDs for a user', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const userId = `test-user-${Date.now()}`; + const streamId1 = `stream-1-${Date.now()}`; + const streamId2 = `stream-2-${Date.now()}`; + + // Create two jobs for the same user + await store.createJob(streamId1, userId, streamId1); + await store.createJob(streamId2, userId, streamId2); + + // Get active jobs for user + const activeJobs = await store.getActiveJobIdsByUser(userId); + + expect(activeJobs).toHaveLength(2); + expect(activeJobs).toContain(streamId1); + expect(activeJobs).toContain(streamId2); + + await store.destroy(); + }); + + test('should return empty array for user with no jobs', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const userId = `nonexistent-user-${Date.now()}`; + + const activeJobs = await store.getActiveJobIdsByUser(userId); + + expect(activeJobs).toHaveLength(0); + + await store.destroy(); + }); + + test('should not return completed jobs', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const userId = `test-user-${Date.now()}`; + const streamId1 = `stream-1-${Date.now()}`; + const streamId2 = `stream-2-${Date.now()}`; + + // Create two jobs + await store.createJob(streamId1, userId, streamId1); + await store.createJob(streamId2, userId, streamId2); + + // Complete one job + await store.updateJob(streamId1, { status: 'complete', completedAt: Date.now() }); + + // Get active jobs - should only return the running one + const activeJobs = await store.getActiveJobIdsByUser(userId); + + expect(activeJobs).toHaveLength(1); + expect(activeJobs).toContain(streamId2); + expect(activeJobs).not.toContain(streamId1); + + await store.destroy(); + }); + + test('should not return aborted jobs', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const userId = `test-user-${Date.now()}`; + const streamId = `stream-${Date.now()}`; + + // Create a job and abort it + await store.createJob(streamId, userId, streamId); + await store.updateJob(streamId, { status: 'aborted', completedAt: Date.now() }); + + // Get active jobs - should be empty + const activeJobs = await store.getActiveJobIdsByUser(userId); + + expect(activeJobs).toHaveLength(0); + + await store.destroy(); + }); + + test('should not return error jobs', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const userId = `test-user-${Date.now()}`; + const streamId = `stream-${Date.now()}`; + + // Create a job with error status + await store.createJob(streamId, userId, streamId); + await store.updateJob(streamId, { + status: 'error', + error: 'Test error', + completedAt: Date.now(), + }); + + // Get active jobs - should be empty + const activeJobs = await store.getActiveJobIdsByUser(userId); + + expect(activeJobs).toHaveLength(0); + + await store.destroy(); + }); + + test('should perform self-healing cleanup of stale entries', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const userId = `test-user-${Date.now()}`; + const streamId = `stream-${Date.now()}`; + const staleStreamId = `stale-stream-${Date.now()}`; + + // Create a real job + await store.createJob(streamId, userId, streamId); + + // Manually add a stale entry to the user's job set (simulating orphaned data) + const userJobsKey = `stream:user:{${userId}}:jobs`; + await ioredisClient.sadd(userJobsKey, staleStreamId); + + // Verify both entries exist in the set + const beforeCleanup = await ioredisClient.smembers(userJobsKey); + expect(beforeCleanup).toContain(streamId); + expect(beforeCleanup).toContain(staleStreamId); + + // Get active jobs - should trigger self-healing + const activeJobs = await store.getActiveJobIdsByUser(userId); + + // Should only return the real job + expect(activeJobs).toHaveLength(1); + expect(activeJobs).toContain(streamId); + + // Verify stale entry was removed + const afterCleanup = await ioredisClient.smembers(userJobsKey); + expect(afterCleanup).toContain(streamId); + expect(afterCleanup).not.toContain(staleStreamId); + + await store.destroy(); + }); + + test('should isolate jobs between different users', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const userId1 = `user-1-${Date.now()}`; + const userId2 = `user-2-${Date.now()}`; + const streamId1 = `stream-1-${Date.now()}`; + const streamId2 = `stream-2-${Date.now()}`; + + // Create jobs for different users + await store.createJob(streamId1, userId1, streamId1); + await store.createJob(streamId2, userId2, streamId2); + + // Get active jobs for user 1 + const user1Jobs = await store.getActiveJobIdsByUser(userId1); + expect(user1Jobs).toHaveLength(1); + expect(user1Jobs).toContain(streamId1); + expect(user1Jobs).not.toContain(streamId2); + + // Get active jobs for user 2 + const user2Jobs = await store.getActiveJobIdsByUser(userId2); + expect(user2Jobs).toHaveLength(1); + expect(user2Jobs).toContain(streamId2); + expect(user2Jobs).not.toContain(streamId1); + + await store.destroy(); + }); + + test('should work across multiple store instances (horizontal scaling)', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + + // Simulate two server instances + const instance1 = new RedisJobStore(ioredisClient); + const instance2 = new RedisJobStore(ioredisClient); + + await instance1.initialize(); + await instance2.initialize(); + + const userId = `test-user-${Date.now()}`; + const streamId = `stream-${Date.now()}`; + + // Instance 1 creates a job + await instance1.createJob(streamId, userId, streamId); + + // Instance 2 should see the active job + const activeJobs = await instance2.getActiveJobIdsByUser(userId); + expect(activeJobs).toHaveLength(1); + expect(activeJobs).toContain(streamId); + + // Instance 1 completes the job + await instance1.updateJob(streamId, { status: 'complete', completedAt: Date.now() }); + + // Instance 2 should no longer see the job as active + const activeJobsAfter = await instance2.getActiveJobIdsByUser(userId); + expect(activeJobsAfter).toHaveLength(0); + + await instance1.destroy(); + await instance2.destroy(); + }); + + test('should clean up user jobs set when job is deleted', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const userId = `test-user-${Date.now()}`; + const streamId = `stream-${Date.now()}`; + + // Create a job + await store.createJob(streamId, userId, streamId); + + // Verify job is in active list + let activeJobs = await store.getActiveJobIdsByUser(userId); + expect(activeJobs).toContain(streamId); + + // Delete the job + await store.deleteJob(streamId); + + // Job should no longer be in active list + activeJobs = await store.getActiveJobIdsByUser(userId); + expect(activeJobs).not.toContain(streamId); + + await store.destroy(); + }); + }); + describe('Local Graph Cache Optimization', () => { test('should use local cache when available', async () => { if (!ioredisClient) { diff --git a/packages/api/src/stream/implementations/RedisJobStore.ts b/packages/api/src/stream/implementations/RedisJobStore.ts index 6f851be286..bc8895cea0 100644 --- a/packages/api/src/stream/implementations/RedisJobStore.ts +++ b/packages/api/src/stream/implementations/RedisJobStore.ts @@ -24,6 +24,8 @@ const KEYS = { runSteps: (streamId: string) => `stream:{${streamId}}:runsteps`, /** Running jobs set for cleanup (global set - single slot) */ runningJobs: 'stream:running', + /** User's active jobs set: stream:user:{userId}:jobs */ + userJobs: (userId: string) => `stream:user:{${userId}}:jobs`, }; /** @@ -137,18 +139,21 @@ export class RedisJobStore implements IJobStore { }; const key = KEYS.job(streamId); + const userJobsKey = KEYS.userJobs(userId); // For cluster mode, we can't pipeline keys on different slots - // The job key uses hash tag {streamId}, runningJobs is global + // The job key uses hash tag {streamId}, runningJobs and userJobs are on different slots if (this.isCluster) { await this.redis.hmset(key, this.serializeJob(job)); await this.redis.expire(key, this.ttl.running); await this.redis.sadd(KEYS.runningJobs, streamId); + await this.redis.sadd(userJobsKey, streamId); } else { const pipeline = this.redis.pipeline(); pipeline.hmset(key, this.serializeJob(job)); pipeline.expire(key, this.ttl.running); pipeline.sadd(KEYS.runningJobs, streamId); + pipeline.sadd(userJobsKey, streamId); await pipeline.exec(); } @@ -166,8 +171,10 @@ export class RedisJobStore implements IJobStore { async updateJob(streamId: string, updates: Partial): Promise { const key = KEYS.job(streamId); - const exists = await this.redis.exists(key); - if (!exists) { + + // Get existing job data (needed for userId when removing from user jobs set) + const job = await this.getJob(streamId); + if (!job) { return; } @@ -178,12 +185,17 @@ export class RedisJobStore implements IJobStore { await this.redis.hmset(key, serialized); - // If status changed to complete/error/aborted, update TTL and remove from running set + // If status changed to complete/error/aborted, update TTL and remove from running/user sets if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) { + const userId = job.userId; + // In cluster mode, separate runningJobs (global) from stream-specific keys if (this.isCluster) { await this.redis.expire(key, this.ttl.completed); await this.redis.srem(KEYS.runningJobs, streamId); + if (userId) { + await this.redis.srem(KEYS.userJobs(userId), streamId); + } if (this.ttl.chunksAfterComplete === 0) { await this.redis.del(KEYS.chunks(streamId)); @@ -200,6 +212,9 @@ export class RedisJobStore implements IJobStore { const pipeline = this.redis.pipeline(); pipeline.expire(key, this.ttl.completed); pipeline.srem(KEYS.runningJobs, streamId); + if (userId) { + pipeline.srem(KEYS.userJobs(userId), streamId); + } if (this.ttl.chunksAfterComplete === 0) { pipeline.del(KEYS.chunks(streamId)); @@ -222,6 +237,10 @@ export class RedisJobStore implements IJobStore { // Clear local cache this.localGraphCache.delete(streamId); + // Get job to find userId for user jobs cleanup + const job = await this.getJob(streamId); + const userId = job?.userId; + // In cluster mode, separate runningJobs (global) from stream-specific keys (same slot) if (this.isCluster) { // Stream-specific keys all hash to same slot due to {streamId} @@ -230,14 +249,20 @@ export class RedisJobStore implements IJobStore { pipeline.del(KEYS.chunks(streamId)); pipeline.del(KEYS.runSteps(streamId)); await pipeline.exec(); - // Global set is on different slot - execute separately + // Global sets are on different slots - execute separately await this.redis.srem(KEYS.runningJobs, streamId); + if (userId) { + await this.redis.srem(KEYS.userJobs(userId), streamId); + } } else { const pipeline = this.redis.pipeline(); pipeline.del(KEYS.job(streamId)); pipeline.del(KEYS.chunks(streamId)); pipeline.del(KEYS.runSteps(streamId)); pipeline.srem(KEYS.runningJobs, streamId); + if (userId) { + pipeline.srem(KEYS.userJobs(userId), streamId); + } await pipeline.exec(); } logger.debug(`[RedisJobStore] Deleted job: ${streamId}`); @@ -327,6 +352,47 @@ export class RedisJobStore implements IJobStore { return 0; } + /** + * Get active job IDs for a user. + * Returns conversation IDs of running jobs belonging to the user. + * Also performs self-healing cleanup: removes stale entries for jobs that no longer exist. + * + * @param userId - The user ID to query + * @returns Array of conversation IDs with active jobs + */ + async getActiveJobIdsByUser(userId: string): Promise { + const userJobsKey = KEYS.userJobs(userId); + const trackedIds = await this.redis.smembers(userJobsKey); + + if (trackedIds.length === 0) { + return []; + } + + const activeIds: string[] = []; + const staleIds: string[] = []; + + for (const streamId of trackedIds) { + const job = await this.getJob(streamId); + // Only include if job exists AND is still running + if (job && job.status === 'running') { + activeIds.push(streamId); + } else { + // Self-healing: job completed/deleted but mapping wasn't cleaned - mark for removal + staleIds.push(streamId); + } + } + + // Clean up stale entries + if (staleIds.length > 0) { + await this.redis.srem(userJobsKey, ...staleIds); + logger.debug( + `[RedisJobStore] Self-healed ${staleIds.length} stale job entries for user ${userId}`, + ); + } + + return activeIds; + } + async destroy(): Promise { if (this.cleanupInterval) { clearInterval(this.cleanupInterval);