diff --git a/packages/data-schemas/src/models/plugins/mongoMeili.spec.ts b/packages/data-schemas/src/models/plugins/mongoMeili.spec.ts index 8f4ee87aaf..25e8d54cc1 100644 --- a/packages/data-schemas/src/models/plugins/mongoMeili.spec.ts +++ b/packages/data-schemas/src/models/plugins/mongoMeili.spec.ts @@ -6,10 +6,20 @@ import { createMessageModel } from '~/models/message'; import { SchemaWithMeiliMethods } from '~/models/plugins/mongoMeili'; const mockAddDocuments = jest.fn(); +const mockAddDocumentsInBatches = jest.fn(); +const mockUpdateDocuments = jest.fn(); +const mockDeleteDocument = jest.fn(); +const mockDeleteDocuments = jest.fn(); +const mockGetDocument = jest.fn(); const mockIndex = jest.fn().mockReturnValue({ getRawInfo: jest.fn(), updateSettings: jest.fn(), addDocuments: mockAddDocuments, + addDocumentsInBatches: mockAddDocumentsInBatches, + updateDocuments: mockUpdateDocuments, + deleteDocument: mockDeleteDocument, + deleteDocuments: mockDeleteDocuments, + getDocument: mockGetDocument, getDocuments: jest.fn().mockReturnValue({ results: [] }), }); jest.mock('meilisearch', () => { @@ -42,6 +52,11 @@ describe('Meilisearch Mongoose plugin', () => { beforeEach(() => { mockAddDocuments.mockClear(); + mockAddDocumentsInBatches.mockClear(); + mockUpdateDocuments.mockClear(); + mockDeleteDocument.mockClear(); + mockDeleteDocuments.mockClear(); + mockGetDocument.mockClear(); }); afterAll(async () => { @@ -264,4 +279,350 @@ describe('Meilisearch Mongoose plugin', () => { expect(indexedCount).toBe(2); }); }); + + describe('New batch processing and retry functionality', () => { + test('processSyncBatch uses addDocumentsInBatches', async () => { + const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods; + await conversationModel.deleteMany({}); + mockAddDocumentsInBatches.mockClear(); + mockAddDocuments.mockClear(); + + await conversationModel.collection.insertOne({ + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + title: 'Test Conversation', + endpoint: EModelEndpoint.openAI, + _meiliIndex: false, + expiredAt: null, + }); + + // Run sync which should call processSyncBatch internally + await conversationModel.syncWithMeili(); + + // Verify addDocumentsInBatches was called (new batch method) + expect(mockAddDocumentsInBatches).toHaveBeenCalled(); + }); + + test('addObjectToMeili retries on failure', async () => { + const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods; + + // Mock addDocuments to fail twice then succeed + mockAddDocuments + .mockRejectedValueOnce(new Error('Network error')) + .mockRejectedValueOnce(new Error('Network error')) + .mockResolvedValueOnce({}); + + // Create a document which triggers addObjectToMeili + await conversationModel.create({ + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + title: 'Test Retry', + endpoint: EModelEndpoint.openAI, + }); + + // Wait for async operations to complete + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Verify addDocuments was called multiple times due to retries + expect(mockAddDocuments).toHaveBeenCalledTimes(3); + }); + + test('getSyncProgress returns accurate progress information', async () => { + const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods; + await conversationModel.deleteMany({}); + + // Insert documents directly to control the _meiliIndex flag + await conversationModel.collection.insertOne({ + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + title: 'Indexed', + endpoint: EModelEndpoint.openAI, + _meiliIndex: true, + expiredAt: null, + }); + + await conversationModel.collection.insertOne({ + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + title: 'Not Indexed', + endpoint: EModelEndpoint.openAI, + _meiliIndex: false, + expiredAt: null, + }); + + const progress = await conversationModel.getSyncProgress(); + + expect(progress.totalDocuments).toBe(2); + expect(progress.totalProcessed).toBe(1); + expect(progress.isComplete).toBe(false); + }); + + test('getSyncProgress excludes TTL documents from counts', async () => { + const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods; + await conversationModel.deleteMany({}); + + // Insert syncable documents (expiredAt: null) + await conversationModel.collection.insertOne({ + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + title: 'Syncable Indexed', + endpoint: EModelEndpoint.openAI, + _meiliIndex: true, + expiredAt: null, + }); + + await conversationModel.collection.insertOne({ + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + title: 'Syncable Not Indexed', + endpoint: EModelEndpoint.openAI, + _meiliIndex: false, + expiredAt: null, + }); + + // Insert TTL documents (expiredAt set) - these should NOT be counted + await conversationModel.collection.insertOne({ + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + title: 'TTL Document 1', + endpoint: EModelEndpoint.openAI, + _meiliIndex: true, + expiredAt: new Date(), + }); + + await conversationModel.collection.insertOne({ + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + title: 'TTL Document 2', + endpoint: EModelEndpoint.openAI, + _meiliIndex: false, + expiredAt: new Date(), + }); + + const progress = await conversationModel.getSyncProgress(); + + // Only syncable documents should be counted (2 total, 1 indexed) + expect(progress.totalDocuments).toBe(2); + expect(progress.totalProcessed).toBe(1); + expect(progress.isComplete).toBe(false); + }); + + test('getSyncProgress shows completion when all syncable documents are indexed', async () => { + const messageModel = createMessageModel(mongoose) as SchemaWithMeiliMethods; + await messageModel.deleteMany({}); + + // All syncable documents are indexed + await messageModel.collection.insertOne({ + messageId: new mongoose.Types.ObjectId(), + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + isCreatedByUser: true, + _meiliIndex: true, + expiredAt: null, + }); + + await messageModel.collection.insertOne({ + messageId: new mongoose.Types.ObjectId(), + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + isCreatedByUser: false, + _meiliIndex: true, + expiredAt: null, + }); + + // Add TTL document - should not affect completion status + await messageModel.collection.insertOne({ + messageId: new mongoose.Types.ObjectId(), + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + isCreatedByUser: true, + _meiliIndex: false, + expiredAt: new Date(), + }); + + const progress = await messageModel.getSyncProgress(); + + expect(progress.totalDocuments).toBe(2); + expect(progress.totalProcessed).toBe(2); + expect(progress.isComplete).toBe(true); + }); + }); + + describe('Error handling in processSyncBatch', () => { + test('syncWithMeili fails when processSyncBatch encounters addDocumentsInBatches error', async () => { + const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods; + await conversationModel.deleteMany({}); + mockAddDocumentsInBatches.mockClear(); + + // Insert a document to sync + await conversationModel.collection.insertOne({ + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + title: 'Test Conversation', + endpoint: EModelEndpoint.openAI, + _meiliIndex: false, + expiredAt: null, + }); + + // Mock addDocumentsInBatches to fail + mockAddDocumentsInBatches.mockRejectedValueOnce(new Error('MeiliSearch connection error')); + + // Sync should throw the error + await expect(conversationModel.syncWithMeili()).rejects.toThrow( + 'MeiliSearch connection error', + ); + + // Verify the error was logged + expect(mockAddDocumentsInBatches).toHaveBeenCalled(); + + // Document should NOT be marked as indexed since sync failed + // Note: direct collection.insertOne doesn't set default values, so _meiliIndex may be undefined + const doc = await conversationModel.findOne({}); + expect(doc?._meiliIndex).not.toBe(true); + }); + + test('syncWithMeili fails when processSyncBatch encounters updateMany error', async () => { + const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods; + await conversationModel.deleteMany({}); + mockAddDocumentsInBatches.mockClear(); + + // Insert a document + await conversationModel.collection.insertOne({ + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + title: 'Test Conversation', + endpoint: EModelEndpoint.openAI, + _meiliIndex: false, + expiredAt: null, + }); + + // Mock addDocumentsInBatches to succeed but simulate updateMany failure + mockAddDocumentsInBatches.mockResolvedValueOnce({}); + + // Spy on updateMany and make it fail + const updateManySpy = jest + .spyOn(conversationModel, 'updateMany') + .mockRejectedValueOnce(new Error('Database connection error')); + + // Sync should throw the error + await expect(conversationModel.syncWithMeili()).rejects.toThrow('Database connection error'); + + expect(updateManySpy).toHaveBeenCalled(); + + // Restore original implementation + updateManySpy.mockRestore(); + }); + + test('processSyncBatch logs error and throws when addDocumentsInBatches fails', async () => { + const messageModel = createMessageModel(mongoose) as SchemaWithMeiliMethods; + await messageModel.deleteMany({}); + + mockAddDocumentsInBatches.mockRejectedValueOnce(new Error('Network timeout')); + + await messageModel.collection.insertOne({ + messageId: new mongoose.Types.ObjectId(), + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + isCreatedByUser: true, + _meiliIndex: false, + expiredAt: null, + }); + + const indexMock = mockIndex(); + const documents = await messageModel.find({ _meiliIndex: false }).lean(); + + // Should throw the error + await expect(messageModel.processSyncBatch(indexMock, documents)).rejects.toThrow( + 'Network timeout', + ); + + expect(mockAddDocumentsInBatches).toHaveBeenCalled(); + }); + + test('processSyncBatch handles empty document array gracefully', async () => { + const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods; + const indexMock = mockIndex(); + + // Should not throw with empty array + await expect(conversationModel.processSyncBatch(indexMock, [])).resolves.not.toThrow(); + + // Should not call addDocumentsInBatches + expect(mockAddDocumentsInBatches).not.toHaveBeenCalled(); + }); + + test('syncWithMeili stops processing when batch fails and does not process remaining documents', async () => { + const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods; + await conversationModel.deleteMany({}); + mockAddDocumentsInBatches.mockClear(); + + // Create multiple documents + for (let i = 0; i < 5; i++) { + await conversationModel.collection.insertOne({ + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + title: `Test Conversation ${i}`, + endpoint: EModelEndpoint.openAI, + _meiliIndex: false, + expiredAt: null, + }); + } + + // Mock addDocumentsInBatches to fail on first call + mockAddDocumentsInBatches.mockRejectedValueOnce(new Error('First batch failed')); + + // Sync should fail on the first batch + await expect(conversationModel.syncWithMeili()).rejects.toThrow('First batch failed'); + + // Should have attempted only once before failing + expect(mockAddDocumentsInBatches).toHaveBeenCalledTimes(1); + + // No documents should be indexed since sync failed + const indexedCount = await conversationModel.countDocuments({ _meiliIndex: true }); + expect(indexedCount).toBe(0); + }); + + test('error in processSyncBatch is properly logged before being thrown', async () => { + const messageModel = createMessageModel(mongoose) as SchemaWithMeiliMethods; + await messageModel.deleteMany({}); + + const testError = new Error('Test error for logging'); + mockAddDocumentsInBatches.mockRejectedValueOnce(testError); + + await messageModel.collection.insertOne({ + messageId: new mongoose.Types.ObjectId(), + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + isCreatedByUser: true, + _meiliIndex: false, + expiredAt: null, + }); + + const indexMock = mockIndex(); + const documents = await messageModel.find({ _meiliIndex: false }).lean(); + + // Should throw the same error that was passed to it + await expect(messageModel.processSyncBatch(indexMock, documents)).rejects.toThrow(testError); + }); + + test('syncWithMeili properly propagates processSyncBatch errors', async () => { + const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods; + await conversationModel.deleteMany({}); + mockAddDocumentsInBatches.mockClear(); + + await conversationModel.collection.insertOne({ + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + title: 'Test', + endpoint: EModelEndpoint.openAI, + _meiliIndex: false, + expiredAt: null, + }); + + const customError = new Error('Custom sync error'); + mockAddDocumentsInBatches.mockRejectedValueOnce(customError); + + // The error should propagate all the way up + await expect(conversationModel.syncWithMeili()).rejects.toThrow('Custom sync error'); + }); + }); }); diff --git a/packages/data-schemas/src/models/plugins/mongoMeili.ts b/packages/data-schemas/src/models/plugins/mongoMeili.ts index 548a7d2f1a..2551c35d99 100644 --- a/packages/data-schemas/src/models/plugins/mongoMeili.ts +++ b/packages/data-schemas/src/models/plugins/mongoMeili.ts @@ -50,17 +50,11 @@ interface _DocumentWithMeiliIndex extends Document { export type DocumentWithMeiliIndex = _DocumentWithMeiliIndex & IConversation & Partial; export interface SchemaWithMeiliMethods extends Model { - syncWithMeili(options?: { resumeFromId?: string }): Promise; + syncWithMeili(): Promise; getSyncProgress(): Promise; processSyncBatch( index: Index, documents: Array>, - updateOps: Array<{ - updateOne: { - filter: Record; - update: { $set: { _meiliIndex: boolean } }; - }; - }>, ): Promise; cleanupMeiliIndex( index: Index, @@ -156,8 +150,8 @@ const createMeiliMongooseModel = ({ * Get the current sync progress */ static async getSyncProgress(this: SchemaWithMeiliMethods): Promise { - const totalDocuments = await this.countDocuments(); - const indexedDocuments = await this.countDocuments({ _meiliIndex: true }); + const totalDocuments = await this.countDocuments({ expiredAt: null }); + const indexedDocuments = await this.countDocuments({ expiredAt: null, _meiliIndex: true }); return { totalProcessed: indexedDocuments, @@ -167,106 +161,79 @@ const createMeiliMongooseModel = ({ } /** - * Synchronizes the data between the MongoDB collection and the MeiliSearch index. - * Now uses streaming and batching to reduce memory usage. - */ - static async syncWithMeili( - this: SchemaWithMeiliMethods, - options?: { resumeFromId?: string }, - ): Promise { + * Synchronizes data between the MongoDB collection and the MeiliSearch index by + * incrementally indexing only documents where `expiredAt` is `null` and `_meiliIndex` is `false` + * (i.e., non-expired documents that have not yet been indexed). + * */ + static async syncWithMeili(this: SchemaWithMeiliMethods): Promise { + const startTime = Date.now(); + const { batchSize, delayMs } = syncConfig; + + const collectionName = primaryKey === 'messageId' ? 'messages' : 'conversations'; + logger.info( + `[syncWithMeili] Starting sync for ${collectionName} with batch size ${batchSize}`, + ); + + // Get approximate total count for raw estimation, the sync should not overcome this number + const approxTotalCount = await this.estimatedDocumentCount(); + logger.info( + `[syncWithMeili] Approximate total number of all ${collectionName}: ${approxTotalCount}`, + ); + try { - const startTime = Date.now(); - const { batchSize, delayMs } = syncConfig; - - logger.info( - `[syncWithMeili] Starting sync for ${primaryKey === 'messageId' ? 'messages' : 'conversations'} with batch size ${batchSize}`, - ); - - // Build query with resume capability - // Do not sync TTL documents - const query: FilterQuery = { expiredAt: null }; - if (options?.resumeFromId) { - query._id = { $gt: options.resumeFromId }; - } - - // Get approximate total count for progress tracking - const approxTotalCount = await this.estimatedDocumentCount(); - logger.info(`[syncWithMeili] Approximate total number of documents to sync: ${approxTotalCount}`); - - let processedCount = 0; - // First, handle documents that need to be removed from Meili + logger.info(`[syncWithMeili] Starting cleanup of Meili index ${index.uid} before sync`); await this.cleanupMeiliIndex(index, primaryKey, batchSize, delayMs); - - // Process MongoDB documents in batches using cursor - const cursor = this.find(query) - .select(attributesToIndex.join(' ') + ' _meiliIndex') - .sort({ _id: 1 }) - .batchSize(batchSize) - .cursor(); - - const format = (doc: Record) => - _.omitBy(_.pick(doc, attributesToIndex), (v, k) => k.startsWith('$')); - - let documentBatch: Array> = []; - let updateOps: Array<{ - updateOne: { - filter: Record; - update: { $set: { _meiliIndex: boolean } }; - }; - }> = []; - - // Process documents in streaming fashion - for await (const doc of cursor) { - const typedDoc = doc.toObject() as unknown as Record; - const formatted = format(typedDoc); - - // Check if document needs indexing - if (!typedDoc._meiliIndex) { - documentBatch.push(formatted); - updateOps.push({ - updateOne: { - filter: { _id: typedDoc._id }, - update: { $set: { _meiliIndex: true } }, - }, - }); - } - - processedCount++; - - // Process batch when it reaches the configured size - if (documentBatch.length >= batchSize) { - await this.processSyncBatch(index, documentBatch, updateOps); - documentBatch = []; - updateOps = []; - - // Log progress - // Calculate percentage based on approximate total count sometimes might lead to more than 100% - // the difference is very small and acceptable for progress tracking - const percent = Math.round((processedCount / approxTotalCount) * 100); - const progress = Math.min(percent, 100); - logger.info(`[syncWithMeili] Progress: ${progress}% (count: ${processedCount})`); - - // Add delay to prevent overwhelming resources - if (delayMs > 0) { - await new Promise((resolve) => setTimeout(resolve, delayMs)); - } - } - } - - // Process remaining documents - if (documentBatch.length > 0) { - await this.processSyncBatch(index, documentBatch, updateOps); - } - - const duration = Date.now() - startTime; - logger.info( - `[syncWithMeili] Completed sync for ${primaryKey === 'messageId' ? 'messages' : 'conversations'} in ${duration}ms`, - ); + logger.info(`[syncWithMeili] Completed cleanup of Meili index: ${index.uid}`); } catch (error) { - logger.error('[syncWithMeili] Error during sync:', error); + logger.error('[syncWithMeili] Error during cleanup Meili before sync:', error); throw error; } + + let processedCount = 0; + let hasMore = true; + + while (hasMore) { + const query: FilterQuery = { + expiredAt: null, + _meiliIndex: false, + }; + + try { + const documents = await this.find(query) + .select(attributesToIndex.join(' ') + ' _meiliIndex') + .limit(batchSize) + .lean(); + + // Check if there are more documents to process + if (documents.length === 0) { + logger.info('[syncWithMeili] No more documents to process'); + break; + } + + // Process the batch + await this.processSyncBatch(index, documents); + processedCount += documents.length; + logger.info(`[syncWithMeili] Processed: ${processedCount}`); + + if (documents.length < batchSize) { + hasMore = false; + } + + // Add delay to prevent overwhelming resources + if (hasMore && delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + } catch (error) { + logger.error('[syncWithMeili] Error processing documents batch:', error); + throw error; + } + } + + const duration = Date.now() - startTime; + logger.info( + `[syncWithMeili] Completed sync for ${collectionName}. Processed ${processedCount} documents in ${duration}ms`, + ); } /** @@ -276,28 +243,26 @@ const createMeiliMongooseModel = ({ this: SchemaWithMeiliMethods, index: Index, documents: Array>, - updateOps: Array<{ - updateOne: { - filter: Record; - update: { $set: { _meiliIndex: boolean } }; - }; - }>, ): Promise { if (documents.length === 0) { return; } + // Format documents for MeiliSearch + const formattedDocs = documents.map((doc) => + _.omitBy(_.pick(doc, attributesToIndex), (_v, k) => k.startsWith('$')), + ); + try { // Add documents to MeiliSearch - await index.addDocuments(documents); + await index.addDocumentsInBatches(formattedDocs); // Update MongoDB to mark documents as indexed - if (updateOps.length > 0) { - await this.collection.bulkWrite(updateOps); - } + const docsIds = documents.map((doc) => doc._id); + await this.updateMany({ _id: { $in: docsIds } }, { $set: { _meiliIndex: true } }); } catch (error) { logger.error('[processSyncBatch] Error processing batch:', error); - // Don't throw - allow sync to continue with other documents + throw error; } } @@ -336,7 +301,7 @@ const createMeiliMongooseModel = ({ // Delete documents that don't exist in MongoDB const toDelete = meiliIds.filter((id) => !existingIds.has(id)); if (toDelete.length > 0) { - await Promise.all(toDelete.map((id) => index.deleteDocument(id as string))); + await index.deleteDocuments(toDelete.map(String)); logger.debug(`[cleanupMeiliIndex] Deleted ${toDelete.length} orphaned documents`); }