When merge_and_push failed at the push step, the local defaultBranch ref was left pointing at the merge commit. This made the three-dot diff (defaultBranch...initiativeBranch) return empty because main already contained all changes — causing the review tab to show "no changes." Now mergeBranch returns the previous ref, and approveInitiative restores it on push failure. Also repaired the corrupted clone state.
729 lines
28 KiB
TypeScript
729 lines
28 KiB
TypeScript
/**
|
|
* Execution Orchestrator
|
|
*
|
|
* Subscribes to task:completed events and orchestrates the post-completion
|
|
* merge workflow:
|
|
* - Task branch → Phase branch (on task completion)
|
|
* - Phase branch → Initiative branch (when all phase tasks done)
|
|
*
|
|
* Supports two execution modes:
|
|
* - YOLO: auto-merge everything
|
|
* - Review per-phase: pause after each phase for diff review
|
|
*/
|
|
|
|
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseChangesRequestedEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent, AgentCrashedEvent, InitiativePendingReviewEvent, InitiativeReviewApprovedEvent, InitiativeChangesRequestedEvent } from '../events/index.js';
|
|
import type { BranchManager } from '../git/branch-manager.js';
|
|
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
|
|
import type { TaskRepository } from '../db/repositories/task-repository.js';
|
|
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
|
|
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
|
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
|
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
|
|
import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js';
|
|
import { phaseBranchName, taskBranchName } from '../git/branch-naming.js';
|
|
import { ensureProjectClone } from '../git/project-clones.js';
|
|
import { createModuleLogger } from '../logger/index.js';
|
|
|
|
const log = createModuleLogger('execution-orchestrator');
|
|
|
|
/** Maximum number of automatic retries for crashed tasks before blocking */
|
|
const MAX_TASK_RETRIES = 3;
|
|
|
|
export class ExecutionOrchestrator {
|
|
/** Serialize merges per phase to avoid concurrent merge conflicts */
|
|
private phaseMergeLocks: Map<string, Promise<void>> = new Map();
|
|
|
|
/** Coalesced dispatch scheduling state */
|
|
private dispatchPending = false;
|
|
private dispatchRunning = false;
|
|
|
|
constructor(
|
|
private branchManager: BranchManager,
|
|
private phaseRepository: PhaseRepository,
|
|
private taskRepository: TaskRepository,
|
|
private initiativeRepository: InitiativeRepository,
|
|
private projectRepository: ProjectRepository,
|
|
private phaseDispatchManager: PhaseDispatchManager,
|
|
private dispatchManager: DispatchManager,
|
|
private conflictResolutionService: ConflictResolutionService,
|
|
private eventBus: EventBus,
|
|
private workspaceRoot: string,
|
|
private agentRepository?: AgentRepository,
|
|
) {}
|
|
|
|
/**
|
|
* Start listening for events that drive execution.
|
|
*/
|
|
start(): void {
|
|
this.eventBus.on<TaskCompletedEvent>('task:completed', (event) => {
|
|
this.handleTaskCompleted(event).catch((err) => {
|
|
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling task:completed');
|
|
});
|
|
});
|
|
|
|
// Kick off dispatch when phases are queued (e.g. user clicks Execute)
|
|
this.eventBus.on<PhaseQueuedEvent>('phase:queued', () => this.scheduleDispatch());
|
|
|
|
// Auto-complete task + re-dispatch when an agent finishes
|
|
this.eventBus.on<AgentStoppedEvent>('agent:stopped', (event) => {
|
|
this.handleAgentStopped(event).catch((err) => {
|
|
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling agent:stopped');
|
|
});
|
|
});
|
|
|
|
// Auto-retry crashed agent tasks (up to MAX_TASK_RETRIES)
|
|
this.eventBus.on<AgentCrashedEvent>('agent:crashed', (event) => {
|
|
this.handleAgentCrashed(event).catch((err) => {
|
|
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling agent:crashed');
|
|
});
|
|
});
|
|
|
|
// Recover in-memory dispatch queues from DB state (survives server restarts)
|
|
this.recoverDispatchQueues().catch((err) => {
|
|
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch queue recovery failed');
|
|
});
|
|
|
|
log.info('execution orchestrator started');
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Coalesced dispatch scheduler
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Schedule a dispatch cycle. Coalesces multiple rapid calls (e.g. from
|
|
* synchronous EventEmitter) into a single async run.
|
|
*/
|
|
private scheduleDispatch(): void {
|
|
if (this.dispatchRunning) {
|
|
this.dispatchPending = true;
|
|
return;
|
|
}
|
|
this.runDispatchCycle().catch((err) =>
|
|
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch cycle error'),
|
|
);
|
|
}
|
|
|
|
private async handleAgentStopped(event: AgentStoppedEvent): Promise<void> {
|
|
const { taskId, reason, agentId } = event.payload;
|
|
|
|
// Auto-complete task for successful agent completions, not manual stops
|
|
if (taskId && reason !== 'user_requested') {
|
|
try {
|
|
await this.dispatchManager.completeTask(taskId, agentId);
|
|
log.info({ taskId, agentId, reason }, 'task auto-completed on agent stop');
|
|
} catch (err) {
|
|
log.warn(
|
|
{ taskId, agentId, reason, err: err instanceof Error ? err.message : String(err) },
|
|
'failed to auto-complete task on agent stop',
|
|
);
|
|
}
|
|
}
|
|
|
|
this.scheduleDispatch();
|
|
}
|
|
|
|
private async handleAgentCrashed(event: AgentCrashedEvent): Promise<void> {
|
|
const { taskId, agentId, error } = event.payload;
|
|
if (!taskId) return;
|
|
|
|
const task = await this.taskRepository.findById(taskId);
|
|
if (!task || task.status !== 'in_progress') return;
|
|
|
|
const retryCount = (task.retryCount ?? 0) + 1;
|
|
if (retryCount > MAX_TASK_RETRIES) {
|
|
log.warn({ taskId, agentId, retryCount, error }, 'task exceeded max retries, leaving in_progress');
|
|
return;
|
|
}
|
|
|
|
// Reset task for re-dispatch with incremented retry count
|
|
await this.taskRepository.update(taskId, { status: 'pending', retryCount });
|
|
await this.dispatchManager.queue(taskId);
|
|
log.info({ taskId, agentId, retryCount, error }, 'crashed task re-queued for retry');
|
|
|
|
this.scheduleDispatch();
|
|
}
|
|
|
|
private async runDispatchCycle(): Promise<void> {
|
|
this.dispatchRunning = true;
|
|
try {
|
|
do {
|
|
this.dispatchPending = false;
|
|
// Dispatch all ready phases (each queues its tasks internally)
|
|
while (true) {
|
|
const result = await this.phaseDispatchManager.dispatchNextPhase();
|
|
if (!result.success) break;
|
|
log.info({ phaseId: result.phaseId }, 'auto-dispatched phase');
|
|
}
|
|
// Dispatch all ready tasks to idle agents
|
|
while (true) {
|
|
const result = await this.dispatchManager.dispatchNext();
|
|
if (!result.success) break;
|
|
log.info({ taskId: result.taskId, agentId: result.agentId }, 'auto-dispatched task');
|
|
}
|
|
} while (this.dispatchPending);
|
|
} finally {
|
|
this.dispatchRunning = false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle a task:completed event.
|
|
* Merges the task branch into the phase branch, then checks if all phase tasks are done.
|
|
*/
|
|
private async handleTaskCompleted(event: TaskCompletedEvent): Promise<void> {
|
|
const { taskId } = event.payload;
|
|
|
|
const task = await this.taskRepository.findById(taskId);
|
|
if (!task?.phaseId || !task.initiativeId) return;
|
|
|
|
const initiative = await this.initiativeRepository.findById(task.initiativeId);
|
|
const phase = await this.phaseRepository.findById(task.phaseId);
|
|
if (!phase) return;
|
|
|
|
// Merge task branch into phase branch (only when branches exist)
|
|
if (initiative?.branch && task.category !== 'merge' && task.category !== 'review') {
|
|
try {
|
|
const initBranch = initiative.branch;
|
|
const phBranch = phaseBranchName(initBranch, phase.name);
|
|
const tBranch = taskBranchName(initBranch, task.id);
|
|
|
|
// Serialize merges per phase
|
|
const lock = this.phaseMergeLocks.get(task.phaseId) ?? Promise.resolve();
|
|
const mergeOp = lock.then(async () => {
|
|
await this.mergeTaskIntoPhase(taskId, task.phaseId!, tBranch, phBranch);
|
|
});
|
|
this.phaseMergeLocks.set(task.phaseId, mergeOp.catch(() => {}));
|
|
await mergeOp;
|
|
} catch (err) {
|
|
log.error({ taskId, err: err instanceof Error ? err.message : String(err) }, 'task merge failed, still checking phase completion');
|
|
}
|
|
}
|
|
|
|
// Check if all phase tasks are done — always, regardless of branch/merge status
|
|
const phaseTasks = await this.taskRepository.findByPhaseId(task.phaseId);
|
|
const allDone = phaseTasks.every((t) => t.status === 'completed');
|
|
if (allDone) {
|
|
await this.handlePhaseAllTasksDone(task.phaseId);
|
|
}
|
|
|
|
// Fill freed agent slot with next queued task
|
|
this.scheduleDispatch();
|
|
}
|
|
|
|
/**
|
|
* Merge a task branch into the phase branch for all linked projects.
|
|
*/
|
|
private async mergeTaskIntoPhase(
|
|
taskId: string,
|
|
phaseId: string,
|
|
taskBranch: string,
|
|
phaseBranch: string,
|
|
): Promise<void> {
|
|
const phase = await this.phaseRepository.findById(phaseId);
|
|
if (!phase) return;
|
|
|
|
const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId);
|
|
|
|
for (const project of projects) {
|
|
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
|
|
|
|
// Only merge if the task branch actually exists (agents may not have pushed)
|
|
const exists = await this.branchManager.branchExists(clonePath, taskBranch);
|
|
if (!exists) {
|
|
log.debug({ taskId, taskBranch, project: project.name }, 'task branch does not exist, skipping merge');
|
|
continue;
|
|
}
|
|
|
|
const result = await this.branchManager.mergeBranch(clonePath, taskBranch, phaseBranch);
|
|
|
|
if (!result.success && result.conflicts) {
|
|
log.warn({ taskId, taskBranch, phaseBranch, conflicts: result.conflicts }, 'task merge conflict');
|
|
await this.conflictResolutionService.handleConflict(taskId, result.conflicts, {
|
|
sourceBranch: taskBranch,
|
|
targetBranch: phaseBranch,
|
|
});
|
|
return;
|
|
}
|
|
|
|
log.info({ taskId, taskBranch, phaseBranch, project: project.name }, 'task branch merged into phase branch');
|
|
}
|
|
|
|
// Emit task:merged event
|
|
const mergedEvent: TaskMergedEvent = {
|
|
type: 'task:merged',
|
|
timestamp: new Date(),
|
|
payload: { taskId, phaseId, sourceBranch: taskBranch, targetBranch: phaseBranch },
|
|
};
|
|
this.eventBus.emit(mergedEvent);
|
|
}
|
|
|
|
/**
|
|
* Handle all tasks in a phase being done.
|
|
* YOLO mode: auto-merge phase → initiative.
|
|
* Review mode: set phase to pending_review.
|
|
*/
|
|
private async handlePhaseAllTasksDone(phaseId: string): Promise<void> {
|
|
const phase = await this.phaseRepository.findById(phaseId);
|
|
if (!phase) return;
|
|
|
|
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
|
|
if (!initiative) return;
|
|
|
|
if (initiative.executionMode === 'yolo') {
|
|
// Merge phase branch into initiative branch (only when branches exist)
|
|
if (initiative.branch) {
|
|
await this.mergePhaseIntoInitiative(phaseId);
|
|
}
|
|
await this.phaseDispatchManager.completePhase(phaseId);
|
|
|
|
// Re-queue approved phases (self-healing: survives server restarts that wipe in-memory queue)
|
|
await this.requeueApprovedPhases(phase.initiativeId);
|
|
|
|
// Check if this was the last phase — if so, trigger initiative review
|
|
const dispatched = await this.phaseDispatchManager.dispatchNextPhase();
|
|
if (!dispatched.success) {
|
|
await this.checkInitiativeCompletion(phase.initiativeId);
|
|
}
|
|
this.scheduleDispatch();
|
|
} else {
|
|
// review_per_phase
|
|
await this.phaseRepository.update(phaseId, { status: 'pending_review' as any });
|
|
|
|
const event: PhasePendingReviewEvent = {
|
|
type: 'phase:pending_review',
|
|
timestamp: new Date(),
|
|
payload: { phaseId, initiativeId: phase.initiativeId },
|
|
};
|
|
this.eventBus.emit(event);
|
|
|
|
log.info({ phaseId, initiativeId: phase.initiativeId }, 'phase pending review');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Merge phase branch into initiative branch for all linked projects.
|
|
*/
|
|
private async mergePhaseIntoInitiative(phaseId: string): Promise<void> {
|
|
const phase = await this.phaseRepository.findById(phaseId);
|
|
if (!phase) return;
|
|
|
|
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
|
|
if (!initiative?.branch) return;
|
|
|
|
const initBranch = initiative.branch;
|
|
const phBranch = phaseBranchName(initBranch, phase.name);
|
|
|
|
const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId);
|
|
|
|
// Store merge base before merging so we can reconstruct diffs for completed phases
|
|
for (const project of projects) {
|
|
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
|
|
try {
|
|
const mergeBase = await this.branchManager.getMergeBase(clonePath, initBranch, phBranch);
|
|
await this.phaseRepository.update(phaseId, { mergeBase });
|
|
break; // Only need one merge base (first project)
|
|
} catch {
|
|
// Phase branch may not exist in this project clone
|
|
}
|
|
}
|
|
|
|
for (const project of projects) {
|
|
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
|
|
const result = await this.branchManager.mergeBranch(clonePath, phBranch, initBranch);
|
|
|
|
if (!result.success) {
|
|
log.error({ phaseId, phBranch, initBranch, conflicts: result.conflicts }, 'phase merge conflict');
|
|
throw new Error(`Phase merge conflict: ${result.message}`);
|
|
}
|
|
|
|
log.info({ phaseId, phBranch, initBranch, project: project.name }, 'phase branch merged into initiative branch');
|
|
}
|
|
|
|
// Emit phase:merged event
|
|
const mergedEvent: PhaseMergedEvent = {
|
|
type: 'phase:merged',
|
|
timestamp: new Date(),
|
|
payload: { phaseId, initiativeId: phase.initiativeId, sourceBranch: phBranch, targetBranch: initBranch },
|
|
};
|
|
this.eventBus.emit(mergedEvent);
|
|
}
|
|
|
|
/**
|
|
* Approve and merge a phase that's pending review.
|
|
* Called from tRPC when user clicks approve in ReviewTab.
|
|
*/
|
|
async approveAndMergePhase(phaseId: string): Promise<void> {
|
|
const phase = await this.phaseRepository.findById(phaseId);
|
|
if (!phase) throw new Error(`Phase not found: ${phaseId}`);
|
|
if (phase.status !== 'pending_review') {
|
|
throw new Error(`Phase ${phaseId} is not pending review (status: ${phase.status})`);
|
|
}
|
|
|
|
await this.mergePhaseIntoInitiative(phaseId);
|
|
await this.phaseDispatchManager.completePhase(phaseId);
|
|
|
|
// Re-queue approved phases (self-healing: survives server restarts that wipe in-memory queue)
|
|
await this.requeueApprovedPhases(phase.initiativeId);
|
|
|
|
// Check if this was the last phase — if so, trigger initiative review
|
|
const dispatched = await this.phaseDispatchManager.dispatchNextPhase();
|
|
if (!dispatched.success) {
|
|
await this.checkInitiativeCompletion(phase.initiativeId);
|
|
}
|
|
this.scheduleDispatch();
|
|
|
|
log.info({ phaseId }, 'phase review approved and merged');
|
|
}
|
|
|
|
/**
|
|
* Request changes on a phase that's pending review.
|
|
* Creates a revision task, resets the phase to in_progress, and dispatches.
|
|
*/
|
|
async requestChangesOnPhase(
|
|
phaseId: string,
|
|
unresolvedThreads: Array<{
|
|
id: string;
|
|
filePath: string;
|
|
lineNumber: number;
|
|
body: string;
|
|
author: string;
|
|
replies: Array<{ id: string; body: string; author: string }>;
|
|
}>,
|
|
summary?: string,
|
|
): Promise<{ taskId: string }> {
|
|
const phase = await this.phaseRepository.findById(phaseId);
|
|
if (!phase) throw new Error(`Phase not found: ${phaseId}`);
|
|
if (phase.status !== 'pending_review') {
|
|
throw new Error(`Phase ${phaseId} is not pending review (status: ${phase.status})`);
|
|
}
|
|
|
|
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
|
|
if (!initiative) throw new Error(`Initiative not found: ${phase.initiativeId}`);
|
|
|
|
// Guard: don't create duplicate review tasks
|
|
const existingTasks = await this.taskRepository.findByPhaseId(phaseId);
|
|
const activeReview = existingTasks.find(
|
|
(t) => t.category === 'review' && (t.status === 'pending' || t.status === 'in_progress'),
|
|
);
|
|
if (activeReview) {
|
|
return { taskId: activeReview.id };
|
|
}
|
|
|
|
// Build revision task description from threaded comments + summary
|
|
const lines: string[] = [];
|
|
if (summary) {
|
|
lines.push(`## Summary\n\n${summary}\n`);
|
|
}
|
|
if (unresolvedThreads.length > 0) {
|
|
lines.push('## Review Comments\n');
|
|
// Group comments by file
|
|
const byFile = new Map<string, typeof unresolvedThreads>();
|
|
for (const c of unresolvedThreads) {
|
|
const arr = byFile.get(c.filePath) ?? [];
|
|
arr.push(c);
|
|
byFile.set(c.filePath, arr);
|
|
}
|
|
for (const [filePath, fileComments] of byFile) {
|
|
lines.push(`### ${filePath}\n`);
|
|
for (const c of fileComments) {
|
|
lines.push(`#### Line ${c.lineNumber} [comment:${c.id}]`);
|
|
lines.push(`**${c.author}**: ${c.body}`);
|
|
for (const r of c.replies) {
|
|
lines.push(`> **${r.author}**: ${r.body}`);
|
|
}
|
|
lines.push('');
|
|
}
|
|
}
|
|
}
|
|
|
|
const description = lines.join('\n') || 'Address review feedback.';
|
|
|
|
// Create revision task
|
|
const task = await this.taskRepository.create({
|
|
phaseId,
|
|
initiativeId: phase.initiativeId,
|
|
name: `Address review feedback: ${phase.name}`,
|
|
description,
|
|
category: 'review',
|
|
priority: 'high',
|
|
});
|
|
|
|
// Reset phase status back to in_progress
|
|
await this.phaseRepository.update(phaseId, { status: 'in_progress' as any });
|
|
|
|
// Queue task for dispatch
|
|
await this.dispatchManager.queue(task.id);
|
|
|
|
// Emit event
|
|
const event: PhaseChangesRequestedEvent = {
|
|
type: 'phase:changes_requested',
|
|
timestamp: new Date(),
|
|
payload: {
|
|
phaseId,
|
|
initiativeId: phase.initiativeId,
|
|
taskId: task.id,
|
|
commentCount: unresolvedThreads.length,
|
|
},
|
|
};
|
|
this.eventBus.emit(event);
|
|
|
|
log.info({ phaseId, taskId: task.id, commentCount: unresolvedThreads.length }, 'changes requested on phase');
|
|
|
|
// Kick off dispatch
|
|
this.scheduleDispatch();
|
|
|
|
return { taskId: task.id };
|
|
}
|
|
|
|
/**
|
|
* Request changes on an initiative that's pending review.
|
|
* Creates/reuses a "Finalization" phase and adds a review task to it.
|
|
*/
|
|
async requestChangesOnInitiative(
|
|
initiativeId: string,
|
|
summary: string,
|
|
): Promise<{ taskId: string }> {
|
|
const initiative = await this.initiativeRepository.findById(initiativeId);
|
|
if (!initiative) throw new Error(`Initiative not found: ${initiativeId}`);
|
|
if (initiative.status !== 'pending_review') {
|
|
throw new Error(`Initiative ${initiativeId} is not pending review (status: ${initiative.status})`);
|
|
}
|
|
|
|
// Find or create a "Finalization" phase
|
|
const phases = await this.phaseRepository.findByInitiativeId(initiativeId);
|
|
let finalizationPhase = phases.find((p) => p.name === 'Finalization');
|
|
|
|
if (!finalizationPhase) {
|
|
finalizationPhase = await this.phaseRepository.create({
|
|
initiativeId,
|
|
name: 'Finalization',
|
|
status: 'in_progress',
|
|
});
|
|
} else if (finalizationPhase.status === 'completed' || finalizationPhase.status === 'pending_review') {
|
|
await this.phaseRepository.update(finalizationPhase.id, { status: 'in_progress' as any });
|
|
}
|
|
|
|
// Guard: don't create duplicate review tasks
|
|
const existingTasks = await this.taskRepository.findByPhaseId(finalizationPhase.id);
|
|
const activeReview = existingTasks.find(
|
|
(t) => t.category === 'review' && (t.status === 'pending' || t.status === 'in_progress'),
|
|
);
|
|
if (activeReview) {
|
|
// Still reset initiative to active
|
|
await this.initiativeRepository.update(initiativeId, { status: 'active' as any });
|
|
this.scheduleDispatch();
|
|
return { taskId: activeReview.id };
|
|
}
|
|
|
|
// Create review task
|
|
const task = await this.taskRepository.create({
|
|
phaseId: finalizationPhase.id,
|
|
initiativeId,
|
|
name: `Address initiative review feedback`,
|
|
description: `## Summary\n\n${summary}`,
|
|
category: 'review',
|
|
priority: 'high',
|
|
});
|
|
|
|
// Reset initiative status to active
|
|
await this.initiativeRepository.update(initiativeId, { status: 'active' as any });
|
|
|
|
// Queue task for dispatch
|
|
await this.dispatchManager.queue(task.id);
|
|
|
|
// Emit event
|
|
const event: InitiativeChangesRequestedEvent = {
|
|
type: 'initiative:changes_requested',
|
|
timestamp: new Date(),
|
|
payload: {
|
|
initiativeId,
|
|
phaseId: finalizationPhase.id,
|
|
taskId: task.id,
|
|
},
|
|
};
|
|
this.eventBus.emit(event);
|
|
|
|
log.info({ initiativeId, phaseId: finalizationPhase.id, taskId: task.id }, 'changes requested on initiative');
|
|
|
|
this.scheduleDispatch();
|
|
|
|
return { taskId: task.id };
|
|
}
|
|
|
|
/**
|
|
* Re-queue approved phases for an initiative into the in-memory dispatch queue.
|
|
* Self-healing: ensures phases aren't lost if the server restarted since the
|
|
* initial queueAllPhases() call.
|
|
*/
|
|
private async requeueApprovedPhases(initiativeId: string): Promise<void> {
|
|
const phases = await this.phaseRepository.findByInitiativeId(initiativeId);
|
|
for (const p of phases) {
|
|
if (p.status === 'approved') {
|
|
try {
|
|
await this.phaseDispatchManager.queuePhase(p.id);
|
|
log.info({ phaseId: p.id }, 're-queued approved phase');
|
|
} catch {
|
|
// Already queued or status changed — safe to ignore
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Recover in-memory dispatch queues from DB state on server startup.
|
|
* Re-queues approved phases and pending tasks for in_progress phases.
|
|
*/
|
|
private async recoverDispatchQueues(): Promise<void> {
|
|
const initiatives = await this.initiativeRepository.findByStatus('active');
|
|
let phasesRecovered = 0;
|
|
let tasksRecovered = 0;
|
|
|
|
for (const initiative of initiatives) {
|
|
const phases = await this.phaseRepository.findByInitiativeId(initiative.id);
|
|
|
|
for (const phase of phases) {
|
|
// Re-queue approved phases into the phase dispatch queue
|
|
if (phase.status === 'approved') {
|
|
try {
|
|
await this.phaseDispatchManager.queuePhase(phase.id);
|
|
phasesRecovered++;
|
|
} catch {
|
|
// Already queued or status changed
|
|
}
|
|
}
|
|
|
|
// Re-queue pending tasks and recover stuck in_progress tasks for in_progress phases
|
|
if (phase.status === 'in_progress') {
|
|
const tasks = await this.taskRepository.findByPhaseId(phase.id);
|
|
for (const task of tasks) {
|
|
if (task.status === 'pending') {
|
|
try {
|
|
await this.dispatchManager.queue(task.id);
|
|
tasksRecovered++;
|
|
} catch {
|
|
// Already queued or task issue
|
|
}
|
|
} else if (task.status === 'in_progress' && this.agentRepository) {
|
|
// Check if the assigned agent is still alive
|
|
const agent = await this.agentRepository.findByTaskId(task.id);
|
|
const isAlive = agent && (agent.status === 'running' || agent.status === 'waiting_for_input');
|
|
if (!isAlive) {
|
|
// Agent is dead — reset task for re-dispatch
|
|
await this.taskRepository.update(task.id, { status: 'pending' });
|
|
await this.dispatchManager.queue(task.id);
|
|
tasksRecovered++;
|
|
log.info({ taskId: task.id, agentId: agent?.id }, 'recovered stuck in_progress task (dead agent)');
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (phasesRecovered > 0 || tasksRecovered > 0) {
|
|
log.info({ phasesRecovered, tasksRecovered }, 'recovered dispatch queues from DB state');
|
|
this.scheduleDispatch();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check if all phases for an initiative are completed.
|
|
* If so, set initiative to pending_review and emit event.
|
|
*/
|
|
private async checkInitiativeCompletion(initiativeId: string): Promise<void> {
|
|
const phases = await this.phaseRepository.findByInitiativeId(initiativeId);
|
|
if (phases.length === 0) return;
|
|
|
|
const allCompleted = phases.every((p) => p.status === 'completed');
|
|
if (!allCompleted) return;
|
|
|
|
const initiative = await this.initiativeRepository.findById(initiativeId);
|
|
if (!initiative?.branch) return;
|
|
if (initiative.status !== 'active') return;
|
|
|
|
await this.initiativeRepository.update(initiativeId, { status: 'pending_review' as any });
|
|
|
|
const event: InitiativePendingReviewEvent = {
|
|
type: 'initiative:pending_review',
|
|
timestamp: new Date(),
|
|
payload: { initiativeId, branch: initiative.branch },
|
|
};
|
|
this.eventBus.emit(event);
|
|
|
|
log.info({ initiativeId, branch: initiative.branch }, 'initiative pending review — all phases completed');
|
|
}
|
|
|
|
/**
|
|
* Approve an initiative review.
|
|
* Either pushes the initiative branch or merges into default + pushes.
|
|
*/
|
|
async approveInitiative(
|
|
initiativeId: string,
|
|
strategy: 'push_branch' | 'merge_and_push',
|
|
): Promise<void> {
|
|
const initiative = await this.initiativeRepository.findById(initiativeId);
|
|
if (!initiative) throw new Error(`Initiative not found: ${initiativeId}`);
|
|
if (initiative.status !== 'pending_review') {
|
|
throw new Error(`Initiative ${initiativeId} is not pending review (status: ${initiative.status})`);
|
|
}
|
|
if (!initiative.branch) {
|
|
throw new Error(`Initiative ${initiativeId} has no branch configured`);
|
|
}
|
|
|
|
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
|
|
|
|
for (const project of projects) {
|
|
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
|
|
const branchExists = await this.branchManager.branchExists(clonePath, initiative.branch);
|
|
if (!branchExists) {
|
|
log.warn({ initiativeId, branch: initiative.branch, project: project.name }, 'initiative branch does not exist in project, skipping');
|
|
continue;
|
|
}
|
|
|
|
// Fetch remote so local branches are up-to-date before merge/push
|
|
await this.branchManager.fetchRemote(clonePath);
|
|
|
|
if (strategy === 'merge_and_push') {
|
|
// Fast-forward local defaultBranch to match origin before merging
|
|
try {
|
|
await this.branchManager.fastForwardBranch(clonePath, project.defaultBranch);
|
|
} catch (ffErr) {
|
|
log.warn({ project: project.name, err: (ffErr as Error).message }, 'fast-forward of default branch failed — attempting merge anyway');
|
|
}
|
|
const result = await this.branchManager.mergeBranch(clonePath, initiative.branch, project.defaultBranch);
|
|
if (!result.success) {
|
|
throw new Error(`Failed to merge ${initiative.branch} into ${project.defaultBranch} for project ${project.name}: ${result.message}`);
|
|
}
|
|
try {
|
|
await this.branchManager.pushBranch(clonePath, project.defaultBranch);
|
|
} catch (pushErr) {
|
|
// Roll back the merge so the diff doesn't disappear from the review tab.
|
|
// Without rollback, defaultBranch includes the initiative changes and the
|
|
// three-dot diff (defaultBranch...initiativeBranch) becomes empty.
|
|
if (result.previousRef) {
|
|
log.warn({ project: project.name, previousRef: result.previousRef }, 'push failed — rolling back merge');
|
|
await this.branchManager.updateRef(clonePath, project.defaultBranch, result.previousRef);
|
|
}
|
|
throw pushErr;
|
|
}
|
|
log.info({ initiativeId, project: project.name }, 'initiative branch merged into default and pushed');
|
|
} else {
|
|
await this.branchManager.pushBranch(clonePath, initiative.branch);
|
|
log.info({ initiativeId, project: project.name }, 'initiative branch pushed to remote');
|
|
}
|
|
}
|
|
|
|
await this.initiativeRepository.update(initiativeId, { status: 'completed' as any });
|
|
|
|
const event: InitiativeReviewApprovedEvent = {
|
|
type: 'initiative:review_approved',
|
|
timestamp: new Date(),
|
|
payload: { initiativeId, branch: initiative.branch, strategy },
|
|
};
|
|
this.eventBus.emit(event);
|
|
|
|
log.info({ initiativeId, strategy }, 'initiative review approved');
|
|
}
|
|
}
|