diff --git a/packages/data-schemas/src/models/plugins/mongoMeili.spec.ts b/packages/data-schemas/src/models/plugins/mongoMeili.spec.ts index 25e8d54cc1..a289b88fe0 100644 --- a/packages/data-schemas/src/models/plugins/mongoMeili.spec.ts +++ b/packages/data-schemas/src/models/plugins/mongoMeili.spec.ts @@ -625,4 +625,394 @@ describe('Meilisearch Mongoose plugin', () => { await expect(conversationModel.syncWithMeili()).rejects.toThrow('Custom sync error'); }); }); + + describe('cleanupMeiliIndex', () => { + let mockGetDocuments: jest.Mock; + + beforeEach(() => { + mockGetDocuments = jest.fn(); + mockDeleteDocuments.mockClear(); + mockIndex.mockReturnValue({ + getRawInfo: jest.fn(), + updateSettings: jest.fn(), + addDocuments: mockAddDocuments, + addDocumentsInBatches: mockAddDocumentsInBatches, + updateDocuments: mockUpdateDocuments, + deleteDocument: mockDeleteDocument, + deleteDocuments: mockDeleteDocuments, + getDocument: mockGetDocument, + getDocuments: mockGetDocuments, + }); + }); + + test('cleanupMeiliIndex deletes orphaned documents from MeiliSearch', async () => { + const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods; + await conversationModel.deleteMany({}); + + const existingConvoId = new mongoose.Types.ObjectId().toString(); + const orphanedConvoId1 = new mongoose.Types.ObjectId().toString(); + const orphanedConvoId2 = new mongoose.Types.ObjectId().toString(); + + // Create one document in MongoDB + await conversationModel.collection.insertOne({ + conversationId: existingConvoId, + user: new mongoose.Types.ObjectId(), + title: 'Existing Conversation', + endpoint: EModelEndpoint.openAI, + _meiliIndex: true, + expiredAt: null, + }); + + // Mock MeiliSearch to return 3 documents (1 exists in MongoDB, 2 are orphaned) + mockGetDocuments.mockResolvedValueOnce({ + results: [ + { conversationId: existingConvoId }, + { conversationId: orphanedConvoId1 }, + { conversationId: orphanedConvoId2 }, + ], + }); + + const indexMock = mockIndex(); + await conversationModel.cleanupMeiliIndex(indexMock, 'conversationId', 100, 0); + + // Should delete the 2 orphaned documents + expect(mockDeleteDocuments).toHaveBeenCalledWith([orphanedConvoId1, orphanedConvoId2]); + }); + + test('cleanupMeiliIndex handles offset correctly when documents are deleted', async () => { + const messageModel = createMessageModel(mongoose) as SchemaWithMeiliMethods; + await messageModel.deleteMany({}); + + const existingIds = [ + new mongoose.Types.ObjectId().toString(), + new mongoose.Types.ObjectId().toString(), + new mongoose.Types.ObjectId().toString(), + ]; + + const orphanedIds = [ + new mongoose.Types.ObjectId().toString(), + new mongoose.Types.ObjectId().toString(), + ]; + + // Create existing documents in MongoDB + for (const id of existingIds) { + await messageModel.collection.insertOne({ + messageId: id, + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + isCreatedByUser: true, + _meiliIndex: true, + expiredAt: null, + }); + } + + // Mock MeiliSearch to return batches with mixed existing and orphaned documents + // First batch: 3 documents (1 existing, 2 orphaned) with batchSize=3 + mockGetDocuments + .mockResolvedValueOnce({ + results: [ + { messageId: existingIds[0] }, + { messageId: orphanedIds[0] }, + { messageId: orphanedIds[1] }, + ], + }) + // Second batch: should use offset=1 (3 - 2 deleted = 1) + // results.length=2 < batchSize=3, so loop should stop after this + .mockResolvedValueOnce({ + results: [{ messageId: existingIds[1] }, { messageId: existingIds[2] }], + }); + + const indexMock = mockIndex(); + await messageModel.cleanupMeiliIndex(indexMock, 'messageId', 3, 0); + + // Should have called getDocuments with correct offsets + expect(mockGetDocuments).toHaveBeenCalledTimes(2); + expect(mockGetDocuments).toHaveBeenNthCalledWith(1, { limit: 3, offset: 0 }); + // After deleting 2 documents, offset should be: 0 + (3 - 2) = 1 + expect(mockGetDocuments).toHaveBeenNthCalledWith(2, { limit: 3, offset: 1 }); + + // Should delete only the orphaned documents + expect(mockDeleteDocuments).toHaveBeenCalledWith([orphanedIds[0], orphanedIds[1]]); + }); + + test('cleanupMeiliIndex preserves existing documents', async () => { + const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods; + await conversationModel.deleteMany({}); + + const existingId1 = new mongoose.Types.ObjectId().toString(); + const existingId2 = new mongoose.Types.ObjectId().toString(); + + // Create documents in MongoDB + await conversationModel.collection.insertMany([ + { + conversationId: existingId1, + user: new mongoose.Types.ObjectId(), + title: 'Conversation 1', + endpoint: EModelEndpoint.openAI, + _meiliIndex: true, + expiredAt: null, + }, + { + conversationId: existingId2, + user: new mongoose.Types.ObjectId(), + title: 'Conversation 2', + endpoint: EModelEndpoint.openAI, + _meiliIndex: true, + expiredAt: null, + }, + ]); + + // Mock MeiliSearch to return the same documents + mockGetDocuments.mockResolvedValueOnce({ + results: [{ conversationId: existingId1 }, { conversationId: existingId2 }], + }); + + const indexMock = mockIndex(); + await conversationModel.cleanupMeiliIndex(indexMock, 'conversationId', 100, 0); + + // Should NOT delete any documents + expect(mockDeleteDocuments).not.toHaveBeenCalled(); + }); + + test('cleanupMeiliIndex handles empty MeiliSearch index', async () => { + const messageModel = createMessageModel(mongoose) as SchemaWithMeiliMethods; + + // Mock empty MeiliSearch index + mockGetDocuments.mockResolvedValueOnce({ + results: [], + }); + + const indexMock = mockIndex(); + await messageModel.cleanupMeiliIndex(indexMock, 'messageId', 100, 0); + + // Should not attempt to delete anything + expect(mockDeleteDocuments).not.toHaveBeenCalled(); + expect(mockGetDocuments).toHaveBeenCalledTimes(1); + }); + + test('cleanupMeiliIndex stops when results.length < batchSize', async () => { + const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods; + await conversationModel.deleteMany({}); + + const id1 = new mongoose.Types.ObjectId().toString(); + const id2 = new mongoose.Types.ObjectId().toString(); + + await conversationModel.collection.insertMany([ + { + conversationId: id1, + user: new mongoose.Types.ObjectId(), + title: 'Conversation 1', + endpoint: EModelEndpoint.openAI, + _meiliIndex: true, + expiredAt: null, + }, + { + conversationId: id2, + user: new mongoose.Types.ObjectId(), + title: 'Conversation 2', + endpoint: EModelEndpoint.openAI, + _meiliIndex: true, + expiredAt: null, + }, + ]); + + // Mock: results.length (2) is less than batchSize (100), should process once and stop + mockGetDocuments.mockResolvedValueOnce({ + results: [{ conversationId: id1 }, { conversationId: id2 }], + }); + + const indexMock = mockIndex(); + await conversationModel.cleanupMeiliIndex(indexMock, 'conversationId', 100, 0); + + // Should only call getDocuments once + expect(mockGetDocuments).toHaveBeenCalledTimes(1); + expect(mockDeleteDocuments).not.toHaveBeenCalled(); + }); + + test('cleanupMeiliIndex handles multiple batches correctly', async () => { + const messageModel = createMessageModel(mongoose) as SchemaWithMeiliMethods; + await messageModel.deleteMany({}); + + const existingIds = Array.from({ length: 5 }, () => new mongoose.Types.ObjectId().toString()); + const orphanedIds = Array.from({ length: 3 }, () => new mongoose.Types.ObjectId().toString()); + + // Create existing documents in MongoDB + for (const id of existingIds) { + await messageModel.collection.insertOne({ + messageId: id, + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + isCreatedByUser: true, + _meiliIndex: true, + expiredAt: null, + }); + } + + // Mock multiple batches with batchSize=3 + mockGetDocuments + // Batch 1: 2 existing, 1 orphaned + .mockResolvedValueOnce({ + results: [ + { messageId: existingIds[0] }, + { messageId: existingIds[1] }, + { messageId: orphanedIds[0] }, + ], + }) + // Batch 2: offset should be 0 + (3 - 1) = 2 + .mockResolvedValueOnce({ + results: [ + { messageId: existingIds[2] }, + { messageId: orphanedIds[1] }, + { messageId: orphanedIds[2] }, + ], + }) + // Batch 3: offset should be 2 + (3 - 2) = 3 + .mockResolvedValueOnce({ + results: [{ messageId: existingIds[3] }, { messageId: existingIds[4] }], + }); + + const indexMock = mockIndex(); + await messageModel.cleanupMeiliIndex(indexMock, 'messageId', 3, 0); + + expect(mockGetDocuments).toHaveBeenCalledTimes(3); + expect(mockGetDocuments).toHaveBeenNthCalledWith(1, { limit: 3, offset: 0 }); + expect(mockGetDocuments).toHaveBeenNthCalledWith(2, { limit: 3, offset: 2 }); + expect(mockGetDocuments).toHaveBeenNthCalledWith(3, { limit: 3, offset: 3 }); + + // Should have deleted orphaned documents in batches + expect(mockDeleteDocuments).toHaveBeenCalledTimes(2); + expect(mockDeleteDocuments).toHaveBeenNthCalledWith(1, [orphanedIds[0]]); + expect(mockDeleteDocuments).toHaveBeenNthCalledWith(2, [orphanedIds[1], orphanedIds[2]]); + }); + + test('cleanupMeiliIndex handles delay between batches', async () => { + const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods; + await conversationModel.deleteMany({}); + + const id1 = new mongoose.Types.ObjectId().toString(); + const id2 = new mongoose.Types.ObjectId().toString(); + + await conversationModel.collection.insertMany([ + { + conversationId: id1, + user: new mongoose.Types.ObjectId(), + title: 'Conversation 1', + endpoint: EModelEndpoint.openAI, + _meiliIndex: true, + expiredAt: null, + }, + { + conversationId: id2, + user: new mongoose.Types.ObjectId(), + title: 'Conversation 2', + endpoint: EModelEndpoint.openAI, + _meiliIndex: true, + expiredAt: null, + }, + ]); + + mockGetDocuments + .mockResolvedValueOnce({ + results: [{ conversationId: id1 }], + }) + .mockResolvedValueOnce({ + results: [{ conversationId: id2 }], + }) + .mockResolvedValueOnce({ + results: [], + }); + + const indexMock = mockIndex(); + const startTime = Date.now(); + await conversationModel.cleanupMeiliIndex(indexMock, 'conversationId', 1, 100); + const endTime = Date.now(); + + // Should have taken at least 200ms due to delay (2 delays between 3 batches) + expect(endTime - startTime).toBeGreaterThanOrEqual(200); + expect(mockGetDocuments).toHaveBeenCalledTimes(3); + }); + + test('cleanupMeiliIndex handles errors gracefully', async () => { + const messageModel = createMessageModel(mongoose) as SchemaWithMeiliMethods; + + mockGetDocuments.mockRejectedValueOnce(new Error('MeiliSearch connection error')); + + const indexMock = mockIndex(); + + // Should not throw, errors are caught and logged + await expect( + messageModel.cleanupMeiliIndex(indexMock, 'messageId', 100, 0), + ).resolves.not.toThrow(); + }); + + test('cleanupMeiliIndex with all documents being orphaned', async () => { + const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods; + await conversationModel.deleteMany({}); + + const orphanedId1 = new mongoose.Types.ObjectId().toString(); + const orphanedId2 = new mongoose.Types.ObjectId().toString(); + const orphanedId3 = new mongoose.Types.ObjectId().toString(); + + // MeiliSearch has documents but MongoDB is empty + mockGetDocuments.mockResolvedValueOnce({ + results: [ + { conversationId: orphanedId1 }, + { conversationId: orphanedId2 }, + { conversationId: orphanedId3 }, + ], + }); + + const indexMock = mockIndex(); + await conversationModel.cleanupMeiliIndex(indexMock, 'conversationId', 100, 0); + + // Should delete all documents since none exist in MongoDB + expect(mockDeleteDocuments).toHaveBeenCalledWith([orphanedId1, orphanedId2, orphanedId3]); + }); + + test('cleanupMeiliIndex adjusts offset to 0 when all batch documents are deleted', async () => { + const messageModel = createMessageModel(mongoose) as SchemaWithMeiliMethods; + await messageModel.deleteMany({}); + + const orphanedIds = Array.from({ length: 3 }, () => new mongoose.Types.ObjectId().toString()); + const existingId = new mongoose.Types.ObjectId().toString(); + + // Create one existing document + await messageModel.collection.insertOne({ + messageId: existingId, + conversationId: new mongoose.Types.ObjectId(), + user: new mongoose.Types.ObjectId(), + isCreatedByUser: true, + _meiliIndex: true, + expiredAt: null, + }); + + mockGetDocuments + // Batch 1: All 3 are orphaned + .mockResolvedValueOnce({ + results: [ + { messageId: orphanedIds[0] }, + { messageId: orphanedIds[1] }, + { messageId: orphanedIds[2] }, + ], + }) + // Batch 2: offset should be 0 + (3 - 3) = 0 + .mockResolvedValueOnce({ + results: [{ messageId: existingId }], + }); + + const indexMock = mockIndex(); + await messageModel.cleanupMeiliIndex(indexMock, 'messageId', 3, 0); + + expect(mockGetDocuments).toHaveBeenCalledTimes(2); + expect(mockGetDocuments).toHaveBeenNthCalledWith(1, { limit: 3, offset: 0 }); + // After deleting all 3, offset remains at 0 + expect(mockGetDocuments).toHaveBeenNthCalledWith(2, { limit: 3, offset: 0 }); + + expect(mockDeleteDocuments).toHaveBeenCalledWith([ + orphanedIds[0], + orphanedIds[1], + orphanedIds[2], + ]); + }); + }); }); diff --git a/packages/data-schemas/src/models/plugins/mongoMeili.ts b/packages/data-schemas/src/models/plugins/mongoMeili.ts index 2551c35d99..92fc5f328c 100644 --- a/packages/data-schemas/src/models/plugins/mongoMeili.ts +++ b/packages/data-schemas/src/models/plugins/mongoMeili.ts @@ -304,8 +304,12 @@ const createMeiliMongooseModel = ({ await index.deleteDocuments(toDelete.map(String)); logger.debug(`[cleanupMeiliIndex] Deleted ${toDelete.length} orphaned documents`); } + // if fetch documents request returns less documents than limit, all documents are processed + if (batch.results.length < batchSize) { + break; + } - offset += batchSize; + offset += batchSize - toDelete.length; // Add delay between batches if (delayMs > 0) {