From 72cd159a373fb8c05e41dbf62ef2e174799feeae Mon Sep 17 00:00:00 2001 From: Danny Avila Date: Fri, 20 Jun 2025 18:05:19 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=83=20refactor:=20Optimize=20MeiliSear?= =?UTF-8?q?ch=20Sync=20Processing=20and=20Tracking=20(#7994)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/db/indexSync.js | 137 ++++-- .../src/models/plugins/mongoMeili.ts | 397 +++++++++++++----- 2 files changed, 404 insertions(+), 130 deletions(-) diff --git a/api/db/indexSync.js b/api/db/indexSync.js index e8bcd55e37..945346a906 100644 --- a/api/db/indexSync.js +++ b/api/db/indexSync.js @@ -1,8 +1,11 @@ const mongoose = require('mongoose'); const { MeiliSearch } = require('meilisearch'); const { logger } = require('@librechat/data-schemas'); +const { FlowStateManager } = require('@librechat/api'); +const { CacheKeys } = require('librechat-data-provider'); const { isEnabled } = require('~/server/utils'); +const { getLogStores } = require('~/cache'); const Conversation = mongoose.models.Conversation; const Message = mongoose.models.Message; @@ -28,43 +31,123 @@ class MeiliSearchClient { } } +/** + * Performs the actual sync operations for messages and conversations + */ +async function performSync() { + const client = MeiliSearchClient.getInstance(); + + const { status } = await client.health(); + if (status !== 'available') { + throw new Error('Meilisearch not available'); + } + + if (indexingDisabled === true) { + logger.info('[indexSync] Indexing is disabled, skipping...'); + return { messagesSync: false, convosSync: false }; + } + + let messagesSync = false; + let convosSync = false; + + // Check if we need to sync messages + const messageProgress = await Message.getSyncProgress(); + if (!messageProgress.isComplete) { + logger.info( + `[indexSync] Messages need syncing: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments} indexed`, + ); + + // Check if we should do a full sync or incremental + const messageCount = await Message.countDocuments(); + const messagesIndexed = messageProgress.totalProcessed; + const syncThreshold = parseInt(process.env.MEILI_SYNC_THRESHOLD || '1000', 10); + + if (messageCount - messagesIndexed > syncThreshold) { + logger.info('[indexSync] Starting full message sync due to large difference'); + await Message.syncWithMeili(); + messagesSync = true; + } else if (messageCount !== messagesIndexed) { + logger.warn('[indexSync] Messages out of sync, performing incremental sync'); + await Message.syncWithMeili(); + messagesSync = true; + } + } else { + logger.info( + `[indexSync] Messages are fully synced: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments}`, + ); + } + + // Check if we need to sync conversations + const convoProgress = await Conversation.getSyncProgress(); + if (!convoProgress.isComplete) { + logger.info( + `[indexSync] Conversations need syncing: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments} indexed`, + ); + + const convoCount = await Conversation.countDocuments(); + const convosIndexed = convoProgress.totalProcessed; + const syncThreshold = parseInt(process.env.MEILI_SYNC_THRESHOLD || '1000', 10); + + if (convoCount - convosIndexed > syncThreshold) { + logger.info('[indexSync] Starting full conversation sync due to large difference'); + await Conversation.syncWithMeili(); + convosSync = true; + } else if (convoCount !== convosIndexed) { + logger.warn('[indexSync] Convos out of sync, performing incremental sync'); + await Conversation.syncWithMeili(); + convosSync = true; + } + } else { + logger.info( + `[indexSync] Conversations are fully synced: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments}`, + ); + } + + return { messagesSync, convosSync }; +} + +/** + * Main index sync function that uses FlowStateManager to prevent concurrent execution + */ async function indexSync() { if (!searchEnabled) { return; } - try { - const client = MeiliSearchClient.getInstance(); - const { status } = await client.health(); - if (status !== 'available') { - throw new Error('Meilisearch not available'); + logger.info('[indexSync] Starting index synchronization check...'); + + try { + // Get or create FlowStateManager instance + const flowsCache = getLogStores(CacheKeys.FLOWS); + if (!flowsCache) { + logger.warn('[indexSync] Flows cache not available, falling back to direct sync'); + return await performSync(); } - if (indexingDisabled === true) { - logger.info('[indexSync] Indexing is disabled, skipping...'); + const flowManager = new FlowStateManager(flowsCache, { + ttl: 60000 * 10, // 10 minutes TTL for sync operations + }); + + // Use a unique flow ID for the sync operation + const flowId = 'meili-index-sync'; + const flowType = 'MEILI_SYNC'; + + // This will only execute the handler if no other instance is running the sync + const result = await flowManager.createFlowWithHandler(flowId, flowType, performSync); + + if (result.messagesSync || result.convosSync) { + logger.info('[indexSync] Sync completed successfully'); + } else { + logger.debug('[indexSync] No sync was needed'); + } + + return result; + } catch (err) { + if (err.message.includes('flow already exists')) { + logger.info('[indexSync] Sync already running on another instance'); return; } - const messageCount = await Message.countDocuments(); - const convoCount = await Conversation.countDocuments(); - const messages = await client.index('messages').getStats(); - const convos = await client.index('convos').getStats(); - const messagesIndexed = messages.numberOfDocuments; - const convosIndexed = convos.numberOfDocuments; - - logger.debug(`[indexSync] There are ${messageCount} messages and ${messagesIndexed} indexed`); - logger.debug(`[indexSync] There are ${convoCount} convos and ${convosIndexed} indexed`); - - if (messageCount !== messagesIndexed) { - logger.debug('[indexSync] Messages out of sync, indexing'); - Message.syncWithMeili(); - } - - if (convoCount !== convosIndexed) { - logger.debug('[indexSync] Convos out of sync, indexing'); - Conversation.syncWithMeili(); - } - } catch (err) { if (err.message.includes('not found')) { logger.debug('[indexSync] Creating indices...'); currentTimeout = setTimeout(async () => { diff --git a/packages/data-schemas/src/models/plugins/mongoMeili.ts b/packages/data-schemas/src/models/plugins/mongoMeili.ts index d44dfd806b..84a8c7efe8 100644 --- a/packages/data-schemas/src/models/plugins/mongoMeili.ts +++ b/packages/data-schemas/src/models/plugins/mongoMeili.ts @@ -19,6 +19,8 @@ interface MongoMeiliOptions { indexName: string; primaryKey: string; mongoose: typeof import('mongoose'); + syncBatchSize?: number; + syncDelayMs?: number; } interface MeiliIndexable { @@ -31,6 +33,13 @@ interface ContentItem { text?: string; } +interface SyncProgress { + lastSyncedId?: string; + totalProcessed: number; + totalDocuments: number; + isComplete: boolean; +} + interface _DocumentWithMeiliIndex extends Document { _meiliIndex?: boolean; preprocessObjectForIndex?: () => Record; @@ -45,7 +54,24 @@ interface _DocumentWithMeiliIndex extends Document { export type DocumentWithMeiliIndex = _DocumentWithMeiliIndex & IConversation & Partial; export interface SchemaWithMeiliMethods extends Model { - syncWithMeili(): Promise; + syncWithMeili(options?: { resumeFromId?: string }): Promise; + getSyncProgress(): Promise; + processSyncBatch( + index: Index, + documents: Array>, + updateOps: Array<{ + updateOne: { + filter: Record; + update: { $set: { _meiliIndex: boolean } }; + }; + }>, + ): Promise; + cleanupMeiliIndex( + index: Index, + primaryKey: string, + batchSize: number, + delayMs: number, + ): Promise; setMeiliIndexSettings(settings: Record): Promise; meiliSearch( q: string, @@ -66,6 +92,14 @@ const searchEnabled = process.env.SEARCH != null && process.env.SEARCH.toLowerCa const meiliEnabled = process.env.MEILI_HOST != null && process.env.MEILI_MASTER_KEY != null && searchEnabled; +/** + * Get sync configuration from environment variables + */ +const getSyncConfig = () => ({ + batchSize: parseInt(process.env.MEILI_SYNC_BATCH_SIZE || '100', 10), + delayMs: parseInt(process.env.MEILI_SYNC_DELAY_MS || '100', 10), +}); + /** * Local implementation of parseTextParts to avoid dependency on librechat-data-provider * Extracts text content from an array of content items @@ -101,6 +135,26 @@ const validateOptions = (options: Partial): void => { }); }; +/** + * Helper function to process documents in batches with rate limiting + */ +const processBatch = async ( + items: T[], + batchSize: number, + delayMs: number, + processor: (batch: T[]) => Promise, +): Promise => { + for (let i = 0; i < items.length; i += batchSize) { + const batch = items.slice(i, i + batchSize); + await processor(batch); + + // Add delay between batches to prevent overwhelming resources + if (i + batchSize < items.length && delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + } +}; + /** * Factory function to create a MeiliMongooseModel class which extends a Mongoose model. * This class contains static and instance methods to synchronize and manage the MeiliSearch index @@ -109,127 +163,213 @@ const validateOptions = (options: Partial): void => { * @param config - Configuration object. * @param config.index - The MeiliSearch index object. * @param config.attributesToIndex - List of attributes to index. + * @param config.syncOptions - Sync configuration options. * @returns A class definition that will be loaded into the Mongoose schema. */ const createMeiliMongooseModel = ({ index, attributesToIndex, + syncOptions, }: { index: Index; attributesToIndex: string[]; + syncOptions: { batchSize: number; delayMs: number }; }) => { const primaryKey = attributesToIndex[0]; + const syncConfig = { ...getSyncConfig(), ...syncOptions }; class MeiliMongooseModel { /** - * Synchronizes the data between the MongoDB collection and the MeiliSearch index. - * - * The synchronization process involves: - * 1. Fetching all documents from the MongoDB collection and MeiliSearch index. - * 2. Comparing documents from both sources. - * 3. Deleting documents from MeiliSearch that no longer exist in MongoDB. - * 4. Adding documents to MeiliSearch that exist in MongoDB but not in the index. - * 5. Updating documents in MeiliSearch if key fields (such as `text` or `title`) differ. - * 6. Updating the `_meiliIndex` field in MongoDB to indicate the indexing status. - * - * Note: The function processes documents in batches because MeiliSearch's - * `index.getDocuments` requires an exact limit and `index.addDocuments` does not handle - * partial failures in a batch. - * - * @returns {Promise} Resolves when the synchronization is complete. + * Get the current sync progress */ - static async syncWithMeili(this: SchemaWithMeiliMethods): Promise { + static async getSyncProgress(this: SchemaWithMeiliMethods): Promise { + const totalDocuments = await this.countDocuments(); + const indexedDocuments = await this.countDocuments({ _meiliIndex: true }); + + return { + totalProcessed: indexedDocuments, + totalDocuments, + isComplete: indexedDocuments === totalDocuments, + }; + } + + /** + * Synchronizes the data between the MongoDB collection and the MeiliSearch index. + * Now uses streaming and batching to reduce memory usage. + */ + static async syncWithMeili( + this: SchemaWithMeiliMethods, + options?: { resumeFromId?: string }, + ): Promise { try { - let moreDocuments = true; - const mongoDocuments = await this.find().lean(); + const startTime = Date.now(); + const { batchSize, delayMs } = syncConfig; + + logger.info( + `[syncWithMeili] Starting sync for ${primaryKey === 'messageId' ? 'messages' : 'conversations'} with batch size ${batchSize}`, + ); + + // Build query with resume capability + const query: FilterQuery = {}; + if (options?.resumeFromId) { + query._id = { $gt: options.resumeFromId }; + } + + // Get total count for progress tracking + const totalCount = await this.countDocuments(query); + let processedCount = 0; + + // First, handle documents that need to be removed from Meili + await this.cleanupMeiliIndex(index, primaryKey, batchSize, delayMs); + + // Process MongoDB documents in batches using cursor + const cursor = this.find(query) + .select(attributesToIndex.join(' ') + ' _meiliIndex') + .sort({ _id: 1 }) + .batchSize(batchSize) + .cursor(); const format = (doc: Record) => _.omitBy(_.pick(doc, attributesToIndex), (v, k) => k.startsWith('$')); - const mongoMap = new Map( - mongoDocuments.map((doc) => { - const typedDoc = doc as Record; - return [typedDoc[primaryKey], format(typedDoc)]; - }), - ); - const indexMap = new Map>(); - let offset = 0; - const batchSize = 1000; - - while (moreDocuments) { - const batch = await index.getDocuments({ limit: batchSize, offset }); - if (batch.results.length === 0) { - moreDocuments = false; - } - for (const doc of batch.results) { - indexMap.set(doc[primaryKey], format(doc)); - } - offset += batchSize; - } - - logger.debug('[syncWithMeili]', { indexMap: indexMap.size, mongoMap: mongoMap.size }); - - const updateOps: Array<{ + let documentBatch: Array> = []; + let updateOps: Array<{ updateOne: { filter: Record; update: { $set: { _meiliIndex: boolean } }; }; }> = []; - // Process documents present in the MeiliSearch index - for (const [id, doc] of indexMap) { - const update: Record = {}; - update[primaryKey] = id; - if (mongoMap.has(id)) { - const mongoDoc = mongoMap.get(id); - if ( - (doc.text && doc.text !== mongoDoc?.text) || - (doc.title && doc.title !== mongoDoc?.title) - ) { - logger.debug( - `[syncWithMeili] ${id} had document discrepancy in ${ - doc.text ? 'text' : 'title' - } field`, - ); - updateOps.push({ - updateOne: { filter: update, update: { $set: { _meiliIndex: true } } }, - }); - await index.addDocuments([doc]); + // Process documents in streaming fashion + for await (const doc of cursor) { + const typedDoc = doc.toObject() as unknown as Record; + const formatted = format(typedDoc); + + // Check if document needs indexing + if (!typedDoc._meiliIndex) { + documentBatch.push(formatted); + updateOps.push({ + updateOne: { + filter: { _id: typedDoc._id }, + update: { $set: { _meiliIndex: true } }, + }, + }); + } + + processedCount++; + + // Process batch when it reaches the configured size + if (documentBatch.length >= batchSize) { + await this.processSyncBatch(index, documentBatch, updateOps); + documentBatch = []; + updateOps = []; + + // Log progress + const progress = Math.round((processedCount / totalCount) * 100); + logger.info(`[syncWithMeili] Progress: ${progress}% (${processedCount}/${totalCount})`); + + // Add delay to prevent overwhelming resources + if (delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); } - } else { - await index.deleteDocument(id as string); - updateOps.push({ - updateOne: { filter: update, update: { $set: { _meiliIndex: false } } }, - }); } } - // Process documents present in MongoDB - for (const [id, doc] of mongoMap) { - const update: Record = {}; - update[primaryKey] = id; - if (!indexMap.has(id)) { - await index.addDocuments([doc]); - updateOps.push({ - updateOne: { filter: update, update: { $set: { _meiliIndex: true } } }, - }); - } else if (doc._meiliIndex === false) { - updateOps.push({ - updateOne: { filter: update, update: { $set: { _meiliIndex: true } } }, - }); - } + // Process remaining documents + if (documentBatch.length > 0) { + await this.processSyncBatch(index, documentBatch, updateOps); } + const duration = Date.now() - startTime; + logger.info( + `[syncWithMeili] Completed sync for ${primaryKey === 'messageId' ? 'messages' : 'conversations'} in ${duration}ms`, + ); + } catch (error) { + logger.error('[syncWithMeili] Error during sync:', error); + throw error; + } + } + + /** + * Process a batch of documents for syncing + */ + static async processSyncBatch( + this: SchemaWithMeiliMethods, + index: Index, + documents: Array>, + updateOps: Array<{ + updateOne: { + filter: Record; + update: { $set: { _meiliIndex: boolean } }; + }; + }>, + ): Promise { + if (documents.length === 0) { + return; + } + + try { + // Add documents to MeiliSearch + await index.addDocuments(documents); + + // Update MongoDB to mark documents as indexed if (updateOps.length > 0) { await this.collection.bulkWrite(updateOps); - logger.debug( - `[syncWithMeili] Finished indexing ${ - primaryKey === 'messageId' ? 'messages' : 'conversations' - }`, - ); } } catch (error) { - logger.error('[syncWithMeili] Error adding document to Meili:', error); + logger.error('[processSyncBatch] Error processing batch:', error); + // Don't throw - allow sync to continue with other documents + } + } + + /** + * Clean up documents in MeiliSearch that no longer exist in MongoDB + */ + static async cleanupMeiliIndex( + this: SchemaWithMeiliMethods, + index: Index, + primaryKey: string, + batchSize: number, + delayMs: number, + ): Promise { + try { + let offset = 0; + let moreDocuments = true; + + while (moreDocuments) { + const batch = await index.getDocuments({ limit: batchSize, offset }); + if (batch.results.length === 0) { + moreDocuments = false; + break; + } + + const meiliIds = batch.results.map((doc) => doc[primaryKey]); + const query: Record = {}; + query[primaryKey] = { $in: meiliIds }; + + // Find which documents exist in MongoDB + const existingDocs = await this.find(query).select(primaryKey).lean(); + + const existingIds = new Set( + existingDocs.map((doc: Record) => doc[primaryKey]), + ); + + // Delete documents that don't exist in MongoDB + const toDelete = meiliIds.filter((id) => !existingIds.has(id)); + if (toDelete.length > 0) { + await Promise.all(toDelete.map((id) => index.deleteDocument(id as string))); + logger.debug(`[cleanupMeiliIndex] Deleted ${toDelete.length} orphaned documents`); + } + + offset += batchSize; + + // Add delay between batches + if (delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + } + } catch (error) { + logger.error('[cleanupMeiliIndex] Error during cleanup:', error); } } @@ -313,18 +453,29 @@ const createMeiliMongooseModel = ({ } /** - * Adds the current document to the MeiliSearch index + * Adds the current document to the MeiliSearch index with retry logic */ async addObjectToMeili( this: DocumentWithMeiliIndex, next: CallbackWithoutResultAndOptionalError, ): Promise { const object = this.preprocessObjectForIndex!(); - try { - await index.addDocuments([object]); - } catch (error) { - logger.error('[addObjectToMeili] Error adding document to Meili:', error); - return next(); + const maxRetries = 3; + let retryCount = 0; + + while (retryCount < maxRetries) { + try { + await index.addDocuments([object]); + break; + } catch (error) { + retryCount++; + if (retryCount >= maxRetries) { + logger.error('[addObjectToMeili] Error adding document to Meili after retries:', error); + return next(); + } + // Exponential backoff + await new Promise((resolve) => setTimeout(resolve, Math.pow(2, retryCount) * 1000)); + } } try { @@ -445,6 +596,8 @@ const createMeiliMongooseModel = ({ * @param options.apiKey - The MeiliSearch API key. * @param options.indexName - The name of the MeiliSearch index. * @param options.primaryKey - The primary key field for indexing. + * @param options.syncBatchSize - Batch size for sync operations. + * @param options.syncDelayMs - Delay between batches in milliseconds. */ export default function mongoMeili(schema: Schema, options: MongoMeiliOptions): void { const mongoose = options.mongoose; @@ -461,11 +614,38 @@ export default function mongoMeili(schema: Schema, options: MongoMeiliOptions): }); const { host, apiKey, indexName, primaryKey } = options; + const syncOptions = { + batchSize: options.syncBatchSize || getSyncConfig().batchSize, + delayMs: options.syncDelayMs || getSyncConfig().delayMs, + }; const client = new MeiliSearch({ host, apiKey }); - client.createIndex(indexName, { primaryKey }); + + /** Create index only if it doesn't exist */ const index = client.index(indexName); + // Check if index exists and create if needed + (async () => { + try { + await index.getRawInfo(); + logger.debug(`[mongoMeili] Index ${indexName} already exists`); + } catch (error) { + const errorCode = (error as { code?: string })?.code; + if (errorCode === 'index_not_found') { + try { + logger.info(`[mongoMeili] Creating new index: ${indexName}`); + await client.createIndex(indexName, { primaryKey }); + logger.info(`[mongoMeili] Successfully created index: ${indexName}`); + } catch (createError) { + // Index might have been created by another instance + logger.debug(`[mongoMeili] Index ${indexName} may already exist:`, createError); + } + } else { + logger.error(`[mongoMeili] Error checking index ${indexName}:`, error); + } + } + })(); + // Collect attributes from the schema that should be indexed const attributesToIndex: string[] = [ ...Object.entries(schema.obj).reduce((results, [key, value]) => { @@ -474,7 +654,7 @@ export default function mongoMeili(schema: Schema, options: MongoMeiliOptions): }, []), ]; - schema.loadClass(createMeiliMongooseModel({ index, attributesToIndex })); + schema.loadClass(createMeiliMongooseModel({ index, attributesToIndex, syncOptions })); // Register Mongoose hooks schema.post('save', function (doc: DocumentWithMeiliIndex, next) { @@ -497,17 +677,23 @@ export default function mongoMeili(schema: Schema, options: MongoMeiliOptions): try { const conditions = (this as Query).getQuery(); + const { batchSize, delayMs } = syncOptions; if (Object.prototype.hasOwnProperty.call(schema.obj, 'messages')) { const convoIndex = client.index('convos'); const deletedConvos = await mongoose .model('Conversation') .find(conditions as FilterQuery) + .select('conversationId') .lean(); - const promises = deletedConvos.map((convo: Record) => - convoIndex.deleteDocument(convo.conversationId as string), - ); - await Promise.all(promises); + + // Process deletions in batches + await processBatch(deletedConvos, batchSize, delayMs, async (batch) => { + const promises = batch.map((convo: Record) => + convoIndex.deleteDocument(convo.conversationId as string), + ); + await Promise.all(promises); + }); } if (Object.prototype.hasOwnProperty.call(schema.obj, 'messageId')) { @@ -515,11 +701,16 @@ export default function mongoMeili(schema: Schema, options: MongoMeiliOptions): const deletedMessages = await mongoose .model('Message') .find(conditions as FilterQuery) + .select('messageId') .lean(); - const promises = deletedMessages.map((message: Record) => - messageIndex.deleteDocument(message.messageId as string), - ); - await Promise.all(promises); + + // Process deletions in batches + await processBatch(deletedMessages, batchSize, delayMs, async (batch) => { + const promises = batch.map((message: Record) => + messageIndex.deleteDocument(message.messageId as string), + ); + await Promise.all(promises); + }); } return next(); } catch (error) {