diff --git a/api/db/indexSync.js b/api/db/indexSync.js index 090c32cca8..c86598d108 100644 --- a/api/db/indexSync.js +++ b/api/db/indexSync.js @@ -183,95 +183,111 @@ async function ensureFilterableAttributes(client) { /** * Performs the actual sync operations for messages and conversations + * @param {FlowStateManager} flowManager - Flow state manager instance + * @param {string} flowId - Flow identifier + * @param {string} flowType - Flow type */ -async function performSync() { - const client = MeiliSearchClient.getInstance(); +async function performSync(flowManager, flowId, flowType) { + try { + const client = MeiliSearchClient.getInstance(); - const { status } = await client.health(); - if (status !== 'available') { - throw new Error('Meilisearch not available'); - } - - if (indexingDisabled === true) { - logger.info('[indexSync] Indexing is disabled, skipping...'); - return { messagesSync: false, convosSync: false }; - } - - /** Ensures indexes have proper filterable attributes configured */ - const { settingsUpdated, orphanedDocsFound: _orphanedDocsFound } = - await ensureFilterableAttributes(client); - - let messagesSync = false; - let convosSync = false; - - // Only reset flags if settings were actually updated (not just for orphaned doc cleanup) - if (settingsUpdated) { - logger.info( - '[indexSync] Settings updated. Forcing full re-sync to reindex with new configuration...', - ); - - // Reset sync flags to force full re-sync - await Message.collection.updateMany({ _meiliIndex: true }, { $set: { _meiliIndex: false } }); - await Conversation.collection.updateMany( - { _meiliIndex: true }, - { $set: { _meiliIndex: false } }, - ); - } - - // Check if we need to sync messages - const messageProgress = await Message.getSyncProgress(); - if (!messageProgress.isComplete || settingsUpdated) { - logger.info( - `[indexSync] Messages need syncing: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments} indexed`, - ); - - // Check if we should do a full sync or incremental - const messageCount = await Message.countDocuments(); - const messagesIndexed = messageProgress.totalProcessed; - const syncThreshold = parseInt(process.env.MEILI_SYNC_THRESHOLD || '1000', 10); - - if (messageCount - messagesIndexed > syncThreshold) { - logger.info('[indexSync] Starting full message sync due to large difference'); - await Message.syncWithMeili(); - messagesSync = true; - } else if (messageCount !== messagesIndexed) { - logger.warn('[indexSync] Messages out of sync, performing incremental sync'); - await Message.syncWithMeili(); - messagesSync = true; + const { status } = await client.health(); + if (status !== 'available') { + throw new Error('Meilisearch not available'); } - } else { - logger.info( - `[indexSync] Messages are fully synced: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments}`, - ); - } - // Check if we need to sync conversations - const convoProgress = await Conversation.getSyncProgress(); - if (!convoProgress.isComplete || settingsUpdated) { - logger.info( - `[indexSync] Conversations need syncing: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments} indexed`, - ); - - const convoCount = await Conversation.countDocuments(); - const convosIndexed = convoProgress.totalProcessed; - const syncThreshold = parseInt(process.env.MEILI_SYNC_THRESHOLD || '1000', 10); - - if (convoCount - convosIndexed > syncThreshold) { - logger.info('[indexSync] Starting full conversation sync due to large difference'); - await Conversation.syncWithMeili(); - convosSync = true; - } else if (convoCount !== convosIndexed) { - logger.warn('[indexSync] Convos out of sync, performing incremental sync'); - await Conversation.syncWithMeili(); - convosSync = true; + if (indexingDisabled === true) { + logger.info('[indexSync] Indexing is disabled, skipping...'); + return { messagesSync: false, convosSync: false }; } - } else { - logger.info( - `[indexSync] Conversations are fully synced: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments}`, - ); - } - return { messagesSync, convosSync }; + /** Ensures indexes have proper filterable attributes configured */ + const { settingsUpdated, orphanedDocsFound: _orphanedDocsFound } = + await ensureFilterableAttributes(client); + + let messagesSync = false; + let convosSync = false; + + // Only reset flags if settings were actually updated (not just for orphaned doc cleanup) + if (settingsUpdated) { + logger.info( + '[indexSync] Settings updated. Forcing full re-sync to reindex with new configuration...', + ); + + // Reset sync flags to force full re-sync + await Message.collection.updateMany({ _meiliIndex: true }, { $set: { _meiliIndex: false } }); + await Conversation.collection.updateMany( + { _meiliIndex: true }, + { $set: { _meiliIndex: false } }, + ); + } + + // Check if we need to sync messages + const messageProgress = await Message.getSyncProgress(); + if (!messageProgress.isComplete || settingsUpdated) { + logger.info( + `[indexSync] Messages need syncing: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments} indexed`, + ); + + // Check if we should do a full sync or incremental + const messageCount = await Message.countDocuments(); + const messagesIndexed = messageProgress.totalProcessed; + const syncThreshold = parseInt(process.env.MEILI_SYNC_THRESHOLD || '1000', 10); + + if (messageCount - messagesIndexed > syncThreshold) { + logger.info('[indexSync] Starting full message sync due to large difference'); + await Message.syncWithMeili(); + messagesSync = true; + } else if (messageCount !== messagesIndexed) { + logger.warn('[indexSync] Messages out of sync, performing incremental sync'); + await Message.syncWithMeili(); + messagesSync = true; + } + } else { + logger.info( + `[indexSync] Messages are fully synced: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments}`, + ); + } + + // Check if we need to sync conversations + const convoProgress = await Conversation.getSyncProgress(); + if (!convoProgress.isComplete || settingsUpdated) { + logger.info( + `[indexSync] Conversations need syncing: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments} indexed`, + ); + + const convoCount = await Conversation.countDocuments(); + const convosIndexed = convoProgress.totalProcessed; + const syncThreshold = parseInt(process.env.MEILI_SYNC_THRESHOLD || '1000', 10); + + if (convoCount - convosIndexed > syncThreshold) { + logger.info('[indexSync] Starting full conversation sync due to large difference'); + await Conversation.syncWithMeili(); + convosSync = true; + } else if (convoCount !== convosIndexed) { + logger.warn('[indexSync] Convos out of sync, performing incremental sync'); + await Conversation.syncWithMeili(); + convosSync = true; + } + } else { + logger.info( + `[indexSync] Conversations are fully synced: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments}`, + ); + } + + return { messagesSync, convosSync }; + } finally { + if (indexingDisabled === true) { + logger.info('[indexSync] Indexing is disabled, skipping cleanup...'); + } else if (flowManager && flowId && flowType) { + try { + await flowManager.deleteFlow(flowId, flowType); + logger.debug('[indexSync] Flow state cleaned up'); + } catch (cleanupErr) { + logger.debug('[indexSync] Could not clean up flow state:', cleanupErr.message); + } + } + } } /** @@ -288,7 +304,7 @@ async function indexSync() { const flowsCache = getLogStores(CacheKeys.FLOWS); if (!flowsCache) { logger.warn('[indexSync] Flows cache not available, falling back to direct sync'); - return await performSync(); + return await performSync(null, null, null); } const flowManager = new FlowStateManager(flowsCache, { @@ -301,7 +317,9 @@ async function indexSync() { try { // This will only execute the handler if no other instance is running the sync - const result = await flowManager.createFlowWithHandler(flowId, flowType, performSync); + const result = await flowManager.createFlowWithHandler(flowId, flowType, () => + performSync(flowManager, flowId, flowType), + ); if (result.messagesSync || result.convosSync) { logger.info('[indexSync] Sync completed successfully'); @@ -331,18 +349,6 @@ async function indexSync() { } else { logger.error('[indexSync] error', err); } - } finally { - // Always clean up flow state after completion or error - // Skip cleanup only if flow already exists (another instance is handling it) - try { - const flowState = await flowManager.getFlowState(flowId, flowType); - if (flowState) { - await flowManager.deleteFlow(flowId, flowType); - logger.debug('[indexSync] Flow state cleaned up'); - } - } catch (cleanupErr) { - logger.debug('[indexSync] Could not clean up flow state:', cleanupErr.message); - } } }