🛠️ fix: improved retry logic during meili sync & improved batching (#11373)
Some checks are pending
Docker Dev Images Build / build (Dockerfile, librechat-dev, node) (push) Waiting to run
Docker Dev Images Build / build (Dockerfile.multi, librechat-dev-api, api-build) (push) Waiting to run
Sync Locize Translations & Create Translation PR / Sync Translation Keys with Locize (push) Waiting to run
Sync Locize Translations & Create Translation PR / Create Translation PR on Version Published (push) Blocked by required conditions

* 🛠️ fix: unreliable retry logic during meili sync in case of interruption

🛠️ fix: exclude temporary documents from the count on startup for meili sync

🛠️ refactor: improved meili index cleanup before sync

* fix: don't swallow the exception to prevent indefinite loop

fix: update log messages for more clarity

fix: more test coverage for exception handling
This commit is contained in:
Andrei Blizorukov 2026-01-16 16:30:00 +01:00 committed by GitHub
parent c378e777ef
commit 02d75b24a4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 441 additions and 115 deletions

View file

@ -6,10 +6,20 @@ import { createMessageModel } from '~/models/message';
import { SchemaWithMeiliMethods } from '~/models/plugins/mongoMeili';
const mockAddDocuments = jest.fn();
const mockAddDocumentsInBatches = jest.fn();
const mockUpdateDocuments = jest.fn();
const mockDeleteDocument = jest.fn();
const mockDeleteDocuments = jest.fn();
const mockGetDocument = jest.fn();
const mockIndex = jest.fn().mockReturnValue({
getRawInfo: jest.fn(),
updateSettings: jest.fn(),
addDocuments: mockAddDocuments,
addDocumentsInBatches: mockAddDocumentsInBatches,
updateDocuments: mockUpdateDocuments,
deleteDocument: mockDeleteDocument,
deleteDocuments: mockDeleteDocuments,
getDocument: mockGetDocument,
getDocuments: jest.fn().mockReturnValue({ results: [] }),
});
jest.mock('meilisearch', () => {
@ -42,6 +52,11 @@ describe('Meilisearch Mongoose plugin', () => {
beforeEach(() => {
mockAddDocuments.mockClear();
mockAddDocumentsInBatches.mockClear();
mockUpdateDocuments.mockClear();
mockDeleteDocument.mockClear();
mockDeleteDocuments.mockClear();
mockGetDocument.mockClear();
});
afterAll(async () => {
@ -264,4 +279,350 @@ describe('Meilisearch Mongoose plugin', () => {
expect(indexedCount).toBe(2);
});
});
describe('New batch processing and retry functionality', () => {
test('processSyncBatch uses addDocumentsInBatches', async () => {
const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods;
await conversationModel.deleteMany({});
mockAddDocumentsInBatches.mockClear();
mockAddDocuments.mockClear();
await conversationModel.collection.insertOne({
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
title: 'Test Conversation',
endpoint: EModelEndpoint.openAI,
_meiliIndex: false,
expiredAt: null,
});
// Run sync which should call processSyncBatch internally
await conversationModel.syncWithMeili();
// Verify addDocumentsInBatches was called (new batch method)
expect(mockAddDocumentsInBatches).toHaveBeenCalled();
});
test('addObjectToMeili retries on failure', async () => {
const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods;
// Mock addDocuments to fail twice then succeed
mockAddDocuments
.mockRejectedValueOnce(new Error('Network error'))
.mockRejectedValueOnce(new Error('Network error'))
.mockResolvedValueOnce({});
// Create a document which triggers addObjectToMeili
await conversationModel.create({
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
title: 'Test Retry',
endpoint: EModelEndpoint.openAI,
});
// Wait for async operations to complete
await new Promise((resolve) => setTimeout(resolve, 100));
// Verify addDocuments was called multiple times due to retries
expect(mockAddDocuments).toHaveBeenCalledTimes(3);
});
test('getSyncProgress returns accurate progress information', async () => {
const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods;
await conversationModel.deleteMany({});
// Insert documents directly to control the _meiliIndex flag
await conversationModel.collection.insertOne({
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
title: 'Indexed',
endpoint: EModelEndpoint.openAI,
_meiliIndex: true,
expiredAt: null,
});
await conversationModel.collection.insertOne({
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
title: 'Not Indexed',
endpoint: EModelEndpoint.openAI,
_meiliIndex: false,
expiredAt: null,
});
const progress = await conversationModel.getSyncProgress();
expect(progress.totalDocuments).toBe(2);
expect(progress.totalProcessed).toBe(1);
expect(progress.isComplete).toBe(false);
});
test('getSyncProgress excludes TTL documents from counts', async () => {
const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods;
await conversationModel.deleteMany({});
// Insert syncable documents (expiredAt: null)
await conversationModel.collection.insertOne({
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
title: 'Syncable Indexed',
endpoint: EModelEndpoint.openAI,
_meiliIndex: true,
expiredAt: null,
});
await conversationModel.collection.insertOne({
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
title: 'Syncable Not Indexed',
endpoint: EModelEndpoint.openAI,
_meiliIndex: false,
expiredAt: null,
});
// Insert TTL documents (expiredAt set) - these should NOT be counted
await conversationModel.collection.insertOne({
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
title: 'TTL Document 1',
endpoint: EModelEndpoint.openAI,
_meiliIndex: true,
expiredAt: new Date(),
});
await conversationModel.collection.insertOne({
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
title: 'TTL Document 2',
endpoint: EModelEndpoint.openAI,
_meiliIndex: false,
expiredAt: new Date(),
});
const progress = await conversationModel.getSyncProgress();
// Only syncable documents should be counted (2 total, 1 indexed)
expect(progress.totalDocuments).toBe(2);
expect(progress.totalProcessed).toBe(1);
expect(progress.isComplete).toBe(false);
});
test('getSyncProgress shows completion when all syncable documents are indexed', async () => {
const messageModel = createMessageModel(mongoose) as SchemaWithMeiliMethods;
await messageModel.deleteMany({});
// All syncable documents are indexed
await messageModel.collection.insertOne({
messageId: new mongoose.Types.ObjectId(),
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
isCreatedByUser: true,
_meiliIndex: true,
expiredAt: null,
});
await messageModel.collection.insertOne({
messageId: new mongoose.Types.ObjectId(),
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
isCreatedByUser: false,
_meiliIndex: true,
expiredAt: null,
});
// Add TTL document - should not affect completion status
await messageModel.collection.insertOne({
messageId: new mongoose.Types.ObjectId(),
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
isCreatedByUser: true,
_meiliIndex: false,
expiredAt: new Date(),
});
const progress = await messageModel.getSyncProgress();
expect(progress.totalDocuments).toBe(2);
expect(progress.totalProcessed).toBe(2);
expect(progress.isComplete).toBe(true);
});
});
describe('Error handling in processSyncBatch', () => {
test('syncWithMeili fails when processSyncBatch encounters addDocumentsInBatches error', async () => {
const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods;
await conversationModel.deleteMany({});
mockAddDocumentsInBatches.mockClear();
// Insert a document to sync
await conversationModel.collection.insertOne({
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
title: 'Test Conversation',
endpoint: EModelEndpoint.openAI,
_meiliIndex: false,
expiredAt: null,
});
// Mock addDocumentsInBatches to fail
mockAddDocumentsInBatches.mockRejectedValueOnce(new Error('MeiliSearch connection error'));
// Sync should throw the error
await expect(conversationModel.syncWithMeili()).rejects.toThrow(
'MeiliSearch connection error',
);
// Verify the error was logged
expect(mockAddDocumentsInBatches).toHaveBeenCalled();
// Document should NOT be marked as indexed since sync failed
// Note: direct collection.insertOne doesn't set default values, so _meiliIndex may be undefined
const doc = await conversationModel.findOne({});
expect(doc?._meiliIndex).not.toBe(true);
});
test('syncWithMeili fails when processSyncBatch encounters updateMany error', async () => {
const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods;
await conversationModel.deleteMany({});
mockAddDocumentsInBatches.mockClear();
// Insert a document
await conversationModel.collection.insertOne({
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
title: 'Test Conversation',
endpoint: EModelEndpoint.openAI,
_meiliIndex: false,
expiredAt: null,
});
// Mock addDocumentsInBatches to succeed but simulate updateMany failure
mockAddDocumentsInBatches.mockResolvedValueOnce({});
// Spy on updateMany and make it fail
const updateManySpy = jest
.spyOn(conversationModel, 'updateMany')
.mockRejectedValueOnce(new Error('Database connection error'));
// Sync should throw the error
await expect(conversationModel.syncWithMeili()).rejects.toThrow('Database connection error');
expect(updateManySpy).toHaveBeenCalled();
// Restore original implementation
updateManySpy.mockRestore();
});
test('processSyncBatch logs error and throws when addDocumentsInBatches fails', async () => {
const messageModel = createMessageModel(mongoose) as SchemaWithMeiliMethods;
await messageModel.deleteMany({});
mockAddDocumentsInBatches.mockRejectedValueOnce(new Error('Network timeout'));
await messageModel.collection.insertOne({
messageId: new mongoose.Types.ObjectId(),
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
isCreatedByUser: true,
_meiliIndex: false,
expiredAt: null,
});
const indexMock = mockIndex();
const documents = await messageModel.find({ _meiliIndex: false }).lean();
// Should throw the error
await expect(messageModel.processSyncBatch(indexMock, documents)).rejects.toThrow(
'Network timeout',
);
expect(mockAddDocumentsInBatches).toHaveBeenCalled();
});
test('processSyncBatch handles empty document array gracefully', async () => {
const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods;
const indexMock = mockIndex();
// Should not throw with empty array
await expect(conversationModel.processSyncBatch(indexMock, [])).resolves.not.toThrow();
// Should not call addDocumentsInBatches
expect(mockAddDocumentsInBatches).not.toHaveBeenCalled();
});
test('syncWithMeili stops processing when batch fails and does not process remaining documents', async () => {
const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods;
await conversationModel.deleteMany({});
mockAddDocumentsInBatches.mockClear();
// Create multiple documents
for (let i = 0; i < 5; i++) {
await conversationModel.collection.insertOne({
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
title: `Test Conversation ${i}`,
endpoint: EModelEndpoint.openAI,
_meiliIndex: false,
expiredAt: null,
});
}
// Mock addDocumentsInBatches to fail on first call
mockAddDocumentsInBatches.mockRejectedValueOnce(new Error('First batch failed'));
// Sync should fail on the first batch
await expect(conversationModel.syncWithMeili()).rejects.toThrow('First batch failed');
// Should have attempted only once before failing
expect(mockAddDocumentsInBatches).toHaveBeenCalledTimes(1);
// No documents should be indexed since sync failed
const indexedCount = await conversationModel.countDocuments({ _meiliIndex: true });
expect(indexedCount).toBe(0);
});
test('error in processSyncBatch is properly logged before being thrown', async () => {
const messageModel = createMessageModel(mongoose) as SchemaWithMeiliMethods;
await messageModel.deleteMany({});
const testError = new Error('Test error for logging');
mockAddDocumentsInBatches.mockRejectedValueOnce(testError);
await messageModel.collection.insertOne({
messageId: new mongoose.Types.ObjectId(),
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
isCreatedByUser: true,
_meiliIndex: false,
expiredAt: null,
});
const indexMock = mockIndex();
const documents = await messageModel.find({ _meiliIndex: false }).lean();
// Should throw the same error that was passed to it
await expect(messageModel.processSyncBatch(indexMock, documents)).rejects.toThrow(testError);
});
test('syncWithMeili properly propagates processSyncBatch errors', async () => {
const conversationModel = createConversationModel(mongoose) as SchemaWithMeiliMethods;
await conversationModel.deleteMany({});
mockAddDocumentsInBatches.mockClear();
await conversationModel.collection.insertOne({
conversationId: new mongoose.Types.ObjectId(),
user: new mongoose.Types.ObjectId(),
title: 'Test',
endpoint: EModelEndpoint.openAI,
_meiliIndex: false,
expiredAt: null,
});
const customError = new Error('Custom sync error');
mockAddDocumentsInBatches.mockRejectedValueOnce(customError);
// The error should propagate all the way up
await expect(conversationModel.syncWithMeili()).rejects.toThrow('Custom sync error');
});
});
});

View file

@ -50,17 +50,11 @@ interface _DocumentWithMeiliIndex extends Document {
export type DocumentWithMeiliIndex = _DocumentWithMeiliIndex & IConversation & Partial<IMessage>;
export interface SchemaWithMeiliMethods extends Model<DocumentWithMeiliIndex> {
syncWithMeili(options?: { resumeFromId?: string }): Promise<void>;
syncWithMeili(): Promise<void>;
getSyncProgress(): Promise<SyncProgress>;
processSyncBatch(
index: Index<MeiliIndexable>,
documents: Array<Record<string, unknown>>,
updateOps: Array<{
updateOne: {
filter: Record<string, unknown>;
update: { $set: { _meiliIndex: boolean } };
};
}>,
): Promise<void>;
cleanupMeiliIndex(
index: Index<MeiliIndexable>,
@ -156,8 +150,8 @@ const createMeiliMongooseModel = ({
* Get the current sync progress
*/
static async getSyncProgress(this: SchemaWithMeiliMethods): Promise<SyncProgress> {
const totalDocuments = await this.countDocuments();
const indexedDocuments = await this.countDocuments({ _meiliIndex: true });
const totalDocuments = await this.countDocuments({ expiredAt: null });
const indexedDocuments = await this.countDocuments({ expiredAt: null, _meiliIndex: true });
return {
totalProcessed: indexedDocuments,
@ -167,106 +161,79 @@ const createMeiliMongooseModel = ({
}
/**
* Synchronizes the data between the MongoDB collection and the MeiliSearch index.
* Now uses streaming and batching to reduce memory usage.
*/
static async syncWithMeili(
this: SchemaWithMeiliMethods,
options?: { resumeFromId?: string },
): Promise<void> {
* Synchronizes data between the MongoDB collection and the MeiliSearch index by
* incrementally indexing only documents where `expiredAt` is `null` and `_meiliIndex` is `false`
* (i.e., non-expired documents that have not yet been indexed).
* */
static async syncWithMeili(this: SchemaWithMeiliMethods): Promise<void> {
const startTime = Date.now();
const { batchSize, delayMs } = syncConfig;
const collectionName = primaryKey === 'messageId' ? 'messages' : 'conversations';
logger.info(
`[syncWithMeili] Starting sync for ${collectionName} with batch size ${batchSize}`,
);
// Get approximate total count for raw estimation, the sync should not overcome this number
const approxTotalCount = await this.estimatedDocumentCount();
logger.info(
`[syncWithMeili] Approximate total number of all ${collectionName}: ${approxTotalCount}`,
);
try {
const startTime = Date.now();
const { batchSize, delayMs } = syncConfig;
logger.info(
`[syncWithMeili] Starting sync for ${primaryKey === 'messageId' ? 'messages' : 'conversations'} with batch size ${batchSize}`,
);
// Build query with resume capability
// Do not sync TTL documents
const query: FilterQuery<unknown> = { expiredAt: null };
if (options?.resumeFromId) {
query._id = { $gt: options.resumeFromId };
}
// Get approximate total count for progress tracking
const approxTotalCount = await this.estimatedDocumentCount();
logger.info(`[syncWithMeili] Approximate total number of documents to sync: ${approxTotalCount}`);
let processedCount = 0;
// First, handle documents that need to be removed from Meili
logger.info(`[syncWithMeili] Starting cleanup of Meili index ${index.uid} before sync`);
await this.cleanupMeiliIndex(index, primaryKey, batchSize, delayMs);
// Process MongoDB documents in batches using cursor
const cursor = this.find(query)
.select(attributesToIndex.join(' ') + ' _meiliIndex')
.sort({ _id: 1 })
.batchSize(batchSize)
.cursor();
const format = (doc: Record<string, unknown>) =>
_.omitBy(_.pick(doc, attributesToIndex), (v, k) => k.startsWith('$'));
let documentBatch: Array<Record<string, unknown>> = [];
let updateOps: Array<{
updateOne: {
filter: Record<string, unknown>;
update: { $set: { _meiliIndex: boolean } };
};
}> = [];
// Process documents in streaming fashion
for await (const doc of cursor) {
const typedDoc = doc.toObject() as unknown as Record<string, unknown>;
const formatted = format(typedDoc);
// Check if document needs indexing
if (!typedDoc._meiliIndex) {
documentBatch.push(formatted);
updateOps.push({
updateOne: {
filter: { _id: typedDoc._id },
update: { $set: { _meiliIndex: true } },
},
});
}
processedCount++;
// Process batch when it reaches the configured size
if (documentBatch.length >= batchSize) {
await this.processSyncBatch(index, documentBatch, updateOps);
documentBatch = [];
updateOps = [];
// Log progress
// Calculate percentage based on approximate total count sometimes might lead to more than 100%
// the difference is very small and acceptable for progress tracking
const percent = Math.round((processedCount / approxTotalCount) * 100);
const progress = Math.min(percent, 100);
logger.info(`[syncWithMeili] Progress: ${progress}% (count: ${processedCount})`);
// Add delay to prevent overwhelming resources
if (delayMs > 0) {
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
}
}
// Process remaining documents
if (documentBatch.length > 0) {
await this.processSyncBatch(index, documentBatch, updateOps);
}
const duration = Date.now() - startTime;
logger.info(
`[syncWithMeili] Completed sync for ${primaryKey === 'messageId' ? 'messages' : 'conversations'} in ${duration}ms`,
);
logger.info(`[syncWithMeili] Completed cleanup of Meili index: ${index.uid}`);
} catch (error) {
logger.error('[syncWithMeili] Error during sync:', error);
logger.error('[syncWithMeili] Error during cleanup Meili before sync:', error);
throw error;
}
let processedCount = 0;
let hasMore = true;
while (hasMore) {
const query: FilterQuery<unknown> = {
expiredAt: null,
_meiliIndex: false,
};
try {
const documents = await this.find(query)
.select(attributesToIndex.join(' ') + ' _meiliIndex')
.limit(batchSize)
.lean();
// Check if there are more documents to process
if (documents.length === 0) {
logger.info('[syncWithMeili] No more documents to process');
break;
}
// Process the batch
await this.processSyncBatch(index, documents);
processedCount += documents.length;
logger.info(`[syncWithMeili] Processed: ${processedCount}`);
if (documents.length < batchSize) {
hasMore = false;
}
// Add delay to prevent overwhelming resources
if (hasMore && delayMs > 0) {
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
} catch (error) {
logger.error('[syncWithMeili] Error processing documents batch:', error);
throw error;
}
}
const duration = Date.now() - startTime;
logger.info(
`[syncWithMeili] Completed sync for ${collectionName}. Processed ${processedCount} documents in ${duration}ms`,
);
}
/**
@ -276,28 +243,26 @@ const createMeiliMongooseModel = ({
this: SchemaWithMeiliMethods,
index: Index<MeiliIndexable>,
documents: Array<Record<string, unknown>>,
updateOps: Array<{
updateOne: {
filter: Record<string, unknown>;
update: { $set: { _meiliIndex: boolean } };
};
}>,
): Promise<void> {
if (documents.length === 0) {
return;
}
// Format documents for MeiliSearch
const formattedDocs = documents.map((doc) =>
_.omitBy(_.pick(doc, attributesToIndex), (_v, k) => k.startsWith('$')),
);
try {
// Add documents to MeiliSearch
await index.addDocuments(documents);
await index.addDocumentsInBatches(formattedDocs);
// Update MongoDB to mark documents as indexed
if (updateOps.length > 0) {
await this.collection.bulkWrite(updateOps);
}
const docsIds = documents.map((doc) => doc._id);
await this.updateMany({ _id: { $in: docsIds } }, { $set: { _meiliIndex: true } });
} catch (error) {
logger.error('[processSyncBatch] Error processing batch:', error);
// Don't throw - allow sync to continue with other documents
throw error;
}
}
@ -336,7 +301,7 @@ const createMeiliMongooseModel = ({
// Delete documents that don't exist in MongoDB
const toDelete = meiliIds.filter((id) => !existingIds.has(id));
if (toDelete.length > 0) {
await Promise.all(toDelete.map((id) => index.deleteDocument(id as string)));
await index.deleteDocuments(toDelete.map(String));
logger.debug(`[cleanupMeiliIndex] Deleted ${toDelete.length} orphaned documents`);
}