fix: Recover in-memory dispatch queues from DB on server startup
Both phaseQueue and taskQueue are in-memory Maps lost on restart. Now the orchestrator's start() method scans active initiatives and: - Re-queues approved phases into the phase dispatch queue - Re-queues pending tasks for in_progress phases into the task dispatch queue - Triggers a dispatch cycle if anything was recovered This fixes stuck phases/tasks after server restarts.
This commit is contained in:
@@ -66,6 +66,11 @@ export class ExecutionOrchestrator {
|
||||
});
|
||||
});
|
||||
|
||||
// Recover in-memory dispatch queues from DB state (survives server restarts)
|
||||
this.recoverDispatchQueues().catch((err) => {
|
||||
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch queue recovery failed');
|
||||
});
|
||||
|
||||
log.info('execution orchestrator started');
|
||||
}
|
||||
|
||||
@@ -420,6 +425,52 @@ export class ExecutionOrchestrator {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Recover in-memory dispatch queues from DB state on server startup.
|
||||
* Re-queues approved phases and pending tasks for in_progress phases.
|
||||
*/
|
||||
private async recoverDispatchQueues(): Promise<void> {
|
||||
const initiatives = await this.initiativeRepository.findByStatus('active');
|
||||
let phasesRecovered = 0;
|
||||
let tasksRecovered = 0;
|
||||
|
||||
for (const initiative of initiatives) {
|
||||
const phases = await this.phaseRepository.findByInitiativeId(initiative.id);
|
||||
|
||||
for (const phase of phases) {
|
||||
// Re-queue approved phases into the phase dispatch queue
|
||||
if (phase.status === 'approved') {
|
||||
try {
|
||||
await this.phaseDispatchManager.queuePhase(phase.id);
|
||||
phasesRecovered++;
|
||||
} catch {
|
||||
// Already queued or status changed
|
||||
}
|
||||
}
|
||||
|
||||
// Re-queue pending tasks for in_progress phases into the task dispatch queue
|
||||
if (phase.status === 'in_progress') {
|
||||
const tasks = await this.taskRepository.findByPhaseId(phase.id);
|
||||
for (const task of tasks) {
|
||||
if (task.status === 'pending') {
|
||||
try {
|
||||
await this.dispatchManager.queue(task.id);
|
||||
tasksRecovered++;
|
||||
} catch {
|
||||
// Already queued or task issue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (phasesRecovered > 0 || tasksRecovered > 0) {
|
||||
log.info({ phasesRecovered, tasksRecovered }, 'recovered dispatch queues from DB state');
|
||||
this.scheduleDispatch();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if all phases for an initiative are completed.
|
||||
* If so, set initiative to pending_review and emit event.
|
||||
|
||||
Reference in New Issue
Block a user