/** * Default Phase Dispatch Manager - Adapter Implementation * * Implements PhaseDispatchManager interface with in-memory queue * and dependency-ordered dispatch. * * This is the ADAPTER for the PhaseDispatchManager PORT. */ import type { EventBus, PhaseQueuedEvent, PhaseStartedEvent, PhaseCompletedEvent, PhaseBlockedEvent, } from '../events/index.js'; import type { PhaseRepository } from '../db/repositories/phase-repository.js'; import type { TaskRepository } from '../db/repositories/task-repository.js'; import type { InitiativeRepository } from '../db/repositories/initiative-repository.js'; import type { ProjectRepository } from '../db/repositories/project-repository.js'; import type { BranchManager } from '../git/branch-manager.js'; import type { PhaseDispatchManager, DispatchManager, QueuedPhase, PhaseDispatchResult } from './types.js'; import { phaseBranchName, isPlanningCategory } from '../git/branch-naming.js'; import { ensureProjectClone } from '../git/project-clones.js'; import type { ProjectSyncManager } from '../git/remote-sync.js'; import { createModuleLogger } from '../logger/index.js'; const log = createModuleLogger('phase-dispatch'); // ============================================================================= // Internal Types // ============================================================================= /** * Internal representation of a blocked phase. */ interface BlockedPhase { phaseId: string; reason: string; } // ============================================================================= // DefaultPhaseDispatchManager Implementation // ============================================================================= /** * In-memory implementation of PhaseDispatchManager. * * Uses Map for queue management and checks phase_dependencies table * for dependency resolution. */ export class DefaultPhaseDispatchManager implements PhaseDispatchManager { /** Internal queue of phases pending dispatch */ private phaseQueue: Map = new Map(); /** Blocked phases with their reasons */ private blockedPhases: Map = new Map(); constructor( private phaseRepository: PhaseRepository, private taskRepository: TaskRepository, private dispatchManager: DispatchManager, private eventBus: EventBus, private initiativeRepository?: InitiativeRepository, private projectRepository?: ProjectRepository, private branchManager?: BranchManager, private workspaceRoot?: string, private projectSyncManager?: ProjectSyncManager, ) {} /** * Queue a phase for dispatch. * Only approved phases can be queued. * Fetches phase dependencies and adds to internal queue. */ async queuePhase(phaseId: string): Promise { // Fetch phase to verify it exists and get initiativeId const phase = await this.phaseRepository.findById(phaseId); if (!phase) { throw new Error(`Phase not found: ${phaseId}`); } // Approval gate: only approved phases can be queued if (phase.status !== 'approved') { throw new Error(`Phase '${phaseId}' must be approved before queuing (current status: ${phase.status})`); } // Get dependencies for this phase const dependsOn = await this.phaseRepository.getDependencies(phaseId); const queuedPhase: QueuedPhase = { phaseId, initiativeId: phase.initiativeId, queuedAt: new Date(), dependsOn, }; this.phaseQueue.set(phaseId, queuedPhase); // Emit PhaseQueuedEvent const event: PhaseQueuedEvent = { type: 'phase:queued', timestamp: new Date(), payload: { phaseId, initiativeId: phase.initiativeId, dependsOn, }, }; this.eventBus.emit(event); } /** * Get next dispatchable phase. * Returns phase with all dependencies complete, sorted by queuedAt (oldest first). */ async getNextDispatchablePhase(): Promise { const queuedPhases = Array.from(this.phaseQueue.values()); if (queuedPhases.length === 0) { return null; } // Filter to only phases with all dependencies complete const readyPhases: QueuedPhase[] = []; for (const qp of queuedPhases) { const allDepsComplete = await this.areAllPhaseDependenciesComplete(qp.dependsOn); if (allDepsComplete) { readyPhases.push(qp); } } if (readyPhases.length === 0) { return null; } // Sort by queuedAt (oldest first) readyPhases.sort((a, b) => a.queuedAt.getTime() - b.queuedAt.getTime()); return readyPhases[0]; } /** * Dispatch next available phase. * Updates phase status to 'in_progress', queues its tasks, and emits PhaseStartedEvent. */ async dispatchNextPhase(): Promise { // Get next dispatchable phase const nextPhase = await this.getNextDispatchablePhase(); if (!nextPhase) { return { success: false, phaseId: '', reason: 'No dispatchable phases', }; } // Get phase details for event const phase = await this.phaseRepository.findById(nextPhase.phaseId); if (!phase) { return { success: false, phaseId: nextPhase.phaseId, reason: 'Phase not found', }; } // Create phase branch in all linked project clones (must succeed before marking in_progress) if (this.initiativeRepository && this.projectRepository && this.branchManager && this.workspaceRoot) { try { const initiative = await this.initiativeRepository.findById(phase.initiativeId); if (initiative?.branch) { const initBranch = initiative.branch; const phBranch = phaseBranchName(initBranch, phase.name); const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId); // 1. Sync project clones (git fetch + ff-only defaultBranch) before branching if (this.projectSyncManager) { for (const project of projects) { try { const result = await this.projectSyncManager.syncProject(project.id); if (result.success) { log.info({ project: project.name, fetched: result.fetched, ff: result.fastForwarded }, 'synced before phase dispatch'); } else { log.warn({ project: project.name, err: result.error }, 'sync failed before phase dispatch (continuing)'); } } catch (syncErr) { log.warn({ project: project.name, err: syncErr instanceof Error ? syncErr.message : String(syncErr) }, 'sync error (continuing)'); } } } // 2. Create initiative and phase branches from (now up-to-date) defaultBranch for (const project of projects) { const clonePath = await ensureProjectClone(project, this.workspaceRoot); await this.branchManager.ensureBranch(clonePath, initBranch, project.defaultBranch); await this.branchManager.ensureBranch(clonePath, phBranch, initBranch); } log.info({ phaseId: nextPhase.phaseId, phBranch, initBranch }, 'phase branch created'); } } catch (err) { const reason = `Branch creation failed: ${err instanceof Error ? err.message : String(err)}`; log.error({ phaseId: nextPhase.phaseId, err: reason }, 'phase branch creation failed, blocking phase'); await this.blockPhase(nextPhase.phaseId, reason); return { success: false, phaseId: nextPhase.phaseId, reason }; } } // Update phase status to 'in_progress' (only after branches confirmed) await this.phaseRepository.update(nextPhase.phaseId, { status: 'in_progress' }); // Remove from queue (now being worked on) this.phaseQueue.delete(nextPhase.phaseId); // Auto-queue pending execution tasks for this phase (skip planning-category tasks) const phaseTasks = await this.taskRepository.findByPhaseId(nextPhase.phaseId); for (const task of phaseTasks) { if (task.status === 'pending' && !isPlanningCategory(task.category)) { await this.dispatchManager.queue(task.id); } } // Emit PhaseStartedEvent const event: PhaseStartedEvent = { type: 'phase:started', timestamp: new Date(), payload: { phaseId: nextPhase.phaseId, initiativeId: phase.initiativeId, }, }; this.eventBus.emit(event); return { success: true, phaseId: nextPhase.phaseId, }; } /** * Mark a phase as complete. * Updates phase status and removes from queue. */ async completePhase(phaseId: string): Promise { // Get phase for event const phase = await this.phaseRepository.findById(phaseId); if (!phase) { throw new Error(`Phase not found: ${phaseId}`); } // Update phase status to 'completed' await this.phaseRepository.update(phaseId, { status: 'completed' }); // Remove from queue this.phaseQueue.delete(phaseId); // Also remove from blocked if it was there this.blockedPhases.delete(phaseId); // Emit PhaseCompletedEvent const event: PhaseCompletedEvent = { type: 'phase:completed', timestamp: new Date(), payload: { phaseId, initiativeId: phase.initiativeId, success: true, message: 'Phase completed', }, }; this.eventBus.emit(event); } /** * Mark a phase as blocked. * Updates phase status and records block reason. */ async blockPhase(phaseId: string, reason: string): Promise { // Update phase status to 'blocked' await this.phaseRepository.update(phaseId, { status: 'blocked' }); // Record in blocked map this.blockedPhases.set(phaseId, { phaseId, reason }); // Remove from queue (blocked phases aren't dispatchable) this.phaseQueue.delete(phaseId); // Emit PhaseBlockedEvent const event: PhaseBlockedEvent = { type: 'phase:blocked', timestamp: new Date(), payload: { phaseId, reason, }, }; this.eventBus.emit(event); } /** * Get current phase queue state. */ async getPhaseQueueState(): Promise<{ queued: QueuedPhase[]; ready: QueuedPhase[]; blocked: Array<{ phaseId: string; reason: string }>; }> { const allQueued = Array.from(this.phaseQueue.values()); // Determine which are ready const ready: QueuedPhase[] = []; for (const qp of allQueued) { const allDepsComplete = await this.areAllPhaseDependenciesComplete(qp.dependsOn); if (allDepsComplete) { ready.push(qp); } } return { queued: allQueued, ready, blocked: Array.from(this.blockedPhases.values()), }; } // ============================================================================= // Private Helpers // ============================================================================= /** * Check if all phase dependencies are complete. */ private async areAllPhaseDependenciesComplete(dependsOn: string[]): Promise { if (dependsOn.length === 0) { return true; } for (const depPhaseId of dependsOn) { const depPhase = await this.phaseRepository.findById(depPhaseId); if (!depPhase || depPhase.status !== 'completed') { return false; } } return true; } }