⚛️ refactor: Redis Scalability Improvements for High-Throughput Deployments (#11840)

* fix: Redis scalability improvements for high-throughput deployments

  Replace INCR+check+DECR race in concurrency middleware with atomic Lua
  scripts. The old approach allowed 3-4 concurrent requests through a
  limit of 2 at 300 req/s because another request could slip between the
  INCR returning and the DECR executing. The Lua scripts run atomically
  on the Redis server, eliminating the race window entirely.

  Add exponential backoff with jitter to all three Redis retry strategies
  (ioredis single-node, cluster, keyv). Previously all instances retried
  at the same millisecond after an outage, causing a connection storm.

  Batch the RedisJobStore cleanup loop into parallel chunks of 50. With
  1000 stale jobs, this reduces cleanup from ~20s of sequential calls to
  ~2s. Also pipeline appendChunk (xadd + expire) into a single round-trip
  and refresh TTL on every chunk instead of only the first, preventing
  TTL expiry during long-running streams.

  Propagate publish errors in RedisEventTransport.emitDone and emitError
  so callers can detect dropped completion/error events. emitChunk is left
  as swallow-and-log because its callers fire-and-forget without await.

  Add jest.config.js for the API package with babel TypeScript support and
  path alias resolution. Fix existing stream integration tests that were
  silently broken due to missing USE_REDIS_CLUSTER=false env var.

* chore: Migrate Jest configuration from jest.config.js to jest.config.mjs

Removed the old jest.config.js file and integrated the Jest configuration into jest.config.mjs, adding Babel TypeScript support and path alias resolution. This change streamlines the configuration for the API package.

* fix: Ensure Redis retry delays do not exceed maximum configured delay

Updated the delay calculation in Redis retry strategies to enforce a maximum delay defined in the configuration. This change prevents excessive delays during reconnection attempts, improving overall connection stability and performance.

* fix: Update RedisJobStore cleanup to handle job failures gracefully

Changed the cleanup process in RedisJobStore to use Promise.allSettled instead of Promise.all, allowing for individual job failures to be logged without interrupting the entire cleanup operation. This enhances error handling and provides better visibility into issues during job cleanup.
This commit is contained in:
Danny Avila 2026-02-18 00:04:33 -05:00 committed by GitHub
parent 5ea59ecb2b
commit 3fa94e843c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 696 additions and 70 deletions

View file

@ -10,6 +10,17 @@ export default {
],
coverageReporters: ['text', 'cobertura'],
testResultsProcessor: 'jest-junit',
transform: {
'\\.[jt]sx?$': [
'babel-jest',
{
presets: [
['@babel/preset-env', { targets: { node: 'current' } }],
'@babel/preset-typescript',
],
},
],
},
moduleNameMapper: {
'^@src/(.*)$': '<rootDir>/src/$1',
'~/(.*)': '<rootDir>/src/$1',

View file

@ -120,7 +120,9 @@ export const limiterCache = (prefix: string): RedisStore | undefined => {
if (!cacheConfig.USE_REDIS) {
return undefined;
}
// TODO: The prefix is not actually applied. Also needs to account for global prefix.
// Note: The `prefix` is applied by RedisStore internally to its key operations.
// The global REDIS_KEY_PREFIX is applied by ioredisClient's keyPrefix setting.
// Combined key format: `{REDIS_KEY_PREFIX}::{prefix}{identifier}`
prefix = prefix.endsWith(':') ? prefix : `${prefix}:`;
try {

View file

@ -29,7 +29,9 @@ if (cacheConfig.USE_REDIS) {
);
return null;
}
const delay = Math.min(times * 50, cacheConfig.REDIS_RETRY_MAX_DELAY);
const base = Math.min(Math.pow(2, times) * 50, cacheConfig.REDIS_RETRY_MAX_DELAY);
const jitter = Math.floor(Math.random() * Math.min(base, 1000));
const delay = Math.min(base + jitter, cacheConfig.REDIS_RETRY_MAX_DELAY);
logger.info(`ioredis reconnecting... attempt ${times}, delay ${delay}ms`);
return delay;
},
@ -71,7 +73,9 @@ if (cacheConfig.USE_REDIS) {
);
return null;
}
const delay = Math.min(times * 100, cacheConfig.REDIS_RETRY_MAX_DELAY);
const base = Math.min(Math.pow(2, times) * 100, cacheConfig.REDIS_RETRY_MAX_DELAY);
const jitter = Math.floor(Math.random() * Math.min(base, 1000));
const delay = Math.min(base + jitter, cacheConfig.REDIS_RETRY_MAX_DELAY);
logger.info(`ioredis cluster reconnecting... attempt ${times}, delay ${delay}ms`);
return delay;
},
@ -149,7 +153,9 @@ if (cacheConfig.USE_REDIS) {
);
return new Error('Max reconnection attempts reached');
}
const delay = Math.min(retries * 100, cacheConfig.REDIS_RETRY_MAX_DELAY);
const base = Math.min(Math.pow(2, retries) * 100, cacheConfig.REDIS_RETRY_MAX_DELAY);
const jitter = Math.floor(Math.random() * Math.min(base, 1000));
const delay = Math.min(base + jitter, cacheConfig.REDIS_RETRY_MAX_DELAY);
logger.info(`@keyv/redis reconnecting... attempt ${retries}, delay ${delay}ms`);
return delay;
},

View file

@ -0,0 +1,258 @@
import type { Redis, Cluster } from 'ioredis';
/**
* Integration tests for concurrency middleware atomic Lua scripts.
*
* Tests that the Lua-based check-and-increment / decrement operations
* are truly atomic and eliminate the INCR+check+DECR race window.
*
* Run with: USE_REDIS=true npx jest --config packages/api/jest.config.js concurrency.cache_integration
*/
describe('Concurrency Middleware Integration Tests', () => {
let originalEnv: NodeJS.ProcessEnv;
let ioredisClient: Redis | Cluster | null = null;
let checkAndIncrementPendingRequest: (
userId: string,
) => Promise<{ allowed: boolean; pendingRequests: number; limit: number }>;
let decrementPendingRequest: (userId: string) => Promise<void>;
const testPrefix = 'Concurrency-Integration-Test';
beforeAll(async () => {
originalEnv = { ...process.env };
process.env.USE_REDIS = process.env.USE_REDIS ?? 'true';
process.env.USE_REDIS_CLUSTER = process.env.USE_REDIS_CLUSTER ?? 'false';
process.env.REDIS_URI = process.env.REDIS_URI ?? 'redis://127.0.0.1:6379';
process.env.REDIS_KEY_PREFIX = testPrefix;
process.env.REDIS_PING_INTERVAL = '0';
process.env.REDIS_RETRY_MAX_ATTEMPTS = '5';
process.env.LIMIT_CONCURRENT_MESSAGES = 'true';
process.env.CONCURRENT_MESSAGE_MAX = '2';
jest.resetModules();
const { ioredisClient: client } = await import('../../cache/redisClients');
ioredisClient = client;
if (!ioredisClient) {
console.warn('Redis not available, skipping integration tests');
return;
}
// Import concurrency module after Redis client is available
const concurrency = await import('../concurrency');
checkAndIncrementPendingRequest = concurrency.checkAndIncrementPendingRequest;
decrementPendingRequest = concurrency.decrementPendingRequest;
});
afterEach(async () => {
if (!ioredisClient) {
return;
}
try {
const keys = await ioredisClient.keys(`${testPrefix}*`);
if (keys.length > 0) {
await Promise.all(keys.map((key) => ioredisClient!.del(key)));
}
} catch (error) {
console.warn('Error cleaning up test keys:', error);
}
});
afterAll(async () => {
if (ioredisClient) {
try {
await ioredisClient.quit();
} catch {
try {
ioredisClient.disconnect();
} catch {
// Ignore
}
}
}
process.env = originalEnv;
});
describe('Atomic Check and Increment', () => {
test('should allow requests within the concurrency limit', async () => {
if (!ioredisClient) {
return;
}
const userId = `user-allow-${Date.now()}`;
// First request - should be allowed (count = 1, limit = 2)
const result1 = await checkAndIncrementPendingRequest(userId);
expect(result1.allowed).toBe(true);
expect(result1.pendingRequests).toBe(1);
expect(result1.limit).toBe(2);
// Second request - should be allowed (count = 2, limit = 2)
const result2 = await checkAndIncrementPendingRequest(userId);
expect(result2.allowed).toBe(true);
expect(result2.pendingRequests).toBe(2);
});
test('should reject requests over the concurrency limit', async () => {
if (!ioredisClient) {
return;
}
const userId = `user-reject-${Date.now()}`;
// Fill up to the limit
await checkAndIncrementPendingRequest(userId);
await checkAndIncrementPendingRequest(userId);
// Third request - should be rejected (count would be 3, limit = 2)
const result = await checkAndIncrementPendingRequest(userId);
expect(result.allowed).toBe(false);
expect(result.pendingRequests).toBe(3); // Reports the count that was over-limit
});
test('should not leave stale counter after rejection (atomic rollback)', async () => {
if (!ioredisClient) {
return;
}
const userId = `user-rollback-${Date.now()}`;
// Fill up to the limit
await checkAndIncrementPendingRequest(userId);
await checkAndIncrementPendingRequest(userId);
// Attempt over-limit (should be rejected and atomically rolled back)
const rejected = await checkAndIncrementPendingRequest(userId);
expect(rejected.allowed).toBe(false);
// The key value should still be 2, not 3 — verify the Lua script decremented back
const key = `PENDING_REQ:${userId}`;
const rawValue = await ioredisClient.get(key);
expect(rawValue).toBe('2');
});
test('should handle concurrent requests atomically (no over-admission)', async () => {
if (!ioredisClient) {
return;
}
const userId = `user-concurrent-${Date.now()}`;
// Fire 20 concurrent requests for the same user (limit = 2)
const results = await Promise.all(
Array.from({ length: 20 }, () => checkAndIncrementPendingRequest(userId)),
);
const allowed = results.filter((r) => r.allowed);
const rejected = results.filter((r) => !r.allowed);
// Exactly 2 should be allowed (the concurrency limit)
expect(allowed.length).toBe(2);
expect(rejected.length).toBe(18);
// The key value should be exactly 2 after all atomic operations
const key = `PENDING_REQ:${userId}`;
const rawValue = await ioredisClient.get(key);
expect(rawValue).toBe('2');
// Clean up
await decrementPendingRequest(userId);
await decrementPendingRequest(userId);
});
});
describe('Atomic Decrement', () => {
test('should decrement pending requests', async () => {
if (!ioredisClient) {
return;
}
const userId = `user-decrement-${Date.now()}`;
await checkAndIncrementPendingRequest(userId);
await checkAndIncrementPendingRequest(userId);
// Decrement once
await decrementPendingRequest(userId);
const key = `PENDING_REQ:${userId}`;
const rawValue = await ioredisClient.get(key);
expect(rawValue).toBe('1');
});
test('should clean up key when count reaches zero', async () => {
if (!ioredisClient) {
return;
}
const userId = `user-cleanup-${Date.now()}`;
await checkAndIncrementPendingRequest(userId);
await decrementPendingRequest(userId);
// Key should be deleted (not left as "0")
const key = `PENDING_REQ:${userId}`;
const exists = await ioredisClient.exists(key);
expect(exists).toBe(0);
});
test('should clean up key on double-decrement (negative protection)', async () => {
if (!ioredisClient) {
return;
}
const userId = `user-double-decr-${Date.now()}`;
await checkAndIncrementPendingRequest(userId);
await decrementPendingRequest(userId);
await decrementPendingRequest(userId); // Double-decrement
// Key should be deleted, not negative
const key = `PENDING_REQ:${userId}`;
const exists = await ioredisClient.exists(key);
expect(exists).toBe(0);
});
test('should allow new requests after decrement frees a slot', async () => {
if (!ioredisClient) {
return;
}
const userId = `user-free-slot-${Date.now()}`;
// Fill to limit
await checkAndIncrementPendingRequest(userId);
await checkAndIncrementPendingRequest(userId);
// Verify at limit
const atLimit = await checkAndIncrementPendingRequest(userId);
expect(atLimit.allowed).toBe(false);
// Free a slot
await decrementPendingRequest(userId);
// Should now be allowed again
const allowed = await checkAndIncrementPendingRequest(userId);
expect(allowed.allowed).toBe(true);
expect(allowed.pendingRequests).toBe(2);
});
});
describe('TTL Behavior', () => {
test('should set TTL on the concurrency key', async () => {
if (!ioredisClient) {
return;
}
const userId = `user-ttl-${Date.now()}`;
await checkAndIncrementPendingRequest(userId);
const key = `PENDING_REQ:${userId}`;
const ttl = await ioredisClient.ttl(key);
expect(ttl).toBeGreaterThan(0);
expect(ttl).toBeLessThanOrEqual(60);
});
});
});

View file

@ -9,6 +9,40 @@ const LIMIT_CONCURRENT_MESSAGES = process.env.LIMIT_CONCURRENT_MESSAGES;
const CONCURRENT_MESSAGE_MAX = math(process.env.CONCURRENT_MESSAGE_MAX, 2);
const CONCURRENT_VIOLATION_SCORE = math(process.env.CONCURRENT_VIOLATION_SCORE, 1);
/**
* Lua script for atomic check-and-increment.
* Increments the key, sets TTL, and if over limit decrements back.
* Returns positive count if allowed, negative count if rejected.
* Single round-trip, fully atomic eliminates the INCR/check/DECR race window.
*/
const CHECK_AND_INCREMENT_SCRIPT = `
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local ttl = tonumber(ARGV[2])
local current = redis.call('INCR', key)
redis.call('EXPIRE', key, ttl)
if current > limit then
redis.call('DECR', key)
return -current
end
return current
`;
/**
* Lua script for atomic decrement-and-cleanup.
* Decrements the key and deletes it if the count reaches zero or below.
* Eliminates the DECR-then-DEL race window.
*/
const DECREMENT_SCRIPT = `
local key = KEYS[1]
local current = redis.call('DECR', key)
if current <= 0 then
redis.call('DEL', key)
return 0
end
return current
`;
/** Lazily initialized cache for pending requests (used only for in-memory fallback) */
let pendingReqCache: ReturnType<typeof standardCache> | null = null;
@ -80,36 +114,28 @@ export async function checkAndIncrementPendingRequest(
return { allowed: true, pendingRequests: 0, limit };
}
// Use atomic Redis INCR when available to prevent race conditions
// Use atomic Lua script when Redis is available to prevent race conditions.
// A single EVAL round-trip atomically increments, checks, and decrements if over-limit.
if (USE_REDIS && ioredisClient) {
const key = buildKey(userId);
try {
// Pipeline ensures INCR and EXPIRE execute atomically in one round-trip
// This prevents edge cases where crash between operations leaves key without TTL
const pipeline = ioredisClient.pipeline();
pipeline.incr(key);
pipeline.expire(key, 60);
const results = await pipeline.exec();
const result = (await ioredisClient.eval(
CHECK_AND_INCREMENT_SCRIPT,
1,
key,
limit,
60,
)) as number;
if (!results || results[0][0]) {
throw new Error('Pipeline execution failed');
if (result < 0) {
// Negative return means over-limit (absolute value is the count before decrement)
const count = -result;
logger.debug(`[concurrency] User ${userId} exceeded concurrent limit: ${count}/${limit}`);
return { allowed: false, pendingRequests: count, limit };
}
const newCount = results[0][1] as number;
if (newCount > limit) {
// Over limit - decrement back and reject
await ioredisClient.decr(key);
logger.debug(
`[concurrency] User ${userId} exceeded concurrent limit: ${newCount}/${limit}`,
);
return { allowed: false, pendingRequests: newCount, limit };
}
logger.debug(
`[concurrency] User ${userId} incremented pending requests: ${newCount}/${limit}`,
);
return { allowed: true, pendingRequests: newCount, limit };
logger.debug(`[concurrency] User ${userId} incremented pending requests: ${result}/${limit}`);
return { allowed: true, pendingRequests: result, limit };
} catch (error) {
logger.error('[concurrency] Redis atomic increment failed:', error);
// On Redis error, allow the request to proceed (fail-open)
@ -164,18 +190,12 @@ export async function decrementPendingRequest(userId: string): Promise<void> {
return;
}
// Use atomic Redis DECR when available
// Use atomic Lua script to decrement and clean up zero/negative keys in one round-trip
if (USE_REDIS && ioredisClient) {
const key = buildKey(userId);
try {
const newCount = await ioredisClient.decr(key);
if (newCount < 0) {
// Counter went negative - reset to 0 and delete
await ioredisClient.del(key);
logger.debug(`[concurrency] User ${userId} pending requests cleared (was negative)`);
} else if (newCount === 0) {
// Clean up zero-value keys
await ioredisClient.del(key);
const newCount = (await ioredisClient.eval(DECREMENT_SCRIPT, 1, key)) as number;
if (newCount === 0) {
logger.debug(`[concurrency] User ${userId} pending requests cleared`);
} else {
logger.debug(`[concurrency] User ${userId} decremented pending requests: ${newCount}`);

View file

@ -19,8 +19,11 @@ describe('RedisEventTransport Integration Tests', () => {
originalEnv = { ...process.env };
process.env.USE_REDIS = process.env.USE_REDIS ?? 'true';
process.env.USE_REDIS_CLUSTER = process.env.USE_REDIS_CLUSTER ?? 'false';
process.env.REDIS_URI = process.env.REDIS_URI ?? 'redis://127.0.0.1:6379';
process.env.REDIS_KEY_PREFIX = testPrefix;
process.env.REDIS_PING_INTERVAL = '0';
process.env.REDIS_RETRY_MAX_ATTEMPTS = '5';
jest.resetModules();
@ -890,4 +893,121 @@ describe('RedisEventTransport Integration Tests', () => {
subscriber.disconnect();
});
});
describe('Publish Error Propagation', () => {
test('should swallow emitChunk publish errors (callers fire-and-forget)', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockRejectedValue(new Error('Redis connection lost')),
};
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
};
const transport = new RedisEventTransport(
mockPublisher as unknown as Redis,
mockSubscriber as unknown as Redis,
);
const streamId = `error-prop-chunk-${Date.now()}`;
// emitChunk swallows errors because callers often fire-and-forget (no await).
// Throwing would cause unhandled promise rejections.
await expect(transport.emitChunk(streamId, { data: 'test' })).resolves.toBeUndefined();
transport.destroy();
});
test('should throw when emitDone publish fails', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockRejectedValue(new Error('Redis connection lost')),
};
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
};
const transport = new RedisEventTransport(
mockPublisher as unknown as Redis,
mockSubscriber as unknown as Redis,
);
const streamId = `error-prop-done-${Date.now()}`;
await expect(transport.emitDone(streamId, { finished: true })).rejects.toThrow(
'Redis connection lost',
);
transport.destroy();
});
test('should throw when emitError publish fails', async () => {
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const mockPublisher = {
publish: jest.fn().mockRejectedValue(new Error('Redis connection lost')),
};
const mockSubscriber = {
on: jest.fn(),
subscribe: jest.fn().mockResolvedValue(undefined),
unsubscribe: jest.fn().mockResolvedValue(undefined),
};
const transport = new RedisEventTransport(
mockPublisher as unknown as Redis,
mockSubscriber as unknown as Redis,
);
const streamId = `error-prop-error-${Date.now()}`;
await expect(transport.emitError(streamId, 'some error')).rejects.toThrow(
'Redis connection lost',
);
transport.destroy();
});
test('should still deliver events successfully when publish succeeds', async () => {
if (!ioredisClient) {
console.warn('Redis not available, skipping test');
return;
}
const { RedisEventTransport } = await import('../implementations/RedisEventTransport');
const subscriber = (ioredisClient as Redis).duplicate();
const transport = new RedisEventTransport(ioredisClient, subscriber);
const streamId = `error-prop-success-${Date.now()}`;
const receivedChunks: unknown[] = [];
let doneEvent: unknown = null;
transport.subscribe(streamId, {
onChunk: (event) => receivedChunks.push(event),
onDone: (event) => {
doneEvent = event;
},
});
await new Promise((resolve) => setTimeout(resolve, 200));
// These should NOT throw
await transport.emitChunk(streamId, { text: 'hello' });
await transport.emitDone(streamId, { finished: true });
await new Promise((resolve) => setTimeout(resolve, 200));
expect(receivedChunks.length).toBe(1);
expect(doneEvent).toEqual({ finished: true });
transport.destroy();
subscriber.disconnect();
});
});
});

View file

@ -24,8 +24,11 @@ describe('RedisJobStore Integration Tests', () => {
// Set up test environment
process.env.USE_REDIS = process.env.USE_REDIS ?? 'true';
process.env.USE_REDIS_CLUSTER = process.env.USE_REDIS_CLUSTER ?? 'false';
process.env.REDIS_URI = process.env.REDIS_URI ?? 'redis://127.0.0.1:6379';
process.env.REDIS_KEY_PREFIX = testPrefix;
process.env.REDIS_PING_INTERVAL = '0';
process.env.REDIS_RETRY_MAX_ATTEMPTS = '5';
jest.resetModules();
@ -1033,4 +1036,196 @@ describe('RedisJobStore Integration Tests', () => {
await instance2.destroy();
});
});
describe('Batched Cleanup', () => {
test('should clean up many stale jobs in parallel batches', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
// Very short TTL so jobs are immediately stale
const store = new RedisJobStore(ioredisClient, { runningTtl: 1 });
await store.initialize();
const jobCount = 75; // More than one batch of 50
const veryOldTimestamp = Date.now() - 10000; // 10 seconds ago
// Create many stale jobs directly in Redis
for (let i = 0; i < jobCount; i++) {
const streamId = `batch-cleanup-${Date.now()}-${i}`;
const jobKey = `stream:{${streamId}}:job`;
await ioredisClient.hmset(jobKey, {
streamId,
userId: 'batch-user',
status: 'running',
createdAt: veryOldTimestamp.toString(),
syncSent: '0',
});
await ioredisClient.sadd('stream:running', streamId);
}
// Verify jobs are in the running set
const runningBefore = await ioredisClient.scard('stream:running');
expect(runningBefore).toBeGreaterThanOrEqual(jobCount);
// Run cleanup - should process in batches of 50
const cleaned = await store.cleanup();
expect(cleaned).toBeGreaterThanOrEqual(jobCount);
await store.destroy();
});
test('should not clean up valid running jobs during batch cleanup', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient, { runningTtl: 1200 });
await store.initialize();
// Create a mix of valid and stale jobs
const validStreamId = `valid-job-${Date.now()}`;
await store.createJob(validStreamId, 'user-1', validStreamId);
const staleStreamId = `stale-job-${Date.now()}`;
const jobKey = `stream:{${staleStreamId}}:job`;
await ioredisClient.hmset(jobKey, {
streamId: staleStreamId,
userId: 'user-1',
status: 'running',
createdAt: (Date.now() - 2000000).toString(), // Very old
syncSent: '0',
});
await ioredisClient.sadd('stream:running', staleStreamId);
const cleaned = await store.cleanup();
expect(cleaned).toBeGreaterThanOrEqual(1);
// Valid job should still exist
const validJob = await store.getJob(validStreamId);
expect(validJob).not.toBeNull();
expect(validJob?.status).toBe('running');
await store.destroy();
});
});
describe('appendChunk TTL Refresh', () => {
test('should set TTL on the chunk stream', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient, { runningTtl: 120 });
await store.initialize();
const streamId = `append-ttl-${Date.now()}`;
await store.createJob(streamId, 'user-1', streamId);
await store.appendChunk(streamId, {
event: 'on_message_delta',
data: { id: 'step-1', type: 'text', text: 'first' },
});
const chunkKey = `stream:{${streamId}}:chunks`;
const ttl = await ioredisClient.ttl(chunkKey);
expect(ttl).toBeGreaterThan(0);
expect(ttl).toBeLessThanOrEqual(120);
await store.destroy();
});
test('should refresh TTL on subsequent chunks (not just first)', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient, { runningTtl: 120 });
await store.initialize();
const streamId = `append-refresh-${Date.now()}`;
await store.createJob(streamId, 'user-1', streamId);
// Append first chunk
await store.appendChunk(streamId, {
event: 'on_message_delta',
data: { id: 'step-1', type: 'text', text: 'first' },
});
const chunkKey = `stream:{${streamId}}:chunks`;
const ttl1 = await ioredisClient.ttl(chunkKey);
expect(ttl1).toBeGreaterThan(0);
// Manually reduce TTL to simulate time passing
await ioredisClient.expire(chunkKey, 30);
const reducedTtl = await ioredisClient.ttl(chunkKey);
expect(reducedTtl).toBeLessThanOrEqual(30);
// Append another chunk - TTL should be refreshed back to running TTL
await store.appendChunk(streamId, {
event: 'on_message_delta',
data: { id: 'step-1', type: 'text', text: 'second' },
});
const ttl2 = await ioredisClient.ttl(chunkKey);
// Should be refreshed to ~120, not still ~30
expect(ttl2).toBeGreaterThan(30);
expect(ttl2).toBeLessThanOrEqual(120);
await store.destroy();
});
test('should store chunks correctly via pipeline', async () => {
if (!ioredisClient) {
return;
}
const { RedisJobStore } = await import('../implementations/RedisJobStore');
const store = new RedisJobStore(ioredisClient);
await store.initialize();
const streamId = `append-pipeline-${Date.now()}`;
await store.createJob(streamId, 'user-1', streamId);
const chunks = [
{
event: 'on_run_step',
data: {
id: 'step-1',
runId: 'run-1',
index: 0,
stepDetails: { type: 'message_creation' },
},
},
{
event: 'on_message_delta',
data: { id: 'step-1', delta: { content: { type: 'text', text: 'Hello ' } } },
},
{
event: 'on_message_delta',
data: { id: 'step-1', delta: { content: { type: 'text', text: 'world!' } } },
},
];
for (const chunk of chunks) {
await store.appendChunk(streamId, chunk);
}
// Verify all chunks were stored
const chunkKey = `stream:{${streamId}}:chunks`;
const len = await ioredisClient.xlen(chunkKey);
expect(len).toBe(3);
// Verify content can be reconstructed
const content = await store.getContentParts(streamId);
expect(content).not.toBeNull();
expect(content!.content.length).toBeGreaterThan(0);
await store.destroy();
});
});
});

View file

@ -461,6 +461,7 @@ export class RedisEventTransport implements IEventTransport {
await this.publisher.publish(channel, JSON.stringify(message));
} catch (err) {
logger.error(`[RedisEventTransport] Failed to publish done:`, err);
throw err;
}
}
@ -477,6 +478,7 @@ export class RedisEventTransport implements IEventTransport {
await this.publisher.publish(channel, JSON.stringify(message));
} catch (err) {
logger.error(`[RedisEventTransport] Failed to publish error:`, err);
throw err;
}
}

View file

@ -302,32 +302,46 @@ export class RedisJobStore implements IJobStore {
}
}
for (const streamId of streamIds) {
const job = await this.getJob(streamId);
// Process in batches of 50 to avoid sequential per-job round-trips
const BATCH_SIZE = 50;
for (let i = 0; i < streamIds.length; i += BATCH_SIZE) {
const batch = streamIds.slice(i, i + BATCH_SIZE);
const results = await Promise.allSettled(
batch.map(async (streamId) => {
const job = await this.getJob(streamId);
// Job no longer exists (TTL expired) - remove from set
if (!job) {
await this.redis.srem(KEYS.runningJobs, streamId);
this.localGraphCache.delete(streamId);
this.localCollectedUsageCache.delete(streamId);
cleaned++;
continue;
}
// Job no longer exists (TTL expired) - remove from set
if (!job) {
await this.redis.srem(KEYS.runningJobs, streamId);
this.localGraphCache.delete(streamId);
this.localCollectedUsageCache.delete(streamId);
return 1;
}
// Job completed but still in running set (shouldn't happen, but handle it)
if (job.status !== 'running') {
await this.redis.srem(KEYS.runningJobs, streamId);
this.localGraphCache.delete(streamId);
this.localCollectedUsageCache.delete(streamId);
cleaned++;
continue;
}
// Job completed but still in running set (shouldn't happen, but handle it)
if (job.status !== 'running') {
await this.redis.srem(KEYS.runningJobs, streamId);
this.localGraphCache.delete(streamId);
this.localCollectedUsageCache.delete(streamId);
return 1;
}
// Stale running job (failsafe - running for > configured TTL)
if (now - job.createdAt > this.ttl.running * 1000) {
logger.warn(`[RedisJobStore] Cleaning up stale job: ${streamId}`);
await this.deleteJob(streamId);
cleaned++;
// Stale running job (failsafe - running for > configured TTL)
if (now - job.createdAt > this.ttl.running * 1000) {
logger.warn(`[RedisJobStore] Cleaning up stale job: ${streamId}`);
await this.deleteJob(streamId);
return 1;
}
return 0;
}),
);
for (const result of results) {
if (result.status === 'fulfilled') {
cleaned += result.value;
} else {
logger.warn(`[RedisJobStore] Cleanup failed for a job:`, result.reason);
}
}
}
@ -592,16 +606,14 @@ export class RedisJobStore implements IJobStore {
*/
async appendChunk(streamId: string, event: unknown): Promise<void> {
const key = KEYS.chunks(streamId);
const added = await this.redis.xadd(key, '*', 'event', JSON.stringify(event));
// Set TTL on first chunk (when stream is created)
// Subsequent chunks inherit the stream's TTL
if (added) {
const len = await this.redis.xlen(key);
if (len === 1) {
await this.redis.expire(key, this.ttl.running);
}
}
// Pipeline XADD + EXPIRE in a single round-trip.
// EXPIRE is O(1) and idempotent — refreshing TTL on every chunk is better than
// only setting it once, since the original approach could let the TTL expire
// during long-running streams.
const pipeline = this.redis.pipeline();
pipeline.xadd(key, '*', 'event', JSON.stringify(event));
pipeline.expire(key, this.ttl.running);
await pipeline.exec();
}
/**