fix: Re-queue approved phases before dispatch to survive server restarts
The in-memory phaseQueue (Map) in DefaultPhaseDispatchManager is lost on server restart. After approving a phase review, dispatchNextPhase() found nothing in the empty queue, so the next unblocked phase never started. Now the orchestrator re-queues all approved phases for the initiative from the DB before attempting to dispatch, making the queue self-healing.
This commit is contained in:
@@ -234,6 +234,9 @@ export class ExecutionOrchestrator {
|
|||||||
await this.mergePhaseIntoInitiative(phaseId);
|
await this.mergePhaseIntoInitiative(phaseId);
|
||||||
await this.phaseDispatchManager.completePhase(phaseId);
|
await this.phaseDispatchManager.completePhase(phaseId);
|
||||||
|
|
||||||
|
// Re-queue approved phases (self-healing: survives server restarts that wipe in-memory queue)
|
||||||
|
await this.requeueApprovedPhases(phase.initiativeId);
|
||||||
|
|
||||||
// Check if this was the last phase — if so, trigger initiative review
|
// Check if this was the last phase — if so, trigger initiative review
|
||||||
const dispatched = await this.phaseDispatchManager.dispatchNextPhase();
|
const dispatched = await this.phaseDispatchManager.dispatchNextPhase();
|
||||||
if (!dispatched.success) {
|
if (!dispatched.success) {
|
||||||
@@ -305,6 +308,9 @@ export class ExecutionOrchestrator {
|
|||||||
await this.mergePhaseIntoInitiative(phaseId);
|
await this.mergePhaseIntoInitiative(phaseId);
|
||||||
await this.phaseDispatchManager.completePhase(phaseId);
|
await this.phaseDispatchManager.completePhase(phaseId);
|
||||||
|
|
||||||
|
// Re-queue approved phases (self-healing: survives server restarts that wipe in-memory queue)
|
||||||
|
await this.requeueApprovedPhases(phase.initiativeId);
|
||||||
|
|
||||||
// Check if this was the last phase — if so, trigger initiative review
|
// Check if this was the last phase — if so, trigger initiative review
|
||||||
const dispatched = await this.phaseDispatchManager.dispatchNextPhase();
|
const dispatched = await this.phaseDispatchManager.dispatchNextPhase();
|
||||||
if (!dispatched.success) {
|
if (!dispatched.success) {
|
||||||
@@ -395,6 +401,25 @@ export class ExecutionOrchestrator {
|
|||||||
return { taskId: task.id };
|
return { taskId: task.id };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Re-queue approved phases for an initiative into the in-memory dispatch queue.
|
||||||
|
* Self-healing: ensures phases aren't lost if the server restarted since the
|
||||||
|
* initial queueAllPhases() call.
|
||||||
|
*/
|
||||||
|
private async requeueApprovedPhases(initiativeId: string): Promise<void> {
|
||||||
|
const phases = await this.phaseRepository.findByInitiativeId(initiativeId);
|
||||||
|
for (const p of phases) {
|
||||||
|
if (p.status === 'approved') {
|
||||||
|
try {
|
||||||
|
await this.phaseDispatchManager.queuePhase(p.id);
|
||||||
|
log.info({ phaseId: p.id }, 're-queued approved phase');
|
||||||
|
} catch {
|
||||||
|
// Already queued or status changed — safe to ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if all phases for an initiative are completed.
|
* Check if all phases for an initiative are completed.
|
||||||
* If so, set initiative to pending_review and emit event.
|
* If so, set initiative to pending_review and emit event.
|
||||||
|
|||||||
Reference in New Issue
Block a user