diff --git a/apps/server/execution/orchestrator.ts b/apps/server/execution/orchestrator.ts index aefce3b..a08e2f1 100644 --- a/apps/server/execution/orchestrator.ts +++ b/apps/server/execution/orchestrator.ts @@ -66,6 +66,11 @@ export class ExecutionOrchestrator { }); }); + // Recover in-memory dispatch queues from DB state (survives server restarts) + this.recoverDispatchQueues().catch((err) => { + log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch queue recovery failed'); + }); + log.info('execution orchestrator started'); } @@ -420,6 +425,52 @@ export class ExecutionOrchestrator { } } + /** + * Recover in-memory dispatch queues from DB state on server startup. + * Re-queues approved phases and pending tasks for in_progress phases. + */ + private async recoverDispatchQueues(): Promise { + const initiatives = await this.initiativeRepository.findByStatus('active'); + let phasesRecovered = 0; + let tasksRecovered = 0; + + for (const initiative of initiatives) { + const phases = await this.phaseRepository.findByInitiativeId(initiative.id); + + for (const phase of phases) { + // Re-queue approved phases into the phase dispatch queue + if (phase.status === 'approved') { + try { + await this.phaseDispatchManager.queuePhase(phase.id); + phasesRecovered++; + } catch { + // Already queued or status changed + } + } + + // Re-queue pending tasks for in_progress phases into the task dispatch queue + if (phase.status === 'in_progress') { + const tasks = await this.taskRepository.findByPhaseId(phase.id); + for (const task of tasks) { + if (task.status === 'pending') { + try { + await this.dispatchManager.queue(task.id); + tasksRecovered++; + } catch { + // Already queued or task issue + } + } + } + } + } + } + + if (phasesRecovered > 0 || tasksRecovered > 0) { + log.info({ phasesRecovered, tasksRecovered }, 'recovered dispatch queues from DB state'); + this.scheduleDispatch(); + } + } + /** * Check if all phases for an initiative are completed. * If so, set initiative to pending_review and emit event.