mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-02-13 13:04:24 +01:00
🔒 fix: Prevent Race Condition in RedisJobStore (#11764)
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions
Some checks are pending
Docker Dev Branch Images Build / build (Dockerfile, lc-dev, node) (push) Waiting to run
Docker Dev Branch Images Build / build (Dockerfile.multi, lc-dev-api, api-build) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions
* 🔧 fix: Optimize job update logic in RedisJobStore - Refactored the updateJob method to use a Lua script for atomic updates, ensuring that jobs are only updated if they exist in Redis. - Removed redundant existence check and streamlined the serialization process for better performance and clarity. * 🔧 test: Add race condition tests for RedisJobStore - Introduced tests to verify behavior of updateJob after deleteJob, ensuring no job hash is recreated post-deletion. - Added checks for orphan keys when concurrent deleteJob and updateJob operations occur, enhancing reliability in job management. * 🔧 test: Refactor Redis client readiness checks in violationCache tests - Introduced a new helper function `waitForRedisClients` to streamline the readiness checks for Redis clients in the violationCache integration tests. - Removed redundant Redis client readiness checks from individual test cases, improving code clarity and maintainability. * 🔧 fix: Update RedisJobStore to use hset instead of hmset - Replaced instances of `hmset` with `hset` in the RedisJobStore implementation to align with the latest Redis command updates. - Updated Lua script in the eval method to reflect the change, ensuring consistent job handling in both cluster and non-cluster modes.
This commit is contained in:
parent
b8c31e7314
commit
e142ab72da
3 changed files with 96 additions and 43 deletions
|
|
@ -20,6 +20,24 @@ interface ViolationData {
|
|||
};
|
||||
}
|
||||
|
||||
/** Waits for both Redis clients (ioredis + keyv/node-redis) to be ready */
|
||||
async function waitForRedisClients() {
|
||||
const redisClients = await import('../../redisClients');
|
||||
const { ioredisClient, keyvRedisClientReady } = redisClients;
|
||||
|
||||
if (ioredisClient && ioredisClient.status !== 'ready') {
|
||||
await new Promise<void>((resolve) => {
|
||||
ioredisClient.once('ready', resolve);
|
||||
});
|
||||
}
|
||||
|
||||
if (keyvRedisClientReady) {
|
||||
await keyvRedisClientReady;
|
||||
}
|
||||
|
||||
return redisClients;
|
||||
}
|
||||
|
||||
describe('violationCache', () => {
|
||||
let originalEnv: NodeJS.ProcessEnv;
|
||||
|
||||
|
|
@ -45,17 +63,9 @@ describe('violationCache', () => {
|
|||
|
||||
test('should create violation cache with Redis when USE_REDIS is true', async () => {
|
||||
const cacheFactory = await import('../../cacheFactory');
|
||||
const redisClients = await import('../../redisClients');
|
||||
const { ioredisClient } = redisClients;
|
||||
await waitForRedisClients();
|
||||
const cache = cacheFactory.violationCache('test-violations', 60000); // 60 second TTL
|
||||
|
||||
// Wait for Redis connection to be ready
|
||||
if (ioredisClient && ioredisClient.status !== 'ready') {
|
||||
await new Promise<void>((resolve) => {
|
||||
ioredisClient.once('ready', resolve);
|
||||
});
|
||||
}
|
||||
|
||||
// Verify it returns a Keyv instance
|
||||
expect(cache).toBeDefined();
|
||||
expect(cache.constructor.name).toBe('Keyv');
|
||||
|
|
@ -112,18 +122,10 @@ describe('violationCache', () => {
|
|||
|
||||
test('should respect namespace prefixing', async () => {
|
||||
const cacheFactory = await import('../../cacheFactory');
|
||||
const redisClients = await import('../../redisClients');
|
||||
const { ioredisClient } = redisClients;
|
||||
await waitForRedisClients();
|
||||
const cache1 = cacheFactory.violationCache('namespace1');
|
||||
const cache2 = cacheFactory.violationCache('namespace2');
|
||||
|
||||
// Wait for Redis connection to be ready
|
||||
if (ioredisClient && ioredisClient.status !== 'ready') {
|
||||
await new Promise<void>((resolve) => {
|
||||
ioredisClient.once('ready', resolve);
|
||||
});
|
||||
}
|
||||
|
||||
const testKey = 'shared-key';
|
||||
const value1: ViolationData = { namespace: 1 };
|
||||
const value2: ViolationData = { namespace: 2 };
|
||||
|
|
@ -146,18 +148,10 @@ describe('violationCache', () => {
|
|||
|
||||
test('should respect TTL settings', async () => {
|
||||
const cacheFactory = await import('../../cacheFactory');
|
||||
const redisClients = await import('../../redisClients');
|
||||
const { ioredisClient } = redisClients;
|
||||
await waitForRedisClients();
|
||||
const ttl = 1000; // 1 second TTL
|
||||
const cache = cacheFactory.violationCache('ttl-test', ttl);
|
||||
|
||||
// Wait for Redis connection to be ready
|
||||
if (ioredisClient && ioredisClient.status !== 'ready') {
|
||||
await new Promise<void>((resolve) => {
|
||||
ioredisClient.once('ready', resolve);
|
||||
});
|
||||
}
|
||||
|
||||
const testKey = 'ttl-key';
|
||||
const testValue: ViolationData = { data: 'expires soon' };
|
||||
|
||||
|
|
@ -178,17 +172,9 @@ describe('violationCache', () => {
|
|||
|
||||
test('should handle complex violation data structures', async () => {
|
||||
const cacheFactory = await import('../../cacheFactory');
|
||||
const redisClients = await import('../../redisClients');
|
||||
const { ioredisClient } = redisClients;
|
||||
await waitForRedisClients();
|
||||
const cache = cacheFactory.violationCache('complex-violations');
|
||||
|
||||
// Wait for Redis connection to be ready
|
||||
if (ioredisClient && ioredisClient.status !== 'ready') {
|
||||
await new Promise<void>((resolve) => {
|
||||
ioredisClient.once('ready', resolve);
|
||||
});
|
||||
}
|
||||
|
||||
const complexData: ViolationData = {
|
||||
userId: 'user123',
|
||||
violations: [
|
||||
|
|
|
|||
|
|
@ -880,6 +880,67 @@ describe('RedisJobStore Integration Tests', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('Race Condition: updateJob after deleteJob', () => {
|
||||
test('should not re-create job hash when updateJob runs after deleteJob', async () => {
|
||||
if (!ioredisClient) {
|
||||
return;
|
||||
}
|
||||
|
||||
const { RedisJobStore } = await import('../implementations/RedisJobStore');
|
||||
const store = new RedisJobStore(ioredisClient);
|
||||
await store.initialize();
|
||||
|
||||
const streamId = `race-condition-${Date.now()}`;
|
||||
await store.createJob(streamId, 'user-1', streamId);
|
||||
|
||||
const jobKey = `stream:{${streamId}}:job`;
|
||||
const ttlBefore = await ioredisClient.ttl(jobKey);
|
||||
expect(ttlBefore).toBeGreaterThan(0);
|
||||
|
||||
await store.deleteJob(streamId);
|
||||
|
||||
const afterDelete = await ioredisClient.exists(jobKey);
|
||||
expect(afterDelete).toBe(0);
|
||||
|
||||
await store.updateJob(streamId, { finalEvent: JSON.stringify({ final: true }) });
|
||||
|
||||
const afterUpdate = await ioredisClient.exists(jobKey);
|
||||
expect(afterUpdate).toBe(0);
|
||||
|
||||
await store.destroy();
|
||||
});
|
||||
|
||||
test('should not leave orphan keys from concurrent emitDone and deleteJob', async () => {
|
||||
if (!ioredisClient) {
|
||||
return;
|
||||
}
|
||||
|
||||
const { RedisJobStore } = await import('../implementations/RedisJobStore');
|
||||
const store = new RedisJobStore(ioredisClient);
|
||||
await store.initialize();
|
||||
|
||||
const streamId = `concurrent-race-${Date.now()}`;
|
||||
await store.createJob(streamId, 'user-1', streamId);
|
||||
|
||||
const jobKey = `stream:{${streamId}}:job`;
|
||||
|
||||
await Promise.all([
|
||||
store.updateJob(streamId, { finalEvent: JSON.stringify({ final: true }) }),
|
||||
store.deleteJob(streamId),
|
||||
]);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
const exists = await ioredisClient.exists(jobKey);
|
||||
const ttl = exists ? await ioredisClient.ttl(jobKey) : -2;
|
||||
|
||||
expect(ttl === -2 || ttl > 0).toBe(true);
|
||||
expect(ttl).not.toBe(-1);
|
||||
|
||||
await store.destroy();
|
||||
});
|
||||
});
|
||||
|
||||
describe('Local Graph Cache Optimization', () => {
|
||||
test('should use local cache when available', async () => {
|
||||
if (!ioredisClient) {
|
||||
|
|
|
|||
|
|
@ -156,13 +156,13 @@ export class RedisJobStore implements IJobStore {
|
|||
// For cluster mode, we can't pipeline keys on different slots
|
||||
// The job key uses hash tag {streamId}, runningJobs and userJobs are on different slots
|
||||
if (this.isCluster) {
|
||||
await this.redis.hmset(key, this.serializeJob(job));
|
||||
await this.redis.hset(key, this.serializeJob(job));
|
||||
await this.redis.expire(key, this.ttl.running);
|
||||
await this.redis.sadd(KEYS.runningJobs, streamId);
|
||||
await this.redis.sadd(userJobsKey, streamId);
|
||||
} else {
|
||||
const pipeline = this.redis.pipeline();
|
||||
pipeline.hmset(key, this.serializeJob(job));
|
||||
pipeline.hset(key, this.serializeJob(job));
|
||||
pipeline.expire(key, this.ttl.running);
|
||||
pipeline.sadd(KEYS.runningJobs, streamId);
|
||||
pipeline.sadd(userJobsKey, streamId);
|
||||
|
|
@ -183,17 +183,23 @@ export class RedisJobStore implements IJobStore {
|
|||
|
||||
async updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void> {
|
||||
const key = KEYS.job(streamId);
|
||||
const exists = await this.redis.exists(key);
|
||||
if (!exists) {
|
||||
return;
|
||||
}
|
||||
|
||||
const serialized = this.serializeJob(updates as SerializableJobData);
|
||||
if (Object.keys(serialized).length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.redis.hmset(key, serialized);
|
||||
const fields = Object.entries(serialized).flat();
|
||||
const updated = await this.redis.eval(
|
||||
'if redis.call("EXISTS", KEYS[1]) == 1 then redis.call("HSET", KEYS[1], unpack(ARGV)) return 1 else return 0 end',
|
||||
1,
|
||||
key,
|
||||
...fields,
|
||||
);
|
||||
|
||||
if (updated === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// If status changed to complete/error/aborted, update TTL and remove from running set
|
||||
// Note: userJobs cleanup is handled lazily via self-healing in getActiveJobIdsByUser
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue