🔃 refactor: Optimize MeiliSearch Sync Processing and Tracking (#7994)

This commit is contained in:
Danny Avila 2025-06-20 18:05:19 -04:00 committed by GitHub
parent fa54c9ae90
commit 72cd159a37
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 404 additions and 130 deletions

View file

@ -1,8 +1,11 @@
const mongoose = require('mongoose');
const { MeiliSearch } = require('meilisearch');
const { logger } = require('@librechat/data-schemas');
const { FlowStateManager } = require('@librechat/api');
const { CacheKeys } = require('librechat-data-provider');
const { isEnabled } = require('~/server/utils');
const { getLogStores } = require('~/cache');
const Conversation = mongoose.models.Conversation;
const Message = mongoose.models.Message;
@ -28,43 +31,123 @@ class MeiliSearchClient {
}
}
/**
* Performs the actual sync operations for messages and conversations
*/
async function performSync() {
const client = MeiliSearchClient.getInstance();
const { status } = await client.health();
if (status !== 'available') {
throw new Error('Meilisearch not available');
}
if (indexingDisabled === true) {
logger.info('[indexSync] Indexing is disabled, skipping...');
return { messagesSync: false, convosSync: false };
}
let messagesSync = false;
let convosSync = false;
// Check if we need to sync messages
const messageProgress = await Message.getSyncProgress();
if (!messageProgress.isComplete) {
logger.info(
`[indexSync] Messages need syncing: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments} indexed`,
);
// Check if we should do a full sync or incremental
const messageCount = await Message.countDocuments();
const messagesIndexed = messageProgress.totalProcessed;
const syncThreshold = parseInt(process.env.MEILI_SYNC_THRESHOLD || '1000', 10);
if (messageCount - messagesIndexed > syncThreshold) {
logger.info('[indexSync] Starting full message sync due to large difference');
await Message.syncWithMeili();
messagesSync = true;
} else if (messageCount !== messagesIndexed) {
logger.warn('[indexSync] Messages out of sync, performing incremental sync');
await Message.syncWithMeili();
messagesSync = true;
}
} else {
logger.info(
`[indexSync] Messages are fully synced: ${messageProgress.totalProcessed}/${messageProgress.totalDocuments}`,
);
}
// Check if we need to sync conversations
const convoProgress = await Conversation.getSyncProgress();
if (!convoProgress.isComplete) {
logger.info(
`[indexSync] Conversations need syncing: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments} indexed`,
);
const convoCount = await Conversation.countDocuments();
const convosIndexed = convoProgress.totalProcessed;
const syncThreshold = parseInt(process.env.MEILI_SYNC_THRESHOLD || '1000', 10);
if (convoCount - convosIndexed > syncThreshold) {
logger.info('[indexSync] Starting full conversation sync due to large difference');
await Conversation.syncWithMeili();
convosSync = true;
} else if (convoCount !== convosIndexed) {
logger.warn('[indexSync] Convos out of sync, performing incremental sync');
await Conversation.syncWithMeili();
convosSync = true;
}
} else {
logger.info(
`[indexSync] Conversations are fully synced: ${convoProgress.totalProcessed}/${convoProgress.totalDocuments}`,
);
}
return { messagesSync, convosSync };
}
/**
* Main index sync function that uses FlowStateManager to prevent concurrent execution
*/
async function indexSync() {
if (!searchEnabled) {
return;
}
try {
const client = MeiliSearchClient.getInstance();
const { status } = await client.health();
if (status !== 'available') {
throw new Error('Meilisearch not available');
logger.info('[indexSync] Starting index synchronization check...');
try {
// Get or create FlowStateManager instance
const flowsCache = getLogStores(CacheKeys.FLOWS);
if (!flowsCache) {
logger.warn('[indexSync] Flows cache not available, falling back to direct sync');
return await performSync();
}
if (indexingDisabled === true) {
logger.info('[indexSync] Indexing is disabled, skipping...');
const flowManager = new FlowStateManager(flowsCache, {
ttl: 60000 * 10, // 10 minutes TTL for sync operations
});
// Use a unique flow ID for the sync operation
const flowId = 'meili-index-sync';
const flowType = 'MEILI_SYNC';
// This will only execute the handler if no other instance is running the sync
const result = await flowManager.createFlowWithHandler(flowId, flowType, performSync);
if (result.messagesSync || result.convosSync) {
logger.info('[indexSync] Sync completed successfully');
} else {
logger.debug('[indexSync] No sync was needed');
}
return result;
} catch (err) {
if (err.message.includes('flow already exists')) {
logger.info('[indexSync] Sync already running on another instance');
return;
}
const messageCount = await Message.countDocuments();
const convoCount = await Conversation.countDocuments();
const messages = await client.index('messages').getStats();
const convos = await client.index('convos').getStats();
const messagesIndexed = messages.numberOfDocuments;
const convosIndexed = convos.numberOfDocuments;
logger.debug(`[indexSync] There are ${messageCount} messages and ${messagesIndexed} indexed`);
logger.debug(`[indexSync] There are ${convoCount} convos and ${convosIndexed} indexed`);
if (messageCount !== messagesIndexed) {
logger.debug('[indexSync] Messages out of sync, indexing');
Message.syncWithMeili();
}
if (convoCount !== convosIndexed) {
logger.debug('[indexSync] Convos out of sync, indexing');
Conversation.syncWithMeili();
}
} catch (err) {
if (err.message.includes('not found')) {
logger.debug('[indexSync] Creating indices...');
currentTimeout = setTimeout(async () => {

View file

@ -19,6 +19,8 @@ interface MongoMeiliOptions {
indexName: string;
primaryKey: string;
mongoose: typeof import('mongoose');
syncBatchSize?: number;
syncDelayMs?: number;
}
interface MeiliIndexable {
@ -31,6 +33,13 @@ interface ContentItem {
text?: string;
}
interface SyncProgress {
lastSyncedId?: string;
totalProcessed: number;
totalDocuments: number;
isComplete: boolean;
}
interface _DocumentWithMeiliIndex extends Document {
_meiliIndex?: boolean;
preprocessObjectForIndex?: () => Record<string, unknown>;
@ -45,7 +54,24 @@ interface _DocumentWithMeiliIndex extends Document {
export type DocumentWithMeiliIndex = _DocumentWithMeiliIndex & IConversation & Partial<IMessage>;
export interface SchemaWithMeiliMethods extends Model<DocumentWithMeiliIndex> {
syncWithMeili(): Promise<void>;
syncWithMeili(options?: { resumeFromId?: string }): Promise<void>;
getSyncProgress(): Promise<SyncProgress>;
processSyncBatch(
index: Index<MeiliIndexable>,
documents: Array<Record<string, unknown>>,
updateOps: Array<{
updateOne: {
filter: Record<string, unknown>;
update: { $set: { _meiliIndex: boolean } };
};
}>,
): Promise<void>;
cleanupMeiliIndex(
index: Index<MeiliIndexable>,
primaryKey: string,
batchSize: number,
delayMs: number,
): Promise<void>;
setMeiliIndexSettings(settings: Record<string, unknown>): Promise<unknown>;
meiliSearch(
q: string,
@ -66,6 +92,14 @@ const searchEnabled = process.env.SEARCH != null && process.env.SEARCH.toLowerCa
const meiliEnabled =
process.env.MEILI_HOST != null && process.env.MEILI_MASTER_KEY != null && searchEnabled;
/**
* Get sync configuration from environment variables
*/
const getSyncConfig = () => ({
batchSize: parseInt(process.env.MEILI_SYNC_BATCH_SIZE || '100', 10),
delayMs: parseInt(process.env.MEILI_SYNC_DELAY_MS || '100', 10),
});
/**
* Local implementation of parseTextParts to avoid dependency on librechat-data-provider
* Extracts text content from an array of content items
@ -101,6 +135,26 @@ const validateOptions = (options: Partial<MongoMeiliOptions>): void => {
});
};
/**
* Helper function to process documents in batches with rate limiting
*/
const processBatch = async <T>(
items: T[],
batchSize: number,
delayMs: number,
processor: (batch: T[]) => Promise<void>,
): Promise<void> => {
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
await processor(batch);
// Add delay between batches to prevent overwhelming resources
if (i + batchSize < items.length && delayMs > 0) {
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
}
};
/**
* Factory function to create a MeiliMongooseModel class which extends a Mongoose model.
* This class contains static and instance methods to synchronize and manage the MeiliSearch index
@ -109,127 +163,213 @@ const validateOptions = (options: Partial<MongoMeiliOptions>): void => {
* @param config - Configuration object.
* @param config.index - The MeiliSearch index object.
* @param config.attributesToIndex - List of attributes to index.
* @param config.syncOptions - Sync configuration options.
* @returns A class definition that will be loaded into the Mongoose schema.
*/
const createMeiliMongooseModel = ({
index,
attributesToIndex,
syncOptions,
}: {
index: Index<MeiliIndexable>;
attributesToIndex: string[];
syncOptions: { batchSize: number; delayMs: number };
}) => {
const primaryKey = attributesToIndex[0];
const syncConfig = { ...getSyncConfig(), ...syncOptions };
class MeiliMongooseModel {
/**
* Synchronizes the data between the MongoDB collection and the MeiliSearch index.
*
* The synchronization process involves:
* 1. Fetching all documents from the MongoDB collection and MeiliSearch index.
* 2. Comparing documents from both sources.
* 3. Deleting documents from MeiliSearch that no longer exist in MongoDB.
* 4. Adding documents to MeiliSearch that exist in MongoDB but not in the index.
* 5. Updating documents in MeiliSearch if key fields (such as `text` or `title`) differ.
* 6. Updating the `_meiliIndex` field in MongoDB to indicate the indexing status.
*
* Note: The function processes documents in batches because MeiliSearch's
* `index.getDocuments` requires an exact limit and `index.addDocuments` does not handle
* partial failures in a batch.
*
* @returns {Promise<void>} Resolves when the synchronization is complete.
* Get the current sync progress
*/
static async syncWithMeili(this: SchemaWithMeiliMethods): Promise<void> {
static async getSyncProgress(this: SchemaWithMeiliMethods): Promise<SyncProgress> {
const totalDocuments = await this.countDocuments();
const indexedDocuments = await this.countDocuments({ _meiliIndex: true });
return {
totalProcessed: indexedDocuments,
totalDocuments,
isComplete: indexedDocuments === totalDocuments,
};
}
/**
* Synchronizes the data between the MongoDB collection and the MeiliSearch index.
* Now uses streaming and batching to reduce memory usage.
*/
static async syncWithMeili(
this: SchemaWithMeiliMethods,
options?: { resumeFromId?: string },
): Promise<void> {
try {
let moreDocuments = true;
const mongoDocuments = await this.find().lean();
const startTime = Date.now();
const { batchSize, delayMs } = syncConfig;
logger.info(
`[syncWithMeili] Starting sync for ${primaryKey === 'messageId' ? 'messages' : 'conversations'} with batch size ${batchSize}`,
);
// Build query with resume capability
const query: FilterQuery<unknown> = {};
if (options?.resumeFromId) {
query._id = { $gt: options.resumeFromId };
}
// Get total count for progress tracking
const totalCount = await this.countDocuments(query);
let processedCount = 0;
// First, handle documents that need to be removed from Meili
await this.cleanupMeiliIndex(index, primaryKey, batchSize, delayMs);
// Process MongoDB documents in batches using cursor
const cursor = this.find(query)
.select(attributesToIndex.join(' ') + ' _meiliIndex')
.sort({ _id: 1 })
.batchSize(batchSize)
.cursor();
const format = (doc: Record<string, unknown>) =>
_.omitBy(_.pick(doc, attributesToIndex), (v, k) => k.startsWith('$'));
const mongoMap = new Map(
mongoDocuments.map((doc) => {
const typedDoc = doc as Record<string, unknown>;
return [typedDoc[primaryKey], format(typedDoc)];
}),
);
const indexMap = new Map<unknown, Record<string, unknown>>();
let offset = 0;
const batchSize = 1000;
while (moreDocuments) {
const batch = await index.getDocuments({ limit: batchSize, offset });
if (batch.results.length === 0) {
moreDocuments = false;
}
for (const doc of batch.results) {
indexMap.set(doc[primaryKey], format(doc));
}
offset += batchSize;
}
logger.debug('[syncWithMeili]', { indexMap: indexMap.size, mongoMap: mongoMap.size });
const updateOps: Array<{
let documentBatch: Array<Record<string, unknown>> = [];
let updateOps: Array<{
updateOne: {
filter: Record<string, unknown>;
update: { $set: { _meiliIndex: boolean } };
};
}> = [];
// Process documents present in the MeiliSearch index
for (const [id, doc] of indexMap) {
const update: Record<string, unknown> = {};
update[primaryKey] = id;
if (mongoMap.has(id)) {
const mongoDoc = mongoMap.get(id);
if (
(doc.text && doc.text !== mongoDoc?.text) ||
(doc.title && doc.title !== mongoDoc?.title)
) {
logger.debug(
`[syncWithMeili] ${id} had document discrepancy in ${
doc.text ? 'text' : 'title'
} field`,
);
updateOps.push({
updateOne: { filter: update, update: { $set: { _meiliIndex: true } } },
});
await index.addDocuments([doc]);
// Process documents in streaming fashion
for await (const doc of cursor) {
const typedDoc = doc.toObject() as unknown as Record<string, unknown>;
const formatted = format(typedDoc);
// Check if document needs indexing
if (!typedDoc._meiliIndex) {
documentBatch.push(formatted);
updateOps.push({
updateOne: {
filter: { _id: typedDoc._id },
update: { $set: { _meiliIndex: true } },
},
});
}
processedCount++;
// Process batch when it reaches the configured size
if (documentBatch.length >= batchSize) {
await this.processSyncBatch(index, documentBatch, updateOps);
documentBatch = [];
updateOps = [];
// Log progress
const progress = Math.round((processedCount / totalCount) * 100);
logger.info(`[syncWithMeili] Progress: ${progress}% (${processedCount}/${totalCount})`);
// Add delay to prevent overwhelming resources
if (delayMs > 0) {
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
} else {
await index.deleteDocument(id as string);
updateOps.push({
updateOne: { filter: update, update: { $set: { _meiliIndex: false } } },
});
}
}
// Process documents present in MongoDB
for (const [id, doc] of mongoMap) {
const update: Record<string, unknown> = {};
update[primaryKey] = id;
if (!indexMap.has(id)) {
await index.addDocuments([doc]);
updateOps.push({
updateOne: { filter: update, update: { $set: { _meiliIndex: true } } },
});
} else if (doc._meiliIndex === false) {
updateOps.push({
updateOne: { filter: update, update: { $set: { _meiliIndex: true } } },
});
}
// Process remaining documents
if (documentBatch.length > 0) {
await this.processSyncBatch(index, documentBatch, updateOps);
}
const duration = Date.now() - startTime;
logger.info(
`[syncWithMeili] Completed sync for ${primaryKey === 'messageId' ? 'messages' : 'conversations'} in ${duration}ms`,
);
} catch (error) {
logger.error('[syncWithMeili] Error during sync:', error);
throw error;
}
}
/**
* Process a batch of documents for syncing
*/
static async processSyncBatch(
this: SchemaWithMeiliMethods,
index: Index<MeiliIndexable>,
documents: Array<Record<string, unknown>>,
updateOps: Array<{
updateOne: {
filter: Record<string, unknown>;
update: { $set: { _meiliIndex: boolean } };
};
}>,
): Promise<void> {
if (documents.length === 0) {
return;
}
try {
// Add documents to MeiliSearch
await index.addDocuments(documents);
// Update MongoDB to mark documents as indexed
if (updateOps.length > 0) {
await this.collection.bulkWrite(updateOps);
logger.debug(
`[syncWithMeili] Finished indexing ${
primaryKey === 'messageId' ? 'messages' : 'conversations'
}`,
);
}
} catch (error) {
logger.error('[syncWithMeili] Error adding document to Meili:', error);
logger.error('[processSyncBatch] Error processing batch:', error);
// Don't throw - allow sync to continue with other documents
}
}
/**
* Clean up documents in MeiliSearch that no longer exist in MongoDB
*/
static async cleanupMeiliIndex(
this: SchemaWithMeiliMethods,
index: Index<MeiliIndexable>,
primaryKey: string,
batchSize: number,
delayMs: number,
): Promise<void> {
try {
let offset = 0;
let moreDocuments = true;
while (moreDocuments) {
const batch = await index.getDocuments({ limit: batchSize, offset });
if (batch.results.length === 0) {
moreDocuments = false;
break;
}
const meiliIds = batch.results.map((doc) => doc[primaryKey]);
const query: Record<string, unknown> = {};
query[primaryKey] = { $in: meiliIds };
// Find which documents exist in MongoDB
const existingDocs = await this.find(query).select(primaryKey).lean();
const existingIds = new Set(
existingDocs.map((doc: Record<string, unknown>) => doc[primaryKey]),
);
// Delete documents that don't exist in MongoDB
const toDelete = meiliIds.filter((id) => !existingIds.has(id));
if (toDelete.length > 0) {
await Promise.all(toDelete.map((id) => index.deleteDocument(id as string)));
logger.debug(`[cleanupMeiliIndex] Deleted ${toDelete.length} orphaned documents`);
}
offset += batchSize;
// Add delay between batches
if (delayMs > 0) {
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
}
} catch (error) {
logger.error('[cleanupMeiliIndex] Error during cleanup:', error);
}
}
@ -313,18 +453,29 @@ const createMeiliMongooseModel = ({
}
/**
* Adds the current document to the MeiliSearch index
* Adds the current document to the MeiliSearch index with retry logic
*/
async addObjectToMeili(
this: DocumentWithMeiliIndex,
next: CallbackWithoutResultAndOptionalError,
): Promise<void> {
const object = this.preprocessObjectForIndex!();
try {
await index.addDocuments([object]);
} catch (error) {
logger.error('[addObjectToMeili] Error adding document to Meili:', error);
return next();
const maxRetries = 3;
let retryCount = 0;
while (retryCount < maxRetries) {
try {
await index.addDocuments([object]);
break;
} catch (error) {
retryCount++;
if (retryCount >= maxRetries) {
logger.error('[addObjectToMeili] Error adding document to Meili after retries:', error);
return next();
}
// Exponential backoff
await new Promise((resolve) => setTimeout(resolve, Math.pow(2, retryCount) * 1000));
}
}
try {
@ -445,6 +596,8 @@ const createMeiliMongooseModel = ({
* @param options.apiKey - The MeiliSearch API key.
* @param options.indexName - The name of the MeiliSearch index.
* @param options.primaryKey - The primary key field for indexing.
* @param options.syncBatchSize - Batch size for sync operations.
* @param options.syncDelayMs - Delay between batches in milliseconds.
*/
export default function mongoMeili(schema: Schema, options: MongoMeiliOptions): void {
const mongoose = options.mongoose;
@ -461,11 +614,38 @@ export default function mongoMeili(schema: Schema, options: MongoMeiliOptions):
});
const { host, apiKey, indexName, primaryKey } = options;
const syncOptions = {
batchSize: options.syncBatchSize || getSyncConfig().batchSize,
delayMs: options.syncDelayMs || getSyncConfig().delayMs,
};
const client = new MeiliSearch({ host, apiKey });
client.createIndex(indexName, { primaryKey });
/** Create index only if it doesn't exist */
const index = client.index<MeiliIndexable>(indexName);
// Check if index exists and create if needed
(async () => {
try {
await index.getRawInfo();
logger.debug(`[mongoMeili] Index ${indexName} already exists`);
} catch (error) {
const errorCode = (error as { code?: string })?.code;
if (errorCode === 'index_not_found') {
try {
logger.info(`[mongoMeili] Creating new index: ${indexName}`);
await client.createIndex(indexName, { primaryKey });
logger.info(`[mongoMeili] Successfully created index: ${indexName}`);
} catch (createError) {
// Index might have been created by another instance
logger.debug(`[mongoMeili] Index ${indexName} may already exist:`, createError);
}
} else {
logger.error(`[mongoMeili] Error checking index ${indexName}:`, error);
}
}
})();
// Collect attributes from the schema that should be indexed
const attributesToIndex: string[] = [
...Object.entries(schema.obj).reduce<string[]>((results, [key, value]) => {
@ -474,7 +654,7 @@ export default function mongoMeili(schema: Schema, options: MongoMeiliOptions):
}, []),
];
schema.loadClass(createMeiliMongooseModel({ index, attributesToIndex }));
schema.loadClass(createMeiliMongooseModel({ index, attributesToIndex, syncOptions }));
// Register Mongoose hooks
schema.post('save', function (doc: DocumentWithMeiliIndex, next) {
@ -497,17 +677,23 @@ export default function mongoMeili(schema: Schema, options: MongoMeiliOptions):
try {
const conditions = (this as Query<unknown, unknown>).getQuery();
const { batchSize, delayMs } = syncOptions;
if (Object.prototype.hasOwnProperty.call(schema.obj, 'messages')) {
const convoIndex = client.index('convos');
const deletedConvos = await mongoose
.model('Conversation')
.find(conditions as FilterQuery<unknown>)
.select('conversationId')
.lean();
const promises = deletedConvos.map((convo: Record<string, unknown>) =>
convoIndex.deleteDocument(convo.conversationId as string),
);
await Promise.all(promises);
// Process deletions in batches
await processBatch(deletedConvos, batchSize, delayMs, async (batch) => {
const promises = batch.map((convo: Record<string, unknown>) =>
convoIndex.deleteDocument(convo.conversationId as string),
);
await Promise.all(promises);
});
}
if (Object.prototype.hasOwnProperty.call(schema.obj, 'messageId')) {
@ -515,11 +701,16 @@ export default function mongoMeili(schema: Schema, options: MongoMeiliOptions):
const deletedMessages = await mongoose
.model('Message')
.find(conditions as FilterQuery<unknown>)
.select('messageId')
.lean();
const promises = deletedMessages.map((message: Record<string, unknown>) =>
messageIndex.deleteDocument(message.messageId as string),
);
await Promise.all(promises);
// Process deletions in batches
await processBatch(deletedMessages, batchSize, delayMs, async (batch) => {
const promises = batch.map((message: Record<string, unknown>) =>
messageIndex.deleteDocument(message.messageId as string),
);
await Promise.all(promises);
});
}
return next();
} catch (error) {