ExecutionOrchestrator now listens for phase:queued and agent:stopped events to drive the dispatch cycle, closing the gap between queueing phases (Execute button) and actually spawning agents. Coalesced scheduling prevents reentrancy with synchronous EventEmitter.
285 lines
11 KiB
TypeScript
285 lines
11 KiB
TypeScript
/**
|
|
* Execution Orchestrator
|
|
*
|
|
* Subscribes to task:completed events and orchestrates the post-completion
|
|
* merge workflow:
|
|
* - Task branch → Phase branch (on task completion)
|
|
* - Phase branch → Initiative branch (when all phase tasks done)
|
|
*
|
|
* Supports two execution modes:
|
|
* - YOLO: auto-merge everything
|
|
* - Review per-phase: pause after each phase for diff review
|
|
*/
|
|
|
|
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent } from '../events/index.js';
|
|
import type { BranchManager } from '../git/branch-manager.js';
|
|
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
|
|
import type { TaskRepository } from '../db/repositories/task-repository.js';
|
|
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
|
|
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
|
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
|
|
import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js';
|
|
import { phaseBranchName, taskBranchName } from '../git/branch-naming.js';
|
|
import { ensureProjectClone } from '../git/project-clones.js';
|
|
import { createModuleLogger } from '../logger/index.js';
|
|
|
|
const log = createModuleLogger('execution-orchestrator');
|
|
|
|
export class ExecutionOrchestrator {
|
|
/** Serialize merges per phase to avoid concurrent merge conflicts */
|
|
private phaseMergeLocks: Map<string, Promise<void>> = new Map();
|
|
|
|
/** Coalesced dispatch scheduling state */
|
|
private dispatchPending = false;
|
|
private dispatchRunning = false;
|
|
|
|
constructor(
|
|
private branchManager: BranchManager,
|
|
private phaseRepository: PhaseRepository,
|
|
private taskRepository: TaskRepository,
|
|
private initiativeRepository: InitiativeRepository,
|
|
private projectRepository: ProjectRepository,
|
|
private phaseDispatchManager: PhaseDispatchManager,
|
|
private dispatchManager: DispatchManager,
|
|
private conflictResolutionService: ConflictResolutionService,
|
|
private eventBus: EventBus,
|
|
private workspaceRoot: string,
|
|
) {}
|
|
|
|
/**
|
|
* Start listening for events that drive execution.
|
|
*/
|
|
start(): void {
|
|
this.eventBus.on<TaskCompletedEvent>('task:completed', (event) => {
|
|
this.handleTaskCompleted(event).catch((err) => {
|
|
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling task:completed');
|
|
});
|
|
});
|
|
|
|
// Kick off dispatch when phases are queued (e.g. user clicks Execute)
|
|
this.eventBus.on<PhaseQueuedEvent>('phase:queued', () => this.scheduleDispatch());
|
|
|
|
// Re-dispatch queued tasks when an agent finishes and frees a slot
|
|
this.eventBus.on<AgentStoppedEvent>('agent:stopped', () => this.scheduleDispatch());
|
|
|
|
log.info('execution orchestrator started');
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Coalesced dispatch scheduler
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Schedule a dispatch cycle. Coalesces multiple rapid calls (e.g. from
|
|
* synchronous EventEmitter) into a single async run.
|
|
*/
|
|
private scheduleDispatch(): void {
|
|
if (this.dispatchRunning) {
|
|
this.dispatchPending = true;
|
|
return;
|
|
}
|
|
this.runDispatchCycle().catch((err) =>
|
|
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch cycle error'),
|
|
);
|
|
}
|
|
|
|
private async runDispatchCycle(): Promise<void> {
|
|
this.dispatchRunning = true;
|
|
try {
|
|
do {
|
|
this.dispatchPending = false;
|
|
// Dispatch all ready phases (each queues its tasks internally)
|
|
while (true) {
|
|
const result = await this.phaseDispatchManager.dispatchNextPhase();
|
|
if (!result.success) break;
|
|
log.info({ phaseId: result.phaseId }, 'auto-dispatched phase');
|
|
}
|
|
// Dispatch all ready tasks to idle agents
|
|
while (true) {
|
|
const result = await this.dispatchManager.dispatchNext();
|
|
if (!result.success) break;
|
|
log.info({ taskId: result.taskId, agentId: result.agentId }, 'auto-dispatched task');
|
|
}
|
|
} while (this.dispatchPending);
|
|
} finally {
|
|
this.dispatchRunning = false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle a task:completed event.
|
|
* Merges the task branch into the phase branch, then checks if all phase tasks are done.
|
|
*/
|
|
private async handleTaskCompleted(event: TaskCompletedEvent): Promise<void> {
|
|
const { taskId } = event.payload;
|
|
|
|
const task = await this.taskRepository.findById(taskId);
|
|
if (!task?.phaseId || !task.initiativeId) return;
|
|
|
|
const initiative = await this.initiativeRepository.findById(task.initiativeId);
|
|
if (!initiative?.branch) return;
|
|
|
|
const phase = await this.phaseRepository.findById(task.phaseId);
|
|
if (!phase) return;
|
|
|
|
// Skip merge tasks — they already work on the phase branch directly
|
|
if (task.category === 'merge') return;
|
|
|
|
const initBranch = initiative.branch;
|
|
const phBranch = phaseBranchName(initBranch, phase.name);
|
|
const tBranch = taskBranchName(initBranch, task.id);
|
|
|
|
// Serialize merges per phase
|
|
const lock = this.phaseMergeLocks.get(task.phaseId) ?? Promise.resolve();
|
|
const mergeOp = lock.then(async () => {
|
|
await this.mergeTaskIntoPhase(taskId, task.phaseId!, tBranch, phBranch);
|
|
});
|
|
this.phaseMergeLocks.set(task.phaseId, mergeOp.catch(() => {}));
|
|
await mergeOp;
|
|
|
|
// Check if all phase tasks are done
|
|
const phaseTasks = await this.taskRepository.findByPhaseId(task.phaseId);
|
|
const allDone = phaseTasks.every((t) => t.status === 'completed');
|
|
if (allDone) {
|
|
await this.handlePhaseAllTasksDone(task.phaseId);
|
|
}
|
|
|
|
// Fill freed agent slot with next queued task
|
|
this.scheduleDispatch();
|
|
}
|
|
|
|
/**
|
|
* Merge a task branch into the phase branch for all linked projects.
|
|
*/
|
|
private async mergeTaskIntoPhase(
|
|
taskId: string,
|
|
phaseId: string,
|
|
taskBranch: string,
|
|
phaseBranch: string,
|
|
): Promise<void> {
|
|
const phase = await this.phaseRepository.findById(phaseId);
|
|
if (!phase) return;
|
|
|
|
const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId);
|
|
|
|
for (const project of projects) {
|
|
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
|
|
|
|
// Only merge if the task branch actually exists (agents may not have pushed)
|
|
const exists = await this.branchManager.branchExists(clonePath, taskBranch);
|
|
if (!exists) {
|
|
log.debug({ taskId, taskBranch, project: project.name }, 'task branch does not exist, skipping merge');
|
|
continue;
|
|
}
|
|
|
|
const result = await this.branchManager.mergeBranch(clonePath, taskBranch, phaseBranch);
|
|
|
|
if (!result.success && result.conflicts) {
|
|
log.warn({ taskId, taskBranch, phaseBranch, conflicts: result.conflicts }, 'task merge conflict');
|
|
await this.conflictResolutionService.handleConflict(taskId, result.conflicts, {
|
|
sourceBranch: taskBranch,
|
|
targetBranch: phaseBranch,
|
|
});
|
|
return;
|
|
}
|
|
|
|
log.info({ taskId, taskBranch, phaseBranch, project: project.name }, 'task branch merged into phase branch');
|
|
}
|
|
|
|
// Emit task:merged event
|
|
const mergedEvent: TaskMergedEvent = {
|
|
type: 'task:merged',
|
|
timestamp: new Date(),
|
|
payload: { taskId, phaseId, sourceBranch: taskBranch, targetBranch: phaseBranch },
|
|
};
|
|
this.eventBus.emit(mergedEvent);
|
|
}
|
|
|
|
/**
|
|
* Handle all tasks in a phase being done.
|
|
* YOLO mode: auto-merge phase → initiative.
|
|
* Review mode: set phase to pending_review.
|
|
*/
|
|
private async handlePhaseAllTasksDone(phaseId: string): Promise<void> {
|
|
const phase = await this.phaseRepository.findById(phaseId);
|
|
if (!phase) return;
|
|
|
|
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
|
|
if (!initiative?.branch) return;
|
|
|
|
if (initiative.executionMode === 'yolo') {
|
|
await this.mergePhaseIntoInitiative(phaseId);
|
|
await this.phaseDispatchManager.completePhase(phaseId);
|
|
await this.phaseDispatchManager.dispatchNextPhase();
|
|
this.scheduleDispatch();
|
|
} else {
|
|
// review_per_phase
|
|
await this.phaseRepository.update(phaseId, { status: 'pending_review' as any });
|
|
|
|
const event: PhasePendingReviewEvent = {
|
|
type: 'phase:pending_review',
|
|
timestamp: new Date(),
|
|
payload: { phaseId, initiativeId: phase.initiativeId },
|
|
};
|
|
this.eventBus.emit(event);
|
|
|
|
log.info({ phaseId, initiativeId: phase.initiativeId }, 'phase pending review');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Merge phase branch into initiative branch for all linked projects.
|
|
*/
|
|
private async mergePhaseIntoInitiative(phaseId: string): Promise<void> {
|
|
const phase = await this.phaseRepository.findById(phaseId);
|
|
if (!phase) return;
|
|
|
|
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
|
|
if (!initiative?.branch) return;
|
|
|
|
const initBranch = initiative.branch;
|
|
const phBranch = phaseBranchName(initBranch, phase.name);
|
|
|
|
const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId);
|
|
|
|
for (const project of projects) {
|
|
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
|
|
const result = await this.branchManager.mergeBranch(clonePath, phBranch, initBranch);
|
|
|
|
if (!result.success) {
|
|
log.error({ phaseId, phBranch, initBranch, conflicts: result.conflicts }, 'phase merge conflict');
|
|
throw new Error(`Phase merge conflict: ${result.message}`);
|
|
}
|
|
|
|
log.info({ phaseId, phBranch, initBranch, project: project.name }, 'phase branch merged into initiative branch');
|
|
}
|
|
|
|
// Emit phase:merged event
|
|
const mergedEvent: PhaseMergedEvent = {
|
|
type: 'phase:merged',
|
|
timestamp: new Date(),
|
|
payload: { phaseId, initiativeId: phase.initiativeId, sourceBranch: phBranch, targetBranch: initBranch },
|
|
};
|
|
this.eventBus.emit(mergedEvent);
|
|
}
|
|
|
|
/**
|
|
* Approve and merge a phase that's pending review.
|
|
* Called from tRPC when user clicks approve in ReviewTab.
|
|
*/
|
|
async approveAndMergePhase(phaseId: string): Promise<void> {
|
|
const phase = await this.phaseRepository.findById(phaseId);
|
|
if (!phase) throw new Error(`Phase not found: ${phaseId}`);
|
|
if (phase.status !== 'pending_review') {
|
|
throw new Error(`Phase ${phaseId} is not pending review (status: ${phase.status})`);
|
|
}
|
|
|
|
await this.mergePhaseIntoInitiative(phaseId);
|
|
await this.phaseDispatchManager.completePhase(phaseId);
|
|
await this.phaseDispatchManager.dispatchNextPhase();
|
|
this.scheduleDispatch();
|
|
|
|
log.info({ phaseId }, 'phase review approved and merged');
|
|
}
|
|
}
|