# Conflicts: # apps/server/drizzle/meta/0037_snapshot.json # apps/server/drizzle/meta/_journal.json
845 lines
33 KiB
TypeScript
845 lines
33 KiB
TypeScript
/**
|
|
* Execution Orchestrator
|
|
*
|
|
* Subscribes to task:completed events and orchestrates the post-completion
|
|
* merge workflow:
|
|
* - Task branch → Phase branch (on task completion)
|
|
* - Phase branch → Initiative branch (when all phase tasks done)
|
|
*
|
|
* Supports two execution modes:
|
|
* - YOLO: auto-merge everything
|
|
* - Review per-phase: pause after each phase for diff review
|
|
*/
|
|
|
|
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseChangesRequestedEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent, AgentCrashedEvent, InitiativePendingReviewEvent, InitiativeReviewApprovedEvent, InitiativeChangesRequestedEvent } from '../events/index.js';
|
|
import type { BranchManager } from '../git/branch-manager.js';
|
|
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
|
|
import type { TaskRepository } from '../db/repositories/task-repository.js';
|
|
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
|
|
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
|
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
|
import type { AgentManager } from '../agent/types.js';
|
|
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
|
|
import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js';
|
|
import { phaseBranchName, taskBranchName, isPlanningCategory } from '../git/branch-naming.js';
|
|
import { ensureProjectClone } from '../git/project-clones.js';
|
|
import { createModuleLogger } from '../logger/index.js';
|
|
import { phaseMetaCache, fileDiffCache } from '../review/diff-cache.js';
|
|
import { shouldRunQualityReview, runQualityReview } from './quality-review.js';
|
|
|
|
const log = createModuleLogger('execution-orchestrator');
|
|
|
|
/** Maximum number of automatic retries for crashed tasks before blocking */
|
|
const MAX_TASK_RETRIES = 3;
|
|
|
|
export class ExecutionOrchestrator {
|
|
/** Serialize merges per phase to avoid concurrent merge conflicts */
|
|
private phaseMergeLocks: Map<string, Promise<void>> = new Map();
|
|
|
|
/** Coalesced dispatch scheduling state */
|
|
private dispatchPending = false;
|
|
private dispatchRunning = false;
|
|
|
|
constructor(
|
|
private branchManager: BranchManager,
|
|
private phaseRepository: PhaseRepository,
|
|
private taskRepository: TaskRepository,
|
|
private initiativeRepository: InitiativeRepository,
|
|
private projectRepository: ProjectRepository,
|
|
private phaseDispatchManager: PhaseDispatchManager,
|
|
private dispatchManager: DispatchManager,
|
|
private conflictResolutionService: ConflictResolutionService,
|
|
private eventBus: EventBus,
|
|
private workspaceRoot: string,
|
|
private agentRepository?: AgentRepository,
|
|
private agentManager?: AgentManager,
|
|
) {}
|
|
|
|
/**
|
|
* Start listening for events that drive execution.
|
|
*/
|
|
start(): void {
|
|
this.eventBus.on<TaskCompletedEvent>('task:completed', (event) => {
|
|
this.handleTaskCompleted(event).catch((err) => {
|
|
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling task:completed');
|
|
});
|
|
});
|
|
|
|
// Kick off dispatch when phases are queued (e.g. user clicks Execute)
|
|
this.eventBus.on<PhaseQueuedEvent>('phase:queued', () => this.scheduleDispatch());
|
|
|
|
// Auto-complete task + re-dispatch when an agent finishes
|
|
this.eventBus.on<AgentStoppedEvent>('agent:stopped', (event) => {
|
|
this.handleAgentStopped(event).catch((err) => {
|
|
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling agent:stopped');
|
|
});
|
|
});
|
|
|
|
// Auto-retry crashed agent tasks (up to MAX_TASK_RETRIES)
|
|
this.eventBus.on<AgentCrashedEvent>('agent:crashed', (event) => {
|
|
this.handleAgentCrashed(event).catch((err) => {
|
|
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling agent:crashed');
|
|
});
|
|
});
|
|
|
|
// Recover in-memory dispatch queues from DB state (survives server restarts)
|
|
this.recoverDispatchQueues().catch((err) => {
|
|
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch queue recovery failed');
|
|
});
|
|
|
|
log.info('execution orchestrator started');
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Coalesced dispatch scheduler
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Schedule a dispatch cycle. Coalesces multiple rapid calls (e.g. from
|
|
* synchronous EventEmitter) into a single async run.
|
|
*/
|
|
private scheduleDispatch(): void {
|
|
if (this.dispatchRunning) {
|
|
this.dispatchPending = true;
|
|
return;
|
|
}
|
|
this.runDispatchCycle().catch((err) =>
|
|
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch cycle error'),
|
|
);
|
|
}
|
|
|
|
private async handleAgentStopped(event: AgentStoppedEvent): Promise<void> {
|
|
const { taskId, reason, agentId } = event.payload;
|
|
|
|
if (taskId && reason !== 'user_requested') {
|
|
try {
|
|
const result = await this.tryQualityReview(taskId, agentId, reason);
|
|
if (!result.reviewStarted) {
|
|
await this.dispatchManager.completeTask(taskId, agentId);
|
|
log.info({ taskId, agentId, reason }, 'task auto-completed on agent stop');
|
|
}
|
|
} catch (err) {
|
|
log.warn(
|
|
{ taskId, agentId, reason, err: err instanceof Error ? err.message : String(err) },
|
|
'failed to handle agent stop',
|
|
);
|
|
}
|
|
}
|
|
|
|
this.scheduleDispatch();
|
|
}
|
|
|
|
/**
|
|
* Attempt to run quality review for a stopping agent.
|
|
* Returns { reviewStarted: true } if quality review was initiated (callers must NOT call completeTask).
|
|
* Returns { reviewStarted: false } if no review needed (caller should call completeTask).
|
|
*/
|
|
private async tryQualityReview(taskId: string, agentId: string, reason: string): Promise<{ reviewStarted: boolean }> {
|
|
if (!this.agentRepository || !this.agentManager) {
|
|
return { reviewStarted: false };
|
|
}
|
|
|
|
const task = await this.taskRepository.findById(taskId);
|
|
if (!task?.phaseId || !task.initiativeId) {
|
|
return { reviewStarted: false };
|
|
}
|
|
|
|
const initiative = await this.initiativeRepository.findById(task.initiativeId);
|
|
if (!initiative?.branch) {
|
|
return { reviewStarted: false };
|
|
}
|
|
|
|
const phase = await this.phaseRepository.findById(task.phaseId);
|
|
if (!phase) {
|
|
return { reviewStarted: false };
|
|
}
|
|
|
|
const taskBranch = taskBranchName(initiative.branch, taskId);
|
|
const baseBranch = phaseBranchName(initiative.branch, phase.name);
|
|
|
|
const projects = await this.projectRepository.findProjectsByInitiativeId(task.initiativeId);
|
|
if (projects.length === 0) {
|
|
return { reviewStarted: false };
|
|
}
|
|
|
|
const repoPath = await ensureProjectClone(projects[0], this.workspaceRoot);
|
|
|
|
const result = await shouldRunQualityReview({
|
|
agentId,
|
|
taskId,
|
|
stopReason: reason,
|
|
repoPath,
|
|
taskBranch,
|
|
baseBranch,
|
|
agentRepository: this.agentRepository,
|
|
taskRepository: this.taskRepository,
|
|
initiativeRepository: this.initiativeRepository,
|
|
branchManager: this.branchManager,
|
|
});
|
|
|
|
if (!result.run) {
|
|
return { reviewStarted: false };
|
|
}
|
|
|
|
await runQualityReview({
|
|
taskId,
|
|
taskBranch,
|
|
baseBranch,
|
|
initiativeId: task.initiativeId,
|
|
qualifyingFiles: result.qualifyingFiles,
|
|
taskRepository: this.taskRepository,
|
|
agentManager: this.agentManager,
|
|
log,
|
|
});
|
|
|
|
return { reviewStarted: true };
|
|
}
|
|
|
|
private async handleAgentCrashed(event: AgentCrashedEvent): Promise<void> {
|
|
const { taskId, agentId, error } = event.payload;
|
|
if (!taskId) return;
|
|
|
|
const task = await this.taskRepository.findById(taskId);
|
|
if (!task || task.status !== 'in_progress') return;
|
|
|
|
const retryCount = (task.retryCount ?? 0) + 1;
|
|
if (retryCount > MAX_TASK_RETRIES) {
|
|
log.warn({ taskId, agentId, retryCount, error }, 'task exceeded max retries, leaving in_progress');
|
|
return;
|
|
}
|
|
|
|
// Reset task for re-dispatch with incremented retry count
|
|
await this.taskRepository.update(taskId, { status: 'pending', retryCount });
|
|
await this.dispatchManager.queue(taskId);
|
|
log.info({ taskId, agentId, retryCount, error }, 'crashed task re-queued for retry');
|
|
|
|
this.scheduleDispatch();
|
|
}
|
|
|
|
private async runDispatchCycle(): Promise<void> {
|
|
this.dispatchRunning = true;
|
|
try {
|
|
do {
|
|
this.dispatchPending = false;
|
|
// Dispatch all ready phases (each queues its tasks internally)
|
|
while (true) {
|
|
const result = await this.phaseDispatchManager.dispatchNextPhase();
|
|
if (!result.success) break;
|
|
log.info({ phaseId: result.phaseId }, 'auto-dispatched phase');
|
|
}
|
|
// Dispatch all ready tasks to idle agents
|
|
while (true) {
|
|
const result = await this.dispatchManager.dispatchNext();
|
|
if (!result.success) break;
|
|
log.info({ taskId: result.taskId, agentId: result.agentId }, 'auto-dispatched task');
|
|
}
|
|
} while (this.dispatchPending);
|
|
} finally {
|
|
this.dispatchRunning = false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle a task:completed event.
|
|
* Merges the task branch into the phase branch, then checks if all phase tasks are done.
|
|
*/
|
|
private async handleTaskCompleted(event: TaskCompletedEvent): Promise<void> {
|
|
const { taskId } = event.payload;
|
|
|
|
const task = await this.taskRepository.findById(taskId);
|
|
if (!task?.phaseId || !task.initiativeId) return;
|
|
|
|
const initiative = await this.initiativeRepository.findById(task.initiativeId);
|
|
const phase = await this.phaseRepository.findById(task.phaseId);
|
|
if (!phase) return;
|
|
|
|
// Merge task branch into phase branch (only when branches exist)
|
|
if (initiative?.branch && task.category !== 'merge' && task.category !== 'review') {
|
|
try {
|
|
const initBranch = initiative.branch;
|
|
const phBranch = phaseBranchName(initBranch, phase.name);
|
|
const tBranch = taskBranchName(initBranch, task.id);
|
|
|
|
// Serialize merges per phase
|
|
const lock = this.phaseMergeLocks.get(task.phaseId) ?? Promise.resolve();
|
|
const mergeOp = lock.then(async () => {
|
|
await this.mergeTaskIntoPhase(taskId, task.phaseId!, tBranch, phBranch);
|
|
});
|
|
this.phaseMergeLocks.set(task.phaseId, mergeOp.catch(() => {}));
|
|
await mergeOp;
|
|
} catch (err) {
|
|
log.error({ taskId, err: err instanceof Error ? err.message : String(err) }, 'task merge failed, still checking phase completion');
|
|
}
|
|
}
|
|
|
|
// Check if all phase tasks are done — always, regardless of branch/merge status
|
|
const phaseTasks = await this.taskRepository.findByPhaseId(task.phaseId);
|
|
const allDone = phaseTasks.every((t) => t.status === 'completed');
|
|
if (allDone) {
|
|
await this.handlePhaseAllTasksDone(task.phaseId);
|
|
}
|
|
|
|
// Fill freed agent slot with next queued task
|
|
this.scheduleDispatch();
|
|
}
|
|
|
|
/**
|
|
* Merge a task branch into the phase branch for all linked projects.
|
|
*/
|
|
private async mergeTaskIntoPhase(
|
|
taskId: string,
|
|
phaseId: string,
|
|
taskBranch: string,
|
|
phaseBranch: string,
|
|
): Promise<void> {
|
|
const phase = await this.phaseRepository.findById(phaseId);
|
|
if (!phase) return;
|
|
|
|
const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId);
|
|
|
|
for (const project of projects) {
|
|
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
|
|
|
|
// Only merge if the task branch actually exists (agents may not have pushed)
|
|
const exists = await this.branchManager.branchExists(clonePath, taskBranch);
|
|
if (!exists) {
|
|
log.debug({ taskId, taskBranch, project: project.name }, 'task branch does not exist, skipping merge');
|
|
continue;
|
|
}
|
|
|
|
const result = await this.branchManager.mergeBranch(clonePath, taskBranch, phaseBranch);
|
|
|
|
if (!result.success && result.conflicts) {
|
|
log.warn({ taskId, taskBranch, phaseBranch, conflicts: result.conflicts }, 'task merge conflict');
|
|
const conflictTask = await this.conflictResolutionService.handleConflict(taskId, result.conflicts, {
|
|
sourceBranch: taskBranch,
|
|
targetBranch: phaseBranch,
|
|
});
|
|
if (conflictTask) {
|
|
await this.dispatchManager.queue(conflictTask.id);
|
|
log.info({ taskId: conflictTask.id, originalTaskId: taskId }, 'conflict resolution task queued for dispatch');
|
|
}
|
|
return;
|
|
}
|
|
|
|
log.info({ taskId, taskBranch, phaseBranch, project: project.name }, 'task branch merged into phase branch');
|
|
}
|
|
|
|
// Invalidate diff cache — phase branch HEAD has advanced after merges
|
|
phaseMetaCache.invalidateByPrefix(`${phaseId}:`);
|
|
fileDiffCache.invalidateByPrefix(`${phaseId}:`);
|
|
|
|
// Emit task:merged event
|
|
const mergedEvent: TaskMergedEvent = {
|
|
type: 'task:merged',
|
|
timestamp: new Date(),
|
|
payload: { taskId, phaseId, sourceBranch: taskBranch, targetBranch: phaseBranch },
|
|
};
|
|
this.eventBus.emit(mergedEvent);
|
|
}
|
|
|
|
/**
|
|
* Handle all tasks in a phase being done.
|
|
* YOLO mode: auto-merge phase → initiative.
|
|
* Review mode: set phase to pending_review.
|
|
*/
|
|
private async handlePhaseAllTasksDone(phaseId: string): Promise<void> {
|
|
const phase = await this.phaseRepository.findById(phaseId);
|
|
if (!phase) return;
|
|
|
|
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
|
|
if (!initiative) return;
|
|
|
|
if (initiative.executionMode === 'yolo') {
|
|
// Merge phase branch into initiative branch (only when branches exist)
|
|
if (initiative.branch) {
|
|
await this.mergePhaseIntoInitiative(phaseId);
|
|
}
|
|
await this.phaseDispatchManager.completePhase(phaseId);
|
|
|
|
// Re-queue approved phases (self-healing: survives server restarts that wipe in-memory queue)
|
|
await this.requeueApprovedPhases(phase.initiativeId);
|
|
|
|
// Check if this was the last phase — if so, trigger initiative review
|
|
const dispatched = await this.phaseDispatchManager.dispatchNextPhase();
|
|
if (!dispatched.success) {
|
|
await this.checkInitiativeCompletion(phase.initiativeId);
|
|
}
|
|
this.scheduleDispatch();
|
|
} else {
|
|
// review_per_phase
|
|
await this.phaseRepository.update(phaseId, { status: 'pending_review' as any });
|
|
|
|
const event: PhasePendingReviewEvent = {
|
|
type: 'phase:pending_review',
|
|
timestamp: new Date(),
|
|
payload: { phaseId, initiativeId: phase.initiativeId },
|
|
};
|
|
this.eventBus.emit(event);
|
|
|
|
log.info({ phaseId, initiativeId: phase.initiativeId }, 'phase pending review');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Merge phase branch into initiative branch for all linked projects.
|
|
*/
|
|
private async mergePhaseIntoInitiative(phaseId: string): Promise<void> {
|
|
const phase = await this.phaseRepository.findById(phaseId);
|
|
if (!phase) return;
|
|
|
|
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
|
|
if (!initiative?.branch) return;
|
|
|
|
const initBranch = initiative.branch;
|
|
const phBranch = phaseBranchName(initBranch, phase.name);
|
|
|
|
const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId);
|
|
|
|
// Store merge base before merging so we can reconstruct diffs for completed phases
|
|
for (const project of projects) {
|
|
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
|
|
try {
|
|
const mergeBase = await this.branchManager.getMergeBase(clonePath, initBranch, phBranch);
|
|
await this.phaseRepository.update(phaseId, { mergeBase });
|
|
break; // Only need one merge base (first project)
|
|
} catch {
|
|
// Phase branch may not exist in this project clone
|
|
}
|
|
}
|
|
|
|
for (const project of projects) {
|
|
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
|
|
const result = await this.branchManager.mergeBranch(clonePath, phBranch, initBranch);
|
|
|
|
if (!result.success) {
|
|
log.error({ phaseId, phBranch, initBranch, conflicts: result.conflicts }, 'phase merge conflict');
|
|
throw new Error(`Phase merge conflict: ${result.message}`);
|
|
}
|
|
|
|
log.info({ phaseId, phBranch, initBranch, project: project.name }, 'phase branch merged into initiative branch');
|
|
}
|
|
|
|
// Emit phase:merged event
|
|
const mergedEvent: PhaseMergedEvent = {
|
|
type: 'phase:merged',
|
|
timestamp: new Date(),
|
|
payload: { phaseId, initiativeId: phase.initiativeId, sourceBranch: phBranch, targetBranch: initBranch },
|
|
};
|
|
this.eventBus.emit(mergedEvent);
|
|
}
|
|
|
|
/**
|
|
* Approve and merge a phase that's pending review.
|
|
* Called from tRPC when user clicks approve in ReviewTab.
|
|
*/
|
|
async approveAndMergePhase(phaseId: string): Promise<void> {
|
|
const phase = await this.phaseRepository.findById(phaseId);
|
|
if (!phase) throw new Error(`Phase not found: ${phaseId}`);
|
|
if (phase.status !== 'pending_review') {
|
|
throw new Error(`Phase ${phaseId} is not pending review (status: ${phase.status})`);
|
|
}
|
|
|
|
await this.mergePhaseIntoInitiative(phaseId);
|
|
await this.phaseDispatchManager.completePhase(phaseId);
|
|
|
|
// Re-queue approved phases (self-healing: survives server restarts that wipe in-memory queue)
|
|
await this.requeueApprovedPhases(phase.initiativeId);
|
|
|
|
// Check if this was the last phase — if so, trigger initiative review
|
|
const dispatched = await this.phaseDispatchManager.dispatchNextPhase();
|
|
if (!dispatched.success) {
|
|
await this.checkInitiativeCompletion(phase.initiativeId);
|
|
}
|
|
this.scheduleDispatch();
|
|
|
|
log.info({ phaseId }, 'phase review approved and merged');
|
|
}
|
|
|
|
/**
|
|
* Request changes on a phase that's pending review.
|
|
* Creates a revision task, resets the phase to in_progress, and dispatches.
|
|
*/
|
|
async requestChangesOnPhase(
|
|
phaseId: string,
|
|
unresolvedThreads: Array<{
|
|
id: string;
|
|
filePath: string;
|
|
lineNumber: number;
|
|
body: string;
|
|
author: string;
|
|
replies: Array<{ id: string; body: string; author: string }>;
|
|
}>,
|
|
summary?: string,
|
|
): Promise<{ taskId: string }> {
|
|
const phase = await this.phaseRepository.findById(phaseId);
|
|
if (!phase) throw new Error(`Phase not found: ${phaseId}`);
|
|
if (phase.status !== 'pending_review') {
|
|
throw new Error(`Phase ${phaseId} is not pending review (status: ${phase.status})`);
|
|
}
|
|
|
|
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
|
|
if (!initiative) throw new Error(`Initiative not found: ${phase.initiativeId}`);
|
|
|
|
// Guard: don't create duplicate review tasks
|
|
const existingTasks = await this.taskRepository.findByPhaseId(phaseId);
|
|
const activeReview = existingTasks.find(
|
|
(t) => t.category === 'review' && (t.status === 'pending' || t.status === 'in_progress'),
|
|
);
|
|
if (activeReview) {
|
|
return { taskId: activeReview.id };
|
|
}
|
|
|
|
// Build revision task description from threaded comments + summary
|
|
const lines: string[] = [];
|
|
if (summary) {
|
|
lines.push(`## Summary\n\n${summary}\n`);
|
|
}
|
|
if (unresolvedThreads.length > 0) {
|
|
lines.push('## Review Comments\n');
|
|
// Group comments by file
|
|
const byFile = new Map<string, typeof unresolvedThreads>();
|
|
for (const c of unresolvedThreads) {
|
|
const arr = byFile.get(c.filePath) ?? [];
|
|
arr.push(c);
|
|
byFile.set(c.filePath, arr);
|
|
}
|
|
for (const [filePath, fileComments] of byFile) {
|
|
lines.push(`### ${filePath}\n`);
|
|
for (const c of fileComments) {
|
|
lines.push(`#### Line ${c.lineNumber} [comment:${c.id}]`);
|
|
lines.push(`**${c.author}**: ${c.body}`);
|
|
for (const r of c.replies) {
|
|
lines.push(`> **${r.author}**: ${r.body}`);
|
|
}
|
|
lines.push('');
|
|
}
|
|
}
|
|
}
|
|
|
|
const description = lines.join('\n') || 'Address review feedback.';
|
|
|
|
// Create revision task
|
|
const task = await this.taskRepository.create({
|
|
phaseId,
|
|
initiativeId: phase.initiativeId,
|
|
name: `Address review feedback: ${phase.name}`,
|
|
description,
|
|
category: 'review',
|
|
priority: 'high',
|
|
});
|
|
|
|
// Reset phase status back to in_progress
|
|
await this.phaseRepository.update(phaseId, { status: 'in_progress' as any });
|
|
|
|
// Queue task for dispatch
|
|
await this.dispatchManager.queue(task.id);
|
|
|
|
// Emit event
|
|
const event: PhaseChangesRequestedEvent = {
|
|
type: 'phase:changes_requested',
|
|
timestamp: new Date(),
|
|
payload: {
|
|
phaseId,
|
|
initiativeId: phase.initiativeId,
|
|
taskId: task.id,
|
|
commentCount: unresolvedThreads.length,
|
|
},
|
|
};
|
|
this.eventBus.emit(event);
|
|
|
|
log.info({ phaseId, taskId: task.id, commentCount: unresolvedThreads.length }, 'changes requested on phase');
|
|
|
|
// Kick off dispatch
|
|
this.scheduleDispatch();
|
|
|
|
return { taskId: task.id };
|
|
}
|
|
|
|
/**
|
|
* Request changes on an initiative that's pending review.
|
|
* Creates/reuses a "Finalization" phase and adds a review task to it.
|
|
*/
|
|
async requestChangesOnInitiative(
|
|
initiativeId: string,
|
|
summary: string,
|
|
): Promise<{ taskId: string }> {
|
|
const initiative = await this.initiativeRepository.findById(initiativeId);
|
|
if (!initiative) throw new Error(`Initiative not found: ${initiativeId}`);
|
|
if (initiative.status !== 'pending_review') {
|
|
throw new Error(`Initiative ${initiativeId} is not pending review (status: ${initiative.status})`);
|
|
}
|
|
|
|
// Find or create a "Finalization" phase
|
|
const phases = await this.phaseRepository.findByInitiativeId(initiativeId);
|
|
let finalizationPhase = phases.find((p) => p.name === 'Finalization');
|
|
|
|
if (!finalizationPhase) {
|
|
finalizationPhase = await this.phaseRepository.create({
|
|
initiativeId,
|
|
name: 'Finalization',
|
|
status: 'in_progress',
|
|
});
|
|
} else if (finalizationPhase.status === 'completed' || finalizationPhase.status === 'pending_review') {
|
|
await this.phaseRepository.update(finalizationPhase.id, { status: 'in_progress' as any });
|
|
}
|
|
|
|
// Guard: don't create duplicate review tasks
|
|
const existingTasks = await this.taskRepository.findByPhaseId(finalizationPhase.id);
|
|
const activeReview = existingTasks.find(
|
|
(t) => t.category === 'review' && (t.status === 'pending' || t.status === 'in_progress'),
|
|
);
|
|
if (activeReview) {
|
|
// Still reset initiative to active
|
|
await this.initiativeRepository.update(initiativeId, { status: 'active' as any });
|
|
this.scheduleDispatch();
|
|
return { taskId: activeReview.id };
|
|
}
|
|
|
|
// Create review task
|
|
const task = await this.taskRepository.create({
|
|
phaseId: finalizationPhase.id,
|
|
initiativeId,
|
|
name: `Address initiative review feedback`,
|
|
description: `## Summary\n\n${summary}`,
|
|
category: 'review',
|
|
priority: 'high',
|
|
});
|
|
|
|
// Reset initiative status to active
|
|
await this.initiativeRepository.update(initiativeId, { status: 'active' as any });
|
|
|
|
// Queue task for dispatch
|
|
await this.dispatchManager.queue(task.id);
|
|
|
|
// Emit event
|
|
const event: InitiativeChangesRequestedEvent = {
|
|
type: 'initiative:changes_requested',
|
|
timestamp: new Date(),
|
|
payload: {
|
|
initiativeId,
|
|
phaseId: finalizationPhase.id,
|
|
taskId: task.id,
|
|
},
|
|
};
|
|
this.eventBus.emit(event);
|
|
|
|
log.info({ initiativeId, phaseId: finalizationPhase.id, taskId: task.id }, 'changes requested on initiative');
|
|
|
|
this.scheduleDispatch();
|
|
|
|
return { taskId: task.id };
|
|
}
|
|
|
|
/**
|
|
* Re-queue approved phases for an initiative into the in-memory dispatch queue.
|
|
* Self-healing: ensures phases aren't lost if the server restarted since the
|
|
* initial queueAllPhases() call.
|
|
*/
|
|
private async requeueApprovedPhases(initiativeId: string): Promise<void> {
|
|
const phases = await this.phaseRepository.findByInitiativeId(initiativeId);
|
|
for (const p of phases) {
|
|
if (p.status === 'approved') {
|
|
try {
|
|
await this.phaseDispatchManager.queuePhase(p.id);
|
|
log.info({ phaseId: p.id }, 're-queued approved phase');
|
|
} catch {
|
|
// Already queued or status changed — safe to ignore
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Recover in-memory dispatch queues from DB state on server startup.
|
|
* Re-queues approved phases and pending tasks for in_progress phases.
|
|
*/
|
|
private async recoverDispatchQueues(): Promise<void> {
|
|
const initiatives = await this.initiativeRepository.findByStatus('active');
|
|
let phasesRecovered = 0;
|
|
let tasksRecovered = 0;
|
|
|
|
for (const initiative of initiatives) {
|
|
const phases = await this.phaseRepository.findByInitiativeId(initiative.id);
|
|
|
|
for (const phase of phases) {
|
|
// Re-queue approved phases into the phase dispatch queue
|
|
if (phase.status === 'approved') {
|
|
try {
|
|
await this.phaseDispatchManager.queuePhase(phase.id);
|
|
phasesRecovered++;
|
|
} catch {
|
|
// Already queued or status changed
|
|
}
|
|
}
|
|
|
|
// Re-queue pending tasks and recover stuck in_progress tasks for in_progress phases
|
|
if (phase.status === 'in_progress') {
|
|
const tasks = await this.taskRepository.findByPhaseId(phase.id);
|
|
for (const task of tasks) {
|
|
if (task.status === 'pending') {
|
|
try {
|
|
await this.dispatchManager.queue(task.id);
|
|
tasksRecovered++;
|
|
} catch {
|
|
// Already queued or task issue
|
|
}
|
|
} else if (task.status === 'in_progress' && this.agentRepository) {
|
|
// Check if the assigned agent is still alive
|
|
const agent = await this.agentRepository.findByTaskId(task.id);
|
|
const isAlive = agent && (agent.status === 'running' || agent.status === 'waiting_for_input');
|
|
if (!isAlive) {
|
|
// Agent is dead — reset task for re-dispatch
|
|
await this.taskRepository.update(task.id, { status: 'pending' });
|
|
await this.dispatchManager.queue(task.id);
|
|
tasksRecovered++;
|
|
log.info({ taskId: task.id, agentId: agent?.id }, 'recovered stuck in_progress task (dead agent)');
|
|
}
|
|
} else if (task.status === 'blocked' && this.agentRepository) {
|
|
// Task was blocked by merge conflict after agent had already completed.
|
|
// If the agent finished successfully, mark the task completed so the
|
|
// phase can progress.
|
|
const agent = await this.agentRepository.findByTaskId(task.id);
|
|
if (agent && (agent.status === 'idle' || agent.status === 'stopped')) {
|
|
await this.taskRepository.update(task.id, { status: 'completed' });
|
|
tasksRecovered++;
|
|
log.info({ taskId: task.id, agentId: agent.id }, 'recovered blocked task (agent completed, merge conflict)');
|
|
}
|
|
}
|
|
}
|
|
|
|
// Clean up stale duplicate planning tasks (e.g. a crashed detail task
|
|
// that was reset to pending, then a new detail task was created and completed).
|
|
const tasksAfterRecovery = await this.taskRepository.findByPhaseId(phase.id);
|
|
const completedPlanningNames = new Set<string>();
|
|
for (const t of tasksAfterRecovery) {
|
|
if (isPlanningCategory(t.category) && t.status === 'completed') {
|
|
completedPlanningNames.add(`${t.category}:${t.phaseId}`);
|
|
}
|
|
}
|
|
for (const t of tasksAfterRecovery) {
|
|
if (isPlanningCategory(t.category) && t.status === 'pending' && completedPlanningNames.has(`${t.category}:${t.phaseId}`)) {
|
|
await this.taskRepository.update(t.id, { status: 'completed', summary: 'Superseded by retry' });
|
|
tasksRecovered++;
|
|
log.info({ taskId: t.id, category: t.category }, 'recovered stale duplicate planning task');
|
|
}
|
|
}
|
|
|
|
// Re-read tasks after recovery updates and check if phase is now fully done
|
|
const updatedTasks = await this.taskRepository.findByPhaseId(phase.id);
|
|
const allDone = updatedTasks.every((t) => t.status === 'completed');
|
|
if (allDone && updatedTasks.length > 0) {
|
|
log.info({ phaseId: phase.id }, 'all tasks completed in in_progress phase, triggering phase completion');
|
|
await this.handlePhaseAllTasksDone(phase.id);
|
|
phasesRecovered++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (phasesRecovered > 0 || tasksRecovered > 0) {
|
|
log.info({ phasesRecovered, tasksRecovered }, 'recovered dispatch queues from DB state');
|
|
this.scheduleDispatch();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check if all phases for an initiative are completed.
|
|
* If so, set initiative to pending_review and emit event.
|
|
*/
|
|
private async checkInitiativeCompletion(initiativeId: string): Promise<void> {
|
|
const phases = await this.phaseRepository.findByInitiativeId(initiativeId);
|
|
if (phases.length === 0) return;
|
|
|
|
const allCompleted = phases.every((p) => p.status === 'completed');
|
|
if (!allCompleted) return;
|
|
|
|
const initiative = await this.initiativeRepository.findById(initiativeId);
|
|
if (!initiative?.branch) return;
|
|
if (initiative.status !== 'active') return;
|
|
|
|
await this.initiativeRepository.update(initiativeId, { status: 'pending_review' as any });
|
|
|
|
const event: InitiativePendingReviewEvent = {
|
|
type: 'initiative:pending_review',
|
|
timestamp: new Date(),
|
|
payload: { initiativeId, branch: initiative.branch },
|
|
};
|
|
this.eventBus.emit(event);
|
|
|
|
log.info({ initiativeId, branch: initiative.branch }, 'initiative pending review — all phases completed');
|
|
}
|
|
|
|
/**
|
|
* Approve an initiative review.
|
|
* Either pushes the initiative branch or merges into default + pushes.
|
|
*/
|
|
async approveInitiative(
|
|
initiativeId: string,
|
|
strategy: 'push_branch' | 'merge_and_push',
|
|
): Promise<void> {
|
|
const initiative = await this.initiativeRepository.findById(initiativeId);
|
|
if (!initiative) throw new Error(`Initiative not found: ${initiativeId}`);
|
|
if (initiative.status !== 'pending_review') {
|
|
throw new Error(`Initiative ${initiativeId} is not pending review (status: ${initiative.status})`);
|
|
}
|
|
if (!initiative.branch) {
|
|
throw new Error(`Initiative ${initiativeId} has no branch configured`);
|
|
}
|
|
|
|
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
|
|
|
|
for (const project of projects) {
|
|
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
|
|
const branchExists = await this.branchManager.branchExists(clonePath, initiative.branch);
|
|
if (!branchExists) {
|
|
log.warn({ initiativeId, branch: initiative.branch, project: project.name }, 'initiative branch does not exist in project, skipping');
|
|
continue;
|
|
}
|
|
|
|
// Fetch remote so local branches are up-to-date before merge/push
|
|
await this.branchManager.fetchRemote(clonePath);
|
|
|
|
if (strategy === 'merge_and_push') {
|
|
// Fast-forward local defaultBranch to match origin before merging
|
|
try {
|
|
await this.branchManager.fastForwardBranch(clonePath, project.defaultBranch);
|
|
} catch (ffErr) {
|
|
log.warn({ project: project.name, err: (ffErr as Error).message }, 'fast-forward of default branch failed — attempting merge anyway');
|
|
}
|
|
const result = await this.branchManager.mergeBranch(clonePath, initiative.branch, project.defaultBranch);
|
|
if (!result.success) {
|
|
throw new Error(`Failed to merge ${initiative.branch} into ${project.defaultBranch} for project ${project.name}: ${result.message}`);
|
|
}
|
|
try {
|
|
await this.branchManager.pushBranch(clonePath, project.defaultBranch);
|
|
} catch (pushErr) {
|
|
// Roll back the merge so the diff doesn't disappear from the review tab.
|
|
// Without rollback, defaultBranch includes the initiative changes and the
|
|
// three-dot diff (defaultBranch...initiativeBranch) becomes empty.
|
|
if (result.previousRef) {
|
|
log.warn({ project: project.name, previousRef: result.previousRef }, 'push failed — rolling back merge');
|
|
await this.branchManager.updateRef(clonePath, project.defaultBranch, result.previousRef);
|
|
}
|
|
throw pushErr;
|
|
}
|
|
log.info({ initiativeId, project: project.name }, 'initiative branch merged into default and pushed');
|
|
} else {
|
|
await this.branchManager.pushBranch(clonePath, initiative.branch);
|
|
log.info({ initiativeId, project: project.name }, 'initiative branch pushed to remote');
|
|
}
|
|
}
|
|
|
|
await this.initiativeRepository.update(initiativeId, { status: 'completed' as any });
|
|
|
|
const event: InitiativeReviewApprovedEvent = {
|
|
type: 'initiative:review_approved',
|
|
timestamp: new Date(),
|
|
payload: { initiativeId, branch: initiative.branch, strategy },
|
|
};
|
|
this.eventBus.emit(event);
|
|
|
|
log.info({ initiativeId, strategy }, 'initiative review approved');
|
|
}
|
|
}
|