refactor: Simplify job deletion logic by removing user job cleanup from InMemoryJobStore and RedisJobStore

This commit is contained in:
Danny Avila 2025-12-17 13:26:05 -05:00
parent 55adcda11f
commit 5402f0f1a8
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956
2 changed files with 6 additions and 36 deletions

View file

@ -106,18 +106,6 @@ export class InMemoryJobStore implements IJobStore {
} }
async deleteJob(streamId: string): Promise<void> { async deleteJob(streamId: string): Promise<void> {
// Remove from user's job set before deleting
const job = this.jobs.get(streamId);
if (job) {
const userJobs = this.userJobMap.get(job.userId);
if (userJobs) {
userJobs.delete(streamId);
if (userJobs.size === 0) {
this.userJobMap.delete(job.userId);
}
}
}
this.jobs.delete(streamId); this.jobs.delete(streamId);
this.contentState.delete(streamId); this.contentState.delete(streamId);
logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`); logger.debug(`[InMemoryJobStore] Deleted job: ${streamId}`);

View file

@ -171,10 +171,8 @@ export class RedisJobStore implements IJobStore {
async updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void> { async updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void> {
const key = KEYS.job(streamId); const key = KEYS.job(streamId);
const exists = await this.redis.exists(key);
// Get existing job data (needed for userId when removing from user jobs set) if (!exists) {
const job = await this.getJob(streamId);
if (!job) {
return; return;
} }
@ -185,17 +183,13 @@ export class RedisJobStore implements IJobStore {
await this.redis.hmset(key, serialized); await this.redis.hmset(key, serialized);
// If status changed to complete/error/aborted, update TTL and remove from running/user sets // If status changed to complete/error/aborted, update TTL and remove from running set
// Note: userJobs cleanup is handled lazily via self-healing in getActiveJobIdsByUser
if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) { if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) {
const userId = job.userId;
// In cluster mode, separate runningJobs (global) from stream-specific keys // In cluster mode, separate runningJobs (global) from stream-specific keys
if (this.isCluster) { if (this.isCluster) {
await this.redis.expire(key, this.ttl.completed); await this.redis.expire(key, this.ttl.completed);
await this.redis.srem(KEYS.runningJobs, streamId); await this.redis.srem(KEYS.runningJobs, streamId);
if (userId) {
await this.redis.srem(KEYS.userJobs(userId), streamId);
}
if (this.ttl.chunksAfterComplete === 0) { if (this.ttl.chunksAfterComplete === 0) {
await this.redis.del(KEYS.chunks(streamId)); await this.redis.del(KEYS.chunks(streamId));
@ -212,9 +206,6 @@ export class RedisJobStore implements IJobStore {
const pipeline = this.redis.pipeline(); const pipeline = this.redis.pipeline();
pipeline.expire(key, this.ttl.completed); pipeline.expire(key, this.ttl.completed);
pipeline.srem(KEYS.runningJobs, streamId); pipeline.srem(KEYS.runningJobs, streamId);
if (userId) {
pipeline.srem(KEYS.userJobs(userId), streamId);
}
if (this.ttl.chunksAfterComplete === 0) { if (this.ttl.chunksAfterComplete === 0) {
pipeline.del(KEYS.chunks(streamId)); pipeline.del(KEYS.chunks(streamId));
@ -237,10 +228,7 @@ export class RedisJobStore implements IJobStore {
// Clear local cache // Clear local cache
this.localGraphCache.delete(streamId); this.localGraphCache.delete(streamId);
// Get job to find userId for user jobs cleanup // Note: userJobs cleanup is handled lazily via self-healing in getActiveJobIdsByUser
const job = await this.getJob(streamId);
const userId = job?.userId;
// In cluster mode, separate runningJobs (global) from stream-specific keys (same slot) // In cluster mode, separate runningJobs (global) from stream-specific keys (same slot)
if (this.isCluster) { if (this.isCluster) {
// Stream-specific keys all hash to same slot due to {streamId} // Stream-specific keys all hash to same slot due to {streamId}
@ -249,20 +237,14 @@ export class RedisJobStore implements IJobStore {
pipeline.del(KEYS.chunks(streamId)); pipeline.del(KEYS.chunks(streamId));
pipeline.del(KEYS.runSteps(streamId)); pipeline.del(KEYS.runSteps(streamId));
await pipeline.exec(); await pipeline.exec();
// Global sets are on different slots - execute separately // Global set is on different slot - execute separately
await this.redis.srem(KEYS.runningJobs, streamId); await this.redis.srem(KEYS.runningJobs, streamId);
if (userId) {
await this.redis.srem(KEYS.userJobs(userId), streamId);
}
} else { } else {
const pipeline = this.redis.pipeline(); const pipeline = this.redis.pipeline();
pipeline.del(KEYS.job(streamId)); pipeline.del(KEYS.job(streamId));
pipeline.del(KEYS.chunks(streamId)); pipeline.del(KEYS.chunks(streamId));
pipeline.del(KEYS.runSteps(streamId)); pipeline.del(KEYS.runSteps(streamId));
pipeline.srem(KEYS.runningJobs, streamId); pipeline.srem(KEYS.runningJobs, streamId);
if (userId) {
pipeline.srem(KEYS.userJobs(userId), streamId);
}
await pipeline.exec(); await pipeline.exec();
} }
logger.debug(`[RedisJobStore] Deleted job: ${streamId}`); logger.debug(`[RedisJobStore] Deleted job: ${streamId}`);