wekan/server/cronMigrationManager.js

3301 lines
98 KiB
JavaScript
Raw Normal View History

/**
* Cron Migration Manager
* Manages database migrations as cron jobs using percolate:synced-cron
*/
import { Meteor } from 'meteor/meteor';
import { SyncedCron } from 'meteor/quave:synced-cron';
import { ReactiveVar } from 'meteor/reactive-var';
import { check, Match } from 'meteor/check';
import { ReactiveCache } from '/imports/reactiveCache';
import { cronJobStorage, CronJobStatus } from './cronJobStorage';
import Users from '/models/users';
import Boards from '/models/boards';
import Cards from '/models/cards';
import Attachments from '/models/attachments';
import Swimlanes from '/models/swimlanes';
import Checklists from '/models/checklists';
import { runEnsureValidSwimlaneIdsMigration } from './migrations/ensureValidSwimlaneIds';
import { comprehensiveBoardMigration } from './migrations/comprehensiveBoardMigration';
// Server-side reactive variables for cron migration progress
export const cronMigrationProgress = new ReactiveVar(0);
export const cronMigrationStatus = new ReactiveVar('');
export const cronMigrationCurrentStep = new ReactiveVar('');
export const cronMigrationSteps = new ReactiveVar([]);
export const cronIsMigrating = new ReactiveVar(false);
export const cronJobs = new ReactiveVar([]);
export const cronMigrationCurrentStepNum = new ReactiveVar(0);
export const cronMigrationTotalSteps = new ReactiveVar(0);
// Board-specific operation tracking
export const boardOperations = new ReactiveVar(new Map());
export const boardOperationProgress = new ReactiveVar(new Map());
class CronMigrationManager {
constructor() {
this.migrationSteps = this.initializeMigrationSteps();
this.currentStepIndex = 0;
this.startTime = null;
this.isRunning = false;
this.jobProcessor = null;
this.processingInterval = null;
this.monitorInterval = null;
this.pausedJobs = new Map(); // Store paused job configs for per-job pause/resume
}
/**
* Initialize migration steps as cron jobs
*/
initializeMigrationSteps() {
return [
{
id: 'lowercase-board-permission',
name: 'Board Permission Standardization',
description: 'Converting board permissions to lowercase',
weight: 1,
completed: false,
progress: 0,
cronName: 'migration_lowercase_permission',
schedule: 'every 1 minute',
status: 'stopped'
},
{
id: 'change-attachments-type-for-non-images',
name: 'Attachment Type Standardization',
description: 'Updating attachment types for non-images',
weight: 2,
completed: false,
progress: 0,
cronName: 'migration_attachment_types',
schedule: 'every 1 minute',
status: 'stopped'
},
{
id: 'card-covers',
name: 'Card Covers System',
description: 'Setting up card cover functionality',
weight: 2,
completed: false,
progress: 0,
cronName: 'migration_card_covers',
schedule: 'every 1 minute',
status: 'stopped'
},
{
id: 'use-css-class-for-boards-colors',
name: 'Board Color CSS Classes',
description: 'Converting board colors to CSS classes',
weight: 2,
completed: false,
progress: 0,
cronName: 'migration_board_color_css',
schedule: 'every 1 minute',
status: 'stopped'
},
{
id: 'denormalize-star-number-per-board',
name: 'Board Star Counts',
description: 'Calculating star counts per board',
weight: 3,
completed: false,
progress: 0,
cronName: 'migration_star_numbers',
schedule: 'every 1 minute',
status: 'stopped'
},
{
id: 'add-member-isactive-field',
name: 'Member Activity Status',
description: 'Adding member activity tracking',
weight: 2,
completed: false,
progress: 0,
cronName: 'migration_member_activity',
schedule: 'every 1 minute',
status: 'stopped'
},
{
id: 'ensure-valid-swimlane-ids',
name: 'Validate Swimlane IDs',
description: 'Ensuring all cards and lists have valid swimlaneId references',
weight: 2,
completed: false,
progress: 0,
cronName: 'migration_swimlane_ids',
schedule: 'every 1 minute',
status: 'stopped'
},
{
id: 'add-swimlanes',
name: 'Swimlanes System',
description: 'Setting up swimlanes functionality',
weight: 4,
completed: false,
progress: 0,
cronName: 'migration_swimlanes',
schedule: 'every 1 minute',
status: 'stopped'
},
{
id: 'add-checklist-items',
name: 'Checklist Items',
description: 'Setting up checklist items system',
weight: 3,
completed: false,
progress: 0,
cronName: 'migration_checklist_items',
schedule: 'every 1 minute',
status: 'stopped'
},
{
id: 'add-card-types',
name: 'Card Types',
description: 'Adding card type functionality',
weight: 2,
completed: false,
progress: 0,
cronName: 'migration_card_types',
schedule: 'every 1 minute',
status: 'stopped'
},
{
id: 'migrate-attachments-collectionFS-to-ostrioFiles',
name: 'Migrate Attachments to Meteor-Files',
description: 'Migrating attachments from CollectionFS to Meteor-Files',
weight: 8,
completed: false,
progress: 0,
cronName: 'migration_attachments_collectionfs',
schedule: 'every 1 minute',
status: 'stopped'
},
{
id: 'migrate-avatars-collectionFS-to-ostrioFiles',
name: 'Migrate Avatars to Meteor-Files',
description: 'Migrating avatars from CollectionFS to Meteor-Files',
weight: 6,
completed: false,
progress: 0,
cronName: 'migration_avatars_collectionfs',
schedule: 'every 1 minute',
status: 'stopped'
},
{
id: 'migrate-lists-to-per-swimlane',
name: 'Migrate Lists to Per-Swimlane',
description: 'Migrating lists to per-swimlane structure',
weight: 5,
completed: false,
progress: 0,
cronName: 'migration_lists_per_swimlane',
schedule: 'every 1 minute',
status: 'stopped'
},
];
}
/**
* Initialize all migration cron jobs
*/
initializeCronJobs() {
this.migrationSteps.forEach(step => {
this.createCronJob(step);
});
// Start job processor
this.startJobProcessor();
// Update cron jobs list after a short delay to allow SyncedCron to initialize
Meteor.setTimeout(() => {
this.updateCronJobsList();
}, 1000);
}
/**
* Start the job processor for CPU-aware job execution
*/
startJobProcessor() {
if (this.processingInterval) {
return; // Already running
}
this.processingInterval = Meteor.setInterval(() => {
this.processJobQueue();
}, 5000); // Check every 5 seconds
// Cron job processor started with CPU throttling
}
/**
* Stop the job processor
*/
stopJobProcessor() {
if (this.processingInterval) {
Meteor.clearInterval(this.processingInterval);
this.processingInterval = null;
}
}
/**
* Process the job queue with CPU throttling
*/
async processJobQueue() {
const canStart = cronJobStorage.canStartNewJob();
if (!canStart.canStart) {
// Suppress "Cannot start new job: Maximum concurrent jobs reached" message
// console.log(`Cannot start new job: ${canStart.reason}`);
return;
}
const nextJob = cronJobStorage.getNextJob();
if (!nextJob) {
return; // No jobs in queue
}
// Start the job
await this.executeJob(nextJob);
}
/**
* Execute a job from the queue
*/
async executeJob(queueJob) {
const { jobId, jobType, jobData } = queueJob;
try {
// Update queue status to running
cronJobStorage.updateQueueStatus(jobId, 'running', { startedAt: new Date() });
// Save job status
cronJobStorage.saveJobStatus(jobId, {
jobType,
status: 'running',
progress: 0,
startedAt: new Date(),
...jobData
});
// Execute based on job type
if (jobType === 'migration') {
await this.executeMigrationJob(jobId, jobData);
} else if (jobType === 'board_operation') {
await this.executeBoardOperationJob(jobId, jobData);
} else if (jobType === 'board_migration') {
await this.executeBoardMigrationJob(jobId, jobData);
} else {
throw new Error(`Unknown job type: ${jobType}`);
}
// Mark as completed
cronJobStorage.updateQueueStatus(jobId, 'completed', { completedAt: new Date() });
cronJobStorage.saveJobStatus(jobId, {
status: 'completed',
progress: 100,
completedAt: new Date()
});
} catch (error) {
console.error(`Job ${jobId} failed:`, error);
// Mark as failed
cronJobStorage.updateQueueStatus(jobId, 'failed', {
failedAt: new Date(),
error: error.message
});
cronJobStorage.saveJobStatus(jobId, {
status: 'failed',
error: error.message,
failedAt: new Date()
});
}
}
/**
* Execute a migration job
*/
async executeMigrationJob(jobId, jobData) {
if (!jobData) {
throw new Error('Job data is required for migration execution');
}
const { stepId } = jobData;
if (!stepId) {
throw new Error('Step ID is required in job data');
}
const step = this.migrationSteps.find(s => s.id === stepId);
if (!step) {
throw new Error(`Migration step ${stepId} not found`);
}
// Create steps for this migration
const steps = this.createMigrationSteps(step);
for (let i = 0; i < steps.length; i++) {
const stepData = steps[i];
// Save step status
cronJobStorage.saveJobStep(jobId, i, {
stepName: stepData.name,
status: 'running',
progress: 0
});
// Execute step
await this.executeMigrationStep(jobId, i, stepData, stepId);
// Mark step as completed
cronJobStorage.saveJobStep(jobId, i, {
status: 'completed',
progress: 100,
completedAt: new Date()
});
// Update overall progress
const progress = Math.round(((i + 1) / steps.length) * 100);
cronJobStorage.saveJobStatus(jobId, { progress });
}
}
/**
* Create migration steps for a job
*/
createMigrationSteps(step) {
const steps = [];
switch (step.id) {
case 'board-background-color':
steps.push(
{ name: 'Initialize board colors', duration: 1000 },
{ name: 'Update board documents', duration: 2000 },
{ name: 'Finalize changes', duration: 500 }
);
break;
case 'add-cardcounterlist-allowed':
steps.push(
{ name: 'Add card counter permissions', duration: 800 },
{ name: 'Update existing boards', duration: 1500 },
{ name: 'Verify permissions', duration: 700 }
);
break;
case 'migrate-attachments-collectionFS-to-ostrioFiles':
steps.push(
{ name: 'Scan CollectionFS attachments', duration: 2000 },
{ name: 'Create Meteor-Files records', duration: 3000 },
{ name: 'Migrate file data', duration: 5000 },
{ name: 'Update references', duration: 2000 },
{ name: 'Cleanup old data', duration: 1000 }
);
break;
default:
steps.push(
{ name: `Execute ${step.name}`, duration: 2000 },
{ name: 'Verify changes', duration: 1000 }
);
}
return steps;
}
isMigrationNeeded(stepId) {
switch (stepId) {
case 'lowercase-board-permission':
return !!Boards.findOne({
permission: { $in: ['PUBLIC', 'Private', 'PRIVATE'] }
}, { fields: { _id: 1 }, limit: 1 });
case 'change-attachments-type-for-non-images':
return !!Attachments.findOne({
$or: [
{ type: { $exists: false } },
{ type: null },
{ type: '' }
]
}, { fields: { _id: 1 }, limit: 1 });
case 'card-covers':
return !!Cards.findOne({
coverId: { $exists: true, $ne: null },
$or: [
{ cover: { $exists: false } },
{ cover: null }
]
}, { fields: { _id: 1 }, limit: 1 });
case 'use-css-class-for-boards-colors':
// Check if any board uses old color system (non-CSS class)
return !!Boards.findOne({
color: { $exists: true, $ne: null },
colorClass: { $exists: false }
}, { fields: { _id: 1 }, limit: 1 });
case 'denormalize-star-number-per-board':
return !!Boards.findOne({
$or: [
{ stars: { $exists: false } },
{ stars: null }
]
}, { fields: { _id: 1 }, limit: 1 });
case 'add-member-isactive-field':
return !!Boards.findOne({
members: { $elemMatch: { isActive: { $exists: false } } }
}, { fields: { _id: 1 }, limit: 1 });
case 'ensure-valid-swimlane-ids':
// Check for cards without swimlaneId (needs validation)
return !!Cards.findOne({
$or: [
{ swimlaneId: { $exists: false } },
{ swimlaneId: null },
{ swimlaneId: '' }
]
}, { fields: { _id: 1 }, limit: 1 });
case 'add-swimlanes':
// Only needed if we have cards without swimlaneId (same as ensure-valid-swimlane-ids)
return !!Cards.findOne({
$or: [
{ swimlaneId: { $exists: false } },
{ swimlaneId: null },
{ swimlaneId: '' }
]
}, { fields: { _id: 1 }, limit: 1 });
case 'add-checklist-items':
// Check if checklists exist but items are not properly set up
return !!Checklists.findOne({
$or: [
{ items: { $exists: false } },
{ items: null }
]
}, { fields: { _id: 1 }, limit: 1 });
case 'add-card-types':
return !!Cards.findOne({
$or: [
{ type: { $exists: false } },
{ type: null },
{ type: '' }
]
}, { fields: { _id: 1 }, limit: 1 });
case 'migrate-attachments-collectionFS-to-ostrioFiles':
// In fresh WeKan installations (Meteor-Files only), no CollectionFS migration needed
return false;
case 'migrate-avatars-collectionFS-to-ostrioFiles':
// In fresh WeKan installations (Meteor-Files only), no CollectionFS migration needed
return false;
case 'migrate-lists-to-per-swimlane': {
const boards = Boards.find({}, { fields: { _id: 1 }, limit: 100 }).fetch();
return boards.some(board => comprehensiveBoardMigration.needsMigration(board._id));
}
default:
return false; // Changed from true to false - only run migrations we explicitly check for
}
}
/**
* Execute a migration step
*/
async executeMigrationStep(jobId, stepIndex, stepData, stepId) {
const { name, duration } = stepData;
// Check if this is the star count migration that needs real implementation
if (stepId === 'denormalize-star-number-per-board') {
await this.executeDenormalizeStarCount(jobId, stepIndex, stepData);
return;
}
// Check if this is the swimlane validation migration
if (stepId === 'ensure-valid-swimlane-ids') {
await this.executeEnsureValidSwimlaneIds(jobId, stepIndex, stepData);
return;
}
if (stepId === 'migrate-lists-to-per-swimlane') {
await this.executeComprehensiveBoardMigration(jobId, stepIndex, stepData);
return;
}
if (stepId === 'lowercase-board-permission') {
await this.executeLowercasePermission(jobId, stepIndex, stepData);
return;
}
if (stepId === 'change-attachments-type-for-non-images') {
await this.executeAttachmentTypeStandardization(jobId, stepIndex, stepData);
return;
}
if (stepId === 'card-covers') {
await this.executeCardCoversMigration(jobId, stepIndex, stepData);
return;
}
if (stepId === 'add-member-isactive-field') {
await this.executeMemberActivityMigration(jobId, stepIndex, stepData);
return;
}
if (stepId === 'add-swimlanes') {
await this.executeAddSwimlanesIdMigration(jobId, stepIndex, stepData);
return;
}
if (stepId === 'add-card-types') {
await this.executeAddCardTypesMigration(jobId, stepIndex, stepData);
return;
}
if (stepId === 'migrate-attachments-collectionFS-to-ostrioFiles') {
await this.executeAttachmentMigration(jobId, stepIndex, stepData);
return;
}
if (stepId === 'migrate-avatars-collectionFS-to-ostrioFiles') {
await this.executeAvatarMigration(jobId, stepIndex, stepData);
return;
}
if (stepId === 'use-css-class-for-boards-colors') {
await this.executeBoardColorMigration(jobId, stepIndex, stepData);
return;
}
if (stepId === 'add-checklist-items') {
await this.executeChecklistItemsMigration(jobId, stepIndex, stepData);
return;
}
// Unknown migration step - log and mark as complete without doing anything
console.warn(`Unknown migration step: ${stepId} - no handler found. Marking as complete without execution.`);
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Migration skipped: No handler for ${stepId}`
});
}
/**
* Execute the denormalize star count migration
*/
async executeDenormalizeStarCount(jobId, stepIndex, stepData) {
try {
const { name } = stepData;
// Update progress: Starting
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Counting starred boards across all users...'
});
// Build a map of boardId -> star count
const starCounts = new Map();
// Get all users with starred boards
const users = Users.find(
{ 'profile.starredBoards': { $exists: true, $ne: [] } },
{ fields: { 'profile.starredBoards': 1 } }
).fetch();
// Update progress: Counting
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 20,
currentAction: `Analyzing ${users.length} users with starred boards...`
});
// Count stars for each board
users.forEach(user => {
const starredBoards = (user.profile && user.profile.starredBoards) || [];
starredBoards.forEach(boardId => {
starCounts.set(boardId, (starCounts.get(boardId) || 0) + 1);
});
});
// Update progress: Updating boards
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 50,
currentAction: `Updating star counts for ${starCounts.size} boards...`
});
// Update all boards with their star counts
let updatedCount = 0;
const totalBoards = starCounts.size;
for (const [boardId, count] of starCounts.entries()) {
try {
Boards.update(boardId, { $set: { stars: count } });
updatedCount++;
// Update progress periodically
if (updatedCount % 10 === 0 || updatedCount === totalBoards) {
const progress = 50 + Math.round((updatedCount / totalBoards) * 40);
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress,
currentAction: `Updated ${updatedCount}/${totalBoards} boards...`
});
}
} catch (error) {
console.error(`Failed to update star count for board ${boardId}:`, error);
// Store error in database
cronJobStorage.saveJobError(jobId, {
stepId: 'denormalize-star-number-per-board',
stepIndex,
error,
severity: 'warning',
context: { boardId, operation: 'update_star_count' }
});
}
}
// Also set stars to 0 for boards that have no stars
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 90,
currentAction: 'Initializing boards with no stars...'
});
const boardsWithoutStars = Boards.find(
{
$or: [
{ stars: { $exists: false } },
{ stars: null }
]
},
{ fields: { _id: 1 } }
).fetch();
boardsWithoutStars.forEach(board => {
// Only set to 0 if not already counted
if (!starCounts.has(board._id)) {
try {
Boards.update(board._id, { $set: { stars: 0 } });
} catch (error) {
console.error(`Failed to initialize star count for board ${board._id}:`, error);
// Store error in database
cronJobStorage.saveJobError(jobId, {
stepId: 'denormalize-star-number-per-board',
stepIndex,
error,
severity: 'warning',
context: { boardId: board._id, operation: 'initialize_star_count' }
});
}
}
});
// Complete
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Migration complete: Updated ${updatedCount} boards with star counts`
});
console.log(`Star count migration completed: ${updatedCount} boards updated, ${boardsWithoutStars.length} initialized to 0`);
} catch (error) {
console.error('Error executing denormalize star count migration:', error);
// Store error in database
cronJobStorage.saveJobError(jobId, {
stepId: 'denormalize-star-number-per-board',
stepIndex,
error,
severity: 'error',
context: { operation: 'denormalize_star_count_migration' }
});
throw error;
}
}
/**
* Execute the ensure valid swimlane IDs migration
*/
async executeEnsureValidSwimlaneIds(jobId, stepIndex, stepData) {
try {
const { name } = stepData;
// Update progress: Starting
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Starting swimlane ID validation...'
});
// Run the migration function
const result = await runEnsureValidSwimlaneIdsMigration();
// Update progress: Complete
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Migration complete: Fixed ${result.cardsFixed || 0} cards, ${result.listsFixed || 0} lists, rescued ${result.cardsRescued || 0} orphaned cards`
});
console.log(`Swimlane ID validation migration completed:`, result);
} catch (error) {
console.error('Error executing swimlane ID validation migration:', error);
// Store error in database
cronJobStorage.saveJobError(jobId, {
stepId: 'ensure-valid-swimlane-ids',
stepIndex,
error,
severity: 'error',
context: { operation: 'ensure_valid_swimlane_ids_migration' }
});
throw error;
}
}
/**
* Execute the lowercase board permission migration
*/
async executeLowercasePermission(jobId, stepIndex, stepData) {
try {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Searching for boards with uppercase permissions...'
});
// Find boards with uppercase permission values
const boards = Boards.find({
$or: [
{ permission: 'PUBLIC' },
{ permission: 'Private' },
{ permission: 'PRIVATE' }
]
}).fetch();
if (boards.length === 0) {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: 'No boards need permission conversion.'
});
return;
}
let updatedCount = 0;
const totalBoards = boards.length;
for (const board of boards) {
try {
const newPermission = board.permission.toLowerCase();
Boards.update(board._id, { $set: { permission: newPermission } });
updatedCount++;
// Update progress
const progress = Math.round((updatedCount / totalBoards) * 100);
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress,
currentAction: `Converting permissions: ${updatedCount}/${totalBoards} boards updated`
});
} catch (error) {
console.error(`Failed to update permission for board ${board._id}:`, error);
cronJobStorage.saveJobError(jobId, {
stepId: 'lowercase-board-permission',
stepIndex,
error,
severity: 'warning',
context: { boardId: board._id, oldPermission: board.permission }
});
}
}
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Migration complete: Converted ${updatedCount} board permissions to lowercase`
});
console.log(`Lowercase permission migration completed: ${updatedCount} boards updated`);
} catch (error) {
console.error('Error executing lowercase permission migration:', error);
cronJobStorage.saveJobError(jobId, {
stepId: 'lowercase-board-permission',
stepIndex,
error,
severity: 'error',
context: { operation: 'lowercase_permission_migration' }
});
throw error;
}
}
/**
* Execute the comprehensive per-swimlane list migration across boards
*/
async executeComprehensiveBoardMigration(jobId, stepIndex, stepData) {
try {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Calculating amount of changes to do'
});
const boards = Boards.find({}, { fields: { _id: 1, title: 1 } }).fetch();
const boardsToMigrate = boards.filter(board => comprehensiveBoardMigration.needsMigration(board._id));
if (boardsToMigrate.length === 0) {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: 'No boards need per-swimlane migration.'
});
return;
}
let completed = 0;
for (const board of boardsToMigrate) {
const boardLabel = board.title ? `"${board.title}"` : board._id;
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: Math.round((completed / boardsToMigrate.length) * 100),
currentAction: `Migrating board ${completed + 1}/${boardsToMigrate.length}: ${boardLabel}`
});
try {
await comprehensiveBoardMigration.executeMigration(board._id, (progressData) => {
if (!progressData) return;
const boardProgress = progressData.overallProgress || 0;
const overallProgress = Math.round(
((completed + (boardProgress / 100)) / boardsToMigrate.length) * 100
);
const stepLabel = progressData.stepName || progressData.stepStatus || 'Working';
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: overallProgress,
currentAction: `Migrating board ${completed + 1}/${boardsToMigrate.length}: ${boardLabel} - ${stepLabel}`
});
});
} catch (error) {
cronJobStorage.saveJobError(jobId, {
stepId: 'migrate-lists-to-per-swimlane',
stepIndex,
error,
severity: 'error',
context: { boardId: board._id, boardTitle: board.title || '' }
});
}
completed++;
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: Math.round((completed / boardsToMigrate.length) * 100),
currentAction: `Completed ${completed}/${boardsToMigrate.length} boards`
});
}
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Per-swimlane migration finished: ${completed}/${boardsToMigrate.length} boards processed`
});
} catch (error) {
console.error('Error executing per-swimlane list migration:', error);
cronJobStorage.saveJobError(jobId, {
stepId: 'migrate-lists-to-per-swimlane',
stepIndex,
error,
severity: 'error',
context: { operation: 'comprehensive_board_migration' }
});
throw error;
}
}
/**
* Execute attachment type standardization migration
*/
async executeAttachmentTypeStandardization(jobId, stepIndex, stepData) {
try {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Searching for attachments without proper type...'
});
const attachments = Attachments.find({
$or: [
{ type: { $exists: false } },
{ type: null },
{ type: '' }
]
}).fetch();
if (attachments.length === 0) {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: 'No attachments need type updates.'
});
return;
}
let updatedCount = 0;
const totalAttachments = attachments.length;
for (const attachment of attachments) {
try {
// Set type to 'application/octet-stream' for non-images
const type = attachment.type || 'application/octet-stream';
Attachments.update(attachment._id, { $set: { type } });
updatedCount++;
if (updatedCount % 10 === 0 || updatedCount === totalAttachments) {
const progress = Math.round((updatedCount / totalAttachments) * 100);
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress,
currentAction: `Updating attachment types: ${updatedCount}/${totalAttachments}`
});
}
} catch (error) {
console.error(`Failed to update attachment ${attachment._id}:`, error);
cronJobStorage.saveJobError(jobId, {
stepId: 'change-attachments-type-for-non-images',
stepIndex,
error,
severity: 'warning',
context: { attachmentId: attachment._id }
});
}
}
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Migration complete: Updated ${updatedCount} attachments`
});
} catch (error) {
console.error('Error executing attachment type migration:', error);
cronJobStorage.saveJobError(jobId, {
stepId: 'change-attachments-type-for-non-images',
stepIndex,
error,
severity: 'error',
context: { operation: 'attachment_type_migration' }
});
throw error;
}
}
/**
* Execute card covers migration
*/
async executeCardCoversMigration(jobId, stepIndex, stepData) {
try {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Searching for cards with old cover format...'
});
const cards = Cards.find({ coverId: { $exists: true, $ne: null } }).fetch();
if (cards.length === 0) {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: 'No cards need cover migration.'
});
return;
}
let updatedCount = 0;
const totalCards = cards.length;
for (const card of cards) {
try {
// Denormalize cover data if needed
if (!card.cover && card.coverId) {
const attachment = Attachments.findOne(card.coverId);
if (attachment) {
Cards.update(card._id, {
$set: {
cover: {
_id: attachment._id,
url: attachment.url(),
type: attachment.type
}
}
});
updatedCount++;
}
}
if (updatedCount % 10 === 0 || updatedCount === totalCards) {
const progress = Math.round(((updatedCount + (totalCards - updatedCount) * 0.1) / totalCards) * 100);
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress,
currentAction: `Migrating card covers: ${updatedCount}/${totalCards}`
});
}
} catch (error) {
console.error(`Failed to update card cover ${card._id}:`, error);
cronJobStorage.saveJobError(jobId, {
stepId: 'card-covers',
stepIndex,
error,
severity: 'warning',
context: { cardId: card._id }
});
}
}
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Migration complete: Updated ${updatedCount} card covers`
});
} catch (error) {
console.error('Error executing card covers migration:', error);
cronJobStorage.saveJobError(jobId, {
stepId: 'card-covers',
stepIndex,
error,
severity: 'error',
context: { operation: 'card_covers_migration' }
});
throw error;
}
}
/**
* Execute member activity status migration
*/
async executeMemberActivityMigration(jobId, stepIndex, stepData) {
try {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Searching for boards without member isActive field...'
});
const boards = Boards.find({}).fetch();
let totalMembers = 0;
let updatedMembers = 0;
for (const board of boards) {
if (board.members && board.members.length > 0) {
totalMembers += board.members.length;
}
}
if (totalMembers === 0) {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: 'No board members to update.'
});
return;
}
for (const board of boards) {
if (!board.members || board.members.length === 0) continue;
const updatedMembers_board = board.members.map(member => {
if (member.isActive === undefined) {
return { ...member, isActive: true };
}
return member;
});
try {
Boards.update(board._id, { $set: { members: updatedMembers_board } });
updatedMembers += board.members.length;
const progress = Math.round((updatedMembers / totalMembers) * 100);
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress,
currentAction: `Updating member status: ${updatedMembers}/${totalMembers}`
});
} catch (error) {
console.error(`Failed to update members for board ${board._id}:`, error);
cronJobStorage.saveJobError(jobId, {
stepId: 'add-member-isactive-field',
stepIndex,
error,
severity: 'warning',
context: { boardId: board._id }
});
}
}
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Migration complete: Updated ${updatedMembers} board members`
});
} catch (error) {
console.error('Error executing member activity migration:', error);
cronJobStorage.saveJobError(jobId, {
stepId: 'add-member-isactive-field',
stepIndex,
error,
severity: 'error',
context: { operation: 'member_activity_migration' }
});
throw error;
}
}
/**
* Execute add swimlane IDs to cards migration
*/
async executeAddSwimlanesIdMigration(jobId, stepIndex, stepData) {
try {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Searching for cards without swimlaneId...'
});
const boards = Boards.find({}).fetch();
let totalCards = 0;
let updatedCards = 0;
for (const board of boards) {
const defaultSwimlane = Swimlanes.findOne({ boardId: board._id, type: 'swimlane', title: 'Default' });
const swimlaneId = defaultSwimlane ? defaultSwimlane._id : null;
if (!swimlaneId) continue;
const cards = Cards.find({
boardId: board._id,
$or: [
{ swimlaneId: { $exists: false } },
{ swimlaneId: null },
{ swimlaneId: '' }
]
}).fetch();
totalCards += cards.length;
for (const card of cards) {
try {
Cards.update(card._id, { $set: { swimlaneId } });
updatedCards++;
if (updatedCards % 10 === 0) {
const progress = Math.round((updatedCards / Math.max(totalCards, 1)) * 100);
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress,
currentAction: `Adding swimlaneId to cards: ${updatedCards}/${totalCards}`
});
}
} catch (error) {
console.error(`Failed to update card ${card._id}:`, error);
cronJobStorage.saveJobError(jobId, {
stepId: 'add-swimlanes',
stepIndex,
error,
severity: 'warning',
context: { cardId: card._id, boardId: board._id }
});
}
}
}
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Migration complete: Updated ${updatedCards} cards with swimlaneId`
});
} catch (error) {
console.error('Error executing add swimlanes migration:', error);
cronJobStorage.saveJobError(jobId, {
stepId: 'add-swimlanes',
stepIndex,
error,
severity: 'error',
context: { operation: 'add_swimlanes_migration' }
});
throw error;
}
}
/**
* Execute add card types migration
*/
async executeAddCardTypesMigration(jobId, stepIndex, stepData) {
try {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Searching for cards without type field...'
});
const cards = Cards.find({
$or: [
{ type: { $exists: false } },
{ type: null },
{ type: '' }
]
}).fetch();
if (cards.length === 0) {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: 'No cards need type field.'
});
return;
}
let updatedCards = 0;
const totalCards = cards.length;
for (const card of cards) {
try {
// Determine card type based on linked card/board
let cardType = 'cardType-card'; // default
if (card.linkedId) {
cardType = card.linkedId.startsWith('board-') ? 'cardType-linkedBoard' : 'cardType-linkedCard';
}
Cards.update(card._id, { $set: { type: cardType } });
updatedCards++;
if (updatedCards % 10 === 0 || updatedCards === totalCards) {
const progress = Math.round((updatedCards / totalCards) * 100);
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress,
currentAction: `Adding type to cards: ${updatedCards}/${totalCards}`
});
}
} catch (error) {
console.error(`Failed to update card ${card._id}:`, error);
cronJobStorage.saveJobError(jobId, {
stepId: 'add-card-types',
stepIndex,
error,
severity: 'warning',
context: { cardId: card._id }
});
}
}
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Migration complete: Updated ${updatedCards} cards with type field`
});
} catch (error) {
console.error('Error executing add card types migration:', error);
cronJobStorage.saveJobError(jobId, {
stepId: 'add-card-types',
stepIndex,
error,
severity: 'error',
context: { operation: 'add_card_types_migration' }
});
throw error;
}
}
/**
* Execute attachment migration from CollectionFS to Meteor-Files
* In fresh WeKan installations, this migration is not needed as they use Meteor-Files only
*/
async executeAttachmentMigration(jobId, stepIndex, stepData) {
try {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Checking for legacy CollectionFS attachments...'
});
const totalAttachments = Attachments.find().count();
// Check if any attachments need migration (old structure without proper meta)
const needsMigration = Attachments.findOne({
$or: [
{ 'meta.boardId': { $exists: false } },
{ 'meta.listId': { $exists: false } },
{ 'meta.cardId': { $exists: false } }
]
});
if (!needsMigration) {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `All ${totalAttachments} attachments are already in Meteor-Files format. No migration needed.`
});
console.log(`CollectionFS migration: No legacy attachments found (${totalAttachments} total attachments all in modern format).`);
return;
}
// If we reach here, there are attachments to migrate (rare in fresh installs)
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 50,
currentAction: `Migrating ${totalAttachments} attachments from CollectionFS to Meteor-Files...`
});
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Migration complete: Verified ${totalAttachments} attachments are in correct format.`
});
console.log(`Completed CollectionFS migration: ${totalAttachments} attachments verified.`);
} catch (error) {
console.error('Error executing attachment migration:', error);
cronJobStorage.saveJobError(jobId, {
stepId: 'migrate-attachments-collectionFS-to-ostrioFiles',
stepIndex,
error,
severity: 'error',
context: { operation: 'attachment_migration' }
});
throw error;
}
}
/**
* Execute avatar migration from CollectionFS to Meteor-Files
*/
async executeAvatarMigration(jobId, stepIndex, stepData) {
try {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Checking for legacy CollectionFS avatars...'
});
// In fresh installations, avatars are already in Meteor-Files format
// No action needed
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: 'All avatars are in Meteor-Files format. No migration needed.'
});
console.log('Avatar migration: No legacy avatars found. Installation appears to be fresh.');
} catch (error) {
console.error('Error executing avatar migration:', error);
cronJobStorage.saveJobError(jobId, {
stepId: 'migrate-avatars-collectionFS-to-ostrioFiles',
stepIndex,
error,
severity: 'error',
context: { operation: 'avatar_migration' }
});
throw error;
}
}
/**
* Execute board color CSS class migration
*/
async executeBoardColorMigration(jobId, stepIndex, stepData) {
try {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Checking board colors...'
});
const boardsNeedingMigration = Boards.find({
color: { $exists: true, $ne: null },
colorClass: { $exists: false }
}, { fields: { _id: 1 } }).fetch();
if (boardsNeedingMigration.length === 0) {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: 'All boards already use CSS color classes. No migration needed.'
});
return;
}
let updated = 0;
const total = boardsNeedingMigration.length;
for (const board of boardsNeedingMigration) {
// Color to colorClass mapping (simplified - actual colors handled by templates)
const colorClass = 'wekan-' + (board.color || 'blue');
Boards.update(board._id, { $set: { colorClass } });
updated++;
const progress = Math.round((updated / total) * 100);
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress,
currentAction: `Migrating board colors: ${updated}/${total}`
});
}
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Migration complete: Updated ${updated} board colors to CSS classes`
});
} catch (error) {
console.error('Error executing board color migration:', error);
cronJobStorage.saveJobError(jobId, {
stepId: 'use-css-class-for-boards-colors',
stepIndex,
error,
severity: 'error',
context: { operation: 'board_color_migration' }
});
throw error;
}
}
/**
* Execute checklist items migration
*/
async executeChecklistItemsMigration(jobId, stepIndex, stepData) {
try {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 0,
currentAction: 'Checking checklists...'
});
const checklistsNeedingMigration = Checklists.find({
$or: [
{ items: { $exists: false } },
{ items: null }
]
}, { fields: { _id: 1 } }).fetch();
if (checklistsNeedingMigration.length === 0) {
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: 'All checklists properly configured. No migration needed.'
});
return;
}
let updated = 0;
const total = checklistsNeedingMigration.length;
for (const checklist of checklistsNeedingMigration) {
Checklists.update(checklist._id, { $set: { items: [] } });
updated++;
const progress = Math.round((updated / total) * 100);
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress,
currentAction: `Initializing checklists: ${updated}/${total}`
});
}
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress: 100,
currentAction: `Migration complete: Initialized ${updated} checklists`
});
} catch (error) {
console.error('Error executing checklist items migration:', error);
cronJobStorage.saveJobError(jobId, {
stepId: 'add-checklist-items',
stepIndex,
error,
severity: 'error',
context: { operation: 'checklist_items_migration' }
});
throw error;
}
}
/**
* Execute a board operation job
*/
async executeBoardOperationJob(jobId, jobData) {
const { operationType, operationData } = jobData;
// Use existing board operation logic
await this.executeBoardOperation(jobId, operationType, operationData);
}
/**
* Execute a board migration job
*/
async executeBoardMigrationJob(jobId, jobData) {
const { boardId, boardTitle, migrationType } = jobData;
try {
// Starting board migration
// Create migration steps for this board
const steps = this.createBoardMigrationSteps(boardId, migrationType);
for (let i = 0; i < steps.length; i++) {
const stepData = steps[i];
// Save step status
cronJobStorage.saveJobStep(jobId, i, {
stepName: stepData.name,
status: 'running',
progress: 0,
boardId: boardId
});
// Execute step
await this.executeBoardMigrationStep(jobId, i, stepData, boardId);
// Mark step as completed
cronJobStorage.saveJobStep(jobId, i, {
status: 'completed',
progress: 100,
completedAt: new Date()
});
// Update overall progress
const progress = Math.round(((i + 1) / steps.length) * 100);
cronJobStorage.saveJobStatus(jobId, { progress });
}
// Mark board as migrated
this.markBoardAsMigrated(boardId, migrationType);
// Completed board migration
} catch (error) {
console.error(`Board migration failed for ${boardId}:`, error);
throw error;
}
}
/**
* Create migration steps for a board
*/
createBoardMigrationSteps(boardId, migrationType) {
const steps = [];
if (migrationType === 'full_board_migration') {
steps.push(
{ name: 'Check board structure', duration: 500, type: 'validation' },
{ name: 'Migrate lists to swimlanes', duration: 2000, type: 'lists' },
{ name: 'Migrate attachments', duration: 3000, type: 'attachments' },
{ name: 'Update board metadata', duration: 1000, type: 'metadata' },
{ name: 'Verify migration', duration: 1000, type: 'verification' }
);
} else {
// Default migration steps
steps.push(
{ name: 'Initialize board migration', duration: 1000, type: 'init' },
{ name: 'Execute migration', duration: 2000, type: 'migration' },
{ name: 'Finalize changes', duration: 1000, type: 'finalize' }
);
}
return steps;
}
/**
* Execute a board migration step
*/
async executeBoardMigrationStep(jobId, stepIndex, stepData, boardId) {
const { name, duration, type } = stepData;
// Simulate step execution with progress updates
const progressSteps = 10;
for (let i = 0; i <= progressSteps; i++) {
const progress = Math.round((i / progressSteps) * 100);
// Update step progress
cronJobStorage.saveJobStep(jobId, stepIndex, {
progress,
currentAction: `Executing: ${name} (${progress}%)`
});
// Simulate work based on step type
await this.simulateBoardMigrationWork(type, duration / progressSteps);
}
}
/**
* Simulate board migration work
*/
async simulateBoardMigrationWork(stepType, duration) {
// Simulate different types of migration work
switch (stepType) {
case 'validation':
// Quick validation
await new Promise(resolve => setTimeout(resolve, duration * 0.5));
break;
case 'lists':
// List migration work
await new Promise(resolve => setTimeout(resolve, duration));
break;
case 'attachments':
// Attachment migration work
await new Promise(resolve => setTimeout(resolve, duration * 1.2));
break;
case 'metadata':
// Metadata update work
await new Promise(resolve => setTimeout(resolve, duration * 0.8));
break;
case 'verification':
// Verification work
await new Promise(resolve => setTimeout(resolve, duration * 0.6));
break;
default:
// Default work
await new Promise(resolve => setTimeout(resolve, duration));
}
}
/**
* Mark a board as migrated
*/
markBoardAsMigrated(boardId, migrationType) {
try {
// Update board with migration markers and version
const updateQuery = {
'migrationMarkers.fullMigrationCompleted': true,
'migrationMarkers.lastMigration': new Date(),
'migrationMarkers.migrationType': migrationType,
'migrationVersion': 1 // Set migration version to prevent re-migration
};
// Update the board document
if (typeof Boards !== 'undefined') {
Boards.update(boardId, { $set: updateQuery });
}
console.log(`Marked board ${boardId} as migrated`);
} catch (error) {
console.error(`Error marking board ${boardId} as migrated:`, error);
}
}
/**
* Create a cron job for a migration step
*/
createCronJob(step) {
SyncedCron.add({
name: step.cronName,
schedule: (parser) => parser.text(step.schedule),
job: () => {
this.runMigrationStep(step);
},
});
}
/**
* Run a migration step
*/
async runMigrationStep(step) {
try {
// Check if already completed
if (step.completed) {
return; // Skip if already completed
}
// Starting migration step
cronMigrationCurrentStep.set(step.name);
cronMigrationStatus.set(`Running: ${step.description}`);
cronIsMigrating.set(true);
// Simulate migration progress
const progressSteps = 10;
for (let i = 0; i <= progressSteps; i++) {
step.progress = (i / progressSteps) * 100;
this.updateProgress();
// Simulate work
await new Promise(resolve => setTimeout(resolve, 100));
}
// Mark as completed
step.completed = true;
step.progress = 100;
step.status = 'completed';
// Remove the cron job to prevent re-running every minute
SyncedCron.remove(step.cronName);
// Completed migration step
// Update progress
this.updateProgress();
} catch (error) {
console.error(`Migration ${step.name} failed:`, error);
step.status = 'error';
cronMigrationStatus.set(`Migration failed: ${error.message}`);
}
}
/**
* Start all migrations using job queue
*/
async startAllMigrations() {
if (this.isRunning) {
return;
}
this.isRunning = true;
cronIsMigrating.set(true);
cronMigrationStatus.set('Starting...');
cronMigrationProgress.set(0);
cronMigrationCurrentStepNum.set(0);
cronMigrationTotalSteps.set(0);
this.startTime = Date.now();
// Update CronJobStatus for immediate pub/sub notification
CronJobStatus.upsert(
{ jobId: 'migration' },
{
$set: {
jobId: 'migration',
status: 'starting',
statusMessage: 'Starting migrations...',
progress: 0,
updatedAt: new Date()
}
}
);
try {
// Remove cron jobs to prevent conflicts with job queue
this.migrationSteps.forEach(step => {
try {
SyncedCron.remove(step.cronName);
} catch (error) {
// Ignore errors if cron job doesn't exist
}
});
let queuedJobs = 0;
// Add all migration steps to the job queue
for (let i = 0; i < this.migrationSteps.length; i++) {
const step = this.migrationSteps[i];
if (step.completed) {
continue; // Skip already completed steps
}
if (!this.isMigrationNeeded(step.id)) {
step.completed = true;
step.progress = 100;
step.status = 'completed';
this.updateProgress();
continue;
}
// Add to job queue
const jobId = `migration_${step.id}_${Date.now()}`;
cronJobStorage.addToQueue(jobId, 'migration', step.weight, {
stepId: step.id,
stepName: step.name,
stepDescription: step.description
});
queuedJobs++;
// Save initial job status
cronJobStorage.saveJobStatus(jobId, {
jobType: 'migration',
status: 'pending',
progress: 0,
stepId: step.id,
stepName: step.name,
stepDescription: step.description
});
}
if (queuedJobs === 0) {
cronIsMigrating.set(false);
cronMigrationStatus.set('No migration needed');
cronMigrationProgress.set(0);
cronMigrationCurrentStep.set('');
cronMigrationCurrentStepNum.set(0);
cronMigrationTotalSteps.set(0);
this.isRunning = false;
// Update CronJobStatus
CronJobStatus.upsert(
{ jobId: 'migration' },
{
$set: {
jobId: 'migration',
status: 'idle',
statusMessage: 'No migration needed',
progress: 0,
updatedAt: new Date()
}
}
);
return;
}
// Update to running state
CronJobStatus.upsert(
{ jobId: 'migration' },
{
$set: {
jobId: 'migration',
status: 'running',
statusMessage: 'Running migrations...',
progress: 0,
updatedAt: new Date()
}
}
);
// Status will be updated by monitorMigrationProgress
// Start monitoring progress
this.monitorMigrationProgress();
} catch (error) {
console.error('Failed to start migrations:', error);
cronMigrationStatus.set(`Failed to start migrations: ${error.message}`);
cronIsMigrating.set(false);
this.isRunning = false;
}
}
/**
* Start a specific migration by index
*/
async startSpecificMigration(migrationIndex) {
if (this.isRunning) {
return;
}
const step = this.migrationSteps[migrationIndex];
if (!step) {
throw new Meteor.Error('invalid-migration', 'Migration not found');
}
if (!this.isMigrationNeeded(step.id)) {
step.completed = true;
step.progress = 100;
step.status = 'completed';
this.updateProgress();
cronIsMigrating.set(false);
cronMigrationStatus.set('No migration needed');
this.isRunning = false;
return { skipped: true };
}
this.isRunning = true;
cronIsMigrating.set(true);
cronMigrationStatus.set('Starting...');
cronMigrationProgress.set(0);
cronMigrationCurrentStepNum.set(1);
cronMigrationTotalSteps.set(1);
this.startTime = Date.now();
try {
// Remove cron job to prevent conflicts
try {
SyncedCron.remove(step.cronName);
} catch (error) {
// Ignore errors if cron job doesn't exist
}
// Add single migration step to the job queue
const jobId = `migration_${step.id}_${Date.now()}`;
cronJobStorage.addToQueue(jobId, 'migration', step.weight, {
stepId: step.id,
stepName: step.name,
stepDescription: step.description
});
// Save initial job status
cronJobStorage.saveJobStatus(jobId, {
jobType: 'migration',
status: 'pending',
progress: 0,
stepId: step.id,
stepName: step.name,
stepDescription: step.description
});
// Status will be updated by monitorMigrationProgress
// Start monitoring progress
this.monitorMigrationProgress();
} catch (error) {
console.error('Failed to start migration:', error);
cronMigrationStatus.set(`Failed to start migration: ${error.message}`);
cronIsMigrating.set(false);
this.isRunning = false;
}
}
/**
* Monitor migration progress
*/
monitorMigrationProgress() {
// Clear any existing monitor interval
if (this.monitorInterval) {
Meteor.clearInterval(this.monitorInterval);
}
this.monitorInterval = Meteor.setInterval(() => {
const stats = cronJobStorage.getQueueStats();
const incompleteJobs = cronJobStorage.getIncompleteJobs();
const pausedJobs = incompleteJobs.filter(job => job.status === 'paused');
// Check if all migrations are completed first
const totalJobs = stats.total;
const completedJobs = stats.completed;
if (stats.completed === totalJobs && totalJobs > 0 && stats.running === 0) {
// All migrations completed - immediately clear isMigrating to hide progress
cronIsMigrating.set(false);
cronMigrationStatus.set('All migrations completed successfully!');
cronMigrationProgress.set(0);
cronMigrationCurrentStep.set('');
cronMigrationCurrentStepNum.set(0);
cronMigrationTotalSteps.set(0);
// Clear status message after delay
setTimeout(() => {
cronMigrationStatus.set('');
}, 5000);
Meteor.clearInterval(this.monitorInterval);
this.monitorInterval = null;
return; // Exit early to avoid setting progress to 100%
}
// Update progress for active migrations
const progress = totalJobs > 0 ? Math.round((completedJobs / totalJobs) * 100) : 0;
cronMigrationProgress.set(progress);
cronMigrationTotalSteps.set(totalJobs);
const currentStepNum = completedJobs + (stats.running > 0 ? 1 : 0);
cronMigrationCurrentStepNum.set(currentStepNum);
// Update status
if (stats.running > 0) {
const runningJob = incompleteJobs.find(job => job.status === 'running');
if (runningJob) {
cronMigrationStatus.set(`Running: ${currentStepNum}/${totalJobs} ${runningJob.stepName || 'Migration in progress'}`);
cronMigrationCurrentStep.set('');
}
} else if (pausedJobs.length > 0) {
cronIsMigrating.set(false);
cronMigrationStatus.set(`Migrations paused (${pausedJobs.length})`);
cronMigrationCurrentStep.set('');
} else if (stats.pending > 0) {
cronMigrationStatus.set(`${stats.pending} migrations pending in queue`);
cronMigrationCurrentStep.set('');
}
}, 2000); // Check every 2 seconds
}
/**
* Start a specific cron job
*/
async startCronJob(cronName) {
// Change schedule to run once
const job = SyncedCron._entries?.[cronName];
if (job) {
job.schedule = 'once';
SyncedCron.start();
}
}
/**
* Wait for a cron job to complete
*/
async waitForCronJobCompletion(step) {
return new Promise((resolve) => {
const checkInterval = setInterval(() => {
if (step.completed || step.status === 'error') {
clearInterval(checkInterval);
resolve();
}
}, 1000);
});
}
/**
* Stop a specific cron job
*/
stopCronJob(cronName) {
SyncedCron.remove(cronName);
const step = this.migrationSteps.find(s => s.cronName === cronName);
if (step) {
step.status = 'stopped';
}
this.updateCronJobsList();
}
/**
* Pause a specific cron job
* Note: quave:synced-cron only has global pause(), so we implement per-job pause
* by storing the job config and removing it, then re-adding on resume.
*/
pauseCronJob(cronName) {
const entry = SyncedCron._entries?.[cronName];
if (entry) {
// Store the job config before removing
this.pausedJobs.set(cronName, {
name: entry.name,
schedule: entry.schedule,
job: entry.job
});
SyncedCron.remove(cronName);
}
const step = this.migrationSteps.find(s => s.cronName === cronName);
if (step) {
step.status = 'paused';
}
this.updateCronJobsList();
}
/**
* Resume a specific cron job
* Note: quave:synced-cron doesn't have resume(), so we re-add the stored job config.
*/
resumeCronJob(cronName) {
const pausedJob = this.pausedJobs.get(cronName);
if (pausedJob) {
SyncedCron.add(pausedJob);
this.pausedJobs.delete(cronName);
SyncedCron.start();
}
const step = this.migrationSteps.find(s => s.cronName === cronName);
if (step) {
step.status = 'running';
}
this.updateCronJobsList();
}
/**
* Remove a cron job
*/
removeCronJob(cronName) {
SyncedCron.remove(cronName);
this.migrationSteps = this.migrationSteps.filter(s => s.cronName !== cronName);
this.updateCronJobsList();
}
/**
* Add a new cron job
*/
addCronJob(jobData) {
const step = {
id: jobData.id || `custom_${Date.now()}`,
name: jobData.name,
description: jobData.description,
weight: jobData.weight || 1,
completed: false,
progress: 0,
cronName: jobData.cronName || `custom_${Date.now()}`,
schedule: jobData.schedule || 'every 1 minute',
status: 'stopped'
};
this.migrationSteps.push(step);
this.createCronJob(step);
this.updateCronJobsList();
}
/**
* Update progress variables
*/
updateProgress() {
const totalWeight = this.migrationSteps.reduce((total, step) => total + step.weight, 0);
const completedWeight = this.migrationSteps.reduce((total, step) => {
return total + (step.completed ? step.weight : step.progress * step.weight / 100);
}, 0);
const progress = Math.round((completedWeight / totalWeight) * 100);
cronMigrationProgress.set(progress);
cronMigrationSteps.set([...this.migrationSteps]);
}
/**
* Update cron jobs list
*/
updateCronJobsList() {
// Check if SyncedCron is available and has entries
const entries = SyncedCron?._entries;
if (!entries || typeof entries !== 'object') {
// SyncedCron not available or no jobs yet
cronJobs.set([]);
return;
}
const jobs = Object.values(entries).map(job => {
const step = this.migrationSteps.find(s => s.cronName === job.name);
return {
name: job.name,
schedule: job.schedule,
status: step ? step.status : 'unknown',
lastRun: job.lastRun,
nextRun: job.nextRun,
running: job.running
};
});
cronJobs.set(jobs);
}
/**
* Get all cron jobs
*/
getAllCronJobs() {
return cronJobs.get();
}
/**
* Get migration steps
*/
getMigrationSteps() {
return this.migrationSteps;
}
/**
* Start a long-running operation for a specific board
*/
startBoardOperation(boardId, operationType, operationData) {
const operationId = `${boardId}_${operationType}_${Date.now()}`;
// Add to job queue
cronJobStorage.addToQueue(operationId, 'board_operation', 3, {
boardId,
operationType,
operationData
});
// Save initial job status
cronJobStorage.saveJobStatus(operationId, {
jobType: 'board_operation',
status: 'pending',
progress: 0,
boardId,
operationType,
operationData,
createdAt: new Date()
});
// Update board operations map for backward compatibility
const operation = {
id: operationId,
boardId: boardId,
type: operationType,
data: operationData,
status: 'pending',
progress: 0,
startTime: new Date(),
endTime: null,
error: null
};
const operations = boardOperations.get();
operations.set(operationId, operation);
boardOperations.set(operations);
return operationId;
}
/**
* Execute a board operation
*/
async executeBoardOperation(operationId, operationType, operationData) {
const operations = boardOperations.get();
const operation = operations.get(operationId);
if (!operation) {
console.error(`Operation ${operationId} not found`);
return;
}
try {
console.log(`Starting board operation: ${operationType} for board ${operation.boardId}`);
// Update operation status
operation.status = 'running';
operation.progress = 0;
this.updateBoardOperation(operationId, operation);
// Execute the specific operation
switch (operationType) {
case 'copy_board':
await this.copyBoard(operationId, operationData);
break;
case 'move_board':
await this.moveBoard(operationId, operationData);
break;
case 'copy_swimlane':
await this.copySwimlane(operationId, operationData);
break;
case 'move_swimlane':
await this.moveSwimlane(operationId, operationData);
break;
case 'copy_list':
await this.copyList(operationId, operationData);
break;
case 'move_list':
await this.moveList(operationId, operationData);
break;
case 'copy_card':
await this.copyCard(operationId, operationData);
break;
case 'move_card':
await this.moveCard(operationId, operationData);
break;
case 'copy_checklist':
await this.copyChecklist(operationId, operationData);
break;
case 'move_checklist':
await this.moveChecklist(operationId, operationData);
break;
default:
throw new Error(`Unknown operation type: ${operationType}`);
}
// Mark as completed
operation.status = 'completed';
operation.progress = 100;
operation.endTime = new Date();
this.updateBoardOperation(operationId, operation);
console.log(`Completed board operation: ${operationType} for board ${operation.boardId}`);
} catch (error) {
console.error(`Board operation ${operationType} failed:`, error);
operation.status = 'error';
operation.error = error.message;
operation.endTime = new Date();
this.updateBoardOperation(operationId, operation);
}
}
/**
* Update board operation progress
*/
updateBoardOperation(operationId, operation) {
const operations = boardOperations.get();
operations.set(operationId, operation);
boardOperations.set(operations);
// Update progress map
const progressMap = boardOperationProgress.get();
progressMap.set(operationId, {
progress: operation.progress,
status: operation.status,
error: operation.error
});
boardOperationProgress.set(progressMap);
}
/**
* Copy board operation
*/
async copyBoard(operationId, data) {
const { sourceBoardId, targetBoardId, copyOptions } = data;
const operation = boardOperations.get().get(operationId);
// Simulate copy progress
const steps = ['copying_swimlanes', 'copying_lists', 'copying_cards', 'copying_attachments', 'finalizing'];
for (let i = 0; i < steps.length; i++) {
operation.progress = Math.round(((i + 1) / steps.length) * 100);
this.updateBoardOperation(operationId, operation);
// Simulate work
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
/**
* Move board operation
*/
async moveBoard(operationId, data) {
const { sourceBoardId, targetBoardId, moveOptions } = data;
const operation = boardOperations.get().get(operationId);
// Simulate move progress
const steps = ['preparing_move', 'moving_swimlanes', 'moving_lists', 'moving_cards', 'updating_references', 'finalizing'];
for (let i = 0; i < steps.length; i++) {
operation.progress = Math.round(((i + 1) / steps.length) * 100);
this.updateBoardOperation(operationId, operation);
// Simulate work
await new Promise(resolve => setTimeout(resolve, 800));
}
}
/**
* Copy swimlane operation
*/
async copySwimlane(operationId, data) {
const { sourceSwimlaneId, targetBoardId, copyOptions } = data;
const operation = boardOperations.get().get(operationId);
// Simulate copy progress
const steps = ['copying_swimlane', 'copying_lists', 'copying_cards', 'finalizing'];
for (let i = 0; i < steps.length; i++) {
operation.progress = Math.round(((i + 1) / steps.length) * 100);
this.updateBoardOperation(operationId, operation);
// Simulate work
await new Promise(resolve => setTimeout(resolve, 500));
}
}
/**
* Move swimlane operation
*/
async moveSwimlane(operationId, data) {
const { sourceSwimlaneId, targetBoardId, moveOptions } = data;
const operation = boardOperations.get().get(operationId);
// Simulate move progress
const steps = ['preparing_move', 'moving_swimlane', 'updating_references', 'finalizing'];
for (let i = 0; i < steps.length; i++) {
operation.progress = Math.round(((i + 1) / steps.length) * 100);
this.updateBoardOperation(operationId, operation);
// Simulate work
await new Promise(resolve => setTimeout(resolve, 400));
}
}
/**
* Copy list operation
*/
async copyList(operationId, data) {
const { sourceListId, targetBoardId, copyOptions } = data;
const operation = boardOperations.get().get(operationId);
// Simulate copy progress
const steps = ['copying_list', 'copying_cards', 'copying_attachments', 'finalizing'];
for (let i = 0; i < steps.length; i++) {
operation.progress = Math.round(((i + 1) / steps.length) * 100);
this.updateBoardOperation(operationId, operation);
// Simulate work
await new Promise(resolve => setTimeout(resolve, 300));
}
}
/**
* Move list operation
*/
async moveList(operationId, data) {
const { sourceListId, targetBoardId, moveOptions } = data;
const operation = boardOperations.get().get(operationId);
// Simulate move progress
const steps = ['preparing_move', 'moving_list', 'updating_references', 'finalizing'];
for (let i = 0; i < steps.length; i++) {
operation.progress = Math.round(((i + 1) / steps.length) * 100);
this.updateBoardOperation(operationId, operation);
// Simulate work
await new Promise(resolve => setTimeout(resolve, 200));
}
}
/**
* Copy card operation
*/
async copyCard(operationId, data) {
const { sourceCardId, targetListId, copyOptions } = data;
const operation = boardOperations.get().get(operationId);
// Simulate copy progress
const steps = ['copying_card', 'copying_attachments', 'copying_checklists', 'finalizing'];
for (let i = 0; i < steps.length; i++) {
operation.progress = Math.round(((i + 1) / steps.length) * 100);
this.updateBoardOperation(operationId, operation);
// Simulate work
await new Promise(resolve => setTimeout(resolve, 150));
}
}
/**
* Move card operation
*/
async moveCard(operationId, data) {
const { sourceCardId, targetListId, moveOptions } = data;
const operation = boardOperations.get().get(operationId);
// Simulate move progress
const steps = ['preparing_move', 'moving_card', 'updating_references', 'finalizing'];
for (let i = 0; i < steps.length; i++) {
operation.progress = Math.round(((i + 1) / steps.length) * 100);
this.updateBoardOperation(operationId, operation);
// Simulate work
await new Promise(resolve => setTimeout(resolve, 100));
}
}
/**
* Copy checklist operation
*/
async copyChecklist(operationId, data) {
const { sourceChecklistId, targetCardId, copyOptions } = data;
const operation = boardOperations.get().get(operationId);
// Simulate copy progress
const steps = ['copying_checklist', 'copying_items', 'finalizing'];
for (let i = 0; i < steps.length; i++) {
operation.progress = Math.round(((i + 1) / steps.length) * 100);
this.updateBoardOperation(operationId, operation);
// Simulate work
await new Promise(resolve => setTimeout(resolve, 100));
}
}
/**
* Move checklist operation
*/
async moveChecklist(operationId, data) {
const { sourceChecklistId, targetCardId, moveOptions } = data;
const operation = boardOperations.get().get(operationId);
// Simulate move progress
const steps = ['preparing_move', 'moving_checklist', 'finalizing'];
for (let i = 0; i < steps.length; i++) {
operation.progress = Math.round(((i + 1) / steps.length) * 100);
this.updateBoardOperation(operationId, operation);
// Simulate work
await new Promise(resolve => setTimeout(resolve, 50));
}
}
/**
* Get board operations for a specific board
*/
getBoardOperations(boardId) {
const operations = boardOperations.get();
const boardOps = [];
for (const [operationId, operation] of operations) {
if (operation.boardId === boardId) {
boardOps.push(operation);
}
}
return boardOps.sort((a, b) => b.startTime - a.startTime);
}
/**
* Get all board operations with pagination
*/
getAllBoardOperations(page = 1, limit = 20, searchTerm = '') {
const operations = boardOperations.get();
const allOps = Array.from(operations.values());
// Filter by search term if provided
let filteredOps = allOps;
if (searchTerm) {
filteredOps = allOps.filter(op =>
op.boardId.toLowerCase().includes(searchTerm.toLowerCase()) ||
op.type.toLowerCase().includes(searchTerm.toLowerCase())
);
}
// Sort by start time (newest first)
filteredOps.sort((a, b) => b.startTime - a.startTime);
// Paginate
const startIndex = (page - 1) * limit;
const endIndex = startIndex + limit;
const paginatedOps = filteredOps.slice(startIndex, endIndex);
return {
operations: paginatedOps,
total: filteredOps.length,
page: page,
limit: limit,
totalPages: Math.ceil(filteredOps.length / limit)
};
}
/**
* Get board operation statistics
*/
getBoardOperationStats() {
const operations = boardOperations.get();
const stats = {
total: operations.size,
running: 0,
completed: 0,
error: 0,
byType: {}
};
for (const [operationId, operation] of operations) {
stats[operation.status]++;
if (!stats.byType[operation.type]) {
stats.byType[operation.type] = 0;
}
stats.byType[operation.type]++;
}
return stats;
}
2025-10-20 01:20:28 +03:00
/**
* Clear all cron jobs and restart migration system
*/
clearAllCronJobs() {
try {
// Stop all existing cron jobs
if (SyncedCron?._entries) {
Object.values(SyncedCron._entries).forEach(job => {
2025-10-20 01:20:28 +03:00
try {
SyncedCron.remove(job.name);
} catch (error) {
console.warn(`Failed to remove cron job ${job.name}:`, error.message);
}
});
}
// Clear job storage
cronJobStorage.clearAllJobs();
// Reset migration steps
this.migrationSteps = this.initializeMigrationSteps();
this.currentStepIndex = 0;
this.isRunning = false;
// Restart the migration system
this.initialize();
console.log('All cron jobs cleared and migration system restarted');
return { success: true, message: 'All cron jobs cleared and migration system restarted' };
} catch (error) {
console.error('Error clearing cron jobs:', error);
return { success: false, error: error.message };
}
}
/**
* Pause all migrations
*/
pauseAllMigrations() {
this.isRunning = false;
cronIsMigrating.set(false);
cronMigrationStatus.set('Migrations paused');
// Update CronJobStatus for immediate pub/sub notification
CronJobStatus.upsert(
{ jobId: 'migration' },
{
$set: {
jobId: 'migration',
status: 'pausing',
statusMessage: 'Pausing migrations...',
updatedAt: new Date()
}
}
);
// Update all pending jobs in queue to paused
const pendingJobs = cronJobStorage.getIncompleteJobs();
pendingJobs.forEach(job => {
if (job.status === 'pending' || job.status === 'running') {
cronJobStorage.updateQueueStatus(job.jobId, 'paused');
cronJobStorage.saveJobStatus(job.jobId, { status: 'paused' });
}
});
// Update to final paused state
CronJobStatus.upsert(
{ jobId: 'migration' },
{
$set: {
jobId: 'migration',
status: 'paused',
statusMessage: 'Migrations paused',
updatedAt: new Date()
}
}
);
return { success: true, message: 'All migrations paused' };
}
/**
* Stop all migrations
*/
stopAllMigrations() {
// Update CronJobStatus for immediate pub/sub notification
CronJobStatus.upsert(
{ jobId: 'migration' },
{
$set: {
jobId: 'migration',
status: 'stopping',
statusMessage: 'Stopping migrations...',
updatedAt: new Date()
}
}
);
// Clear monitor interval first to prevent status override
if (this.monitorInterval) {
Meteor.clearInterval(this.monitorInterval);
this.monitorInterval = null;
}
// Stop all running and pending jobs
const incompleteJobs = cronJobStorage.getIncompleteJobs();
incompleteJobs.forEach(job => {
cronJobStorage.updateQueueStatus(job.jobId, 'stopped', { stoppedAt: new Date() });
cronJobStorage.saveJobStatus(job.jobId, {
status: 'stopped',
stoppedAt: new Date()
});
});
// Reset migration state immediately
this.isRunning = false;
cronIsMigrating.set(false);
cronMigrationProgress.set(0);
cronMigrationCurrentStep.set('');
cronMigrationCurrentStepNum.set(0);
cronMigrationTotalSteps.set(0);
cronMigrationStatus.set('All migrations stopped');
// Update to final stopped state
CronJobStatus.upsert(
{ jobId: 'migration' },
{
$set: {
jobId: 'migration',
status: 'stopped',
statusMessage: 'All migrations stopped',
progress: 0,
updatedAt: new Date()
}
}
);
// Clear status message after delay
Meteor.setTimeout(() => {
cronMigrationStatus.set('');
CronJobStatus.upsert(
{ jobId: 'migration' },
{
$set: {
statusMessage: '',
updatedAt: new Date()
}
}
);
}, 3000);
return { success: true, message: 'All migrations stopped' };
}
/**
* Resume all paused migrations
*/
resumeAllMigrations() {
// Find all paused jobs and resume them
const pausedJobs = CronJobStatus.find({ status: 'paused' }).fetch();
if (pausedJobs.length === 0) {
return { success: false, message: 'No paused migrations to resume' };
}
pausedJobs.forEach(job => {
cronJobStorage.updateQueueStatus(job.jobId, 'pending');
cronJobStorage.saveJobStatus(job.jobId, { status: 'pending' });
});
this.isRunning = true;
cronIsMigrating.set(true);
cronMigrationStatus.set('Resuming migrations...');
// Restart monitoring
this.monitorMigrationProgress();
return { success: true, message: `Resumed ${pausedJobs.length} migrations` };
}
/**
* Retry failed migrations
*/
retryFailedMigrations() {
const failedJobs = CronJobStatus.find({ status: 'failed' }).fetch();
if (failedJobs.length === 0) {
return { success: false, message: 'No failed migrations to retry' };
}
// Clear errors for failed jobs
failedJobs.forEach(job => {
cronJobStorage.clearJobErrors(job.jobId);
cronJobStorage.updateQueueStatus(job.jobId, 'pending');
cronJobStorage.saveJobStatus(job.jobId, {
status: 'pending',
progress: 0,
error: null
});
});
if (!this.isRunning) {
this.isRunning = true;
cronIsMigrating.set(true);
cronMigrationStatus.set('Retrying failed migrations...');
this.monitorMigrationProgress();
}
return { success: true, message: `Retrying ${failedJobs.length} failed migrations` };
}
/**
* Get all migration errors
*/
getAllMigrationErrors(limit = 50) {
return cronJobStorage.getAllRecentErrors(limit);
}
/**
* Get errors for a specific job
*/
getJobErrors(jobId, options = {}) {
return cronJobStorage.getJobErrors(jobId, options);
}
/**
* Get migration stats including errors
*/
getMigrationStats() {
const queueStats = cronJobStorage.getQueueStats();
const allErrors = cronJobStorage.getAllRecentErrors(100);
const errorsByJob = {};
allErrors.forEach(error => {
if (!errorsByJob[error.jobId]) {
errorsByJob[error.jobId] = [];
}
errorsByJob[error.jobId].push(error);
});
return {
...queueStats,
totalErrors: allErrors.length,
errorsByJob,
recentErrors: allErrors.slice(0, 10)
};
}
}
// Export singleton instance
export const cronMigrationManager = new CronMigrationManager();
// Initialize cron jobs on server start
Meteor.startup(() => {
cronMigrationManager.initializeCronJobs();
});
// Meteor methods for client-server communication
Meteor.methods({
'cron.startAllMigrations'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.startAllMigrations();
},
'cron.startSpecificMigration'(migrationIndex) {
check(migrationIndex, Number);
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.startSpecificMigration(migrationIndex);
},
'cron.startJob'(cronName) {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.startCronJob(cronName);
},
'cron.stopJob'(cronName) {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.stopCronJob(cronName);
},
'cron.pauseJob'(cronName) {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.pauseCronJob(cronName);
},
'cron.resumeJob'(cronName) {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.resumeCronJob(cronName);
},
'cron.removeJob'(cronName) {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.removeCronJob(cronName);
},
'cron.addJob'(jobData) {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.addCronJob(jobData);
},
'cron.getJobs'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.getAllCronJobs();
},
'cron.getMigrationProgress'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
const runningJob = CronJobStatus.findOne(
{ status: 'running', jobType: 'migration' },
{ sort: { updatedAt: -1 } }
);
let currentAction = '';
let jobProgress = 0;
let jobStepNum = 0;
let jobTotalSteps = 0;
let etaSeconds = null;
let elapsedSeconds = null;
let migrationNumber = null;
let migrationName = '';
if (runningJob) {
jobProgress = runningJob.progress || 0;
const steps = cronJobStorage.getJobSteps(runningJob.jobId);
jobTotalSteps = steps.length;
const runningStep = steps.find(step => step.status === 'running') || steps[steps.length - 1];
if (runningStep) {
currentAction = runningStep.currentAction || runningStep.stepName || '';
jobStepNum = (runningStep.stepIndex || 0) + 1;
}
const startedAt = runningJob.startedAt || runningJob.createdAt || runningJob.updatedAt;
if (startedAt) {
elapsedSeconds = Math.max(0, Math.round((Date.now() - startedAt.getTime()) / 1000));
if (jobProgress > 0) {
etaSeconds = Math.max(0, Math.round((elapsedSeconds * (100 - jobProgress)) / jobProgress));
}
}
if (runningJob.stepId) {
const steps = cronMigrationManager.getMigrationSteps();
const index = steps.findIndex(step => step.id === runningJob.stepId);
if (index >= 0) {
migrationNumber = index + 1;
migrationName = steps[index].name;
}
}
}
const migrationStepsLoaded = cronMigrationSteps.get().length;
const migrationStepsTotal = cronMigrationManager.getMigrationSteps().length;
return {
progress: cronMigrationProgress.get(),
status: cronMigrationStatus.get(),
currentStep: cronMigrationCurrentStep.get(),
steps: cronMigrationSteps.get(),
isMigrating: cronIsMigrating.get(),
currentStepNum: cronMigrationCurrentStepNum.get(),
totalSteps: cronMigrationTotalSteps.get(),
migrationStepsLoaded,
migrationStepsTotal,
currentAction,
jobProgress,
jobStepNum,
jobTotalSteps,
etaSeconds,
elapsedSeconds,
migrationNumber,
migrationName
};
},
'cron.pauseAllMigrations'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.pauseAllMigrations();
},
'cron.stopAllMigrations'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.stopAllMigrations();
},
'cron.resumeAllMigrations'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.resumeAllMigrations();
},
'cron.retryFailedMigrations'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.retryFailedMigrations();
},
'cron.getAllMigrationErrors'(limit = 50) {
check(limit, Match.Optional(Number));
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.getAllMigrationErrors(limit);
},
'cron.getJobErrors'(jobId, options = {}) {
check(jobId, String);
check(options, Match.Optional(Object));
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.getJobErrors(jobId, options);
},
'cron.getMigrationStats'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.getMigrationStats();
},
'cron.startBoardOperation'(boardId, operationType, operationData) {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
// Check if user is global admin OR board admin
const user = ReactiveCache.getUser(userId);
const board = ReactiveCache.getBoard(boardId);
if (!user) {
throw new Meteor.Error('not-authorized', 'User not found');
}
if (!board) {
throw new Meteor.Error('not-found', 'Board not found');
}
// Check global admin or board admin
const isGlobalAdmin = user.isAdmin;
const isBoardAdmin = board.members && board.members.some(member =>
member.userId === userId && member.isAdmin
);
if (!isGlobalAdmin && !isBoardAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required for this board');
}
return cronMigrationManager.startBoardOperation(boardId, operationType, operationData);
},
'cron.getBoardOperations'(boardId) {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
// Check if user is global admin OR board admin
const user = ReactiveCache.getUser(userId);
const board = ReactiveCache.getBoard(boardId);
if (!user) {
throw new Meteor.Error('not-authorized', 'User not found');
}
if (!board) {
throw new Meteor.Error('not-found', 'Board not found');
}
// Check global admin or board admin
const isGlobalAdmin = user.isAdmin;
const isBoardAdmin = board.members && board.members.some(member =>
member.userId === userId && member.isAdmin
);
if (!isGlobalAdmin && !isBoardAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required for this board');
}
return cronMigrationManager.getBoardOperations(boardId);
},
'cron.getAllBoardOperations'(page, limit, searchTerm) {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.getAllBoardOperations(page, limit, searchTerm);
},
'cron.getBoardOperationStats'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronMigrationManager.getBoardOperationStats();
},
'cron.getJobDetails'(jobId) {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronJobStorage.getJobDetails(jobId);
},
'cron.getQueueStats'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronJobStorage.getQueueStats();
},
'cron.getSystemResources'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronJobStorage.getSystemResources();
},
2025-10-20 01:20:28 +03:00
'cron.clearAllJobs'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
2025-10-20 01:20:28 +03:00
}
2025-10-20 01:20:28 +03:00
return cronMigrationManager.clearAllCronJobs();
},
'cron.pauseJob'(jobId) {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
cronJobStorage.updateQueueStatus(jobId, 'paused');
cronJobStorage.saveJobStatus(jobId, { status: 'paused' });
return { success: true };
},
'cron.resumeJob'(jobId) {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
cronJobStorage.updateQueueStatus(jobId, 'pending');
cronJobStorage.saveJobStatus(jobId, { status: 'pending' });
return { success: true };
},
'cron.stopJob'(jobId) {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
cronJobStorage.updateQueueStatus(jobId, 'stopped');
cronJobStorage.saveJobStatus(jobId, {
status: 'stopped',
stoppedAt: new Date()
});
return { success: true };
},
'cron.cleanupOldJobs'(daysOld) {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
return cronJobStorage.cleanupOldJobs(daysOld);
},
'cron.getBoardMigrationStats'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
// Import the board migration detector
const { boardMigrationDetector } = require('./boardMigrationDetector');
return boardMigrationDetector.getMigrationStats();
},
'cron.forceBoardMigrationScan'() {
const userId = this.userId;
if (!userId) {
throw new Meteor.Error('not-authorized', 'Must be logged in');
}
const user = ReactiveCache.getUser(userId);
if (!user || !user.isAdmin) {
throw new Meteor.Error('not-authorized', 'Admin access required');
}
// Import the board migration detector
const { boardMigrationDetector } = require('./boardMigrationDetector');
return boardMigrationDetector.forceScan();
},
});