Files
Codewalkers/apps/server/dispatch/phase-manager.ts
Lukas May da8c714de2 fix: Handle spawn and branch failures gracefully in dispatch cycle
Wrap agentManager.spawn() in try/catch — on failure, block the task
instead of crashing the entire dispatch cycle. Move phase status update
to after branch creation succeeds — on branch failure, block the phase
and skip task queuing. Fix statement-breakpoint markers in migration
0020 to use separate lines.
2026-03-05 16:36:39 +01:00

350 lines
11 KiB
TypeScript

/**
* Default Phase Dispatch Manager - Adapter Implementation
*
* Implements PhaseDispatchManager interface with in-memory queue
* and dependency-ordered dispatch.
*
* This is the ADAPTER for the PhaseDispatchManager PORT.
*/
import type {
EventBus,
PhaseQueuedEvent,
PhaseStartedEvent,
PhaseCompletedEvent,
PhaseBlockedEvent,
} from '../events/index.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { BranchManager } from '../git/branch-manager.js';
import type { PhaseDispatchManager, DispatchManager, QueuedPhase, PhaseDispatchResult } from './types.js';
import { phaseBranchName, isPlanningCategory } from '../git/branch-naming.js';
import { ensureProjectClone } from '../git/project-clones.js';
import type { ProjectSyncManager } from '../git/remote-sync.js';
import { createModuleLogger } from '../logger/index.js';
const log = createModuleLogger('phase-dispatch');
// =============================================================================
// Internal Types
// =============================================================================
/**
* Internal representation of a blocked phase.
*/
interface BlockedPhase {
phaseId: string;
reason: string;
}
// =============================================================================
// DefaultPhaseDispatchManager Implementation
// =============================================================================
/**
* In-memory implementation of PhaseDispatchManager.
*
* Uses Map for queue management and checks phase_dependencies table
* for dependency resolution.
*/
export class DefaultPhaseDispatchManager implements PhaseDispatchManager {
/** Internal queue of phases pending dispatch */
private phaseQueue: Map<string, QueuedPhase> = new Map();
/** Blocked phases with their reasons */
private blockedPhases: Map<string, BlockedPhase> = new Map();
constructor(
private phaseRepository: PhaseRepository,
private taskRepository: TaskRepository,
private dispatchManager: DispatchManager,
private eventBus: EventBus,
private initiativeRepository?: InitiativeRepository,
private projectRepository?: ProjectRepository,
private branchManager?: BranchManager,
private workspaceRoot?: string,
private projectSyncManager?: ProjectSyncManager,
) {}
/**
* Queue a phase for dispatch.
* Only approved phases can be queued.
* Fetches phase dependencies and adds to internal queue.
*/
async queuePhase(phaseId: string): Promise<void> {
// Fetch phase to verify it exists and get initiativeId
const phase = await this.phaseRepository.findById(phaseId);
if (!phase) {
throw new Error(`Phase not found: ${phaseId}`);
}
// Approval gate: only approved phases can be queued
if (phase.status !== 'approved') {
throw new Error(`Phase '${phaseId}' must be approved before queuing (current status: ${phase.status})`);
}
// Get dependencies for this phase
const dependsOn = await this.phaseRepository.getDependencies(phaseId);
const queuedPhase: QueuedPhase = {
phaseId,
initiativeId: phase.initiativeId,
queuedAt: new Date(),
dependsOn,
};
this.phaseQueue.set(phaseId, queuedPhase);
// Emit PhaseQueuedEvent
const event: PhaseQueuedEvent = {
type: 'phase:queued',
timestamp: new Date(),
payload: {
phaseId,
initiativeId: phase.initiativeId,
dependsOn,
},
};
this.eventBus.emit(event);
}
/**
* Get next dispatchable phase.
* Returns phase with all dependencies complete, sorted by queuedAt (oldest first).
*/
async getNextDispatchablePhase(): Promise<QueuedPhase | null> {
const queuedPhases = Array.from(this.phaseQueue.values());
if (queuedPhases.length === 0) {
return null;
}
// Filter to only phases with all dependencies complete
const readyPhases: QueuedPhase[] = [];
for (const qp of queuedPhases) {
const allDepsComplete = await this.areAllPhaseDependenciesComplete(qp.dependsOn);
if (allDepsComplete) {
readyPhases.push(qp);
}
}
if (readyPhases.length === 0) {
return null;
}
// Sort by queuedAt (oldest first)
readyPhases.sort((a, b) => a.queuedAt.getTime() - b.queuedAt.getTime());
return readyPhases[0];
}
/**
* Dispatch next available phase.
* Updates phase status to 'in_progress', queues its tasks, and emits PhaseStartedEvent.
*/
async dispatchNextPhase(): Promise<PhaseDispatchResult> {
// Get next dispatchable phase
const nextPhase = await this.getNextDispatchablePhase();
if (!nextPhase) {
return {
success: false,
phaseId: '',
reason: 'No dispatchable phases',
};
}
// Get phase details for event
const phase = await this.phaseRepository.findById(nextPhase.phaseId);
if (!phase) {
return {
success: false,
phaseId: nextPhase.phaseId,
reason: 'Phase not found',
};
}
// Create phase branch in all linked project clones (must succeed before marking in_progress)
if (this.initiativeRepository && this.projectRepository && this.branchManager && this.workspaceRoot) {
try {
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
if (initiative?.branch) {
const initBranch = initiative.branch;
const phBranch = phaseBranchName(initBranch, phase.name);
const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId);
// 1. Sync project clones (git fetch + ff-only defaultBranch) before branching
if (this.projectSyncManager) {
for (const project of projects) {
try {
const result = await this.projectSyncManager.syncProject(project.id);
if (result.success) {
log.info({ project: project.name, fetched: result.fetched, ff: result.fastForwarded }, 'synced before phase dispatch');
} else {
log.warn({ project: project.name, err: result.error }, 'sync failed before phase dispatch (continuing)');
}
} catch (syncErr) {
log.warn({ project: project.name, err: syncErr instanceof Error ? syncErr.message : String(syncErr) }, 'sync error (continuing)');
}
}
}
// 2. Create initiative and phase branches from (now up-to-date) defaultBranch
for (const project of projects) {
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
await this.branchManager.ensureBranch(clonePath, initBranch, project.defaultBranch);
await this.branchManager.ensureBranch(clonePath, phBranch, initBranch);
}
log.info({ phaseId: nextPhase.phaseId, phBranch, initBranch }, 'phase branch created');
}
} catch (err) {
const reason = `Branch creation failed: ${err instanceof Error ? err.message : String(err)}`;
log.error({ phaseId: nextPhase.phaseId, err: reason }, 'phase branch creation failed, blocking phase');
await this.blockPhase(nextPhase.phaseId, reason);
return { success: false, phaseId: nextPhase.phaseId, reason };
}
}
// Update phase status to 'in_progress' (only after branches confirmed)
await this.phaseRepository.update(nextPhase.phaseId, { status: 'in_progress' });
// Remove from queue (now being worked on)
this.phaseQueue.delete(nextPhase.phaseId);
// Auto-queue pending execution tasks for this phase (skip planning-category tasks)
const phaseTasks = await this.taskRepository.findByPhaseId(nextPhase.phaseId);
for (const task of phaseTasks) {
if (task.status === 'pending' && !isPlanningCategory(task.category)) {
await this.dispatchManager.queue(task.id);
}
}
// Emit PhaseStartedEvent
const event: PhaseStartedEvent = {
type: 'phase:started',
timestamp: new Date(),
payload: {
phaseId: nextPhase.phaseId,
initiativeId: phase.initiativeId,
},
};
this.eventBus.emit(event);
return {
success: true,
phaseId: nextPhase.phaseId,
};
}
/**
* Mark a phase as complete.
* Updates phase status and removes from queue.
*/
async completePhase(phaseId: string): Promise<void> {
// Get phase for event
const phase = await this.phaseRepository.findById(phaseId);
if (!phase) {
throw new Error(`Phase not found: ${phaseId}`);
}
// Update phase status to 'completed'
await this.phaseRepository.update(phaseId, { status: 'completed' });
// Remove from queue
this.phaseQueue.delete(phaseId);
// Also remove from blocked if it was there
this.blockedPhases.delete(phaseId);
// Emit PhaseCompletedEvent
const event: PhaseCompletedEvent = {
type: 'phase:completed',
timestamp: new Date(),
payload: {
phaseId,
initiativeId: phase.initiativeId,
success: true,
message: 'Phase completed',
},
};
this.eventBus.emit(event);
}
/**
* Mark a phase as blocked.
* Updates phase status and records block reason.
*/
async blockPhase(phaseId: string, reason: string): Promise<void> {
// Update phase status to 'blocked'
await this.phaseRepository.update(phaseId, { status: 'blocked' });
// Record in blocked map
this.blockedPhases.set(phaseId, { phaseId, reason });
// Remove from queue (blocked phases aren't dispatchable)
this.phaseQueue.delete(phaseId);
// Emit PhaseBlockedEvent
const event: PhaseBlockedEvent = {
type: 'phase:blocked',
timestamp: new Date(),
payload: {
phaseId,
reason,
},
};
this.eventBus.emit(event);
}
/**
* Get current phase queue state.
*/
async getPhaseQueueState(): Promise<{
queued: QueuedPhase[];
ready: QueuedPhase[];
blocked: Array<{ phaseId: string; reason: string }>;
}> {
const allQueued = Array.from(this.phaseQueue.values());
// Determine which are ready
const ready: QueuedPhase[] = [];
for (const qp of allQueued) {
const allDepsComplete = await this.areAllPhaseDependenciesComplete(qp.dependsOn);
if (allDepsComplete) {
ready.push(qp);
}
}
return {
queued: allQueued,
ready,
blocked: Array.from(this.blockedPhases.values()),
};
}
// =============================================================================
// Private Helpers
// =============================================================================
/**
* Check if all phase dependencies are complete.
*/
private async areAllPhaseDependenciesComplete(dependsOn: string[]): Promise<boolean> {
if (dependsOn.length === 0) {
return true;
}
for (const depPhaseId of dependsOn) {
const depPhase = await this.phaseRepository.findById(depPhaseId);
if (!depPhase || depPhase.status !== 'completed') {
return false;
}
}
return true;
}
}