/** * Execution Orchestrator * * Subscribes to task:completed events and orchestrates the post-completion * merge workflow: * - Task branch → Phase branch (on task completion) * - Phase branch → Initiative branch (when all phase tasks done) * * Supports two execution modes: * - YOLO: auto-merge everything * - Review per-phase: pause after each phase for diff review */ import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent } from '../events/index.js'; import type { BranchManager } from '../git/branch-manager.js'; import type { PhaseRepository } from '../db/repositories/phase-repository.js'; import type { TaskRepository } from '../db/repositories/task-repository.js'; import type { InitiativeRepository } from '../db/repositories/initiative-repository.js'; import type { ProjectRepository } from '../db/repositories/project-repository.js'; import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js'; import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js'; import { phaseBranchName, taskBranchName } from '../git/branch-naming.js'; import { ensureProjectClone } from '../git/project-clones.js'; import { createModuleLogger } from '../logger/index.js'; const log = createModuleLogger('execution-orchestrator'); export class ExecutionOrchestrator { /** Serialize merges per phase to avoid concurrent merge conflicts */ private phaseMergeLocks: Map> = new Map(); /** Coalesced dispatch scheduling state */ private dispatchPending = false; private dispatchRunning = false; constructor( private branchManager: BranchManager, private phaseRepository: PhaseRepository, private taskRepository: TaskRepository, private initiativeRepository: InitiativeRepository, private projectRepository: ProjectRepository, private phaseDispatchManager: PhaseDispatchManager, private dispatchManager: DispatchManager, private conflictResolutionService: ConflictResolutionService, private eventBus: EventBus, private workspaceRoot: string, ) {} /** * Start listening for events that drive execution. */ start(): void { this.eventBus.on('task:completed', (event) => { this.handleTaskCompleted(event).catch((err) => { log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling task:completed'); }); }); // Kick off dispatch when phases are queued (e.g. user clicks Execute) this.eventBus.on('phase:queued', () => this.scheduleDispatch()); // Auto-complete task + re-dispatch when an agent finishes this.eventBus.on('agent:stopped', (event) => { this.handleAgentStopped(event).catch((err) => { log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling agent:stopped'); }); }); log.info('execution orchestrator started'); } // --------------------------------------------------------------------------- // Coalesced dispatch scheduler // --------------------------------------------------------------------------- /** * Schedule a dispatch cycle. Coalesces multiple rapid calls (e.g. from * synchronous EventEmitter) into a single async run. */ private scheduleDispatch(): void { if (this.dispatchRunning) { this.dispatchPending = true; return; } this.runDispatchCycle().catch((err) => log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch cycle error'), ); } private async handleAgentStopped(event: AgentStoppedEvent): Promise { const { taskId, reason, agentId } = event.payload; // Auto-complete task for successful agent completions, not manual stops if (taskId && reason !== 'user_requested') { try { await this.dispatchManager.completeTask(taskId, agentId); log.info({ taskId, agentId, reason }, 'task auto-completed on agent stop'); } catch (err) { log.warn( { taskId, agentId, reason, err: err instanceof Error ? err.message : String(err) }, 'failed to auto-complete task on agent stop', ); } } this.scheduleDispatch(); } private async runDispatchCycle(): Promise { this.dispatchRunning = true; try { do { this.dispatchPending = false; // Dispatch all ready phases (each queues its tasks internally) while (true) { const result = await this.phaseDispatchManager.dispatchNextPhase(); if (!result.success) break; log.info({ phaseId: result.phaseId }, 'auto-dispatched phase'); } // Dispatch all ready tasks to idle agents while (true) { const result = await this.dispatchManager.dispatchNext(); if (!result.success) break; log.info({ taskId: result.taskId, agentId: result.agentId }, 'auto-dispatched task'); } } while (this.dispatchPending); } finally { this.dispatchRunning = false; } } /** * Handle a task:completed event. * Merges the task branch into the phase branch, then checks if all phase tasks are done. */ private async handleTaskCompleted(event: TaskCompletedEvent): Promise { const { taskId } = event.payload; const task = await this.taskRepository.findById(taskId); if (!task?.phaseId || !task.initiativeId) return; const initiative = await this.initiativeRepository.findById(task.initiativeId); if (!initiative?.branch) return; const phase = await this.phaseRepository.findById(task.phaseId); if (!phase) return; // Skip merge tasks — they already work on the phase branch directly if (task.category === 'merge') return; const initBranch = initiative.branch; const phBranch = phaseBranchName(initBranch, phase.name); const tBranch = taskBranchName(initBranch, task.id); // Serialize merges per phase const lock = this.phaseMergeLocks.get(task.phaseId) ?? Promise.resolve(); const mergeOp = lock.then(async () => { await this.mergeTaskIntoPhase(taskId, task.phaseId!, tBranch, phBranch); }); this.phaseMergeLocks.set(task.phaseId, mergeOp.catch(() => {})); await mergeOp; // Check if all phase tasks are done const phaseTasks = await this.taskRepository.findByPhaseId(task.phaseId); const allDone = phaseTasks.every((t) => t.status === 'completed'); if (allDone) { await this.handlePhaseAllTasksDone(task.phaseId); } // Fill freed agent slot with next queued task this.scheduleDispatch(); } /** * Merge a task branch into the phase branch for all linked projects. */ private async mergeTaskIntoPhase( taskId: string, phaseId: string, taskBranch: string, phaseBranch: string, ): Promise { const phase = await this.phaseRepository.findById(phaseId); if (!phase) return; const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId); for (const project of projects) { const clonePath = await ensureProjectClone(project, this.workspaceRoot); // Only merge if the task branch actually exists (agents may not have pushed) const exists = await this.branchManager.branchExists(clonePath, taskBranch); if (!exists) { log.debug({ taskId, taskBranch, project: project.name }, 'task branch does not exist, skipping merge'); continue; } const result = await this.branchManager.mergeBranch(clonePath, taskBranch, phaseBranch); if (!result.success && result.conflicts) { log.warn({ taskId, taskBranch, phaseBranch, conflicts: result.conflicts }, 'task merge conflict'); await this.conflictResolutionService.handleConflict(taskId, result.conflicts, { sourceBranch: taskBranch, targetBranch: phaseBranch, }); return; } log.info({ taskId, taskBranch, phaseBranch, project: project.name }, 'task branch merged into phase branch'); } // Emit task:merged event const mergedEvent: TaskMergedEvent = { type: 'task:merged', timestamp: new Date(), payload: { taskId, phaseId, sourceBranch: taskBranch, targetBranch: phaseBranch }, }; this.eventBus.emit(mergedEvent); } /** * Handle all tasks in a phase being done. * YOLO mode: auto-merge phase → initiative. * Review mode: set phase to pending_review. */ private async handlePhaseAllTasksDone(phaseId: string): Promise { const phase = await this.phaseRepository.findById(phaseId); if (!phase) return; const initiative = await this.initiativeRepository.findById(phase.initiativeId); if (!initiative?.branch) return; if (initiative.executionMode === 'yolo') { await this.mergePhaseIntoInitiative(phaseId); await this.phaseDispatchManager.completePhase(phaseId); await this.phaseDispatchManager.dispatchNextPhase(); this.scheduleDispatch(); } else { // review_per_phase await this.phaseRepository.update(phaseId, { status: 'pending_review' as any }); const event: PhasePendingReviewEvent = { type: 'phase:pending_review', timestamp: new Date(), payload: { phaseId, initiativeId: phase.initiativeId }, }; this.eventBus.emit(event); log.info({ phaseId, initiativeId: phase.initiativeId }, 'phase pending review'); } } /** * Merge phase branch into initiative branch for all linked projects. */ private async mergePhaseIntoInitiative(phaseId: string): Promise { const phase = await this.phaseRepository.findById(phaseId); if (!phase) return; const initiative = await this.initiativeRepository.findById(phase.initiativeId); if (!initiative?.branch) return; const initBranch = initiative.branch; const phBranch = phaseBranchName(initBranch, phase.name); const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId); for (const project of projects) { const clonePath = await ensureProjectClone(project, this.workspaceRoot); const result = await this.branchManager.mergeBranch(clonePath, phBranch, initBranch); if (!result.success) { log.error({ phaseId, phBranch, initBranch, conflicts: result.conflicts }, 'phase merge conflict'); throw new Error(`Phase merge conflict: ${result.message}`); } log.info({ phaseId, phBranch, initBranch, project: project.name }, 'phase branch merged into initiative branch'); } // Emit phase:merged event const mergedEvent: PhaseMergedEvent = { type: 'phase:merged', timestamp: new Date(), payload: { phaseId, initiativeId: phase.initiativeId, sourceBranch: phBranch, targetBranch: initBranch }, }; this.eventBus.emit(mergedEvent); } /** * Approve and merge a phase that's pending review. * Called from tRPC when user clicks approve in ReviewTab. */ async approveAndMergePhase(phaseId: string): Promise { const phase = await this.phaseRepository.findById(phaseId); if (!phase) throw new Error(`Phase not found: ${phaseId}`); if (phase.status !== 'pending_review') { throw new Error(`Phase ${phaseId} is not pending review (status: ${phase.status})`); } await this.mergePhaseIntoInitiative(phaseId); await this.phaseDispatchManager.completePhase(phaseId); await this.phaseDispatchManager.dispatchNextPhase(); this.scheduleDispatch(); log.info({ phaseId }, 'phase review approved and merged'); } }