mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-02-16 15:38:10 +01:00
🔧 refactor: batching documents on meili index reset (#11165)
* 🔧 refactor: batching documents on meili index reset Update on all documents can be very heavy on weak or low-tier instances 🔧 refactor: check if flag is enabled before calling meilisearch 🔧 fix: adding index to query documents to reset meili-search index status * 🔧 refactor: error handling 🔧 refactor: more unit-test coverage * 🔧 refactor: edge case error handling & tests
This commit is contained in:
parent
791dab8f20
commit
7d136edb40
6 changed files with 648 additions and 24 deletions
|
|
@ -4,6 +4,7 @@ const { logger } = require('@librechat/data-schemas');
|
||||||
const { CacheKeys } = require('librechat-data-provider');
|
const { CacheKeys } = require('librechat-data-provider');
|
||||||
const { isEnabled, FlowStateManager } = require('@librechat/api');
|
const { isEnabled, FlowStateManager } = require('@librechat/api');
|
||||||
const { getLogStores } = require('~/cache');
|
const { getLogStores } = require('~/cache');
|
||||||
|
const { batchResetMeiliFlags } = require('./utils');
|
||||||
|
|
||||||
const Conversation = mongoose.models.Conversation;
|
const Conversation = mongoose.models.Conversation;
|
||||||
const Message = mongoose.models.Message;
|
const Message = mongoose.models.Message;
|
||||||
|
|
@ -189,6 +190,11 @@ async function ensureFilterableAttributes(client) {
|
||||||
*/
|
*/
|
||||||
async function performSync(flowManager, flowId, flowType) {
|
async function performSync(flowManager, flowId, flowType) {
|
||||||
try {
|
try {
|
||||||
|
if (indexingDisabled === true) {
|
||||||
|
logger.info('[indexSync] Indexing is disabled, skipping...');
|
||||||
|
return { messagesSync: false, convosSync: false };
|
||||||
|
}
|
||||||
|
|
||||||
const client = MeiliSearchClient.getInstance();
|
const client = MeiliSearchClient.getInstance();
|
||||||
|
|
||||||
const { status } = await client.health();
|
const { status } = await client.health();
|
||||||
|
|
@ -196,11 +202,6 @@ async function performSync(flowManager, flowId, flowType) {
|
||||||
throw new Error('Meilisearch not available');
|
throw new Error('Meilisearch not available');
|
||||||
}
|
}
|
||||||
|
|
||||||
if (indexingDisabled === true) {
|
|
||||||
logger.info('[indexSync] Indexing is disabled, skipping...');
|
|
||||||
return { messagesSync: false, convosSync: false };
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Ensures indexes have proper filterable attributes configured */
|
/** Ensures indexes have proper filterable attributes configured */
|
||||||
const { settingsUpdated, orphanedDocsFound: _orphanedDocsFound } =
|
const { settingsUpdated, orphanedDocsFound: _orphanedDocsFound } =
|
||||||
await ensureFilterableAttributes(client);
|
await ensureFilterableAttributes(client);
|
||||||
|
|
@ -215,11 +216,8 @@ async function performSync(flowManager, flowId, flowType) {
|
||||||
);
|
);
|
||||||
|
|
||||||
// Reset sync flags to force full re-sync
|
// Reset sync flags to force full re-sync
|
||||||
await Message.collection.updateMany({ _meiliIndex: true }, { $set: { _meiliIndex: false } });
|
await batchResetMeiliFlags(Message.collection);
|
||||||
await Conversation.collection.updateMany(
|
await batchResetMeiliFlags(Conversation.collection);
|
||||||
{ _meiliIndex: true },
|
|
||||||
{ $set: { _meiliIndex: false } },
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if we need to sync messages
|
// Check if we need to sync messages
|
||||||
|
|
|
||||||
90
api/db/utils.js
Normal file
90
api/db/utils.js
Normal file
|
|
@ -0,0 +1,90 @@
|
||||||
|
const { logger } = require('@librechat/data-schemas');
|
||||||
|
|
||||||
|
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Batch update documents in chunks to avoid timeouts on weak instances
|
||||||
|
* @param {mongoose.Collection} collection - MongoDB collection
|
||||||
|
* @returns {Promise<number>} - Total modified count
|
||||||
|
* @throws {Error} - Throws if database operations fail (e.g., network issues, connection loss, permission problems)
|
||||||
|
*/
|
||||||
|
async function batchResetMeiliFlags(collection) {
|
||||||
|
const DEFAULT_BATCH_SIZE = 1000;
|
||||||
|
|
||||||
|
let BATCH_SIZE = parseEnvInt('MEILI_SYNC_BATCH_SIZE', DEFAULT_BATCH_SIZE);
|
||||||
|
if (BATCH_SIZE === 0) {
|
||||||
|
logger.warn(
|
||||||
|
`[batchResetMeiliFlags] MEILI_SYNC_BATCH_SIZE cannot be 0. Using default: ${DEFAULT_BATCH_SIZE}`,
|
||||||
|
);
|
||||||
|
BATCH_SIZE = DEFAULT_BATCH_SIZE;
|
||||||
|
}
|
||||||
|
|
||||||
|
const BATCH_DELAY_MS = parseEnvInt('MEILI_SYNC_DELAY_MS', 100);
|
||||||
|
let totalModified = 0;
|
||||||
|
let hasMore = true;
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (hasMore) {
|
||||||
|
const docs = await collection
|
||||||
|
.find({ expiredAt: null, _meiliIndex: true }, { projection: { _id: 1 } })
|
||||||
|
.limit(BATCH_SIZE)
|
||||||
|
.toArray();
|
||||||
|
|
||||||
|
if (docs.length === 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
const ids = docs.map((doc) => doc._id);
|
||||||
|
const result = await collection.updateMany(
|
||||||
|
{ _id: { $in: ids } },
|
||||||
|
{ $set: { _meiliIndex: false } },
|
||||||
|
);
|
||||||
|
|
||||||
|
totalModified += result.modifiedCount;
|
||||||
|
process.stdout.write(
|
||||||
|
`\r Updating ${collection.collectionName}: ${totalModified} documents...`,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (docs.length < BATCH_SIZE) {
|
||||||
|
hasMore = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasMore && BATCH_DELAY_MS > 0) {
|
||||||
|
await sleep(BATCH_DELAY_MS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalModified;
|
||||||
|
} catch (error) {
|
||||||
|
throw new Error(
|
||||||
|
`Failed to batch reset Meili flags for collection '${collection.collectionName}' after processing ${totalModified} documents: ${error.message}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse and validate an environment variable as a positive integer
|
||||||
|
* @param {string} varName - Environment variable name
|
||||||
|
* @param {number} defaultValue - Default value to use if invalid or missing
|
||||||
|
* @returns {number} - Parsed value or default
|
||||||
|
*/
|
||||||
|
function parseEnvInt(varName, defaultValue) {
|
||||||
|
const value = process.env[varName];
|
||||||
|
if (!value) {
|
||||||
|
return defaultValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const parsed = parseInt(value, 10);
|
||||||
|
if (isNaN(parsed) || parsed < 0) {
|
||||||
|
logger.warn(
|
||||||
|
`[batchResetMeiliFlags] Invalid value for ${varName}="${value}". Expected a positive integer. Using default: ${defaultValue}`,
|
||||||
|
);
|
||||||
|
return defaultValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
return parsed;
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
batchResetMeiliFlags,
|
||||||
|
};
|
||||||
521
api/db/utils.spec.js
Normal file
521
api/db/utils.spec.js
Normal file
|
|
@ -0,0 +1,521 @@
|
||||||
|
const mongoose = require('mongoose');
|
||||||
|
const { MongoMemoryServer } = require('mongodb-memory-server');
|
||||||
|
const { batchResetMeiliFlags } = require('./utils');
|
||||||
|
|
||||||
|
describe('batchResetMeiliFlags', () => {
|
||||||
|
let mongoServer;
|
||||||
|
let testCollection;
|
||||||
|
const ORIGINAL_BATCH_SIZE = process.env.MEILI_SYNC_BATCH_SIZE;
|
||||||
|
const ORIGINAL_BATCH_DELAY = process.env.MEILI_SYNC_DELAY_MS;
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
mongoServer = await MongoMemoryServer.create();
|
||||||
|
const mongoUri = mongoServer.getUri();
|
||||||
|
await mongoose.connect(mongoUri);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
await mongoose.disconnect();
|
||||||
|
await mongoServer.stop();
|
||||||
|
|
||||||
|
// Restore original env variables
|
||||||
|
if (ORIGINAL_BATCH_SIZE !== undefined) {
|
||||||
|
process.env.MEILI_SYNC_BATCH_SIZE = ORIGINAL_BATCH_SIZE;
|
||||||
|
} else {
|
||||||
|
delete process.env.MEILI_SYNC_BATCH_SIZE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ORIGINAL_BATCH_DELAY !== undefined) {
|
||||||
|
process.env.MEILI_SYNC_DELAY_MS = ORIGINAL_BATCH_DELAY;
|
||||||
|
} else {
|
||||||
|
delete process.env.MEILI_SYNC_DELAY_MS;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
// Create a fresh collection for each test
|
||||||
|
testCollection = mongoose.connection.db.collection('test_meili_batch');
|
||||||
|
await testCollection.deleteMany({});
|
||||||
|
|
||||||
|
// Reset env variables to defaults
|
||||||
|
delete process.env.MEILI_SYNC_BATCH_SIZE;
|
||||||
|
delete process.env.MEILI_SYNC_DELAY_MS;
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
if (testCollection) {
|
||||||
|
await testCollection.deleteMany({});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('basic functionality', () => {
|
||||||
|
it('should reset _meiliIndex flag for documents with expiredAt: null and _meiliIndex: true', async () => {
|
||||||
|
// Insert test documents
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true, name: 'doc1' },
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true, name: 'doc2' },
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true, name: 'doc3' },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(3);
|
||||||
|
|
||||||
|
const updatedDocs = await testCollection.find({ _meiliIndex: false }).toArray();
|
||||||
|
expect(updatedDocs).toHaveLength(3);
|
||||||
|
|
||||||
|
const notUpdatedDocs = await testCollection.find({ _meiliIndex: true }).toArray();
|
||||||
|
expect(notUpdatedDocs).toHaveLength(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not modify documents with expiredAt set', async () => {
|
||||||
|
const expiredDate = new Date();
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: expiredDate, _meiliIndex: true },
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(1);
|
||||||
|
|
||||||
|
const expiredDoc = await testCollection.findOne({ expiredAt: expiredDate });
|
||||||
|
expect(expiredDoc._meiliIndex).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not modify documents with _meiliIndex: false', async () => {
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: false },
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return 0 when no documents match the criteria', async () => {
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: new Date(), _meiliIndex: true },
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: false },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return 0 when collection is empty', async () => {
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('batch processing', () => {
|
||||||
|
it('should process documents in batches according to MEILI_SYNC_BATCH_SIZE', async () => {
|
||||||
|
process.env.MEILI_SYNC_BATCH_SIZE = '2';
|
||||||
|
|
||||||
|
const docs = [];
|
||||||
|
for (let i = 0; i < 5; i++) {
|
||||||
|
docs.push({
|
||||||
|
_id: new mongoose.Types.ObjectId(),
|
||||||
|
expiredAt: null,
|
||||||
|
_meiliIndex: true,
|
||||||
|
name: `doc${i}`,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
await testCollection.insertMany(docs);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(5);
|
||||||
|
|
||||||
|
const updatedDocs = await testCollection.find({ _meiliIndex: false }).toArray();
|
||||||
|
expect(updatedDocs).toHaveLength(5);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should handle large datasets with small batch sizes', async () => {
|
||||||
|
process.env.MEILI_SYNC_BATCH_SIZE = '10';
|
||||||
|
|
||||||
|
const docs = [];
|
||||||
|
for (let i = 0; i < 25; i++) {
|
||||||
|
docs.push({
|
||||||
|
_id: new mongoose.Types.ObjectId(),
|
||||||
|
expiredAt: null,
|
||||||
|
_meiliIndex: true,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
await testCollection.insertMany(docs);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(25);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should use default batch size of 1000 when env variable is not set', async () => {
|
||||||
|
// Create exactly 1000 documents to verify default batch behavior
|
||||||
|
const docs = [];
|
||||||
|
for (let i = 0; i < 1000; i++) {
|
||||||
|
docs.push({
|
||||||
|
_id: new mongoose.Types.ObjectId(),
|
||||||
|
expiredAt: null,
|
||||||
|
_meiliIndex: true,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
await testCollection.insertMany(docs);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(1000);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('return value', () => {
|
||||||
|
it('should return correct modified count', async () => {
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
]);
|
||||||
|
|
||||||
|
await expect(batchResetMeiliFlags(testCollection)).resolves.toBe(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('batch delay', () => {
|
||||||
|
it('should respect MEILI_SYNC_DELAY_MS between batches', async () => {
|
||||||
|
process.env.MEILI_SYNC_BATCH_SIZE = '2';
|
||||||
|
process.env.MEILI_SYNC_DELAY_MS = '50';
|
||||||
|
|
||||||
|
const docs = [];
|
||||||
|
for (let i = 0; i < 5; i++) {
|
||||||
|
docs.push({
|
||||||
|
_id: new mongoose.Types.ObjectId(),
|
||||||
|
expiredAt: null,
|
||||||
|
_meiliIndex: true,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
await testCollection.insertMany(docs);
|
||||||
|
|
||||||
|
const startTime = Date.now();
|
||||||
|
await batchResetMeiliFlags(testCollection);
|
||||||
|
const endTime = Date.now();
|
||||||
|
|
||||||
|
// With 5 documents and batch size 2, we need 3 batches
|
||||||
|
// That means 2 delays between batches (not after the last one)
|
||||||
|
// So minimum time should be around 100ms (2 * 50ms)
|
||||||
|
// Using a slightly lower threshold to account for timing variations
|
||||||
|
const elapsed = endTime - startTime;
|
||||||
|
expect(elapsed).toBeGreaterThanOrEqual(80);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not delay when MEILI_SYNC_DELAY_MS is 0', async () => {
|
||||||
|
process.env.MEILI_SYNC_BATCH_SIZE = '2';
|
||||||
|
process.env.MEILI_SYNC_DELAY_MS = '0';
|
||||||
|
|
||||||
|
const docs = [];
|
||||||
|
for (let i = 0; i < 5; i++) {
|
||||||
|
docs.push({
|
||||||
|
_id: new mongoose.Types.ObjectId(),
|
||||||
|
expiredAt: null,
|
||||||
|
_meiliIndex: true,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
await testCollection.insertMany(docs);
|
||||||
|
|
||||||
|
const startTime = Date.now();
|
||||||
|
await batchResetMeiliFlags(testCollection);
|
||||||
|
const endTime = Date.now();
|
||||||
|
|
||||||
|
const elapsed = endTime - startTime;
|
||||||
|
// Should complete without intentional delays, but database operations still take time
|
||||||
|
// Just verify it completes and returns the correct count
|
||||||
|
expect(elapsed).toBeLessThan(1000); // More reasonable upper bound
|
||||||
|
|
||||||
|
const result = await testCollection.countDocuments({ _meiliIndex: false });
|
||||||
|
expect(result).toBe(5);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not delay after the last batch', async () => {
|
||||||
|
process.env.MEILI_SYNC_BATCH_SIZE = '3';
|
||||||
|
process.env.MEILI_SYNC_DELAY_MS = '100';
|
||||||
|
|
||||||
|
// Exactly 3 documents - should fit in one batch, no delay
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
// Verify all 3 documents were processed in a single batch
|
||||||
|
expect(result).toBe(3);
|
||||||
|
|
||||||
|
const updatedDocs = await testCollection.countDocuments({ _meiliIndex: false });
|
||||||
|
expect(updatedDocs).toBe(3);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('edge cases', () => {
|
||||||
|
it('should handle documents without _meiliIndex field', async () => {
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null },
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
// Only one document has _meiliIndex: true
|
||||||
|
expect(result).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should handle mixed document states correctly', async () => {
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: false },
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: new Date(), _meiliIndex: true },
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(2);
|
||||||
|
|
||||||
|
const flaggedDocs = await testCollection
|
||||||
|
.find({ expiredAt: null, _meiliIndex: false })
|
||||||
|
.toArray();
|
||||||
|
expect(flaggedDocs).toHaveLength(3); // 2 were updated, 1 was already false
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('error handling', () => {
|
||||||
|
it('should throw error with context when find operation fails', async () => {
|
||||||
|
const mockCollection = {
|
||||||
|
collectionName: 'test_meili_batch',
|
||||||
|
find: jest.fn().mockReturnValue({
|
||||||
|
limit: jest.fn().mockReturnValue({
|
||||||
|
toArray: jest.fn().mockRejectedValue(new Error('Network error')),
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
await expect(batchResetMeiliFlags(mockCollection)).rejects.toThrow(
|
||||||
|
"Failed to batch reset Meili flags for collection 'test_meili_batch' after processing 0 documents: Network error",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw error with context when updateMany operation fails', async () => {
|
||||||
|
const mockCollection = {
|
||||||
|
collectionName: 'test_meili_batch',
|
||||||
|
find: jest.fn().mockReturnValue({
|
||||||
|
limit: jest.fn().mockReturnValue({
|
||||||
|
toArray: jest
|
||||||
|
.fn()
|
||||||
|
.mockResolvedValue([
|
||||||
|
{ _id: new mongoose.Types.ObjectId() },
|
||||||
|
{ _id: new mongoose.Types.ObjectId() },
|
||||||
|
]),
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
updateMany: jest.fn().mockRejectedValue(new Error('Connection lost')),
|
||||||
|
};
|
||||||
|
|
||||||
|
await expect(batchResetMeiliFlags(mockCollection)).rejects.toThrow(
|
||||||
|
"Failed to batch reset Meili flags for collection 'test_meili_batch' after processing 0 documents: Connection lost",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should include documents processed count in error when failure occurs mid-batch', async () => {
|
||||||
|
// Set batch size to 2 to force multiple batches
|
||||||
|
process.env.MEILI_SYNC_BATCH_SIZE = '2';
|
||||||
|
process.env.MEILI_SYNC_DELAY_MS = '0'; // No delay for faster test
|
||||||
|
|
||||||
|
let findCallCount = 0;
|
||||||
|
let updateCallCount = 0;
|
||||||
|
|
||||||
|
const mockCollection = {
|
||||||
|
collectionName: 'test_meili_batch',
|
||||||
|
find: jest.fn().mockReturnValue({
|
||||||
|
limit: jest.fn().mockReturnValue({
|
||||||
|
toArray: jest.fn().mockImplementation(() => {
|
||||||
|
findCallCount++;
|
||||||
|
// Return 2 documents for first two calls (to keep loop going)
|
||||||
|
// Return 2 documents for third call (to trigger third update which will fail)
|
||||||
|
if (findCallCount <= 3) {
|
||||||
|
return Promise.resolve([
|
||||||
|
{ _id: new mongoose.Types.ObjectId() },
|
||||||
|
{ _id: new mongoose.Types.ObjectId() },
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
// Should not reach here due to error
|
||||||
|
return Promise.resolve([]);
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
updateMany: jest.fn().mockImplementation(() => {
|
||||||
|
updateCallCount++;
|
||||||
|
if (updateCallCount === 1) {
|
||||||
|
return Promise.resolve({ modifiedCount: 2 });
|
||||||
|
} else if (updateCallCount === 2) {
|
||||||
|
return Promise.resolve({ modifiedCount: 2 });
|
||||||
|
} else {
|
||||||
|
return Promise.reject(new Error('Database timeout'));
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
await expect(batchResetMeiliFlags(mockCollection)).rejects.toThrow(
|
||||||
|
"Failed to batch reset Meili flags for collection 'test_meili_batch' after processing 4 documents: Database timeout",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should use collection.collectionName in error messages', async () => {
|
||||||
|
const mockCollection = {
|
||||||
|
collectionName: 'messages',
|
||||||
|
find: jest.fn().mockReturnValue({
|
||||||
|
limit: jest.fn().mockReturnValue({
|
||||||
|
toArray: jest.fn().mockRejectedValue(new Error('Permission denied')),
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
await expect(batchResetMeiliFlags(mockCollection)).rejects.toThrow(
|
||||||
|
"Failed to batch reset Meili flags for collection 'messages' after processing 0 documents: Permission denied",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('environment variable validation', () => {
|
||||||
|
let warnSpy;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
// Mock logger.warn to track warning calls
|
||||||
|
const { logger } = require('@librechat/data-schemas');
|
||||||
|
warnSpy = jest.spyOn(logger, 'warn').mockImplementation(() => {});
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
if (warnSpy) {
|
||||||
|
warnSpy.mockRestore();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should log warning and use default when MEILI_SYNC_BATCH_SIZE is not a number', async () => {
|
||||||
|
process.env.MEILI_SYNC_BATCH_SIZE = 'abc';
|
||||||
|
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(1);
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining('Invalid value for MEILI_SYNC_BATCH_SIZE="abc"'),
|
||||||
|
);
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('Using default: 1000'));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should log warning and use default when MEILI_SYNC_DELAY_MS is not a number', async () => {
|
||||||
|
process.env.MEILI_SYNC_DELAY_MS = 'xyz';
|
||||||
|
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(1);
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining('Invalid value for MEILI_SYNC_DELAY_MS="xyz"'),
|
||||||
|
);
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('Using default: 100'));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should log warning and use default when MEILI_SYNC_BATCH_SIZE is negative', async () => {
|
||||||
|
process.env.MEILI_SYNC_BATCH_SIZE = '-50';
|
||||||
|
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(1);
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining('Invalid value for MEILI_SYNC_BATCH_SIZE="-50"'),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should log warning and use default when MEILI_SYNC_DELAY_MS is negative', async () => {
|
||||||
|
process.env.MEILI_SYNC_DELAY_MS = '-100';
|
||||||
|
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(1);
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining('Invalid value for MEILI_SYNC_DELAY_MS="-100"'),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should accept valid positive integer values without warnings', async () => {
|
||||||
|
process.env.MEILI_SYNC_BATCH_SIZE = '500';
|
||||||
|
process.env.MEILI_SYNC_DELAY_MS = '50';
|
||||||
|
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(1);
|
||||||
|
expect(warnSpy).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should log warning and use default when MEILI_SYNC_BATCH_SIZE is zero', async () => {
|
||||||
|
process.env.MEILI_SYNC_BATCH_SIZE = '0';
|
||||||
|
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(1);
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining('MEILI_SYNC_BATCH_SIZE cannot be 0. Using default: 1000'),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should accept zero as a valid value for MEILI_SYNC_DELAY_MS without warnings', async () => {
|
||||||
|
process.env.MEILI_SYNC_DELAY_MS = '0';
|
||||||
|
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(1);
|
||||||
|
expect(warnSpy).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not log warnings when environment variables are not set', async () => {
|
||||||
|
delete process.env.MEILI_SYNC_BATCH_SIZE;
|
||||||
|
delete process.env.MEILI_SYNC_DELAY_MS;
|
||||||
|
|
||||||
|
await testCollection.insertMany([
|
||||||
|
{ _id: new mongoose.Types.ObjectId(), expiredAt: null, _meiliIndex: true },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await batchResetMeiliFlags(testCollection);
|
||||||
|
|
||||||
|
expect(result).toBe(1);
|
||||||
|
expect(warnSpy).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
@ -1,8 +1,10 @@
|
||||||
const path = require('path');
|
const path = require('path');
|
||||||
const mongoose = require('mongoose');
|
const mongoose = require('mongoose');
|
||||||
require('module-alias')({ base: path.resolve(__dirname, '..', 'api') });
|
require('module-alias')({ base: path.resolve(__dirname, '..', 'api') });
|
||||||
|
require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') });
|
||||||
const { askQuestion, silentExit } = require('./helpers');
|
const { askQuestion, silentExit } = require('./helpers');
|
||||||
const connect = require('./connect');
|
const connect = require('./connect');
|
||||||
|
const { batchResetMeiliFlags } = require('~/db/utils');
|
||||||
|
|
||||||
(async () => {
|
(async () => {
|
||||||
await connect();
|
await connect();
|
||||||
|
|
@ -24,32 +26,39 @@ const connect = require('./connect');
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
const clearProgress = () => process.stdout.write('\r' + ' '.repeat(70) + '\r');
|
||||||
|
|
||||||
// Reset _meiliIndex flags for messages
|
// Reset _meiliIndex flags for messages
|
||||||
console.cyan('\nResetting message sync flags...');
|
console.cyan('\nResetting message sync flags...');
|
||||||
const messageResult = await mongoose.connection.db
|
const messages = mongoose.connection.db.collection('messages');
|
||||||
.collection('messages')
|
const messageModifiedCount = await batchResetMeiliFlags(messages);
|
||||||
.updateMany({ _meiliIndex: true }, { $set: { _meiliIndex: false } });
|
clearProgress();
|
||||||
|
console.green(`✓ Reset ${messageModifiedCount} message sync flags`);
|
||||||
console.green(`✓ Reset ${messageResult.modifiedCount} message sync flags`);
|
|
||||||
|
|
||||||
// Reset _meiliIndex flags for conversations
|
// Reset _meiliIndex flags for conversations
|
||||||
console.cyan('\nResetting conversation sync flags...');
|
console.cyan('\nResetting conversation sync flags...');
|
||||||
const conversationResult = await mongoose.connection.db
|
const conversationsCollection = mongoose.connection.db.collection('conversations');
|
||||||
.collection('conversations')
|
const conversationModifiedCount = await batchResetMeiliFlags(conversationsCollection);
|
||||||
.updateMany({ _meiliIndex: true }, { $set: { _meiliIndex: false } });
|
clearProgress();
|
||||||
|
console.green(`✓ Reset ${conversationModifiedCount} conversation sync flags`);
|
||||||
|
|
||||||
console.green(`✓ Reset ${conversationResult.modifiedCount} conversation sync flags`);
|
// Query to count only non-expired documents that are queued for sync (_meiliIndex: false)
|
||||||
|
// This represents documents that need to be indexed, not the total collection size
|
||||||
|
const queryTotal = { expiredAt: null, _meiliIndex: false };
|
||||||
|
|
||||||
// Get current counts
|
// Get current counts of documents queued for sync
|
||||||
const totalMessages = await mongoose.connection.db.collection('messages').countDocuments();
|
const totalMessages = await mongoose.connection.db
|
||||||
|
.collection('messages')
|
||||||
|
.countDocuments(queryTotal);
|
||||||
const totalConversations = await mongoose.connection.db
|
const totalConversations = await mongoose.connection.db
|
||||||
.collection('conversations')
|
.collection('conversations')
|
||||||
.countDocuments();
|
.countDocuments(queryTotal);
|
||||||
|
|
||||||
console.purple('\n---------------------------------------');
|
console.purple('\n---------------------------------------');
|
||||||
console.green('MeiliSearch sync flags have been reset successfully!');
|
console.green('MeiliSearch sync flags have been reset successfully!');
|
||||||
console.cyan(`\nTotal messages to sync: ${totalMessages}`);
|
console.cyan(`\nDocuments queued for sync:`);
|
||||||
console.cyan(`Total conversations to sync: ${totalConversations}`);
|
console.cyan(`Messages: ${totalMessages}`);
|
||||||
|
console.cyan(`Conversations: ${totalConversations}`);
|
||||||
console.yellow('\nThe next time LibreChat starts or performs a sync check,');
|
console.yellow('\nThe next time LibreChat starts or performs a sync check,');
|
||||||
console.yellow('all data will be re-indexed into MeiliSearch.');
|
console.yellow('all data will be re-indexed into MeiliSearch.');
|
||||||
console.purple('---------------------------------------\n');
|
console.purple('---------------------------------------\n');
|
||||||
|
|
|
||||||
|
|
@ -45,4 +45,7 @@ convoSchema.index({ expiredAt: 1 }, { expireAfterSeconds: 0 });
|
||||||
convoSchema.index({ createdAt: 1, updatedAt: 1 });
|
convoSchema.index({ createdAt: 1, updatedAt: 1 });
|
||||||
convoSchema.index({ conversationId: 1, user: 1 }, { unique: true });
|
convoSchema.index({ conversationId: 1, user: 1 }, { unique: true });
|
||||||
|
|
||||||
|
// index for MeiliSearch sync operations
|
||||||
|
convoSchema.index({ _meiliIndex: 1, expiredAt: 1 });
|
||||||
|
|
||||||
export default convoSchema;
|
export default convoSchema;
|
||||||
|
|
|
||||||
|
|
@ -148,4 +148,7 @@ messageSchema.index({ expiredAt: 1 }, { expireAfterSeconds: 0 });
|
||||||
messageSchema.index({ createdAt: 1 });
|
messageSchema.index({ createdAt: 1 });
|
||||||
messageSchema.index({ messageId: 1, user: 1 }, { unique: true });
|
messageSchema.index({ messageId: 1, user: 1 }, { unique: true });
|
||||||
|
|
||||||
|
// index for MeiliSearch sync operations
|
||||||
|
messageSchema.index({ _meiliIndex: 1, expiredAt: 1 });
|
||||||
|
|
||||||
export default messageSchema;
|
export default messageSchema;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue