fix(flow): add immediate abort handling and fix intervalId initialization

- Add immediate abort handler that responds instantly to abort signal
- Declare intervalId before cleanup function to prevent 'Cannot access before initialization' error
- Consolidate cleanup logic into single function to avoid duplicate cleanup
- Properly remove abort event listener on cleanup
This commit is contained in:
Danny Avila 2025-12-18 19:25:46 -05:00
parent 228562c9f0
commit cc931bcf51
No known key found for this signature in database
GPG key ID: BF31EEB2C5CA0956

View file

@ -129,22 +129,61 @@ export class FlowStateManager<T = unknown> {
return new Promise<T>((resolve, reject) => {
const checkInterval = 2000;
let elapsedTime = 0;
let isCleanedUp = false;
let intervalId: NodeJS.Timeout | null = null;
// Cleanup function to avoid duplicate cleanup
const cleanup = () => {
if (isCleanedUp) return;
isCleanedUp = true;
if (intervalId) {
clearInterval(intervalId);
this.intervals.delete(intervalId);
}
if (signal && abortHandler) {
signal.removeEventListener('abort', abortHandler);
}
};
// Immediate abort handler - responds instantly to abort signal
const abortHandler = async () => {
cleanup();
logger.warn(`[${flowKey}] Flow aborted (immediate)`);
const message = `${type} flow aborted`;
try {
await this.keyv.delete(flowKey);
} catch {
// Ignore delete errors during abort
}
reject(new Error(message));
};
// Register abort handler immediately if signal provided
if (signal) {
if (signal.aborted) {
// Already aborted, reject immediately
cleanup();
reject(new Error(`${type} flow aborted`));
return;
}
signal.addEventListener('abort', abortHandler, { once: true });
}
intervalId = setInterval(async () => {
if (isCleanedUp) return;
const intervalId = setInterval(async () => {
try {
const flowState = (await this.keyv.get(flowKey)) as FlowState<T> | undefined;
if (!flowState) {
clearInterval(intervalId);
this.intervals.delete(intervalId);
cleanup();
logger.error(`[${flowKey}] Flow state not found`);
reject(new Error(`${type} Flow state not found`));
return;
}
if (signal?.aborted) {
clearInterval(intervalId);
this.intervals.delete(intervalId);
cleanup();
logger.warn(`[${flowKey}] Flow aborted`);
const message = `${type} flow aborted`;
await this.keyv.delete(flowKey);
@ -153,8 +192,7 @@ export class FlowStateManager<T = unknown> {
}
if (flowState.status !== 'PENDING') {
clearInterval(intervalId);
this.intervals.delete(intervalId);
cleanup();
logger.debug(`[${flowKey}] Flow completed`);
if (flowState.status === 'COMPLETED' && flowState.result !== undefined) {
@ -168,8 +206,7 @@ export class FlowStateManager<T = unknown> {
elapsedTime += checkInterval;
if (elapsedTime >= this.ttl) {
clearInterval(intervalId);
this.intervals.delete(intervalId);
cleanup();
logger.error(
`[${flowKey}] Flow timed out | Elapsed time: ${elapsedTime} | TTL: ${this.ttl}`,
);
@ -179,8 +216,7 @@ export class FlowStateManager<T = unknown> {
logger.debug(`[${flowKey}] Flow state elapsed time: ${elapsedTime}, checking again...`);
} catch (error) {
logger.error(`[${flowKey}] Error checking flow state:`, error);
clearInterval(intervalId);
this.intervals.delete(intervalId);
cleanup();
reject(error);
}
}, checkInterval);