From cc931bcf51a4e88cb0825648a5338249907d0b60 Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Thu, 18 Dec 2025 19:25:46 -0500 Subject: [PATCH] fix(flow): add immediate abort handling and fix intervalId initialization - Add immediate abort handler that responds instantly to abort signal - Declare intervalId before cleanup function to prevent 'Cannot access before initialization' error - Consolidate cleanup logic into single function to avoid duplicate cleanup - Properly remove abort event listener on cleanup --- packages/api/src/flow/manager.ts | 58 ++++++++++++++++++++++++++------ 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/packages/api/src/flow/manager.ts b/packages/api/src/flow/manager.ts index 96bd88167b..2e2731a2d4 100644 --- a/packages/api/src/flow/manager.ts +++ b/packages/api/src/flow/manager.ts @@ -129,22 +129,61 @@ export class FlowStateManager { return new Promise((resolve, reject) => { const checkInterval = 2000; let elapsedTime = 0; + let isCleanedUp = false; + let intervalId: NodeJS.Timeout | null = null; + + // Cleanup function to avoid duplicate cleanup + const cleanup = () => { + if (isCleanedUp) return; + isCleanedUp = true; + if (intervalId) { + clearInterval(intervalId); + this.intervals.delete(intervalId); + } + if (signal && abortHandler) { + signal.removeEventListener('abort', abortHandler); + } + }; + + // Immediate abort handler - responds instantly to abort signal + const abortHandler = async () => { + cleanup(); + logger.warn(`[${flowKey}] Flow aborted (immediate)`); + const message = `${type} flow aborted`; + try { + await this.keyv.delete(flowKey); + } catch { + // Ignore delete errors during abort + } + reject(new Error(message)); + }; + + // Register abort handler immediately if signal provided + if (signal) { + if (signal.aborted) { + // Already aborted, reject immediately + cleanup(); + reject(new Error(`${type} flow aborted`)); + return; + } + signal.addEventListener('abort', abortHandler, { once: true }); + } + + intervalId = setInterval(async () => { + if (isCleanedUp) return; - const intervalId = setInterval(async () => { try { const flowState = (await this.keyv.get(flowKey)) as FlowState | undefined; if (!flowState) { - clearInterval(intervalId); - this.intervals.delete(intervalId); + cleanup(); logger.error(`[${flowKey}] Flow state not found`); reject(new Error(`${type} Flow state not found`)); return; } if (signal?.aborted) { - clearInterval(intervalId); - this.intervals.delete(intervalId); + cleanup(); logger.warn(`[${flowKey}] Flow aborted`); const message = `${type} flow aborted`; await this.keyv.delete(flowKey); @@ -153,8 +192,7 @@ export class FlowStateManager { } if (flowState.status !== 'PENDING') { - clearInterval(intervalId); - this.intervals.delete(intervalId); + cleanup(); logger.debug(`[${flowKey}] Flow completed`); if (flowState.status === 'COMPLETED' && flowState.result !== undefined) { @@ -168,8 +206,7 @@ export class FlowStateManager { elapsedTime += checkInterval; if (elapsedTime >= this.ttl) { - clearInterval(intervalId); - this.intervals.delete(intervalId); + cleanup(); logger.error( `[${flowKey}] Flow timed out | Elapsed time: ${elapsedTime} | TTL: ${this.ttl}`, ); @@ -179,8 +216,7 @@ export class FlowStateManager { logger.debug(`[${flowKey}] Flow state elapsed time: ${elapsedTime}, checking again...`); } catch (error) { logger.error(`[${flowKey}] Error checking flow state:`, error); - clearInterval(intervalId); - this.intervals.delete(intervalId); + cleanup(); reject(error); } }, checkInterval);