From e142ab72da7ca53327543fcf2cc30461262f5e28 Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Thu, 12 Feb 2026 18:47:57 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=92=20fix:=20Prevent=20Race=20Conditio?= =?UTF-8?q?n=20in=20RedisJobStore=20(#11764)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 🔧 fix: Optimize job update logic in RedisJobStore - Refactored the updateJob method to use a Lua script for atomic updates, ensuring that jobs are only updated if they exist in Redis. - Removed redundant existence check and streamlined the serialization process for better performance and clarity. * 🔧 test: Add race condition tests for RedisJobStore - Introduced tests to verify behavior of updateJob after deleteJob, ensuring no job hash is recreated post-deletion. - Added checks for orphan keys when concurrent deleteJob and updateJob operations occur, enhancing reliability in job management. * 🔧 test: Refactor Redis client readiness checks in violationCache tests - Introduced a new helper function `waitForRedisClients` to streamline the readiness checks for Redis clients in the violationCache integration tests. - Removed redundant Redis client readiness checks from individual test cases, improving code clarity and maintainability. * 🔧 fix: Update RedisJobStore to use hset instead of hmset - Replaced instances of `hmset` with `hset` in the RedisJobStore implementation to align with the latest Redis command updates. - Updated Lua script in the eval method to reflect the change, ensuring consistent job handling in both cluster and non-cluster modes. --- .../violationCache.cache_integration.spec.ts | 58 +++++++----------- .../RedisJobStore.stream_integration.spec.ts | 61 +++++++++++++++++++ .../stream/implementations/RedisJobStore.ts | 20 +++--- 3 files changed, 96 insertions(+), 43 deletions(-) diff --git a/packages/api/src/cache/__tests__/cacheFactory/violationCache.cache_integration.spec.ts b/packages/api/src/cache/__tests__/cacheFactory/violationCache.cache_integration.spec.ts index 989008e82e..1978620c24 100644 --- a/packages/api/src/cache/__tests__/cacheFactory/violationCache.cache_integration.spec.ts +++ b/packages/api/src/cache/__tests__/cacheFactory/violationCache.cache_integration.spec.ts @@ -20,6 +20,24 @@ interface ViolationData { }; } +/** Waits for both Redis clients (ioredis + keyv/node-redis) to be ready */ +async function waitForRedisClients() { + const redisClients = await import('../../redisClients'); + const { ioredisClient, keyvRedisClientReady } = redisClients; + + if (ioredisClient && ioredisClient.status !== 'ready') { + await new Promise((resolve) => { + ioredisClient.once('ready', resolve); + }); + } + + if (keyvRedisClientReady) { + await keyvRedisClientReady; + } + + return redisClients; +} + describe('violationCache', () => { let originalEnv: NodeJS.ProcessEnv; @@ -45,17 +63,9 @@ describe('violationCache', () => { test('should create violation cache with Redis when USE_REDIS is true', async () => { const cacheFactory = await import('../../cacheFactory'); - const redisClients = await import('../../redisClients'); - const { ioredisClient } = redisClients; + await waitForRedisClients(); const cache = cacheFactory.violationCache('test-violations', 60000); // 60 second TTL - // Wait for Redis connection to be ready - if (ioredisClient && ioredisClient.status !== 'ready') { - await new Promise((resolve) => { - ioredisClient.once('ready', resolve); - }); - } - // Verify it returns a Keyv instance expect(cache).toBeDefined(); expect(cache.constructor.name).toBe('Keyv'); @@ -112,18 +122,10 @@ describe('violationCache', () => { test('should respect namespace prefixing', async () => { const cacheFactory = await import('../../cacheFactory'); - const redisClients = await import('../../redisClients'); - const { ioredisClient } = redisClients; + await waitForRedisClients(); const cache1 = cacheFactory.violationCache('namespace1'); const cache2 = cacheFactory.violationCache('namespace2'); - // Wait for Redis connection to be ready - if (ioredisClient && ioredisClient.status !== 'ready') { - await new Promise((resolve) => { - ioredisClient.once('ready', resolve); - }); - } - const testKey = 'shared-key'; const value1: ViolationData = { namespace: 1 }; const value2: ViolationData = { namespace: 2 }; @@ -146,18 +148,10 @@ describe('violationCache', () => { test('should respect TTL settings', async () => { const cacheFactory = await import('../../cacheFactory'); - const redisClients = await import('../../redisClients'); - const { ioredisClient } = redisClients; + await waitForRedisClients(); const ttl = 1000; // 1 second TTL const cache = cacheFactory.violationCache('ttl-test', ttl); - // Wait for Redis connection to be ready - if (ioredisClient && ioredisClient.status !== 'ready') { - await new Promise((resolve) => { - ioredisClient.once('ready', resolve); - }); - } - const testKey = 'ttl-key'; const testValue: ViolationData = { data: 'expires soon' }; @@ -178,17 +172,9 @@ describe('violationCache', () => { test('should handle complex violation data structures', async () => { const cacheFactory = await import('../../cacheFactory'); - const redisClients = await import('../../redisClients'); - const { ioredisClient } = redisClients; + await waitForRedisClients(); const cache = cacheFactory.violationCache('complex-violations'); - // Wait for Redis connection to be ready - if (ioredisClient && ioredisClient.status !== 'ready') { - await new Promise((resolve) => { - ioredisClient.once('ready', resolve); - }); - } - const complexData: ViolationData = { userId: 'user123', violations: [ diff --git a/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts b/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts index 89c6f9e92e..d00321a77d 100644 --- a/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts +++ b/packages/api/src/stream/__tests__/RedisJobStore.stream_integration.spec.ts @@ -880,6 +880,67 @@ describe('RedisJobStore Integration Tests', () => { }); }); + describe('Race Condition: updateJob after deleteJob', () => { + test('should not re-create job hash when updateJob runs after deleteJob', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const streamId = `race-condition-${Date.now()}`; + await store.createJob(streamId, 'user-1', streamId); + + const jobKey = `stream:{${streamId}}:job`; + const ttlBefore = await ioredisClient.ttl(jobKey); + expect(ttlBefore).toBeGreaterThan(0); + + await store.deleteJob(streamId); + + const afterDelete = await ioredisClient.exists(jobKey); + expect(afterDelete).toBe(0); + + await store.updateJob(streamId, { finalEvent: JSON.stringify({ final: true }) }); + + const afterUpdate = await ioredisClient.exists(jobKey); + expect(afterUpdate).toBe(0); + + await store.destroy(); + }); + + test('should not leave orphan keys from concurrent emitDone and deleteJob', async () => { + if (!ioredisClient) { + return; + } + + const { RedisJobStore } = await import('../implementations/RedisJobStore'); + const store = new RedisJobStore(ioredisClient); + await store.initialize(); + + const streamId = `concurrent-race-${Date.now()}`; + await store.createJob(streamId, 'user-1', streamId); + + const jobKey = `stream:{${streamId}}:job`; + + await Promise.all([ + store.updateJob(streamId, { finalEvent: JSON.stringify({ final: true }) }), + store.deleteJob(streamId), + ]); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + const exists = await ioredisClient.exists(jobKey); + const ttl = exists ? await ioredisClient.ttl(jobKey) : -2; + + expect(ttl === -2 || ttl > 0).toBe(true); + expect(ttl).not.toBe(-1); + + await store.destroy(); + }); + }); + describe('Local Graph Cache Optimization', () => { test('should use local cache when available', async () => { if (!ioredisClient) { diff --git a/packages/api/src/stream/implementations/RedisJobStore.ts b/packages/api/src/stream/implementations/RedisJobStore.ts index cce636d5a1..a0d26d087f 100644 --- a/packages/api/src/stream/implementations/RedisJobStore.ts +++ b/packages/api/src/stream/implementations/RedisJobStore.ts @@ -156,13 +156,13 @@ export class RedisJobStore implements IJobStore { // For cluster mode, we can't pipeline keys on different slots // The job key uses hash tag {streamId}, runningJobs and userJobs are on different slots if (this.isCluster) { - await this.redis.hmset(key, this.serializeJob(job)); + await this.redis.hset(key, this.serializeJob(job)); await this.redis.expire(key, this.ttl.running); await this.redis.sadd(KEYS.runningJobs, streamId); await this.redis.sadd(userJobsKey, streamId); } else { const pipeline = this.redis.pipeline(); - pipeline.hmset(key, this.serializeJob(job)); + pipeline.hset(key, this.serializeJob(job)); pipeline.expire(key, this.ttl.running); pipeline.sadd(KEYS.runningJobs, streamId); pipeline.sadd(userJobsKey, streamId); @@ -183,17 +183,23 @@ export class RedisJobStore implements IJobStore { async updateJob(streamId: string, updates: Partial): Promise { const key = KEYS.job(streamId); - const exists = await this.redis.exists(key); - if (!exists) { - return; - } const serialized = this.serializeJob(updates as SerializableJobData); if (Object.keys(serialized).length === 0) { return; } - await this.redis.hmset(key, serialized); + const fields = Object.entries(serialized).flat(); + const updated = await this.redis.eval( + 'if redis.call("EXISTS", KEYS[1]) == 1 then redis.call("HSET", KEYS[1], unpack(ARGV)) return 1 else return 0 end', + 1, + key, + ...fields, + ); + + if (updated === 0) { + return; + } // If status changed to complete/error/aborted, update TTL and remove from running set // Note: userJobs cleanup is handled lazily via self-healing in getActiveJobIdsByUser