/** * Execution Orchestrator * * Subscribes to task:completed events and orchestrates the post-completion * merge workflow: * - Task branch → Phase branch (on task completion) * - Phase branch → Initiative branch (when all phase tasks done) * * Supports two execution modes: * - YOLO: auto-merge everything * - Review per-phase: pause after each phase for diff review */ import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseChangesRequestedEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent, AgentCrashedEvent, InitiativePendingReviewEvent, InitiativeReviewApprovedEvent, InitiativeChangesRequestedEvent } from '../events/index.js'; import type { BranchManager } from '../git/branch-manager.js'; import type { PhaseRepository } from '../db/repositories/phase-repository.js'; import type { TaskRepository } from '../db/repositories/task-repository.js'; import type { InitiativeRepository } from '../db/repositories/initiative-repository.js'; import type { ProjectRepository } from '../db/repositories/project-repository.js'; import type { AgentRepository } from '../db/repositories/agent-repository.js'; import type { AgentManager } from '../agent/types.js'; import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js'; import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js'; import { phaseBranchName, taskBranchName } from '../git/branch-naming.js'; import { ensureProjectClone } from '../git/project-clones.js'; import { createModuleLogger } from '../logger/index.js'; import { phaseMetaCache, fileDiffCache } from '../review/diff-cache.js'; import { shouldRunQualityReview, runQualityReview } from './quality-review.js'; const log = createModuleLogger('execution-orchestrator'); /** Maximum number of automatic retries for crashed tasks before blocking */ const MAX_TASK_RETRIES = 3; export class ExecutionOrchestrator { /** Serialize merges per phase to avoid concurrent merge conflicts */ private phaseMergeLocks: Map> = new Map(); /** Coalesced dispatch scheduling state */ private dispatchPending = false; private dispatchRunning = false; constructor( private branchManager: BranchManager, private phaseRepository: PhaseRepository, private taskRepository: TaskRepository, private initiativeRepository: InitiativeRepository, private projectRepository: ProjectRepository, private phaseDispatchManager: PhaseDispatchManager, private dispatchManager: DispatchManager, private conflictResolutionService: ConflictResolutionService, private eventBus: EventBus, private workspaceRoot: string, private agentRepository?: AgentRepository, private agentManager?: AgentManager, ) {} /** * Start listening for events that drive execution. */ start(): void { this.eventBus.on('task:completed', (event) => { this.handleTaskCompleted(event).catch((err) => { log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling task:completed'); }); }); // Kick off dispatch when phases are queued (e.g. user clicks Execute) this.eventBus.on('phase:queued', () => this.scheduleDispatch()); // Auto-complete task + re-dispatch when an agent finishes this.eventBus.on('agent:stopped', (event) => { this.handleAgentStopped(event).catch((err) => { log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling agent:stopped'); }); }); // Auto-retry crashed agent tasks (up to MAX_TASK_RETRIES) this.eventBus.on('agent:crashed', (event) => { this.handleAgentCrashed(event).catch((err) => { log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling agent:crashed'); }); }); // Recover in-memory dispatch queues from DB state (survives server restarts) this.recoverDispatchQueues().catch((err) => { log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch queue recovery failed'); }); log.info('execution orchestrator started'); } // --------------------------------------------------------------------------- // Coalesced dispatch scheduler // --------------------------------------------------------------------------- /** * Schedule a dispatch cycle. Coalesces multiple rapid calls (e.g. from * synchronous EventEmitter) into a single async run. */ private scheduleDispatch(): void { if (this.dispatchRunning) { this.dispatchPending = true; return; } this.runDispatchCycle().catch((err) => log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch cycle error'), ); } private async handleAgentStopped(event: AgentStoppedEvent): Promise { const { taskId, reason, agentId } = event.payload; if (taskId && reason !== 'user_requested') { try { const result = await this.tryQualityReview(taskId, agentId, reason); if (!result.reviewStarted) { await this.dispatchManager.completeTask(taskId, agentId); log.info({ taskId, agentId, reason }, 'task auto-completed on agent stop'); } } catch (err) { log.warn( { taskId, agentId, reason, err: err instanceof Error ? err.message : String(err) }, 'failed to handle agent stop', ); } } this.scheduleDispatch(); } /** * Attempt to run quality review for a stopping agent. * Returns { reviewStarted: true } if quality review was initiated (callers must NOT call completeTask). * Returns { reviewStarted: false } if no review needed (caller should call completeTask). */ private async tryQualityReview(taskId: string, agentId: string, reason: string): Promise<{ reviewStarted: boolean }> { if (!this.agentRepository || !this.agentManager) { return { reviewStarted: false }; } const task = await this.taskRepository.findById(taskId); if (!task?.phaseId || !task.initiativeId) { return { reviewStarted: false }; } const initiative = await this.initiativeRepository.findById(task.initiativeId); if (!initiative?.branch) { return { reviewStarted: false }; } const phase = await this.phaseRepository.findById(task.phaseId); if (!phase) { return { reviewStarted: false }; } const taskBranch = taskBranchName(initiative.branch, taskId); const baseBranch = phaseBranchName(initiative.branch, phase.name); const projects = await this.projectRepository.findProjectsByInitiativeId(task.initiativeId); if (projects.length === 0) { return { reviewStarted: false }; } const repoPath = await ensureProjectClone(projects[0], this.workspaceRoot); const result = await shouldRunQualityReview({ agentId, taskId, stopReason: reason, repoPath, taskBranch, baseBranch, agentRepository: this.agentRepository, taskRepository: this.taskRepository, initiativeRepository: this.initiativeRepository, branchManager: this.branchManager, }); if (!result.run) { return { reviewStarted: false }; } await runQualityReview({ taskId, taskBranch, baseBranch, initiativeId: task.initiativeId, qualifyingFiles: result.qualifyingFiles, taskRepository: this.taskRepository, agentManager: this.agentManager, log, }); return { reviewStarted: true }; } private async handleAgentCrashed(event: AgentCrashedEvent): Promise { const { taskId, agentId, error } = event.payload; if (!taskId) return; const task = await this.taskRepository.findById(taskId); if (!task || task.status !== 'in_progress') return; const retryCount = (task.retryCount ?? 0) + 1; if (retryCount > MAX_TASK_RETRIES) { log.warn({ taskId, agentId, retryCount, error }, 'task exceeded max retries, leaving in_progress'); return; } // Reset task for re-dispatch with incremented retry count await this.taskRepository.update(taskId, { status: 'pending', retryCount }); await this.dispatchManager.queue(taskId); log.info({ taskId, agentId, retryCount, error }, 'crashed task re-queued for retry'); this.scheduleDispatch(); } private async runDispatchCycle(): Promise { this.dispatchRunning = true; try { do { this.dispatchPending = false; // Dispatch all ready phases (each queues its tasks internally) while (true) { const result = await this.phaseDispatchManager.dispatchNextPhase(); if (!result.success) break; log.info({ phaseId: result.phaseId }, 'auto-dispatched phase'); } // Dispatch all ready tasks to idle agents while (true) { const result = await this.dispatchManager.dispatchNext(); if (!result.success) break; log.info({ taskId: result.taskId, agentId: result.agentId }, 'auto-dispatched task'); } } while (this.dispatchPending); } finally { this.dispatchRunning = false; } } /** * Handle a task:completed event. * Merges the task branch into the phase branch, then checks if all phase tasks are done. */ private async handleTaskCompleted(event: TaskCompletedEvent): Promise { const { taskId } = event.payload; const task = await this.taskRepository.findById(taskId); if (!task?.phaseId || !task.initiativeId) return; const initiative = await this.initiativeRepository.findById(task.initiativeId); const phase = await this.phaseRepository.findById(task.phaseId); if (!phase) return; // Merge task branch into phase branch (only when branches exist) if (initiative?.branch && task.category !== 'merge' && task.category !== 'review') { try { const initBranch = initiative.branch; const phBranch = phaseBranchName(initBranch, phase.name); const tBranch = taskBranchName(initBranch, task.id); // Serialize merges per phase const lock = this.phaseMergeLocks.get(task.phaseId) ?? Promise.resolve(); const mergeOp = lock.then(async () => { await this.mergeTaskIntoPhase(taskId, task.phaseId!, tBranch, phBranch); }); this.phaseMergeLocks.set(task.phaseId, mergeOp.catch(() => {})); await mergeOp; } catch (err) { log.error({ taskId, err: err instanceof Error ? err.message : String(err) }, 'task merge failed, still checking phase completion'); } } // Check if all phase tasks are done — always, regardless of branch/merge status const phaseTasks = await this.taskRepository.findByPhaseId(task.phaseId); const allDone = phaseTasks.every((t) => t.status === 'completed'); if (allDone) { await this.handlePhaseAllTasksDone(task.phaseId); } // Fill freed agent slot with next queued task this.scheduleDispatch(); } /** * Merge a task branch into the phase branch for all linked projects. */ private async mergeTaskIntoPhase( taskId: string, phaseId: string, taskBranch: string, phaseBranch: string, ): Promise { const phase = await this.phaseRepository.findById(phaseId); if (!phase) return; const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId); for (const project of projects) { const clonePath = await ensureProjectClone(project, this.workspaceRoot); // Only merge if the task branch actually exists (agents may not have pushed) const exists = await this.branchManager.branchExists(clonePath, taskBranch); if (!exists) { log.debug({ taskId, taskBranch, project: project.name }, 'task branch does not exist, skipping merge'); continue; } const result = await this.branchManager.mergeBranch(clonePath, taskBranch, phaseBranch); if (!result.success && result.conflicts) { log.warn({ taskId, taskBranch, phaseBranch, conflicts: result.conflicts }, 'task merge conflict'); const conflictTask = await this.conflictResolutionService.handleConflict(taskId, result.conflicts, { sourceBranch: taskBranch, targetBranch: phaseBranch, }); if (conflictTask) { await this.dispatchManager.queue(conflictTask.id); log.info({ taskId: conflictTask.id, originalTaskId: taskId }, 'conflict resolution task queued for dispatch'); } return; } log.info({ taskId, taskBranch, phaseBranch, project: project.name }, 'task branch merged into phase branch'); } // Invalidate diff cache — phase branch HEAD has advanced after merges phaseMetaCache.invalidateByPrefix(`${phaseId}:`); fileDiffCache.invalidateByPrefix(`${phaseId}:`); // Emit task:merged event const mergedEvent: TaskMergedEvent = { type: 'task:merged', timestamp: new Date(), payload: { taskId, phaseId, sourceBranch: taskBranch, targetBranch: phaseBranch }, }; this.eventBus.emit(mergedEvent); } /** * Handle all tasks in a phase being done. * YOLO mode: auto-merge phase → initiative. * Review mode: set phase to pending_review. */ private async handlePhaseAllTasksDone(phaseId: string): Promise { const phase = await this.phaseRepository.findById(phaseId); if (!phase) return; const initiative = await this.initiativeRepository.findById(phase.initiativeId); if (!initiative) return; if (initiative.executionMode === 'yolo') { // Merge phase branch into initiative branch (only when branches exist) if (initiative.branch) { await this.mergePhaseIntoInitiative(phaseId); } await this.phaseDispatchManager.completePhase(phaseId); // Re-queue approved phases (self-healing: survives server restarts that wipe in-memory queue) await this.requeueApprovedPhases(phase.initiativeId); // Check if this was the last phase — if so, trigger initiative review const dispatched = await this.phaseDispatchManager.dispatchNextPhase(); if (!dispatched.success) { await this.checkInitiativeCompletion(phase.initiativeId); } this.scheduleDispatch(); } else { // review_per_phase await this.phaseRepository.update(phaseId, { status: 'pending_review' as any }); const event: PhasePendingReviewEvent = { type: 'phase:pending_review', timestamp: new Date(), payload: { phaseId, initiativeId: phase.initiativeId }, }; this.eventBus.emit(event); log.info({ phaseId, initiativeId: phase.initiativeId }, 'phase pending review'); } } /** * Merge phase branch into initiative branch for all linked projects. */ private async mergePhaseIntoInitiative(phaseId: string): Promise { const phase = await this.phaseRepository.findById(phaseId); if (!phase) return; const initiative = await this.initiativeRepository.findById(phase.initiativeId); if (!initiative?.branch) return; const initBranch = initiative.branch; const phBranch = phaseBranchName(initBranch, phase.name); const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId); // Store merge base before merging so we can reconstruct diffs for completed phases for (const project of projects) { const clonePath = await ensureProjectClone(project, this.workspaceRoot); try { const mergeBase = await this.branchManager.getMergeBase(clonePath, initBranch, phBranch); await this.phaseRepository.update(phaseId, { mergeBase }); break; // Only need one merge base (first project) } catch { // Phase branch may not exist in this project clone } } for (const project of projects) { const clonePath = await ensureProjectClone(project, this.workspaceRoot); const result = await this.branchManager.mergeBranch(clonePath, phBranch, initBranch); if (!result.success) { log.error({ phaseId, phBranch, initBranch, conflicts: result.conflicts }, 'phase merge conflict'); throw new Error(`Phase merge conflict: ${result.message}`); } log.info({ phaseId, phBranch, initBranch, project: project.name }, 'phase branch merged into initiative branch'); } // Emit phase:merged event const mergedEvent: PhaseMergedEvent = { type: 'phase:merged', timestamp: new Date(), payload: { phaseId, initiativeId: phase.initiativeId, sourceBranch: phBranch, targetBranch: initBranch }, }; this.eventBus.emit(mergedEvent); } /** * Approve and merge a phase that's pending review. * Called from tRPC when user clicks approve in ReviewTab. */ async approveAndMergePhase(phaseId: string): Promise { const phase = await this.phaseRepository.findById(phaseId); if (!phase) throw new Error(`Phase not found: ${phaseId}`); if (phase.status !== 'pending_review') { throw new Error(`Phase ${phaseId} is not pending review (status: ${phase.status})`); } await this.mergePhaseIntoInitiative(phaseId); await this.phaseDispatchManager.completePhase(phaseId); // Re-queue approved phases (self-healing: survives server restarts that wipe in-memory queue) await this.requeueApprovedPhases(phase.initiativeId); // Check if this was the last phase — if so, trigger initiative review const dispatched = await this.phaseDispatchManager.dispatchNextPhase(); if (!dispatched.success) { await this.checkInitiativeCompletion(phase.initiativeId); } this.scheduleDispatch(); log.info({ phaseId }, 'phase review approved and merged'); } /** * Request changes on a phase that's pending review. * Creates a revision task, resets the phase to in_progress, and dispatches. */ async requestChangesOnPhase( phaseId: string, unresolvedThreads: Array<{ id: string; filePath: string; lineNumber: number; body: string; author: string; replies: Array<{ id: string; body: string; author: string }>; }>, summary?: string, ): Promise<{ taskId: string }> { const phase = await this.phaseRepository.findById(phaseId); if (!phase) throw new Error(`Phase not found: ${phaseId}`); if (phase.status !== 'pending_review') { throw new Error(`Phase ${phaseId} is not pending review (status: ${phase.status})`); } const initiative = await this.initiativeRepository.findById(phase.initiativeId); if (!initiative) throw new Error(`Initiative not found: ${phase.initiativeId}`); // Guard: don't create duplicate review tasks const existingTasks = await this.taskRepository.findByPhaseId(phaseId); const activeReview = existingTasks.find( (t) => t.category === 'review' && (t.status === 'pending' || t.status === 'in_progress'), ); if (activeReview) { return { taskId: activeReview.id }; } // Build revision task description from threaded comments + summary const lines: string[] = []; if (summary) { lines.push(`## Summary\n\n${summary}\n`); } if (unresolvedThreads.length > 0) { lines.push('## Review Comments\n'); // Group comments by file const byFile = new Map(); for (const c of unresolvedThreads) { const arr = byFile.get(c.filePath) ?? []; arr.push(c); byFile.set(c.filePath, arr); } for (const [filePath, fileComments] of byFile) { lines.push(`### ${filePath}\n`); for (const c of fileComments) { lines.push(`#### Line ${c.lineNumber} [comment:${c.id}]`); lines.push(`**${c.author}**: ${c.body}`); for (const r of c.replies) { lines.push(`> **${r.author}**: ${r.body}`); } lines.push(''); } } } const description = lines.join('\n') || 'Address review feedback.'; // Create revision task const task = await this.taskRepository.create({ phaseId, initiativeId: phase.initiativeId, name: `Address review feedback: ${phase.name}`, description, category: 'review', priority: 'high', }); // Reset phase status back to in_progress await this.phaseRepository.update(phaseId, { status: 'in_progress' as any }); // Queue task for dispatch await this.dispatchManager.queue(task.id); // Emit event const event: PhaseChangesRequestedEvent = { type: 'phase:changes_requested', timestamp: new Date(), payload: { phaseId, initiativeId: phase.initiativeId, taskId: task.id, commentCount: unresolvedThreads.length, }, }; this.eventBus.emit(event); log.info({ phaseId, taskId: task.id, commentCount: unresolvedThreads.length }, 'changes requested on phase'); // Kick off dispatch this.scheduleDispatch(); return { taskId: task.id }; } /** * Request changes on an initiative that's pending review. * Creates/reuses a "Finalization" phase and adds a review task to it. */ async requestChangesOnInitiative( initiativeId: string, summary: string, ): Promise<{ taskId: string }> { const initiative = await this.initiativeRepository.findById(initiativeId); if (!initiative) throw new Error(`Initiative not found: ${initiativeId}`); if (initiative.status !== 'pending_review') { throw new Error(`Initiative ${initiativeId} is not pending review (status: ${initiative.status})`); } // Find or create a "Finalization" phase const phases = await this.phaseRepository.findByInitiativeId(initiativeId); let finalizationPhase = phases.find((p) => p.name === 'Finalization'); if (!finalizationPhase) { finalizationPhase = await this.phaseRepository.create({ initiativeId, name: 'Finalization', status: 'in_progress', }); } else if (finalizationPhase.status === 'completed' || finalizationPhase.status === 'pending_review') { await this.phaseRepository.update(finalizationPhase.id, { status: 'in_progress' as any }); } // Guard: don't create duplicate review tasks const existingTasks = await this.taskRepository.findByPhaseId(finalizationPhase.id); const activeReview = existingTasks.find( (t) => t.category === 'review' && (t.status === 'pending' || t.status === 'in_progress'), ); if (activeReview) { // Still reset initiative to active await this.initiativeRepository.update(initiativeId, { status: 'active' as any }); this.scheduleDispatch(); return { taskId: activeReview.id }; } // Create review task const task = await this.taskRepository.create({ phaseId: finalizationPhase.id, initiativeId, name: `Address initiative review feedback`, description: `## Summary\n\n${summary}`, category: 'review', priority: 'high', }); // Reset initiative status to active await this.initiativeRepository.update(initiativeId, { status: 'active' as any }); // Queue task for dispatch await this.dispatchManager.queue(task.id); // Emit event const event: InitiativeChangesRequestedEvent = { type: 'initiative:changes_requested', timestamp: new Date(), payload: { initiativeId, phaseId: finalizationPhase.id, taskId: task.id, }, }; this.eventBus.emit(event); log.info({ initiativeId, phaseId: finalizationPhase.id, taskId: task.id }, 'changes requested on initiative'); this.scheduleDispatch(); return { taskId: task.id }; } /** * Re-queue approved phases for an initiative into the in-memory dispatch queue. * Self-healing: ensures phases aren't lost if the server restarted since the * initial queueAllPhases() call. */ private async requeueApprovedPhases(initiativeId: string): Promise { const phases = await this.phaseRepository.findByInitiativeId(initiativeId); for (const p of phases) { if (p.status === 'approved') { try { await this.phaseDispatchManager.queuePhase(p.id); log.info({ phaseId: p.id }, 're-queued approved phase'); } catch { // Already queued or status changed — safe to ignore } } } } /** * Recover in-memory dispatch queues from DB state on server startup. * Re-queues approved phases and pending tasks for in_progress phases. */ private async recoverDispatchQueues(): Promise { const initiatives = await this.initiativeRepository.findByStatus('active'); let phasesRecovered = 0; let tasksRecovered = 0; for (const initiative of initiatives) { const phases = await this.phaseRepository.findByInitiativeId(initiative.id); for (const phase of phases) { // Re-queue approved phases into the phase dispatch queue if (phase.status === 'approved') { try { await this.phaseDispatchManager.queuePhase(phase.id); phasesRecovered++; } catch { // Already queued or status changed } } // Re-queue pending tasks and recover stuck in_progress tasks for in_progress phases if (phase.status === 'in_progress') { const tasks = await this.taskRepository.findByPhaseId(phase.id); for (const task of tasks) { if (task.status === 'pending') { try { await this.dispatchManager.queue(task.id); tasksRecovered++; } catch { // Already queued or task issue } } else if (task.status === 'in_progress' && this.agentRepository) { // Check if the assigned agent is still alive const agent = await this.agentRepository.findByTaskId(task.id); const isAlive = agent && (agent.status === 'running' || agent.status === 'waiting_for_input'); if (!isAlive) { // Agent is dead — reset task for re-dispatch await this.taskRepository.update(task.id, { status: 'pending' }); await this.dispatchManager.queue(task.id); tasksRecovered++; log.info({ taskId: task.id, agentId: agent?.id }, 'recovered stuck in_progress task (dead agent)'); } } else if (task.status === 'blocked' && this.agentRepository) { // Task was blocked by merge conflict after agent had already completed. // If the agent finished successfully, mark the task completed so the // phase can progress. const agent = await this.agentRepository.findByTaskId(task.id); if (agent && (agent.status === 'idle' || agent.status === 'stopped')) { await this.taskRepository.update(task.id, { status: 'completed' }); tasksRecovered++; log.info({ taskId: task.id, agentId: agent.id }, 'recovered blocked task (agent completed, merge conflict)'); } } } // Re-read tasks after recovery updates and check if phase is now fully done const updatedTasks = await this.taskRepository.findByPhaseId(phase.id); const allDone = updatedTasks.every((t) => t.status === 'completed'); if (allDone && updatedTasks.length > 0) { log.info({ phaseId: phase.id }, 'all tasks completed in in_progress phase, triggering phase completion'); await this.handlePhaseAllTasksDone(phase.id); phasesRecovered++; } } } } if (phasesRecovered > 0 || tasksRecovered > 0) { log.info({ phasesRecovered, tasksRecovered }, 'recovered dispatch queues from DB state'); this.scheduleDispatch(); } } /** * Check if all phases for an initiative are completed. * If so, set initiative to pending_review and emit event. */ private async checkInitiativeCompletion(initiativeId: string): Promise { const phases = await this.phaseRepository.findByInitiativeId(initiativeId); if (phases.length === 0) return; const allCompleted = phases.every((p) => p.status === 'completed'); if (!allCompleted) return; const initiative = await this.initiativeRepository.findById(initiativeId); if (!initiative?.branch) return; if (initiative.status !== 'active') return; await this.initiativeRepository.update(initiativeId, { status: 'pending_review' as any }); const event: InitiativePendingReviewEvent = { type: 'initiative:pending_review', timestamp: new Date(), payload: { initiativeId, branch: initiative.branch }, }; this.eventBus.emit(event); log.info({ initiativeId, branch: initiative.branch }, 'initiative pending review — all phases completed'); } /** * Approve an initiative review. * Either pushes the initiative branch or merges into default + pushes. */ async approveInitiative( initiativeId: string, strategy: 'push_branch' | 'merge_and_push', ): Promise { const initiative = await this.initiativeRepository.findById(initiativeId); if (!initiative) throw new Error(`Initiative not found: ${initiativeId}`); if (initiative.status !== 'pending_review') { throw new Error(`Initiative ${initiativeId} is not pending review (status: ${initiative.status})`); } if (!initiative.branch) { throw new Error(`Initiative ${initiativeId} has no branch configured`); } const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId); for (const project of projects) { const clonePath = await ensureProjectClone(project, this.workspaceRoot); const branchExists = await this.branchManager.branchExists(clonePath, initiative.branch); if (!branchExists) { log.warn({ initiativeId, branch: initiative.branch, project: project.name }, 'initiative branch does not exist in project, skipping'); continue; } // Fetch remote so local branches are up-to-date before merge/push await this.branchManager.fetchRemote(clonePath); if (strategy === 'merge_and_push') { // Fast-forward local defaultBranch to match origin before merging try { await this.branchManager.fastForwardBranch(clonePath, project.defaultBranch); } catch (ffErr) { log.warn({ project: project.name, err: (ffErr as Error).message }, 'fast-forward of default branch failed — attempting merge anyway'); } const result = await this.branchManager.mergeBranch(clonePath, initiative.branch, project.defaultBranch); if (!result.success) { throw new Error(`Failed to merge ${initiative.branch} into ${project.defaultBranch} for project ${project.name}: ${result.message}`); } try { await this.branchManager.pushBranch(clonePath, project.defaultBranch); } catch (pushErr) { // Roll back the merge so the diff doesn't disappear from the review tab. // Without rollback, defaultBranch includes the initiative changes and the // three-dot diff (defaultBranch...initiativeBranch) becomes empty. if (result.previousRef) { log.warn({ project: project.name, previousRef: result.previousRef }, 'push failed — rolling back merge'); await this.branchManager.updateRef(clonePath, project.defaultBranch, result.previousRef); } throw pushErr; } log.info({ initiativeId, project: project.name }, 'initiative branch merged into default and pushed'); } else { await this.branchManager.pushBranch(clonePath, initiative.branch); log.info({ initiativeId, project: project.name }, 'initiative branch pushed to remote'); } } await this.initiativeRepository.update(initiativeId, { status: 'completed' as any }); const event: InitiativeReviewApprovedEvent = { type: 'initiative:review_approved', timestamp: new Date(), payload: { initiativeId, branch: initiative.branch, strategy }, }; this.eventBus.emit(event); log.info({ initiativeId, strategy }, 'initiative review approved'); } }