/** * Execution Orchestrator * * Subscribes to task:completed events and orchestrates the post-completion * merge workflow: * - Task branch → Phase branch (on task completion) * - Phase branch → Initiative branch (when all phase tasks done) * * Supports two execution modes: * - YOLO: auto-merge everything * - Review per-phase: pause after each phase for diff review */ import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseMergedEvent, TaskMergedEvent } from '../events/index.js'; import type { BranchManager } from '../git/branch-manager.js'; import type { PhaseRepository } from '../db/repositories/phase-repository.js'; import type { TaskRepository } from '../db/repositories/task-repository.js'; import type { InitiativeRepository } from '../db/repositories/initiative-repository.js'; import type { ProjectRepository } from '../db/repositories/project-repository.js'; import type { PhaseDispatchManager } from '../dispatch/types.js'; import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js'; import { phaseBranchName, taskBranchName } from '../git/branch-naming.js'; import { ensureProjectClone } from '../git/project-clones.js'; import { createModuleLogger } from '../logger/index.js'; const log = createModuleLogger('execution-orchestrator'); export class ExecutionOrchestrator { /** Serialize merges per phase to avoid concurrent merge conflicts */ private phaseMergeLocks: Map> = new Map(); constructor( private branchManager: BranchManager, private phaseRepository: PhaseRepository, private taskRepository: TaskRepository, private initiativeRepository: InitiativeRepository, private projectRepository: ProjectRepository, private phaseDispatchManager: PhaseDispatchManager, private conflictResolutionService: ConflictResolutionService, private eventBus: EventBus, private workspaceRoot: string, ) {} /** * Start listening for task completion events. */ start(): void { this.eventBus.on('task:completed', (event) => { this.handleTaskCompleted(event).catch((err) => { log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling task:completed'); }); }); log.info('execution orchestrator started'); } /** * Handle a task:completed event. * Merges the task branch into the phase branch, then checks if all phase tasks are done. */ private async handleTaskCompleted(event: TaskCompletedEvent): Promise { const { taskId } = event.payload; const task = await this.taskRepository.findById(taskId); if (!task?.phaseId || !task.initiativeId) return; const initiative = await this.initiativeRepository.findById(task.initiativeId); if (!initiative?.branch) return; const phase = await this.phaseRepository.findById(task.phaseId); if (!phase) return; // Skip merge tasks — they already work on the phase branch directly if (task.category === 'merge') return; const initBranch = initiative.branch; const phBranch = phaseBranchName(initBranch, phase.name); const tBranch = taskBranchName(initBranch, task.id); // Serialize merges per phase const lock = this.phaseMergeLocks.get(task.phaseId) ?? Promise.resolve(); const mergeOp = lock.then(async () => { await this.mergeTaskIntoPhase(taskId, task.phaseId!, tBranch, phBranch); }); this.phaseMergeLocks.set(task.phaseId, mergeOp.catch(() => {})); await mergeOp; // Check if all phase tasks are done const phaseTasks = await this.taskRepository.findByPhaseId(task.phaseId); const allDone = phaseTasks.every((t) => t.status === 'completed'); if (allDone) { await this.handlePhaseAllTasksDone(task.phaseId); } } /** * Merge a task branch into the phase branch for all linked projects. */ private async mergeTaskIntoPhase( taskId: string, phaseId: string, taskBranch: string, phaseBranch: string, ): Promise { const phase = await this.phaseRepository.findById(phaseId); if (!phase) return; const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId); for (const project of projects) { const clonePath = await ensureProjectClone(project, this.workspaceRoot); // Only merge if the task branch actually exists (agents may not have pushed) const exists = await this.branchManager.branchExists(clonePath, taskBranch); if (!exists) { log.debug({ taskId, taskBranch, project: project.name }, 'task branch does not exist, skipping merge'); continue; } const result = await this.branchManager.mergeBranch(clonePath, taskBranch, phaseBranch); if (!result.success && result.conflicts) { log.warn({ taskId, taskBranch, phaseBranch, conflicts: result.conflicts }, 'task merge conflict'); await this.conflictResolutionService.handleConflict(taskId, result.conflicts, { sourceBranch: taskBranch, targetBranch: phaseBranch, }); return; } log.info({ taskId, taskBranch, phaseBranch, project: project.name }, 'task branch merged into phase branch'); } // Emit task:merged event const mergedEvent: TaskMergedEvent = { type: 'task:merged', timestamp: new Date(), payload: { taskId, phaseId, sourceBranch: taskBranch, targetBranch: phaseBranch }, }; this.eventBus.emit(mergedEvent); } /** * Handle all tasks in a phase being done. * YOLO mode: auto-merge phase → initiative. * Review mode: set phase to pending_review. */ private async handlePhaseAllTasksDone(phaseId: string): Promise { const phase = await this.phaseRepository.findById(phaseId); if (!phase) return; const initiative = await this.initiativeRepository.findById(phase.initiativeId); if (!initiative?.branch) return; if (initiative.executionMode === 'yolo') { await this.mergePhaseIntoInitiative(phaseId); await this.phaseDispatchManager.completePhase(phaseId); await this.phaseDispatchManager.dispatchNextPhase(); } else { // review_per_phase await this.phaseRepository.update(phaseId, { status: 'pending_review' as any }); const event: PhasePendingReviewEvent = { type: 'phase:pending_review', timestamp: new Date(), payload: { phaseId, initiativeId: phase.initiativeId }, }; this.eventBus.emit(event); log.info({ phaseId, initiativeId: phase.initiativeId }, 'phase pending review'); } } /** * Merge phase branch into initiative branch for all linked projects. */ private async mergePhaseIntoInitiative(phaseId: string): Promise { const phase = await this.phaseRepository.findById(phaseId); if (!phase) return; const initiative = await this.initiativeRepository.findById(phase.initiativeId); if (!initiative?.branch) return; const initBranch = initiative.branch; const phBranch = phaseBranchName(initBranch, phase.name); const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId); for (const project of projects) { const clonePath = await ensureProjectClone(project, this.workspaceRoot); const result = await this.branchManager.mergeBranch(clonePath, phBranch, initBranch); if (!result.success) { log.error({ phaseId, phBranch, initBranch, conflicts: result.conflicts }, 'phase merge conflict'); throw new Error(`Phase merge conflict: ${result.message}`); } log.info({ phaseId, phBranch, initBranch, project: project.name }, 'phase branch merged into initiative branch'); } // Emit phase:merged event const mergedEvent: PhaseMergedEvent = { type: 'phase:merged', timestamp: new Date(), payload: { phaseId, initiativeId: phase.initiativeId, sourceBranch: phBranch, targetBranch: initBranch }, }; this.eventBus.emit(mergedEvent); } /** * Approve and merge a phase that's pending review. * Called from tRPC when user clicks approve in ReviewTab. */ async approveAndMergePhase(phaseId: string): Promise { const phase = await this.phaseRepository.findById(phaseId); if (!phase) throw new Error(`Phase not found: ${phaseId}`); if (phase.status !== 'pending_review') { throw new Error(`Phase ${phaseId} is not pending review (status: ${phase.status})`); } await this.mergePhaseIntoInitiative(phaseId); await this.phaseDispatchManager.completePhase(phaseId); await this.phaseDispatchManager.dispatchNextPhase(); log.info({ phaseId }, 'phase review approved and merged'); } }