From 857c054a9a40abf85ae974b37593ecfb72d20ded Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Sun, 5 Oct 2025 13:48:41 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=97=91=EF=B8=8F=20feat:=20Cleanup=20for?= =?UTF-8?q?=20Orphaned=20MeiliSearch=20Documents=20(#9980)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Added a new function `deleteDocumentsWithoutUserField` to remove documents lacking the user field from the specified MeiliSearch index. - Integrated this function into the `ensureFilterableAttributes` method to clean up orphaned documents when existing messages or conversations are found without a user field. 🔧 feat: Enhance Index Synchronization Logic - Updated `ensureFilterableAttributes` to return an object indicating whether settings were updated and if orphaned documents were found. - Integrated orphaned document cleanup directly into the index synchronization process without forcing a full re-sync unless settings were updated. - Improved logging for clarity on index configuration updates and orphaned document handling. 🔧 feat: Improve Flow State Management in Index Synchronization - Refactored flow state management logic to ensure cleanup occurs after synchronization, regardless of success or error. - Enhanced logging for flow state cleanup to provide better visibility into the synchronization process. - Streamlined the structure of the index synchronization function for improved readability. --- api/db/indexSync.js | 154 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 123 insertions(+), 31 deletions(-) diff --git a/api/db/indexSync.js b/api/db/indexSync.js index 9a9ed9507a..090c32cca8 100644 --- a/api/db/indexSync.js +++ b/api/db/indexSync.js @@ -29,12 +29,64 @@ class MeiliSearchClient { } } +/** + * Deletes documents from MeiliSearch index that are missing the user field + * @param {import('meilisearch').Index} index - MeiliSearch index instance + * @param {string} indexName - Name of the index for logging + * @returns {Promise} - Number of documents deleted + */ +async function deleteDocumentsWithoutUserField(index, indexName) { + let deletedCount = 0; + let offset = 0; + const batchSize = 1000; + + try { + while (true) { + const searchResult = await index.search('', { + limit: batchSize, + offset: offset, + }); + + if (searchResult.hits.length === 0) { + break; + } + + const idsToDelete = searchResult.hits.filter((hit) => !hit.user).map((hit) => hit.id); + + if (idsToDelete.length > 0) { + logger.info( + `[indexSync] Deleting ${idsToDelete.length} documents without user field from ${indexName} index`, + ); + await index.deleteDocuments(idsToDelete); + deletedCount += idsToDelete.length; + } + + if (searchResult.hits.length < batchSize) { + break; + } + + offset += batchSize; + } + + if (deletedCount > 0) { + logger.info(`[indexSync] Deleted ${deletedCount} orphaned documents from ${indexName} index`); + } + } catch (error) { + logger.error(`[indexSync] Error deleting documents from ${indexName}:`, error); + } + + return deletedCount; +} + /** * Ensures indexes have proper filterable attributes configured and checks if documents have user field * @param {MeiliSearch} client - MeiliSearch client instance - * @returns {Promise} - true if configuration was updated or re-sync is needed + * @returns {Promise<{settingsUpdated: boolean, orphanedDocsFound: boolean}>} - Status of what was done */ async function ensureFilterableAttributes(client) { + let settingsUpdated = false; + let hasOrphanedDocs = false; + try { // Check and update messages index try { @@ -47,16 +99,17 @@ async function ensureFilterableAttributes(client) { filterableAttributes: ['user'], }); logger.info('[indexSync] Messages index configured for user filtering'); - logger.info('[indexSync] Index configuration updated. Full re-sync will be triggered.'); - return true; + settingsUpdated = true; } // Check if existing documents have user field indexed try { const searchResult = await messagesIndex.search('', { limit: 1 }); if (searchResult.hits.length > 0 && !searchResult.hits[0].user) { - logger.info('[indexSync] Existing messages missing user field, re-sync needed'); - return true; + logger.info( + '[indexSync] Existing messages missing user field, will clean up orphaned documents...', + ); + hasOrphanedDocs = true; } } catch (searchError) { logger.debug('[indexSync] Could not check message documents:', searchError.message); @@ -78,16 +131,17 @@ async function ensureFilterableAttributes(client) { filterableAttributes: ['user'], }); logger.info('[indexSync] Convos index configured for user filtering'); - logger.info('[indexSync] Index configuration updated. Full re-sync will be triggered.'); - return true; + settingsUpdated = true; } // Check if existing documents have user field indexed try { const searchResult = await convosIndex.search('', { limit: 1 }); if (searchResult.hits.length > 0 && !searchResult.hits[0].user) { - logger.info('[indexSync] Existing conversations missing user field, re-sync needed'); - return true; + logger.info( + '[indexSync] Existing conversations missing user field, will clean up orphaned documents...', + ); + hasOrphanedDocs = true; } } catch (searchError) { logger.debug('[indexSync] Could not check conversation documents:', searchError.message); @@ -97,11 +151,34 @@ async function ensureFilterableAttributes(client) { logger.warn('[indexSync] Could not check/update convos index settings:', error.message); } } + + // If either index has orphaned documents, clean them up (but don't force resync) + if (hasOrphanedDocs) { + try { + const messagesIndex = client.index('messages'); + await deleteDocumentsWithoutUserField(messagesIndex, 'messages'); + } catch (error) { + logger.debug('[indexSync] Could not clean up messages:', error.message); + } + + try { + const convosIndex = client.index('convos'); + await deleteDocumentsWithoutUserField(convosIndex, 'convos'); + } catch (error) { + logger.debug('[indexSync] Could not clean up convos:', error.message); + } + + logger.info('[indexSync] Orphaned documents cleaned up without forcing resync.'); + } + + if (settingsUpdated) { + logger.info('[indexSync] Index settings updated. Full re-sync will be triggered.'); + } } catch (error) { logger.error('[indexSync] Error ensuring filterable attributes:', error); } - return false; + return { settingsUpdated, orphanedDocsFound: hasOrphanedDocs }; } /** @@ -121,14 +198,17 @@ async function performSync() { } /** Ensures indexes have proper filterable attributes configured */ - const configUpdated = await ensureFilterableAttributes(client); + const { settingsUpdated, orphanedDocsFound: _orphanedDocsFound } = + await ensureFilterableAttributes(client); let messagesSync = false; let convosSync = false; - // If configuration was just updated or documents are missing user field, force a full re-sync - if (configUpdated) { - logger.info('[indexSync] Forcing full re-sync to ensure user field is properly indexed...'); + // Only reset flags if settings were actually updated (not just for orphaned doc cleanup) + if (settingsUpdated) { + logger.info( + '[indexSync] Settings updated. Forcing full re-sync to reindex with new configuration...', + ); // Reset sync flags to force full re-sync await Message.collection.updateMany({ _meiliIndex: true }, { $set: { _meiliIndex: false } }); @@ -140,7 +220,7 @@ async function performSync() { // Check if we need to sync messages const messageProgress = await Message.getSyncProgress(); - if (!messageProgress.isComplete || configUpdated) { + if (!messageProgress.isComplete || settingsUpdated) { logger.info( `[indexSync] Messages need syncing: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments} indexed`, ); @@ -167,7 +247,7 @@ async function performSync() { // Check if we need to sync conversations const convoProgress = await Conversation.getSyncProgress(); - if (!convoProgress.isComplete || configUpdated) { + if (!convoProgress.isComplete || settingsUpdated) { logger.info( `[indexSync] Conversations need syncing: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments} indexed`, ); @@ -204,22 +284,22 @@ async function indexSync() { logger.info('[indexSync] Starting index synchronization check...'); + // Get or create FlowStateManager instance + const flowsCache = getLogStores(CacheKeys.FLOWS); + if (!flowsCache) { + logger.warn('[indexSync] Flows cache not available, falling back to direct sync'); + return await performSync(); + } + + const flowManager = new FlowStateManager(flowsCache, { + ttl: 60000 * 10, // 10 minutes TTL for sync operations + }); + + // Use a unique flow ID for the sync operation + const flowId = 'meili-index-sync'; + const flowType = 'MEILI_SYNC'; + try { - // Get or create FlowStateManager instance - const flowsCache = getLogStores(CacheKeys.FLOWS); - if (!flowsCache) { - logger.warn('[indexSync] Flows cache not available, falling back to direct sync'); - return await performSync(); - } - - const flowManager = new FlowStateManager(flowsCache, { - ttl: 60000 * 10, // 10 minutes TTL for sync operations - }); - - // Use a unique flow ID for the sync operation - const flowId = 'meili-index-sync'; - const flowType = 'MEILI_SYNC'; - // This will only execute the handler if no other instance is running the sync const result = await flowManager.createFlowWithHandler(flowId, flowType, performSync); @@ -251,6 +331,18 @@ async function indexSync() { } else { logger.error('[indexSync] error', err); } + } finally { + // Always clean up flow state after completion or error + // Skip cleanup only if flow already exists (another instance is handling it) + try { + const flowState = await flowManager.getFlowState(flowId, flowType); + if (flowState) { + await flowManager.deleteFlow(flowId, flowType); + logger.debug('[indexSync] Flow state cleaned up'); + } + } catch (cleanupErr) { + logger.debug('[indexSync] Could not clean up flow state:', cleanupErr.message); + } } }