All files / src/execution orchestrator.ts

1.01% Statements 1/99
0% Branches 0/40
0% Functions 0/12
1.13% Lines 1/88

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227                                                  8x                                                                                                                                                                                                                                                                                                                                                                                                                  
/**
 * Execution Orchestrator
 *
 * Subscribes to task:completed events and orchestrates the post-completion
 * merge workflow:
 * - Task branch → Phase branch (on task completion)
 * - Phase branch → Initiative branch (when all phase tasks done)
 *
 * Supports two execution modes:
 * - YOLO: auto-merge everything
 * - Review per-phase: pause after each phase for diff review
 */
 
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseMergedEvent, TaskMergedEvent } from '../events/index.js';
import type { BranchManager } from '../git/branch-manager.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { PhaseDispatchManager } from '../dispatch/types.js';
import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js';
import { phaseBranchName, taskBranchName } from '../git/branch-naming.js';
import { ensureProjectClone } from '../git/project-clones.js';
import { createModuleLogger } from '../logger/index.js';
 
const log = createModuleLogger('execution-orchestrator');
 
export class ExecutionOrchestrator {
  /** Serialize merges per phase to avoid concurrent merge conflicts */
  private phaseMergeLocks: Map<string, Promise<void>> = new Map();
 
  constructor(
    private branchManager: BranchManager,
    private phaseRepository: PhaseRepository,
    private taskRepository: TaskRepository,
    private initiativeRepository: InitiativeRepository,
    private projectRepository: ProjectRepository,
    private phaseDispatchManager: PhaseDispatchManager,
    private conflictResolutionService: ConflictResolutionService,
    private eventBus: EventBus,
    private workspaceRoot: string,
  ) {}
 
  /**
   * Start listening for task completion events.
   */
  start(): void {
    this.eventBus.on<TaskCompletedEvent>('task:completed', (event) => {
      this.handleTaskCompleted(event).catch((err) => {
        log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling task:completed');
      });
    });
    log.info('execution orchestrator started');
  }
 
  /**
   * Handle a task:completed event.
   * Merges the task branch into the phase branch, then checks if all phase tasks are done.
   */
  private async handleTaskCompleted(event: TaskCompletedEvent): Promise<void> {
    const { taskId } = event.payload;
 
    const task = await this.taskRepository.findById(taskId);
    if (!task?.phaseId || !task.initiativeId) return;
 
    const initiative = await this.initiativeRepository.findById(task.initiativeId);
    if (!initiative?.branch) return;
 
    const phase = await this.phaseRepository.findById(task.phaseId);
    if (!phase) return;
 
    // Skip merge tasks — they already work on the phase branch directly
    if (task.category === 'merge') return;
 
    const initBranch = initiative.branch;
    const phBranch = phaseBranchName(initBranch, phase.name);
    const tBranch = taskBranchName(initBranch, task.id);
 
    // Serialize merges per phase
    const lock = this.phaseMergeLocks.get(task.phaseId) ?? Promise.resolve();
    const mergeOp = lock.then(async () => {
      await this.mergeTaskIntoPhase(taskId, task.phaseId!, tBranch, phBranch);
    });
    this.phaseMergeLocks.set(task.phaseId, mergeOp.catch(() => {}));
    await mergeOp;
 
    // Check if all phase tasks are done
    const phaseTasks = await this.taskRepository.findByPhaseId(task.phaseId);
    const allDone = phaseTasks.every((t) => t.status === 'completed');
    if (allDone) {
      await this.handlePhaseAllTasksDone(task.phaseId);
    }
  }
 
  /**
   * Merge a task branch into the phase branch for all linked projects.
   */
  private async mergeTaskIntoPhase(
    taskId: string,
    phaseId: string,
    taskBranch: string,
    phaseBranch: string,
  ): Promise<void> {
    const phase = await this.phaseRepository.findById(phaseId);
    if (!phase) return;
 
    const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId);
 
    for (const project of projects) {
      const clonePath = await ensureProjectClone(project, this.workspaceRoot);
 
      // Only merge if the task branch actually exists (agents may not have pushed)
      const exists = await this.branchManager.branchExists(clonePath, taskBranch);
      if (!exists) {
        log.debug({ taskId, taskBranch, project: project.name }, 'task branch does not exist, skipping merge');
        continue;
      }
 
      const result = await this.branchManager.mergeBranch(clonePath, taskBranch, phaseBranch);
 
      if (!result.success && result.conflicts) {
        log.warn({ taskId, taskBranch, phaseBranch, conflicts: result.conflicts }, 'task merge conflict');
        await this.conflictResolutionService.handleConflict(taskId, result.conflicts, {
          sourceBranch: taskBranch,
          targetBranch: phaseBranch,
        });
        return;
      }
 
      log.info({ taskId, taskBranch, phaseBranch, project: project.name }, 'task branch merged into phase branch');
    }
 
    // Emit task:merged event
    const mergedEvent: TaskMergedEvent = {
      type: 'task:merged',
      timestamp: new Date(),
      payload: { taskId, phaseId, sourceBranch: taskBranch, targetBranch: phaseBranch },
    };
    this.eventBus.emit(mergedEvent);
  }
 
  /**
   * Handle all tasks in a phase being done.
   * YOLO mode: auto-merge phase → initiative.
   * Review mode: set phase to pending_review.
   */
  private async handlePhaseAllTasksDone(phaseId: string): Promise<void> {
    const phase = await this.phaseRepository.findById(phaseId);
    if (!phase) return;
 
    const initiative = await this.initiativeRepository.findById(phase.initiativeId);
    if (!initiative?.branch) return;
 
    if (initiative.executionMode === 'yolo') {
      await this.mergePhaseIntoInitiative(phaseId);
      await this.phaseDispatchManager.completePhase(phaseId);
      await this.phaseDispatchManager.dispatchNextPhase();
    } else {
      // review_per_phase
      await this.phaseRepository.update(phaseId, { status: 'pending_review' as any });
 
      const event: PhasePendingReviewEvent = {
        type: 'phase:pending_review',
        timestamp: new Date(),
        payload: { phaseId, initiativeId: phase.initiativeId },
      };
      this.eventBus.emit(event);
 
      log.info({ phaseId, initiativeId: phase.initiativeId }, 'phase pending review');
    }
  }
 
  /**
   * Merge phase branch into initiative branch for all linked projects.
   */
  private async mergePhaseIntoInitiative(phaseId: string): Promise<void> {
    const phase = await this.phaseRepository.findById(phaseId);
    if (!phase) return;
 
    const initiative = await this.initiativeRepository.findById(phase.initiativeId);
    if (!initiative?.branch) return;
 
    const initBranch = initiative.branch;
    const phBranch = phaseBranchName(initBranch, phase.name);
 
    const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId);
 
    for (const project of projects) {
      const clonePath = await ensureProjectClone(project, this.workspaceRoot);
      const result = await this.branchManager.mergeBranch(clonePath, phBranch, initBranch);
 
      if (!result.success) {
        log.error({ phaseId, phBranch, initBranch, conflicts: result.conflicts }, 'phase merge conflict');
        throw new Error(`Phase merge conflict: ${result.message}`);
      }
 
      log.info({ phaseId, phBranch, initBranch, project: project.name }, 'phase branch merged into initiative branch');
    }
 
    // Emit phase:merged event
    const mergedEvent: PhaseMergedEvent = {
      type: 'phase:merged',
      timestamp: new Date(),
      payload: { phaseId, initiativeId: phase.initiativeId, sourceBranch: phBranch, targetBranch: initBranch },
    };
    this.eventBus.emit(mergedEvent);
  }
 
  /**
   * Approve and merge a phase that's pending review.
   * Called from tRPC when user clicks approve in ReviewTab.
   */
  async approveAndMergePhase(phaseId: string): Promise<void> {
    const phase = await this.phaseRepository.findById(phaseId);
    if (!phase) throw new Error(`Phase not found: ${phaseId}`);
    if (phase.status !== 'pending_review') {
      throw new Error(`Phase ${phaseId} is not pending review (status: ${phase.status})`);
    }
 
    await this.mergePhaseIntoInitiative(phaseId);
    await this.phaseDispatchManager.completePhase(phaseId);
    await this.phaseDispatchManager.dispatchNextPhase();
 
    log.info({ phaseId }, 'phase review approved and merged');
  }
}