mirror of
https://github.com/danny-avila/LibreChat.git
synced 2025-12-17 00:40:14 +01:00
🗑️ feat: Cleanup for Orphaned MeiliSearch Documents (#9980)
- Added a new function `deleteDocumentsWithoutUserField` to remove documents lacking the user field from the specified MeiliSearch index. - Integrated this function into the `ensureFilterableAttributes` method to clean up orphaned documents when existing messages or conversations are found without a user field. 🔧 feat: Enhance Index Synchronization Logic - Updated `ensureFilterableAttributes` to return an object indicating whether settings were updated and if orphaned documents were found. - Integrated orphaned document cleanup directly into the index synchronization process without forcing a full re-sync unless settings were updated. - Improved logging for clarity on index configuration updates and orphaned document handling. 🔧 feat: Improve Flow State Management in Index Synchronization - Refactored flow state management logic to ensure cleanup occurs after synchronization, regardless of success or error. - Enhanced logging for flow state cleanup to provide better visibility into the synchronization process. - Streamlined the structure of the index synchronization function for improved readability.
This commit is contained in:
parent
c9103a1708
commit
857c054a9a
1 changed files with 123 additions and 31 deletions
|
|
@ -29,12 +29,64 @@ class MeiliSearchClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes documents from MeiliSearch index that are missing the user field
|
||||||
|
* @param {import('meilisearch').Index} index - MeiliSearch index instance
|
||||||
|
* @param {string} indexName - Name of the index for logging
|
||||||
|
* @returns {Promise<number>} - Number of documents deleted
|
||||||
|
*/
|
||||||
|
async function deleteDocumentsWithoutUserField(index, indexName) {
|
||||||
|
let deletedCount = 0;
|
||||||
|
let offset = 0;
|
||||||
|
const batchSize = 1000;
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
const searchResult = await index.search('', {
|
||||||
|
limit: batchSize,
|
||||||
|
offset: offset,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (searchResult.hits.length === 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
const idsToDelete = searchResult.hits.filter((hit) => !hit.user).map((hit) => hit.id);
|
||||||
|
|
||||||
|
if (idsToDelete.length > 0) {
|
||||||
|
logger.info(
|
||||||
|
`[indexSync] Deleting ${idsToDelete.length} documents without user field from ${indexName} index`,
|
||||||
|
);
|
||||||
|
await index.deleteDocuments(idsToDelete);
|
||||||
|
deletedCount += idsToDelete.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (searchResult.hits.length < batchSize) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
offset += batchSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (deletedCount > 0) {
|
||||||
|
logger.info(`[indexSync] Deleted ${deletedCount} orphaned documents from ${indexName} index`);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[indexSync] Error deleting documents from ${indexName}:`, error);
|
||||||
|
}
|
||||||
|
|
||||||
|
return deletedCount;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ensures indexes have proper filterable attributes configured and checks if documents have user field
|
* Ensures indexes have proper filterable attributes configured and checks if documents have user field
|
||||||
* @param {MeiliSearch} client - MeiliSearch client instance
|
* @param {MeiliSearch} client - MeiliSearch client instance
|
||||||
* @returns {Promise<boolean>} - true if configuration was updated or re-sync is needed
|
* @returns {Promise<{settingsUpdated: boolean, orphanedDocsFound: boolean}>} - Status of what was done
|
||||||
*/
|
*/
|
||||||
async function ensureFilterableAttributes(client) {
|
async function ensureFilterableAttributes(client) {
|
||||||
|
let settingsUpdated = false;
|
||||||
|
let hasOrphanedDocs = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Check and update messages index
|
// Check and update messages index
|
||||||
try {
|
try {
|
||||||
|
|
@ -47,16 +99,17 @@ async function ensureFilterableAttributes(client) {
|
||||||
filterableAttributes: ['user'],
|
filterableAttributes: ['user'],
|
||||||
});
|
});
|
||||||
logger.info('[indexSync] Messages index configured for user filtering');
|
logger.info('[indexSync] Messages index configured for user filtering');
|
||||||
logger.info('[indexSync] Index configuration updated. Full re-sync will be triggered.');
|
settingsUpdated = true;
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if existing documents have user field indexed
|
// Check if existing documents have user field indexed
|
||||||
try {
|
try {
|
||||||
const searchResult = await messagesIndex.search('', { limit: 1 });
|
const searchResult = await messagesIndex.search('', { limit: 1 });
|
||||||
if (searchResult.hits.length > 0 && !searchResult.hits[0].user) {
|
if (searchResult.hits.length > 0 && !searchResult.hits[0].user) {
|
||||||
logger.info('[indexSync] Existing messages missing user field, re-sync needed');
|
logger.info(
|
||||||
return true;
|
'[indexSync] Existing messages missing user field, will clean up orphaned documents...',
|
||||||
|
);
|
||||||
|
hasOrphanedDocs = true;
|
||||||
}
|
}
|
||||||
} catch (searchError) {
|
} catch (searchError) {
|
||||||
logger.debug('[indexSync] Could not check message documents:', searchError.message);
|
logger.debug('[indexSync] Could not check message documents:', searchError.message);
|
||||||
|
|
@ -78,16 +131,17 @@ async function ensureFilterableAttributes(client) {
|
||||||
filterableAttributes: ['user'],
|
filterableAttributes: ['user'],
|
||||||
});
|
});
|
||||||
logger.info('[indexSync] Convos index configured for user filtering');
|
logger.info('[indexSync] Convos index configured for user filtering');
|
||||||
logger.info('[indexSync] Index configuration updated. Full re-sync will be triggered.');
|
settingsUpdated = true;
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if existing documents have user field indexed
|
// Check if existing documents have user field indexed
|
||||||
try {
|
try {
|
||||||
const searchResult = await convosIndex.search('', { limit: 1 });
|
const searchResult = await convosIndex.search('', { limit: 1 });
|
||||||
if (searchResult.hits.length > 0 && !searchResult.hits[0].user) {
|
if (searchResult.hits.length > 0 && !searchResult.hits[0].user) {
|
||||||
logger.info('[indexSync] Existing conversations missing user field, re-sync needed');
|
logger.info(
|
||||||
return true;
|
'[indexSync] Existing conversations missing user field, will clean up orphaned documents...',
|
||||||
|
);
|
||||||
|
hasOrphanedDocs = true;
|
||||||
}
|
}
|
||||||
} catch (searchError) {
|
} catch (searchError) {
|
||||||
logger.debug('[indexSync] Could not check conversation documents:', searchError.message);
|
logger.debug('[indexSync] Could not check conversation documents:', searchError.message);
|
||||||
|
|
@ -97,11 +151,34 @@ async function ensureFilterableAttributes(client) {
|
||||||
logger.warn('[indexSync] Could not check/update convos index settings:', error.message);
|
logger.warn('[indexSync] Could not check/update convos index settings:', error.message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If either index has orphaned documents, clean them up (but don't force resync)
|
||||||
|
if (hasOrphanedDocs) {
|
||||||
|
try {
|
||||||
|
const messagesIndex = client.index('messages');
|
||||||
|
await deleteDocumentsWithoutUserField(messagesIndex, 'messages');
|
||||||
|
} catch (error) {
|
||||||
|
logger.debug('[indexSync] Could not clean up messages:', error.message);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const convosIndex = client.index('convos');
|
||||||
|
await deleteDocumentsWithoutUserField(convosIndex, 'convos');
|
||||||
|
} catch (error) {
|
||||||
|
logger.debug('[indexSync] Could not clean up convos:', error.message);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info('[indexSync] Orphaned documents cleaned up without forcing resync.');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (settingsUpdated) {
|
||||||
|
logger.info('[indexSync] Index settings updated. Full re-sync will be triggered.');
|
||||||
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('[indexSync] Error ensuring filterable attributes:', error);
|
logger.error('[indexSync] Error ensuring filterable attributes:', error);
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return { settingsUpdated, orphanedDocsFound: hasOrphanedDocs };
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -121,14 +198,17 @@ async function performSync() {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Ensures indexes have proper filterable attributes configured */
|
/** Ensures indexes have proper filterable attributes configured */
|
||||||
const configUpdated = await ensureFilterableAttributes(client);
|
const { settingsUpdated, orphanedDocsFound: _orphanedDocsFound } =
|
||||||
|
await ensureFilterableAttributes(client);
|
||||||
|
|
||||||
let messagesSync = false;
|
let messagesSync = false;
|
||||||
let convosSync = false;
|
let convosSync = false;
|
||||||
|
|
||||||
// If configuration was just updated or documents are missing user field, force a full re-sync
|
// Only reset flags if settings were actually updated (not just for orphaned doc cleanup)
|
||||||
if (configUpdated) {
|
if (settingsUpdated) {
|
||||||
logger.info('[indexSync] Forcing full re-sync to ensure user field is properly indexed...');
|
logger.info(
|
||||||
|
'[indexSync] Settings updated. Forcing full re-sync to reindex with new configuration...',
|
||||||
|
);
|
||||||
|
|
||||||
// Reset sync flags to force full re-sync
|
// Reset sync flags to force full re-sync
|
||||||
await Message.collection.updateMany({ _meiliIndex: true }, { $set: { _meiliIndex: false } });
|
await Message.collection.updateMany({ _meiliIndex: true }, { $set: { _meiliIndex: false } });
|
||||||
|
|
@ -140,7 +220,7 @@ async function performSync() {
|
||||||
|
|
||||||
// Check if we need to sync messages
|
// Check if we need to sync messages
|
||||||
const messageProgress = await Message.getSyncProgress();
|
const messageProgress = await Message.getSyncProgress();
|
||||||
if (!messageProgress.isComplete || configUpdated) {
|
if (!messageProgress.isComplete || settingsUpdated) {
|
||||||
logger.info(
|
logger.info(
|
||||||
`[indexSync] Messages need syncing: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments} indexed`,
|
`[indexSync] Messages need syncing: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments} indexed`,
|
||||||
);
|
);
|
||||||
|
|
@ -167,7 +247,7 @@ async function performSync() {
|
||||||
|
|
||||||
// Check if we need to sync conversations
|
// Check if we need to sync conversations
|
||||||
const convoProgress = await Conversation.getSyncProgress();
|
const convoProgress = await Conversation.getSyncProgress();
|
||||||
if (!convoProgress.isComplete || configUpdated) {
|
if (!convoProgress.isComplete || settingsUpdated) {
|
||||||
logger.info(
|
logger.info(
|
||||||
`[indexSync] Conversations need syncing: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments} indexed`,
|
`[indexSync] Conversations need syncing: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments} indexed`,
|
||||||
);
|
);
|
||||||
|
|
@ -204,7 +284,6 @@ async function indexSync() {
|
||||||
|
|
||||||
logger.info('[indexSync] Starting index synchronization check...');
|
logger.info('[indexSync] Starting index synchronization check...');
|
||||||
|
|
||||||
try {
|
|
||||||
// Get or create FlowStateManager instance
|
// Get or create FlowStateManager instance
|
||||||
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
const flowsCache = getLogStores(CacheKeys.FLOWS);
|
||||||
if (!flowsCache) {
|
if (!flowsCache) {
|
||||||
|
|
@ -220,6 +299,7 @@ async function indexSync() {
|
||||||
const flowId = 'meili-index-sync';
|
const flowId = 'meili-index-sync';
|
||||||
const flowType = 'MEILI_SYNC';
|
const flowType = 'MEILI_SYNC';
|
||||||
|
|
||||||
|
try {
|
||||||
// This will only execute the handler if no other instance is running the sync
|
// This will only execute the handler if no other instance is running the sync
|
||||||
const result = await flowManager.createFlowWithHandler(flowId, flowType, performSync);
|
const result = await flowManager.createFlowWithHandler(flowId, flowType, performSync);
|
||||||
|
|
||||||
|
|
@ -251,6 +331,18 @@ async function indexSync() {
|
||||||
} else {
|
} else {
|
||||||
logger.error('[indexSync] error', err);
|
logger.error('[indexSync] error', err);
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
// Always clean up flow state after completion or error
|
||||||
|
// Skip cleanup only if flow already exists (another instance is handling it)
|
||||||
|
try {
|
||||||
|
const flowState = await flowManager.getFlowState(flowId, flowType);
|
||||||
|
if (flowState) {
|
||||||
|
await flowManager.deleteFlow(flowId, flowType);
|
||||||
|
logger.debug('[indexSync] Flow state cleaned up');
|
||||||
|
}
|
||||||
|
} catch (cleanupErr) {
|
||||||
|
logger.debug('[indexSync] Could not clean up flow state:', cleanupErr.message);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue