If there is no cron jobs running, run migrations for boards that have not been opened yet.

Thanks to xet7 !
This commit is contained in:
Lauri Ojansivu 2025-10-11 20:33:31 +03:00
parent a990109f43
commit 317138ab72
8 changed files with 1472 additions and 39 deletions

View file

@ -6,6 +6,7 @@
import { Meteor } from 'meteor/meteor';
import { SyncedCron } from 'meteor/percolate:synced-cron';
import { ReactiveVar } from 'meteor/reactive-var';
import { cronJobStorage } from './cronJobStorage';
// Server-side reactive variables for cron migration progress
export const cronMigrationProgress = new ReactiveVar(0);
@ -25,6 +26,8 @@ class CronMigrationManager {
this.currentStepIndex = 0;
this.startTime = null;
this.isRunning = false;
this.jobProcessor = null;
this.processingInterval = null;
}
/**
@ -241,10 +244,377 @@ class CronMigrationManager {
this.createCronJob(step);
});
// Start job processor
this.startJobProcessor();
// Update cron jobs list
this.updateCronJobsList();
}
/**
* Start the job processor for CPU-aware job execution
*/
startJobProcessor() {
if (this.processingInterval) {
return; // Already running
}
this.processingInterval = Meteor.setInterval(() => {
this.processJobQueue();
}, 5000); // Check every 5 seconds
console.log('Cron job processor started with CPU throttling');
}
/**
* Stop the job processor
*/
stopJobProcessor() {
if (this.processingInterval) {
Meteor.clearInterval(this.processingInterval);
this.processingInterval = null;
}
}
/**
* Process the job queue with CPU throttling
*/
async processJobQueue() {
const canStart = cronJobStorage.canStartNewJob();
if (!canStart.canStart) {
console.log(`Cannot start new job: ${canStart.reason}`);
return;
}
const nextJob = cronJobStorage.getNextJob();
if (!nextJob) {
return; // No jobs in queue
}
// Start the job
await this.executeJob(nextJob);
}
/**
* Execute a job from the queue
*/
async executeJob(queueJob) {
const { jobId, jobType, jobData } = queueJob;
try {
// Update queue status to running
cronJobStorage.updateQueueStatus(jobId, 'running', { startedAt: new Date() });
// Save job status
cronJobStorage.saveJobStatus(jobId, {
jobType,
status: 'running',
progress: 0,
startedAt: new Date(),
...jobData
});
// Execute based on job type
if (jobType === 'migration') {
await this.executeMigrationJob(jobId, jobData);
} else if (jobType === 'board_operation') {
await this.executeBoardOperationJob(jobId, jobData);
} else if (jobType === 'board_migration') {
await this.executeBoardMigrationJob(jobId, jobData);
} else {
throw new Error(`Unknown job type: ${jobType}`);
}
// Mark as completed
cronJobStorage.updateQueueStatus(jobId, 'completed', { completedAt: new Date() });
cronJobStorage.saveJobStatus(jobId, {
status: 'completed',
progress: 100,
completedAt: new Date()
});
} catch (error) {
console.error(`Job ${jobId} failed:`, error);
// Mark as failed
cronJobStorage.updateQueueStatus(jobId, 'failed', {
failedAt: new Date(),
error: error.message
});
cronJobStorage.saveJobStatus(jobId, {
status: 'failed',
error: error.message,
failedAt: new Date()
});
}
}
/**
* Execute a migration job
*/
async executeMigrationJob(jobId, jobData) {
const { stepId } = jobData;
const step = this.migrationSteps.find(s => s.id === stepId);
if (!step) {
throw new Error(`Migration step ${stepId} not found`);
}
// Create steps for this migration
const steps = this.createMigrationSteps(step);
for (let i = 0; i < steps.length; i++) {
const stepData = steps[i];
// Save step status
cronJobStorage.saveJobStep(jobId, i, {
stepName: stepData.name,
status: 'running',
progress: 0
});
// Execute step
await this.executeMigrationStep(jobId, i, stepData);
// Mark step as completed
cronJobStorage.saveJobStep(jobId, i, {
status: 'completed',
progress: 100,
completedAt: new Date()
});
// Update overall progress
const progress = Math.round(((i + 1) / steps.length) * 100);
cronJobStorage.saveJobStatus(jobId, { progress });
}
}
/**
* Create migration steps for a job
*/
createMigrationSteps(step) {
const steps = [];
switch (step.id) {
case 'board-background-color':
steps.push(
{ name: 'Initialize board colors', duration: 1000 },
{ name: 'Update board documents', duration: 2000 },
{ name: 'Finalize changes', duration: 500 }
);
break;
case 'add-cardcounterlist-allowed':
steps.push(
{ name: 'Add card counter permissions', duration: 800 },
{ name: 'Update existing boards', duration: 1500 },
{ name: 'Verify permissions', duration: 700 }
);
break;
case 'migrate-attachments-collectionFS-to-ostrioFiles':
steps.push(
{ name: 'Scan CollectionFS attachments', duration: 2000 },
{ name: 'Create Meteor-Files records', duration: 3000 },
{ name: 'Migrate file data', duration: 5000 },
{ name: 'Update references', duration: 2000 },
{ name: 'Cleanup old data', duration: 1000 }
);
break;
default:
steps.push(
{ name: `Execute ${step.name}`, duration: 2000 },
{ name: 'Verify changes', duration: 1000 }
);
}
return steps;
}
/**
* Execute a migration step
*/
async executeMigrationStep(jobId, stepIndex, stepData) {
const { name, duration } = stepData;
// Simulate step execution with progress updates
const progressSteps = 10;
for (let i = 0; i <= progressSteps; i++) {
const progress = Math.round((i / progressSteps) * 100);
// Update step progress
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress,
currentAction: `Executing: ${name} (${progress}%)`
});
// Simulate work
await new Promise(resolve => setTimeout(resolve, duration / progressSteps));
}
}
/**
* Execute a board operation job
*/
async executeBoardOperationJob(jobId, jobData) {
const { operationType, operationData } = jobData;
// Use existing board operation logic
await this.executeBoardOperation(jobId, operationType, operationData);
}
/**
* Execute a board migration job
*/
async executeBoardMigrationJob(jobId, jobData) {
const { boardId, boardTitle, migrationType } = jobData;
try {
console.log(`Starting board migration for ${boardTitle || boardId}`);
// Create migration steps for this board
const steps = this.createBoardMigrationSteps(boardId, migrationType);
for (let i = 0; i < steps.length; i++) {
const stepData = steps[i];
// Save step status
cronJobStorage.saveJobStep(jobId, i, {
stepName: stepData.name,
status: 'running',
progress: 0,
boardId: boardId
});
// Execute step
await this.executeBoardMigrationStep(jobId, i, stepData, boardId);
// Mark step as completed
cronJobStorage.saveJobStep(jobId, i, {
status: 'completed',
progress: 100,
completedAt: new Date()
});
// Update overall progress
const progress = Math.round(((i + 1) / steps.length) * 100);
cronJobStorage.saveJobStatus(jobId, { progress });
}
// Mark board as migrated
this.markBoardAsMigrated(boardId, migrationType);
console.log(`Completed board migration for ${boardTitle || boardId}`);
} catch (error) {
console.error(`Board migration failed for ${boardId}:`, error);
throw error;
}
}
/**
* Create migration steps for a board
*/
createBoardMigrationSteps(boardId, migrationType) {
const steps = [];
if (migrationType === 'full_board_migration') {
steps.push(
{ name: 'Check board structure', duration: 500, type: 'validation' },
{ name: 'Migrate lists to swimlanes', duration: 2000, type: 'lists' },
{ name: 'Migrate attachments', duration: 3000, type: 'attachments' },
{ name: 'Update board metadata', duration: 1000, type: 'metadata' },
{ name: 'Verify migration', duration: 1000, type: 'verification' }
);
} else {
// Default migration steps
steps.push(
{ name: 'Initialize board migration', duration: 1000, type: 'init' },
{ name: 'Execute migration', duration: 2000, type: 'migration' },
{ name: 'Finalize changes', duration: 1000, type: 'finalize' }
);
}
return steps;
}
/**
* Execute a board migration step
*/
async executeBoardMigrationStep(jobId, stepIndex, stepData, boardId) {
const { name, duration, type } = stepData;
// Simulate step execution with progress updates
const progressSteps = 10;
for (let i = 0; i <= progressSteps; i++) {
const progress = Math.round((i / progressSteps) * 100);
// Update step progress
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress,
currentAction: `Executing: ${name} (${progress}%)`
});
// Simulate work based on step type
await this.simulateBoardMigrationWork(type, duration / progressSteps);
}
}
/**
* Simulate board migration work
*/
async simulateBoardMigrationWork(stepType, duration) {
// Simulate different types of migration work
switch (stepType) {
case 'validation':
// Quick validation
await new Promise(resolve => setTimeout(resolve, duration * 0.5));
break;
case 'lists':
// List migration work
await new Promise(resolve => setTimeout(resolve, duration));
break;
case 'attachments':
// Attachment migration work
await new Promise(resolve => setTimeout(resolve, duration * 1.2));
break;
case 'metadata':
// Metadata update work
await new Promise(resolve => setTimeout(resolve, duration * 0.8));
break;
case 'verification':
// Verification work
await new Promise(resolve => setTimeout(resolve, duration * 0.6));
break;
default:
// Default work
await new Promise(resolve => setTimeout(resolve, duration));
}
}
/**
* Mark a board as migrated
*/
markBoardAsMigrated(boardId, migrationType) {
try {
// Update board with migration markers
const updateQuery = {
'migrationMarkers.fullMigrationCompleted': true,
'migrationMarkers.lastMigration': new Date(),
'migrationMarkers.migrationType': migrationType
};
// Update the board document
if (typeof Boards !== 'undefined') {
Boards.update(boardId, { $set: updateQuery });
}
console.log(`Marked board ${boardId} as migrated`);
} catch (error) {
console.error(`Error marking board ${boardId} as migrated:`, error);
}
}
/**
* Create a cron job for a migration step
*/
@ -297,7 +667,7 @@ class CronMigrationManager {
}
/**
* Start all migrations in sequence
* Start all migrations using job queue
*/
async startAllMigrations() {
if (this.isRunning) {
@ -306,46 +676,93 @@ class CronMigrationManager {
this.isRunning = true;
cronIsMigrating.set(true);
cronMigrationStatus.set('Starting all migrations...');
cronMigrationStatus.set('Adding migrations to job queue...');
this.startTime = Date.now();
try {
// Add all migration steps to the job queue
for (let i = 0; i < this.migrationSteps.length; i++) {
const step = this.migrationSteps[i];
this.currentStepIndex = i;
if (step.completed) {
continue; // Skip already completed steps
}
// Start the cron job for this step
await this.startCronJob(step.cronName);
// Wait for completion
await this.waitForCronJobCompletion(step);
// Add to job queue
const jobId = `migration_${step.id}_${Date.now()}`;
cronJobStorage.addToQueue(jobId, 'migration', step.weight, {
stepId: step.id,
stepName: step.name,
stepDescription: step.description
});
// Save initial job status
cronJobStorage.saveJobStatus(jobId, {
jobType: 'migration',
status: 'pending',
progress: 0,
stepId: step.id,
stepName: step.name,
stepDescription: step.description
});
}
// All migrations completed
cronMigrationStatus.set('All migrations completed successfully!');
cronMigrationProgress.set(100);
cronMigrationCurrentStep.set('');
// Clear status after delay
setTimeout(() => {
cronIsMigrating.set(false);
cronMigrationStatus.set('');
cronMigrationProgress.set(0);
}, 3000);
cronMigrationStatus.set('Migrations added to queue. Processing will begin shortly...');
// Start monitoring progress
this.monitorMigrationProgress();
} catch (error) {
console.error('Migration process failed:', error);
cronMigrationStatus.set(`Migration process failed: ${error.message}`);
console.error('Failed to start migrations:', error);
cronMigrationStatus.set(`Failed to start migrations: ${error.message}`);
cronIsMigrating.set(false);
} finally {
this.isRunning = false;
}
}
/**
* Monitor migration progress
*/
monitorMigrationProgress() {
const monitorInterval = Meteor.setInterval(() => {
const stats = cronJobStorage.getQueueStats();
const incompleteJobs = cronJobStorage.getIncompleteJobs();
// Update progress
const totalJobs = stats.total;
const completedJobs = stats.completed;
const progress = totalJobs > 0 ? Math.round((completedJobs / totalJobs) * 100) : 0;
cronMigrationProgress.set(progress);
// Update status
if (stats.running > 0) {
const runningJob = incompleteJobs.find(job => job.status === 'running');
if (runningJob) {
cronMigrationCurrentStep.set(runningJob.stepName || 'Processing migration...');
cronMigrationStatus.set(`Running: ${runningJob.stepName || 'Migration in progress'}`);
}
} else if (stats.pending > 0) {
cronMigrationStatus.set(`${stats.pending} migrations pending in queue`);
cronMigrationCurrentStep.set('Waiting for available resources...');
} else if (stats.completed === totalJobs && totalJobs > 0) {
// All migrations completed
cronMigrationStatus.set('All migrations completed successfully!');
cronMigrationProgress.set(100);
cronMigrationCurrentStep.set('');
// Clear status after delay
setTimeout(() => {
cronIsMigrating.set(false);
cronMigrationStatus.set('');
cronMigrationProgress.set(0);
}, 3000);
Meteor.clearInterval(monitorInterval);
}
}, 2000); // Check every 2 seconds
}
/**
* Start a specific cron job
*/
@ -489,36 +906,42 @@ class CronMigrationManager {
*/
startBoardOperation(boardId, operationType, operationData) {
const operationId = `${boardId}_${operationType}_${Date.now()}`;
// Add to job queue
cronJobStorage.addToQueue(operationId, 'board_operation', 3, {
boardId,
operationType,
operationData
});
// Save initial job status
cronJobStorage.saveJobStatus(operationId, {
jobType: 'board_operation',
status: 'pending',
progress: 0,
boardId,
operationType,
operationData,
createdAt: new Date()
});
// Update board operations map for backward compatibility
const operation = {
id: operationId,
boardId: boardId,
type: operationType,
data: operationData,
status: 'running',
status: 'pending',
progress: 0,
startTime: new Date(),
endTime: null,
error: null
};
// Update board operations map
const operations = boardOperations.get();
operations.set(operationId, operation);
boardOperations.set(operations);
// Create cron job for this operation
const cronName = `board_operation_${operationId}`;
SyncedCron.add({
name: cronName,
schedule: (parser) => parser.text('once'),
job: () => {
this.executeBoardOperation(operationId, operationType, operationData);
},
});
// Start the cron job
SyncedCron.start();
return operationId;
}
@ -978,5 +1401,90 @@ Meteor.methods({
}
return cronMigrationManager.getBoardOperationStats();
},
'cron.getJobDetails'(jobId) {
if (!this.userId) {
throw new Meteor.Error('not-authorized');
}
return cronJobStorage.getJobDetails(jobId);
},
'cron.getQueueStats'() {
if (!this.userId) {
throw new Meteor.Error('not-authorized');
}
return cronJobStorage.getQueueStats();
},
'cron.getSystemResources'() {
if (!this.userId) {
throw new Meteor.Error('not-authorized');
}
return cronJobStorage.getSystemResources();
},
'cron.pauseJob'(jobId) {
if (!this.userId) {
throw new Meteor.Error('not-authorized');
}
cronJobStorage.updateQueueStatus(jobId, 'paused');
cronJobStorage.saveJobStatus(jobId, { status: 'paused' });
return { success: true };
},
'cron.resumeJob'(jobId) {
if (!this.userId) {
throw new Meteor.Error('not-authorized');
}
cronJobStorage.updateQueueStatus(jobId, 'pending');
cronJobStorage.saveJobStatus(jobId, { status: 'pending' });
return { success: true };
},
'cron.stopJob'(jobId) {
if (!this.userId) {
throw new Meteor.Error('not-authorized');
}
cronJobStorage.updateQueueStatus(jobId, 'stopped');
cronJobStorage.saveJobStatus(jobId, {
status: 'stopped',
stoppedAt: new Date()
});
return { success: true };
},
'cron.cleanupOldJobs'(daysOld) {
if (!this.userId) {
throw new Meteor.Error('not-authorized');
}
return cronJobStorage.cleanupOldJobs(daysOld);
},
'cron.getBoardMigrationStats'() {
if (!this.userId) {
throw new Meteor.Error('not-authorized');
}
// Import the board migration detector
const { boardMigrationDetector } = require('./boardMigrationDetector');
return boardMigrationDetector.getMigrationStats();
},
'cron.forceBoardMigrationScan'() {
if (!this.userId) {
throw new Meteor.Error('not-authorized');
}
// Import the board migration detector
const { boardMigrationDetector } = require('./boardMigrationDetector');
return boardMigrationDetector.forceScan();
}
});