Fix DB migration from 8.19 to 8.21 stuck forever.

Thanks to MaccabeeY and xet7 !

Fixes #6078
This commit is contained in:
Lauri Ojansivu 2026-01-21 00:56:42 +02:00
parent e177bf54e2
commit a31a615da6
9 changed files with 869 additions and 71 deletions

View file

@ -6,7 +6,13 @@
import { Meteor } from 'meteor/meteor';
import { SyncedCron } from 'meteor/quave:synced-cron';
import { ReactiveVar } from 'meteor/reactive-var';
import { cronJobStorage } from './cronJobStorage';
import { check, Match } from 'meteor/check';
import { ReactiveCache } from '/imports/reactiveCache';
import { cronJobStorage, CronJobStatus } from './cronJobStorage';
import Users from '/models/users';
import Boards from '/models/boards';
import { runEnsureValidSwimlaneIdsMigration } from './migrations/ensureValidSwimlaneIds';
// Server-side reactive variables for cron migration progress
export const cronMigrationProgress = new ReactiveVar(0);
@ -15,6 +21,8 @@ export const cronMigrationCurrentStep = new ReactiveVar('');
export const cronMigrationSteps = new ReactiveVar([]);
export const cronIsMigrating = new ReactiveVar(false);
export const cronJobs = new ReactiveVar([]);
export const cronMigrationCurrentStepNum = new ReactiveVar(0);
export const cronMigrationTotalSteps = new ReactiveVar(0);
// Board-specific operation tracking
export const boardOperations = new ReactiveVar(new Map());
@ -28,6 +36,7 @@ class CronMigrationManager {
this.isRunning = false;
this.jobProcessor = null;
this.processingInterval = null;
this.monitorInterval = null;
this.pausedJobs = new Map(); // Store paused job configs for per-job pause/resume
}
@ -135,6 +144,17 @@ class CronMigrationManager {
schedule: 'every 1 minute',
status: 'stopped'
},
{
id: 'ensure-valid-swimlane-ids',
name: 'Validate Swimlane IDs',
description: 'Ensuring all cards and lists have valid swimlaneId references',
weight: 2,
completed: false,
progress: 0,
cronName: 'migration_swimlane_ids',
schedule: 'every 1 minute',
status: 'stopped'
},
{
id: 'add-sort-checklists',
name: 'Checklist Sorting',
@ -447,6 +467,18 @@ class CronMigrationManager {
async executeMigrationStep(jobId, stepIndex, stepData, stepId) {
const { name, duration } = stepData;
// Check if this is the star count migration that needs real implementation
if (stepId === 'denormalize-star-number-per-board') {
await this.executeDenormalizeStarCount(jobId, stepIndex, stepData);
return;
}
// Check if this is the swimlane validation migration
if (stepId === 'ensure-valid-swimlane-ids') {
await this.executeEnsureValidSwimlaneIds(jobId, stepIndex, stepData);
return;
}
// Simulate step execution with progress updates for other migrations
const progressSteps = 10;
for (let i = 0; i <= progressSteps; i++) {
@ -463,6 +495,173 @@ class CronMigrationManager {
}
}
/**
* Execute the denormalize star count migration
*/
async executeDenormalizeStarCount(jobId, stepIndex, stepData) {
try {
const { name } = stepData;
// Update progress: Starting
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Counting starred boards across all users...'
});
// Build a map of boardId -> star count
const starCounts = new Map();
// Get all users with starred boards
const users = Users.find(
{ 'profile.starredBoards': { $exists: true, $ne: [] } },
{ fields: { 'profile.starredBoards': 1 } }
).fetch();
// Update progress: Counting
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 20,
currentAction: `Analyzing ${users.length} users with starred boards...`
});
// Count stars for each board
users.forEach(user => {
const starredBoards = (user.profile && user.profile.starredBoards) || [];
starredBoards.forEach(boardId => {
starCounts.set(boardId, (starCounts.get(boardId) || 0) + 1);
});
});
// Update progress: Updating boards
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 50,
currentAction: `Updating star counts for ${starCounts.size} boards...`
});
// Update all boards with their star counts
let updatedCount = 0;
const totalBoards = starCounts.size;
for (const [boardId, count] of starCounts.entries()) {
try {
Boards.update(boardId, { $set: { stars: count } });
updatedCount++;
// Update progress periodically
if (updatedCount % 10 === 0 || updatedCount === totalBoards) {
const progress = 50 + Math.round((updatedCount / totalBoards) * 40);
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress,
currentAction: `Updated ${updatedCount}/${totalBoards} boards...`
});
}
} catch (error) {
console.error(`Failed to update star count for board ${boardId}:`, error);
// Store error in database
cronJobStorage.saveJobError(jobId, {
stepId: 'denormalize-star-number-per-board',
stepIndex,
error,
severity: 'warning',
context: { boardId, operation: 'update_star_count' }
});
}
}
// Also set stars to 0 for boards that have no stars
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 90,
currentAction: 'Initializing boards with no stars...'
});
const boardsWithoutStars = Boards.find(
{
$or: [
{ stars: { $exists: false } },
{ stars: null }
]
},
{ fields: { _id: 1 } }
).fetch();
boardsWithoutStars.forEach(board => {
// Only set to 0 if not already counted
if (!starCounts.has(board._id)) {
try {
Boards.update(board._id, { $set: { stars: 0 } });
} catch (error) {
console.error(`Failed to initialize star count for board ${board._id}:`, error);
// Store error in database
cronJobStorage.saveJobError(jobId, {
stepId: 'denormalize-star-number-per-board',
stepIndex,
error,
severity: 'warning',
context: { boardId: board._id, operation: 'initialize_star_count' }
});
}
}
});
// Complete
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Migration complete: Updated ${updatedCount} boards with star counts`
});
console.log(`Star count migration completed: ${updatedCount} boards updated, ${boardsWithoutStars.length} initialized to 0`);
} catch (error) {
console.error('Error executing denormalize star count migration:', error);
// Store error in database
cronJobStorage.saveJobError(jobId, {
stepId: 'denormalize-star-number-per-board',
stepIndex,
error,
severity: 'error',
context: { operation: 'denormalize_star_count_migration' }
});
throw error;
}
}
/**
* Execute the ensure valid swimlane IDs migration
*/
async executeEnsureValidSwimlaneIds(jobId, stepIndex, stepData) {
try {
const { name } = stepData;
// Update progress: Starting
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Starting swimlane ID validation...'
});
// Run the migration function
const result = await runEnsureValidSwimlaneIdsMigration();
// Update progress: Complete
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Migration complete: Fixed ${result.cardsFixed || 0} cards, ${result.listsFixed || 0} lists, rescued ${result.cardsRescued || 0} orphaned cards`
});
console.log(`Swimlane ID validation migration completed:`, result);
} catch (error) {
console.error('Error executing swimlane ID validation migration:', error);
// Store error in database
cronJobStorage.saveJobError(jobId, {
stepId: 'ensure-valid-swimlane-ids',
stepIndex,
error,
severity: 'error',
context: { operation: 'ensure_valid_swimlane_ids_migration' }
});
throw error;
}
}
/**
* Execute a board operation job
@ -697,7 +896,10 @@ class CronMigrationManager {
this.isRunning = true;
cronIsMigrating.set(true);
cronMigrationStatus.set('Adding migrations to job queue...');
cronMigrationStatus.set('Starting...');
cronMigrationProgress.set(0);
cronMigrationCurrentStepNum.set(0);
cronMigrationTotalSteps.set(0);
this.startTime = Date.now();
try {
@ -737,7 +939,7 @@ class CronMigrationManager {
});
}
cronMigrationStatus.set('Migrations added to queue. Processing will begin shortly...');
// Status will be updated by monitorMigrationProgress
// Start monitoring progress
this.monitorMigrationProgress();
@ -750,45 +952,119 @@ class CronMigrationManager {
}
}
/**
* Start a specific migration by index
*/
async startSpecificMigration(migrationIndex) {
if (this.isRunning) {
return;
}
const step = this.migrationSteps[migrationIndex];
if (!step) {
throw new Meteor.Error('invalid-migration', 'Migration not found');
}
this.isRunning = true;
cronIsMigrating.set(true);
cronMigrationStatus.set('Starting...');
cronMigrationProgress.set(0);
cronMigrationCurrentStepNum.set(1);
cronMigrationTotalSteps.set(1);
this.startTime = Date.now();
try {
// Remove cron job to prevent conflicts
try {
SyncedCron.remove(step.cronName);
} catch (error) {
// Ignore errors if cron job doesn't exist
}
// Add single migration step to the job queue
const jobId = `migration_${step.id}_${Date.now()}`;
cronJobStorage.addToQueue(jobId, 'migration', step.weight, {
stepId: step.id,
stepName: step.name,
stepDescription: step.description
});
// Save initial job status
cronJobStorage.saveJobStatus(jobId, {
jobType: 'migration',
status: 'pending',
progress: 0,
stepId: step.id,
stepName: step.name,
stepDescription: step.description
});
// Status will be updated by monitorMigrationProgress
// Start monitoring progress
this.monitorMigrationProgress();
} catch (error) {
console.error('Failed to start migration:', error);
cronMigrationStatus.set(`Failed to start migration: ${error.message}`);
cronIsMigrating.set(false);
this.isRunning = false;
}
}
/**
* Monitor migration progress
*/
monitorMigrationProgress() {
const monitorInterval = Meteor.setInterval(() => {
// Clear any existing monitor interval
if (this.monitorInterval) {
Meteor.clearInterval(this.monitorInterval);
}
this.monitorInterval = Meteor.setInterval(() => {
const stats = cronJobStorage.getQueueStats();
const incompleteJobs = cronJobStorage.getIncompleteJobs();
// Update progress
// Check if all migrations are completed first
const totalJobs = stats.total;
const completedJobs = stats.completed;
const progress = totalJobs > 0 ? Math.round((completedJobs / totalJobs) * 100) : 0;
if (stats.completed === totalJobs && totalJobs > 0 && stats.running === 0) {
// All migrations completed - immediately clear isMigrating to hide progress
cronIsMigrating.set(false);
cronMigrationStatus.set('All migrations completed successfully!');
cronMigrationProgress.set(0);
cronMigrationCurrentStep.set('');
cronMigrationCurrentStepNum.set(0);
cronMigrationTotalSteps.set(0);
// Clear status message after delay
setTimeout(() => {
cronMigrationStatus.set('');
}, 5000);
Meteor.clearInterval(this.monitorInterval);
this.monitorInterval = null;
return; // Exit early to avoid setting progress to 100%
}
// Update progress for active migrations
const progress = totalJobs > 0 ? Math.round((completedJobs / totalJobs) * 100) : 0;
cronMigrationProgress.set(progress);
cronMigrationTotalSteps.set(totalJobs);
const currentStepNum = completedJobs + (stats.running > 0 ? 1 : 0);
cronMigrationCurrentStepNum.set(currentStepNum);
// Update status
if (stats.running > 0) {
const runningJob = incompleteJobs.find(job => job.status === 'running');
if (runningJob) {
cronMigrationCurrentStep.set(runningJob.stepName || 'Processing migration...');
cronMigrationStatus.set(`Running: ${runningJob.stepName || 'Migration in progress'}`);
cronMigrationStatus.set(`Running: ${currentStepNum}/${totalJobs} ${runningJob.stepName || 'Migration in progress'}`);
cronMigrationCurrentStep.set('');
}
} else if (stats.pending > 0) {
cronMigrationStatus.set(`${stats.pending} migrations pending in queue`);
cronMigrationCurrentStep.set('Waiting for available resources...');
} else if (stats.completed === totalJobs && totalJobs > 0) {
// All migrations completed
cronMigrationStatus.set('All migrations completed successfully!');
cronMigrationProgress.set(100);
cronMigrationCurrentStep.set('');
// Clear status after delay
setTimeout(() => {
cronIsMigrating.set(false);
cronMigrationStatus.set('');
cronMigrationProgress.set(0);
}, 3000);
Meteor.clearInterval(monitorInterval);
}
}, 2000); // Check every 2 seconds
}
@ -1380,6 +1656,120 @@ class CronMigrationManager {
}
}
/**
* Pause all migrations
*/
pauseAllMigrations() {
this.isRunning = false;
cronIsMigrating.set(false);
cronMigrationStatus.set('Migrations paused');
// Update all pending jobs in queue to paused
const pendingJobs = cronJobStorage.getIncompleteJobs();
pendingJobs.forEach(job => {
if (job.status === 'pending' || job.status === 'running') {
cronJobStorage.updateQueueStatus(job.jobId, 'paused');
cronJobStorage.saveJobStatus(job.jobId, { status: 'paused' });
}
});
return { success: true, message: 'All migrations paused' };
}
/**
* Resume all paused migrations
*/
resumeAllMigrations() {
// Find all paused jobs and resume them
const pausedJobs = CronJobStatus.find({ status: 'paused' }).fetch();
if (pausedJobs.length === 0) {
return { success: false, message: 'No paused migrations to resume' };
}
pausedJobs.forEach(job => {
cronJobStorage.updateQueueStatus(job.jobId, 'pending');
cronJobStorage.saveJobStatus(job.jobId, { status: 'pending' });
});
this.isRunning = true;
cronIsMigrating.set(true);
cronMigrationStatus.set('Resuming migrations...');
// Restart monitoring
this.monitorMigrationProgress();
return { success: true, message: `Resumed ${pausedJobs.length} migrations` };
}
/**
* Retry failed migrations
*/
retryFailedMigrations() {
const failedJobs = CronJobStatus.find({ status: 'failed' }).fetch();
if (failedJobs.length === 0) {
return { success: false, message: 'No failed migrations to retry' };
}
// Clear errors for failed jobs
failedJobs.forEach(job => {
cronJobStorage.clearJobErrors(job.jobId);
cronJobStorage.updateQueueStatus(job.jobId, 'pending');
cronJobStorage.saveJobStatus(job.jobId, {
status: 'pending',
progress: 0,
error: null
});
});
if (!this.isRunning) {
this.isRunning = true;
cronIsMigrating.set(true);
cronMigrationStatus.set('Retrying failed migrations...');
this.monitorMigrationProgress();
}
return { success: true, message: `Retrying ${failedJobs.length} failed migrations` };
}
/**
* Get all migration errors
*/
getAllMigrationErrors(limit = 50) {
return cronJobStorage.getAllRecentErrors(limit);
}
/**
* Get errors for a specific job
*/
getJobErrors(jobId, options = {}) {
return cronJobStorage.getJobErrors(jobId, options);
}
/**
* Get migration stats including errors
*/
getMigrationStats() {
const queueStats = cronJobStorage.getQueueStats();
const allErrors = cronJobStorage.getAllRecentErrors(100);
const errorsByJob = {};
allErrors.forEach(error => {
if (!errorsByJob[error.jobId]) {
errorsByJob[error.jobId] = [];
}
errorsByJob[error.jobId].push(error);
});
return {
...queueStats,
totalErrors: allErrors.length,
errorsByJob,
recentErrors: allErrors.slice(0, 10)
};
}
}
// Export singleton instance
@ -1405,6 +1795,20 @@ Meteor.methods({
return cronMigrationManager.startAllMigrations();
},
'cron.startSpecificMigration'(migrationIndex) {
check(migrationIndex, Number);
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.startSpecificMigration(migrationIndex);
},
'cron.startJob'(cronName) {
const userId = this.userId;
if (!userId) {
@ -1511,10 +1915,95 @@ Meteor.methods({
status: cronMigrationStatus.get(),
currentStep: cronMigrationCurrentStep.get(),
steps: cronMigrationSteps.get(),
isMigrating: cronIsMigrating.get()
isMigrating: cronIsMigrating.get(),
currentStepNum: cronMigrationCurrentStepNum.get(),
totalSteps: cronMigrationTotalSteps.get()
};
},
'cron.pauseAllMigrations'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.pauseAllMigrations();
},
'cron.resumeAllMigrations'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.resumeAllMigrations();
},
'cron.retryFailedMigrations'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.retryFailedMigrations();
},
'cron.getAllMigrationErrors'(limit = 50) {
check(limit, Match.Optional(Number));
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.getAllMigrationErrors(limit);
},
'cron.getJobErrors'(jobId, options = {}) {
check(jobId, String);
check(options, Match.Optional(Object));
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.getJobErrors(jobId, options);
},
'cron.getMigrationStats'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.getMigrationStats();
},
'cron.startBoardOperation'(boardId, operationType, operationData) {
const userId = this.userId;
if (!userId) {
@ -1747,6 +2236,12 @@ Meteor.methods({
throw new Meteor.Error('not-authorized', 'Admin access required');
}
// Clear monitor interval first to prevent status override
if (cronMigrationManager.monitorInterval) {
Meteor.clearInterval(cronMigrationManager.monitorInterval);
cronMigrationManager.monitorInterval = null;
}
// Stop all running and pending jobs
const incompleteJobs = cronJobStorage.getIncompleteJobs();
incompleteJobs.forEach(job => {
@ -1757,11 +2252,19 @@ Meteor.methods({
});
});
// Reset migration state
// Reset migration state immediately
cronMigrationManager.isRunning = false;
cronIsMigrating.set(false);
cronMigrationStatus.set('All migrations stopped');
cronMigrationProgress.set(0);
cronMigrationCurrentStep.set('');
cronMigrationCurrentStepNum.set(0);
cronMigrationTotalSteps.set(0);
cronMigrationStatus.set('All migrations stopped');
// Clear status message after delay
setTimeout(() => {
cronMigrationStatus.set('');
}, 3000);
return { success: true, message: 'All migrations stopped' };
},