From 5402f0f1a8698fe07af6be22d3b8c1941a53719c Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Wed, 17 Dec 2025 13:26:05 -0500 Subject: [PATCH] refactor: Simplify job deletion logic by removing user job cleanup from InMemoryJobStore and RedisJobStore --- .../implementations/InMemoryJobStore.ts | 12 -------- .../stream/implementations/RedisJobStore.ts | 30 ++++--------------- 2 files changed, 6 insertions(+), 36 deletions(-) diff --git a/packages/api/src/stream/implementations/InMemoryJobStore.ts b/packages/api/src/stream/implementations/InMemoryJobStore.ts index 1f7b4ac901..273935ec57 100644 --- a/packages/api/src/stream/implementations/InMemoryJobStore.ts +++ b/packages/api/src/stream/implementations/InMemoryJobStore.ts @@ -106,18 +106,6 @@ export class InMemoryJobStore implements IJobStore { } async deleteJob(streamId: string): Promise { - // Remove from user's job set before deleting - const job = this.jobs.get(streamId); - if (job) { - const userJobs = this.userJobMap.get(job.userId); - if (userJobs) { - userJobs.delete(streamId); - if (userJobs.size === 0) { - this.userJobMap.delete(job.userId); - } - } - } - this.jobs.delete(streamId); this.contentState.delete(streamId); logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`); diff --git a/packages/api/src/stream/implementations/RedisJobStore.ts b/packages/api/src/stream/implementations/RedisJobStore.ts index bc8895cea0..b234c14166 100644 --- a/packages/api/src/stream/implementations/RedisJobStore.ts +++ b/packages/api/src/stream/implementations/RedisJobStore.ts @@ -171,10 +171,8 @@ export class RedisJobStore implements IJobStore { async updateJob(streamId: string, updates: Partial): Promise { const key = KEYS.job(streamId); - - // Get existing job data (needed for userId when removing from user jobs set) - const job = await this.getJob(streamId); - if (!job) { + const exists = await this.redis.exists(key); + if (!exists) { return; } @@ -185,17 +183,13 @@ export class RedisJobStore implements IJobStore { await this.redis.hmset(key, serialized); - // If status changed to complete/error/aborted, update TTL and remove from running/user sets + // If status changed to complete/error/aborted, update TTL and remove from running set + // Note: userJobs cleanup is handled lazily via self-healing in getActiveJobIdsByUser if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) { - const userId = job.userId; - // In cluster mode, separate runningJobs (global) from stream-specific keys if (this.isCluster) { await this.redis.expire(key, this.ttl.completed); await this.redis.srem(KEYS.runningJobs, streamId); - if (userId) { - await this.redis.srem(KEYS.userJobs(userId), streamId); - } if (this.ttl.chunksAfterComplete === 0) { await this.redis.del(KEYS.chunks(streamId)); @@ -212,9 +206,6 @@ export class RedisJobStore implements IJobStore { const pipeline = this.redis.pipeline(); pipeline.expire(key, this.ttl.completed); pipeline.srem(KEYS.runningJobs, streamId); - if (userId) { - pipeline.srem(KEYS.userJobs(userId), streamId); - } if (this.ttl.chunksAfterComplete === 0) { pipeline.del(KEYS.chunks(streamId)); @@ -237,10 +228,7 @@ export class RedisJobStore implements IJobStore { // Clear local cache this.localGraphCache.delete(streamId); - // Get job to find userId for user jobs cleanup - const job = await this.getJob(streamId); - const userId = job?.userId; - + // Note: userJobs cleanup is handled lazily via self-healing in getActiveJobIdsByUser // In cluster mode, separate runningJobs (global) from stream-specific keys (same slot) if (this.isCluster) { // Stream-specific keys all hash to same slot due to {streamId} @@ -249,20 +237,14 @@ export class RedisJobStore implements IJobStore { pipeline.del(KEYS.chunks(streamId)); pipeline.del(KEYS.runSteps(streamId)); await pipeline.exec(); - // Global sets are on different slots - execute separately + // Global set is on different slot - execute separately await this.redis.srem(KEYS.runningJobs, streamId); - if (userId) { - await this.redis.srem(KEYS.userJobs(userId), streamId); - } } else { const pipeline = this.redis.pipeline(); pipeline.del(KEYS.job(streamId)); pipeline.del(KEYS.chunks(streamId)); pipeline.del(KEYS.runSteps(streamId)); pipeline.srem(KEYS.runningJobs, streamId); - if (userId) { - pipeline.srem(KEYS.userJobs(userId), streamId); - } await pipeline.exec(); } logger.debug(`[RedisJobStore] Deleted job: ${streamId}`);