|
|
|
|
@@ -11,7 +11,7 @@
|
|
|
|
|
* - Review per-phase: pause after each phase for diff review
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseChangesRequestedEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent, InitiativePendingReviewEvent, InitiativeReviewApprovedEvent } from '../events/index.js';
|
|
|
|
|
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseChangesRequestedEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent, InitiativePendingReviewEvent, InitiativeReviewApprovedEvent, InitiativeChangesRequestedEvent } from '../events/index.js';
|
|
|
|
|
import type { BranchManager } from '../git/branch-manager.js';
|
|
|
|
|
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
|
|
|
|
|
import type { TaskRepository } from '../db/repositories/task-repository.js';
|
|
|
|
|
@@ -150,20 +150,20 @@ export class ExecutionOrchestrator {
|
|
|
|
|
const phase = await this.phaseRepository.findById(task.phaseId);
|
|
|
|
|
if (!phase) return;
|
|
|
|
|
|
|
|
|
|
// Skip merge/review tasks — they already work on the phase branch directly
|
|
|
|
|
if (task.category === 'merge' || task.category === 'review') return;
|
|
|
|
|
// Skip merge for review/merge tasks — they already work on the phase branch directly
|
|
|
|
|
if (task.category !== 'merge' && task.category !== 'review') {
|
|
|
|
|
const initBranch = initiative.branch;
|
|
|
|
|
const phBranch = phaseBranchName(initBranch, phase.name);
|
|
|
|
|
const tBranch = taskBranchName(initBranch, task.id);
|
|
|
|
|
|
|
|
|
|
const initBranch = initiative.branch;
|
|
|
|
|
const phBranch = phaseBranchName(initBranch, phase.name);
|
|
|
|
|
const tBranch = taskBranchName(initBranch, task.id);
|
|
|
|
|
|
|
|
|
|
// Serialize merges per phase
|
|
|
|
|
const lock = this.phaseMergeLocks.get(task.phaseId) ?? Promise.resolve();
|
|
|
|
|
const mergeOp = lock.then(async () => {
|
|
|
|
|
await this.mergeTaskIntoPhase(taskId, task.phaseId!, tBranch, phBranch);
|
|
|
|
|
});
|
|
|
|
|
this.phaseMergeLocks.set(task.phaseId, mergeOp.catch(() => {}));
|
|
|
|
|
await mergeOp;
|
|
|
|
|
// Serialize merges per phase
|
|
|
|
|
const lock = this.phaseMergeLocks.get(task.phaseId) ?? Promise.resolve();
|
|
|
|
|
const mergeOp = lock.then(async () => {
|
|
|
|
|
await this.mergeTaskIntoPhase(taskId, task.phaseId!, tBranch, phBranch);
|
|
|
|
|
});
|
|
|
|
|
this.phaseMergeLocks.set(task.phaseId, mergeOp.catch(() => {}));
|
|
|
|
|
await mergeOp;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check if all phase tasks are done
|
|
|
|
|
const phaseTasks = await this.taskRepository.findByPhaseId(task.phaseId);
|
|
|
|
|
@@ -356,6 +356,15 @@ export class ExecutionOrchestrator {
|
|
|
|
|
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
|
|
|
|
|
if (!initiative) throw new Error(`Initiative not found: ${phase.initiativeId}`);
|
|
|
|
|
|
|
|
|
|
// Guard: don't create duplicate review tasks
|
|
|
|
|
const existingTasks = await this.taskRepository.findByPhaseId(phaseId);
|
|
|
|
|
const activeReview = existingTasks.find(
|
|
|
|
|
(t) => t.category === 'review' && (t.status === 'pending' || t.status === 'in_progress'),
|
|
|
|
|
);
|
|
|
|
|
if (activeReview) {
|
|
|
|
|
return { taskId: activeReview.id };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build revision task description from comments + summary
|
|
|
|
|
const lines: string[] = [];
|
|
|
|
|
if (summary) {
|
|
|
|
|
@@ -418,6 +427,81 @@ export class ExecutionOrchestrator {
|
|
|
|
|
return { taskId: task.id };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Request changes on an initiative that's pending review.
|
|
|
|
|
* Creates/reuses a "Finalization" phase and adds a review task to it.
|
|
|
|
|
*/
|
|
|
|
|
async requestChangesOnInitiative(
|
|
|
|
|
initiativeId: string,
|
|
|
|
|
summary: string,
|
|
|
|
|
): Promise<{ taskId: string }> {
|
|
|
|
|
const initiative = await this.initiativeRepository.findById(initiativeId);
|
|
|
|
|
if (!initiative) throw new Error(`Initiative not found: ${initiativeId}`);
|
|
|
|
|
if (initiative.status !== 'pending_review') {
|
|
|
|
|
throw new Error(`Initiative ${initiativeId} is not pending review (status: ${initiative.status})`);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Find or create a "Finalization" phase
|
|
|
|
|
const phases = await this.phaseRepository.findByInitiativeId(initiativeId);
|
|
|
|
|
let finalizationPhase = phases.find((p) => p.name === 'Finalization');
|
|
|
|
|
|
|
|
|
|
if (!finalizationPhase) {
|
|
|
|
|
finalizationPhase = await this.phaseRepository.create({
|
|
|
|
|
initiativeId,
|
|
|
|
|
name: 'Finalization',
|
|
|
|
|
status: 'in_progress',
|
|
|
|
|
});
|
|
|
|
|
} else if (finalizationPhase.status === 'completed' || finalizationPhase.status === 'pending_review') {
|
|
|
|
|
await this.phaseRepository.update(finalizationPhase.id, { status: 'in_progress' as any });
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Guard: don't create duplicate review tasks
|
|
|
|
|
const existingTasks = await this.taskRepository.findByPhaseId(finalizationPhase.id);
|
|
|
|
|
const activeReview = existingTasks.find(
|
|
|
|
|
(t) => t.category === 'review' && (t.status === 'pending' || t.status === 'in_progress'),
|
|
|
|
|
);
|
|
|
|
|
if (activeReview) {
|
|
|
|
|
// Still reset initiative to active
|
|
|
|
|
await this.initiativeRepository.update(initiativeId, { status: 'active' as any });
|
|
|
|
|
this.scheduleDispatch();
|
|
|
|
|
return { taskId: activeReview.id };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create review task
|
|
|
|
|
const task = await this.taskRepository.create({
|
|
|
|
|
phaseId: finalizationPhase.id,
|
|
|
|
|
initiativeId,
|
|
|
|
|
name: `Address initiative review feedback`,
|
|
|
|
|
description: `## Summary\n\n${summary}`,
|
|
|
|
|
category: 'review',
|
|
|
|
|
priority: 'high',
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Reset initiative status to active
|
|
|
|
|
await this.initiativeRepository.update(initiativeId, { status: 'active' as any });
|
|
|
|
|
|
|
|
|
|
// Queue task for dispatch
|
|
|
|
|
await this.dispatchManager.queue(task.id);
|
|
|
|
|
|
|
|
|
|
// Emit event
|
|
|
|
|
const event: InitiativeChangesRequestedEvent = {
|
|
|
|
|
type: 'initiative:changes_requested',
|
|
|
|
|
timestamp: new Date(),
|
|
|
|
|
payload: {
|
|
|
|
|
initiativeId,
|
|
|
|
|
phaseId: finalizationPhase.id,
|
|
|
|
|
taskId: task.id,
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
this.eventBus.emit(event);
|
|
|
|
|
|
|
|
|
|
log.info({ initiativeId, phaseId: finalizationPhase.id, taskId: task.id }, 'changes requested on initiative');
|
|
|
|
|
|
|
|
|
|
this.scheduleDispatch();
|
|
|
|
|
|
|
|
|
|
return { taskId: task.id };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Re-queue approved phases for an initiative into the in-memory dispatch queue.
|
|
|
|
|
* Self-healing: ensures phases aren't lost if the server restarted since the
|
|
|
|
|
|