fix: Recover in-memory dispatch queues from DB on server startup

Both phaseQueue and taskQueue are in-memory Maps lost on restart. Now
the orchestrator's start() method scans active initiatives and:
- Re-queues approved phases into the phase dispatch queue
- Re-queues pending tasks for in_progress phases into the task dispatch queue
- Triggers a dispatch cycle if anything was recovered

This fixes stuck phases/tasks after server restarts.
This commit is contained in:
Lukas May
2026-03-05 21:10:32 +01:00
parent 573e1b40d2
commit 47fa924927

View File

@@ -66,6 +66,11 @@ export class ExecutionOrchestrator {
});
});
// Recover in-memory dispatch queues from DB state (survives server restarts)
this.recoverDispatchQueues().catch((err) => {
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch queue recovery failed');
});
log.info('execution orchestrator started');
}
@@ -420,6 +425,52 @@ export class ExecutionOrchestrator {
}
}
/**
* Recover in-memory dispatch queues from DB state on server startup.
* Re-queues approved phases and pending tasks for in_progress phases.
*/
private async recoverDispatchQueues(): Promise<void> {
const initiatives = await this.initiativeRepository.findByStatus('active');
let phasesRecovered = 0;
let tasksRecovered = 0;
for (const initiative of initiatives) {
const phases = await this.phaseRepository.findByInitiativeId(initiative.id);
for (const phase of phases) {
// Re-queue approved phases into the phase dispatch queue
if (phase.status === 'approved') {
try {
await this.phaseDispatchManager.queuePhase(phase.id);
phasesRecovered++;
} catch {
// Already queued or status changed
}
}
// Re-queue pending tasks for in_progress phases into the task dispatch queue
if (phase.status === 'in_progress') {
const tasks = await this.taskRepository.findByPhaseId(phase.id);
for (const task of tasks) {
if (task.status === 'pending') {
try {
await this.dispatchManager.queue(task.id);
tasksRecovered++;
} catch {
// Already queued or task issue
}
}
}
}
}
}
if (phasesRecovered > 0 || tasksRecovered > 0) {
log.info({ phasesRecovered, tasksRecovered }, 'recovered dispatch queues from DB state');
this.scheduleDispatch();
}
}
/**
* Check if all phases for an initiative are completed.
* If so, set initiative to pending_review and emit event.