Files
Codewalkers/apps/server/execution/orchestrator.ts
Lukas May 573e1b40d2 fix: Re-queue approved phases before dispatch to survive server restarts
The in-memory phaseQueue (Map) in DefaultPhaseDispatchManager is lost on
server restart. After approving a phase review, dispatchNextPhase() found
nothing in the empty queue, so the next unblocked phase never started.

Now the orchestrator re-queues all approved phases for the initiative from
the DB before attempting to dispatch, making the queue self-healing.
2026-03-05 21:04:39 +01:00

502 lines
19 KiB
TypeScript

/**
* Execution Orchestrator
*
* Subscribes to task:completed events and orchestrates the post-completion
* merge workflow:
* - Task branch → Phase branch (on task completion)
* - Phase branch → Initiative branch (when all phase tasks done)
*
* Supports two execution modes:
* - YOLO: auto-merge everything
* - Review per-phase: pause after each phase for diff review
*/
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseChangesRequestedEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent, InitiativePendingReviewEvent, InitiativeReviewApprovedEvent } from '../events/index.js';
import type { BranchManager } from '../git/branch-manager.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js';
import { phaseBranchName, taskBranchName } from '../git/branch-naming.js';
import { ensureProjectClone } from '../git/project-clones.js';
import { createModuleLogger } from '../logger/index.js';
const log = createModuleLogger('execution-orchestrator');
export class ExecutionOrchestrator {
/** Serialize merges per phase to avoid concurrent merge conflicts */
private phaseMergeLocks: Map<string, Promise<void>> = new Map();
/** Coalesced dispatch scheduling state */
private dispatchPending = false;
private dispatchRunning = false;
constructor(
private branchManager: BranchManager,
private phaseRepository: PhaseRepository,
private taskRepository: TaskRepository,
private initiativeRepository: InitiativeRepository,
private projectRepository: ProjectRepository,
private phaseDispatchManager: PhaseDispatchManager,
private dispatchManager: DispatchManager,
private conflictResolutionService: ConflictResolutionService,
private eventBus: EventBus,
private workspaceRoot: string,
) {}
/**
* Start listening for events that drive execution.
*/
start(): void {
this.eventBus.on<TaskCompletedEvent>('task:completed', (event) => {
this.handleTaskCompleted(event).catch((err) => {
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling task:completed');
});
});
// Kick off dispatch when phases are queued (e.g. user clicks Execute)
this.eventBus.on<PhaseQueuedEvent>('phase:queued', () => this.scheduleDispatch());
// Auto-complete task + re-dispatch when an agent finishes
this.eventBus.on<AgentStoppedEvent>('agent:stopped', (event) => {
this.handleAgentStopped(event).catch((err) => {
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling agent:stopped');
});
});
log.info('execution orchestrator started');
}
// ---------------------------------------------------------------------------
// Coalesced dispatch scheduler
// ---------------------------------------------------------------------------
/**
* Schedule a dispatch cycle. Coalesces multiple rapid calls (e.g. from
* synchronous EventEmitter) into a single async run.
*/
private scheduleDispatch(): void {
if (this.dispatchRunning) {
this.dispatchPending = true;
return;
}
this.runDispatchCycle().catch((err) =>
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch cycle error'),
);
}
private async handleAgentStopped(event: AgentStoppedEvent): Promise<void> {
const { taskId, reason, agentId } = event.payload;
// Auto-complete task for successful agent completions, not manual stops
if (taskId && reason !== 'user_requested') {
try {
await this.dispatchManager.completeTask(taskId, agentId);
log.info({ taskId, agentId, reason }, 'task auto-completed on agent stop');
} catch (err) {
log.warn(
{ taskId, agentId, reason, err: err instanceof Error ? err.message : String(err) },
'failed to auto-complete task on agent stop',
);
}
}
this.scheduleDispatch();
}
private async runDispatchCycle(): Promise<void> {
this.dispatchRunning = true;
try {
do {
this.dispatchPending = false;
// Dispatch all ready phases (each queues its tasks internally)
while (true) {
const result = await this.phaseDispatchManager.dispatchNextPhase();
if (!result.success) break;
log.info({ phaseId: result.phaseId }, 'auto-dispatched phase');
}
// Dispatch all ready tasks to idle agents
while (true) {
const result = await this.dispatchManager.dispatchNext();
if (!result.success) break;
log.info({ taskId: result.taskId, agentId: result.agentId }, 'auto-dispatched task');
}
} while (this.dispatchPending);
} finally {
this.dispatchRunning = false;
}
}
/**
* Handle a task:completed event.
* Merges the task branch into the phase branch, then checks if all phase tasks are done.
*/
private async handleTaskCompleted(event: TaskCompletedEvent): Promise<void> {
const { taskId } = event.payload;
const task = await this.taskRepository.findById(taskId);
if (!task?.phaseId || !task.initiativeId) return;
const initiative = await this.initiativeRepository.findById(task.initiativeId);
if (!initiative?.branch) return;
const phase = await this.phaseRepository.findById(task.phaseId);
if (!phase) return;
// Skip merge/review tasks — they already work on the phase branch directly
if (task.category === 'merge' || task.category === 'review') return;
const initBranch = initiative.branch;
const phBranch = phaseBranchName(initBranch, phase.name);
const tBranch = taskBranchName(initBranch, task.id);
// Serialize merges per phase
const lock = this.phaseMergeLocks.get(task.phaseId) ?? Promise.resolve();
const mergeOp = lock.then(async () => {
await this.mergeTaskIntoPhase(taskId, task.phaseId!, tBranch, phBranch);
});
this.phaseMergeLocks.set(task.phaseId, mergeOp.catch(() => {}));
await mergeOp;
// Check if all phase tasks are done
const phaseTasks = await this.taskRepository.findByPhaseId(task.phaseId);
const allDone = phaseTasks.every((t) => t.status === 'completed');
if (allDone) {
await this.handlePhaseAllTasksDone(task.phaseId);
}
// Fill freed agent slot with next queued task
this.scheduleDispatch();
}
/**
* Merge a task branch into the phase branch for all linked projects.
*/
private async mergeTaskIntoPhase(
taskId: string,
phaseId: string,
taskBranch: string,
phaseBranch: string,
): Promise<void> {
const phase = await this.phaseRepository.findById(phaseId);
if (!phase) return;
const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId);
for (const project of projects) {
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
// Only merge if the task branch actually exists (agents may not have pushed)
const exists = await this.branchManager.branchExists(clonePath, taskBranch);
if (!exists) {
log.debug({ taskId, taskBranch, project: project.name }, 'task branch does not exist, skipping merge');
continue;
}
const result = await this.branchManager.mergeBranch(clonePath, taskBranch, phaseBranch);
if (!result.success && result.conflicts) {
log.warn({ taskId, taskBranch, phaseBranch, conflicts: result.conflicts }, 'task merge conflict');
await this.conflictResolutionService.handleConflict(taskId, result.conflicts, {
sourceBranch: taskBranch,
targetBranch: phaseBranch,
});
return;
}
log.info({ taskId, taskBranch, phaseBranch, project: project.name }, 'task branch merged into phase branch');
}
// Emit task:merged event
const mergedEvent: TaskMergedEvent = {
type: 'task:merged',
timestamp: new Date(),
payload: { taskId, phaseId, sourceBranch: taskBranch, targetBranch: phaseBranch },
};
this.eventBus.emit(mergedEvent);
}
/**
* Handle all tasks in a phase being done.
* YOLO mode: auto-merge phase → initiative.
* Review mode: set phase to pending_review.
*/
private async handlePhaseAllTasksDone(phaseId: string): Promise<void> {
const phase = await this.phaseRepository.findById(phaseId);
if (!phase) return;
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
if (!initiative?.branch) return;
if (initiative.executionMode === 'yolo') {
await this.mergePhaseIntoInitiative(phaseId);
await this.phaseDispatchManager.completePhase(phaseId);
// Re-queue approved phases (self-healing: survives server restarts that wipe in-memory queue)
await this.requeueApprovedPhases(phase.initiativeId);
// Check if this was the last phase — if so, trigger initiative review
const dispatched = await this.phaseDispatchManager.dispatchNextPhase();
if (!dispatched.success) {
await this.checkInitiativeCompletion(phase.initiativeId);
}
this.scheduleDispatch();
} else {
// review_per_phase
await this.phaseRepository.update(phaseId, { status: 'pending_review' as any });
const event: PhasePendingReviewEvent = {
type: 'phase:pending_review',
timestamp: new Date(),
payload: { phaseId, initiativeId: phase.initiativeId },
};
this.eventBus.emit(event);
log.info({ phaseId, initiativeId: phase.initiativeId }, 'phase pending review');
}
}
/**
* Merge phase branch into initiative branch for all linked projects.
*/
private async mergePhaseIntoInitiative(phaseId: string): Promise<void> {
const phase = await this.phaseRepository.findById(phaseId);
if (!phase) return;
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
if (!initiative?.branch) return;
const initBranch = initiative.branch;
const phBranch = phaseBranchName(initBranch, phase.name);
const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId);
for (const project of projects) {
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
const result = await this.branchManager.mergeBranch(clonePath, phBranch, initBranch);
if (!result.success) {
log.error({ phaseId, phBranch, initBranch, conflicts: result.conflicts }, 'phase merge conflict');
throw new Error(`Phase merge conflict: ${result.message}`);
}
log.info({ phaseId, phBranch, initBranch, project: project.name }, 'phase branch merged into initiative branch');
}
// Emit phase:merged event
const mergedEvent: PhaseMergedEvent = {
type: 'phase:merged',
timestamp: new Date(),
payload: { phaseId, initiativeId: phase.initiativeId, sourceBranch: phBranch, targetBranch: initBranch },
};
this.eventBus.emit(mergedEvent);
}
/**
* Approve and merge a phase that's pending review.
* Called from tRPC when user clicks approve in ReviewTab.
*/
async approveAndMergePhase(phaseId: string): Promise<void> {
const phase = await this.phaseRepository.findById(phaseId);
if (!phase) throw new Error(`Phase not found: ${phaseId}`);
if (phase.status !== 'pending_review') {
throw new Error(`Phase ${phaseId} is not pending review (status: ${phase.status})`);
}
await this.mergePhaseIntoInitiative(phaseId);
await this.phaseDispatchManager.completePhase(phaseId);
// Re-queue approved phases (self-healing: survives server restarts that wipe in-memory queue)
await this.requeueApprovedPhases(phase.initiativeId);
// Check if this was the last phase — if so, trigger initiative review
const dispatched = await this.phaseDispatchManager.dispatchNextPhase();
if (!dispatched.success) {
await this.checkInitiativeCompletion(phase.initiativeId);
}
this.scheduleDispatch();
log.info({ phaseId }, 'phase review approved and merged');
}
/**
* Request changes on a phase that's pending review.
* Creates a revision task, resets the phase to in_progress, and dispatches.
*/
async requestChangesOnPhase(
phaseId: string,
unresolvedComments: Array<{ filePath: string; lineNumber: number; body: string }>,
summary?: string,
): Promise<{ taskId: string }> {
const phase = await this.phaseRepository.findById(phaseId);
if (!phase) throw new Error(`Phase not found: ${phaseId}`);
if (phase.status !== 'pending_review') {
throw new Error(`Phase ${phaseId} is not pending review (status: ${phase.status})`);
}
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
if (!initiative) throw new Error(`Initiative not found: ${phase.initiativeId}`);
// Build revision task description from comments + summary
const lines: string[] = [];
if (summary) {
lines.push(`## Summary\n\n${summary}\n`);
}
if (unresolvedComments.length > 0) {
lines.push('## Review Comments\n');
// Group comments by file
const byFile = new Map<string, typeof unresolvedComments>();
for (const c of unresolvedComments) {
const arr = byFile.get(c.filePath) ?? [];
arr.push(c);
byFile.set(c.filePath, arr);
}
for (const [filePath, fileComments] of byFile) {
lines.push(`### ${filePath}\n`);
for (const c of fileComments) {
lines.push(`- **Line ${c.lineNumber}**: ${c.body}`);
}
lines.push('');
}
}
const description = lines.join('\n') || 'Address review feedback.';
// Create revision task
const task = await this.taskRepository.create({
phaseId,
initiativeId: phase.initiativeId,
name: `Address review feedback: ${phase.name}`,
description,
category: 'review',
priority: 'high',
});
// Reset phase status back to in_progress
await this.phaseRepository.update(phaseId, { status: 'in_progress' as any });
// Queue task for dispatch
await this.dispatchManager.queue(task.id);
// Emit event
const event: PhaseChangesRequestedEvent = {
type: 'phase:changes_requested',
timestamp: new Date(),
payload: {
phaseId,
initiativeId: phase.initiativeId,
taskId: task.id,
commentCount: unresolvedComments.length,
},
};
this.eventBus.emit(event);
log.info({ phaseId, taskId: task.id, commentCount: unresolvedComments.length }, 'changes requested on phase');
// Kick off dispatch
this.scheduleDispatch();
return { taskId: task.id };
}
/**
* Re-queue approved phases for an initiative into the in-memory dispatch queue.
* Self-healing: ensures phases aren't lost if the server restarted since the
* initial queueAllPhases() call.
*/
private async requeueApprovedPhases(initiativeId: string): Promise<void> {
const phases = await this.phaseRepository.findByInitiativeId(initiativeId);
for (const p of phases) {
if (p.status === 'approved') {
try {
await this.phaseDispatchManager.queuePhase(p.id);
log.info({ phaseId: p.id }, 're-queued approved phase');
} catch {
// Already queued or status changed — safe to ignore
}
}
}
}
/**
* Check if all phases for an initiative are completed.
* If so, set initiative to pending_review and emit event.
*/
private async checkInitiativeCompletion(initiativeId: string): Promise<void> {
const phases = await this.phaseRepository.findByInitiativeId(initiativeId);
if (phases.length === 0) return;
const allCompleted = phases.every((p) => p.status === 'completed');
if (!allCompleted) return;
const initiative = await this.initiativeRepository.findById(initiativeId);
if (!initiative?.branch) return;
if (initiative.status !== 'active') return;
await this.initiativeRepository.update(initiativeId, { status: 'pending_review' as any });
const event: InitiativePendingReviewEvent = {
type: 'initiative:pending_review',
timestamp: new Date(),
payload: { initiativeId, branch: initiative.branch },
};
this.eventBus.emit(event);
log.info({ initiativeId, branch: initiative.branch }, 'initiative pending review — all phases completed');
}
/**
* Approve an initiative review.
* Either pushes the initiative branch or merges into default + pushes.
*/
async approveInitiative(
initiativeId: string,
strategy: 'push_branch' | 'merge_and_push',
): Promise<void> {
const initiative = await this.initiativeRepository.findById(initiativeId);
if (!initiative) throw new Error(`Initiative not found: ${initiativeId}`);
if (initiative.status !== 'pending_review') {
throw new Error(`Initiative ${initiativeId} is not pending review (status: ${initiative.status})`);
}
if (!initiative.branch) {
throw new Error(`Initiative ${initiativeId} has no branch configured`);
}
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
for (const project of projects) {
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
const branchExists = await this.branchManager.branchExists(clonePath, initiative.branch);
if (!branchExists) {
log.warn({ initiativeId, branch: initiative.branch, project: project.name }, 'initiative branch does not exist in project, skipping');
continue;
}
if (strategy === 'merge_and_push') {
const result = await this.branchManager.mergeBranch(clonePath, initiative.branch, project.defaultBranch);
if (!result.success) {
throw new Error(`Failed to merge ${initiative.branch} into ${project.defaultBranch} for project ${project.name}: ${result.message}`);
}
await this.branchManager.pushBranch(clonePath, project.defaultBranch);
log.info({ initiativeId, project: project.name }, 'initiative branch merged into default and pushed');
} else {
await this.branchManager.pushBranch(clonePath, initiative.branch);
log.info({ initiativeId, project: project.name }, 'initiative branch pushed to remote');
}
}
await this.initiativeRepository.update(initiativeId, { status: 'completed' as any });
const event: InitiativeReviewApprovedEvent = {
type: 'initiative:review_approved',
timestamp: new Date(),
payload: { initiativeId, branch: initiative.branch, strategy },
};
this.eventBus.emit(event);
log.info({ initiativeId, strategy }, 'initiative review approved');
}
}