fix: Recover in-memory dispatch queues from DB on server startup
Both phaseQueue and taskQueue are in-memory Maps lost on restart. Now the orchestrator's start() method scans active initiatives and: - Re-queues approved phases into the phase dispatch queue - Re-queues pending tasks for in_progress phases into the task dispatch queue - Triggers a dispatch cycle if anything was recovered This fixes stuck phases/tasks after server restarts.
This commit is contained in:
@@ -66,6 +66,11 @@ export class ExecutionOrchestrator {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Recover in-memory dispatch queues from DB state (survives server restarts)
|
||||||
|
this.recoverDispatchQueues().catch((err) => {
|
||||||
|
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch queue recovery failed');
|
||||||
|
});
|
||||||
|
|
||||||
log.info('execution orchestrator started');
|
log.info('execution orchestrator started');
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -420,6 +425,52 @@ export class ExecutionOrchestrator {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recover in-memory dispatch queues from DB state on server startup.
|
||||||
|
* Re-queues approved phases and pending tasks for in_progress phases.
|
||||||
|
*/
|
||||||
|
private async recoverDispatchQueues(): Promise<void> {
|
||||||
|
const initiatives = await this.initiativeRepository.findByStatus('active');
|
||||||
|
let phasesRecovered = 0;
|
||||||
|
let tasksRecovered = 0;
|
||||||
|
|
||||||
|
for (const initiative of initiatives) {
|
||||||
|
const phases = await this.phaseRepository.findByInitiativeId(initiative.id);
|
||||||
|
|
||||||
|
for (const phase of phases) {
|
||||||
|
// Re-queue approved phases into the phase dispatch queue
|
||||||
|
if (phase.status === 'approved') {
|
||||||
|
try {
|
||||||
|
await this.phaseDispatchManager.queuePhase(phase.id);
|
||||||
|
phasesRecovered++;
|
||||||
|
} catch {
|
||||||
|
// Already queued or status changed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-queue pending tasks for in_progress phases into the task dispatch queue
|
||||||
|
if (phase.status === 'in_progress') {
|
||||||
|
const tasks = await this.taskRepository.findByPhaseId(phase.id);
|
||||||
|
for (const task of tasks) {
|
||||||
|
if (task.status === 'pending') {
|
||||||
|
try {
|
||||||
|
await this.dispatchManager.queue(task.id);
|
||||||
|
tasksRecovered++;
|
||||||
|
} catch {
|
||||||
|
// Already queued or task issue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (phasesRecovered > 0 || tasksRecovered > 0) {
|
||||||
|
log.info({ phasesRecovered, tasksRecovered }, 'recovered dispatch queues from DB state');
|
||||||
|
this.scheduleDispatch();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if all phases for an initiative are completed.
|
* Check if all phases for an initiative are completed.
|
||||||
* If so, set initiative to pending_review and emit event.
|
* If so, set initiative to pending_review and emit event.
|
||||||
|
|||||||
Reference in New Issue
Block a user