From 1ba95871f5f214e7f87bc2cf251db97385581a31 Mon Sep 17 00:00:00 2001 From: Lukas May Date: Mon, 2 Feb 2026 13:40:17 +0100 Subject: [PATCH] feat(14-04): implement DefaultPhaseDispatchManager adapter - In-memory queue with Map - Dependency checking via phaseRepository.getDependencies() - queuePhase: fetch phase, get dependencies, emit PhaseQueuedEvent - getNextDispatchablePhase: filter queue, sort by queuedAt - dispatchNextPhase: update status to in_progress, emit PhaseStartedEvent - completePhase: update status to completed, emit PhaseCompletedEvent - blockPhase: update status to blocked, emit PhaseBlockedEvent - getPhaseQueueState: return queued, ready, blocked arrays --- src/dispatch/phase-manager.ts | 277 ++++++++++++++++++++++++++++++++++ 1 file changed, 277 insertions(+) create mode 100644 src/dispatch/phase-manager.ts diff --git a/src/dispatch/phase-manager.ts b/src/dispatch/phase-manager.ts new file mode 100644 index 0000000..5e98560 --- /dev/null +++ b/src/dispatch/phase-manager.ts @@ -0,0 +1,277 @@ +/** + * Default Phase Dispatch Manager - Adapter Implementation + * + * Implements PhaseDispatchManager interface with in-memory queue + * and dependency-ordered dispatch. + * + * This is the ADAPTER for the PhaseDispatchManager PORT. + */ + +import type { + EventBus, + PhaseQueuedEvent, + PhaseStartedEvent, + PhaseCompletedEvent, + PhaseBlockedEvent, +} from '../events/index.js'; +import type { PhaseRepository } from '../db/repositories/phase-repository.js'; +import type { PhaseDispatchManager, QueuedPhase, PhaseDispatchResult } from './types.js'; + +// ============================================================================= +// Internal Types +// ============================================================================= + +/** + * Internal representation of a blocked phase. + */ +interface BlockedPhase { + phaseId: string; + reason: string; +} + +// ============================================================================= +// DefaultPhaseDispatchManager Implementation +// ============================================================================= + +/** + * In-memory implementation of PhaseDispatchManager. + * + * Uses Map for queue management and checks phase_dependencies table + * for dependency resolution. + */ +export class DefaultPhaseDispatchManager implements PhaseDispatchManager { + /** Internal queue of phases pending dispatch */ + private phaseQueue: Map = new Map(); + + /** Blocked phases with their reasons */ + private blockedPhases: Map = new Map(); + + constructor( + private phaseRepository: PhaseRepository, + private eventBus: EventBus + ) {} + + /** + * Queue a phase for dispatch. + * Fetches phase dependencies and adds to internal queue. + */ + async queuePhase(phaseId: string): Promise { + // Fetch phase to verify it exists and get initiativeId + const phase = await this.phaseRepository.findById(phaseId); + if (!phase) { + throw new Error(`Phase not found: ${phaseId}`); + } + + // Get dependencies for this phase + const dependsOn = await this.phaseRepository.getDependencies(phaseId); + + const queuedPhase: QueuedPhase = { + phaseId, + initiativeId: phase.initiativeId, + queuedAt: new Date(), + dependsOn, + }; + + this.phaseQueue.set(phaseId, queuedPhase); + + // Emit PhaseQueuedEvent + const event: PhaseQueuedEvent = { + type: 'phase:queued', + timestamp: new Date(), + payload: { + phaseId, + initiativeId: phase.initiativeId, + dependsOn, + }, + }; + this.eventBus.emit(event); + } + + /** + * Get next dispatchable phase. + * Returns phase with all dependencies complete, sorted by queuedAt (oldest first). + */ + async getNextDispatchablePhase(): Promise { + const queuedPhases = Array.from(this.phaseQueue.values()); + + if (queuedPhases.length === 0) { + return null; + } + + // Filter to only phases with all dependencies complete + const readyPhases: QueuedPhase[] = []; + + for (const qp of queuedPhases) { + const allDepsComplete = await this.areAllPhaseDependenciesComplete(qp.dependsOn); + if (allDepsComplete) { + readyPhases.push(qp); + } + } + + if (readyPhases.length === 0) { + return null; + } + + // Sort by queuedAt (oldest first) + readyPhases.sort((a, b) => a.queuedAt.getTime() - b.queuedAt.getTime()); + + return readyPhases[0]; + } + + /** + * Dispatch next available phase. + * Updates phase status to 'in_progress' and emits PhaseStartedEvent. + */ + async dispatchNextPhase(): Promise { + // Get next dispatchable phase + const nextPhase = await this.getNextDispatchablePhase(); + + if (!nextPhase) { + return { + success: false, + phaseId: '', + reason: 'No dispatchable phases', + }; + } + + // Get phase details for event + const phase = await this.phaseRepository.findById(nextPhase.phaseId); + if (!phase) { + return { + success: false, + phaseId: nextPhase.phaseId, + reason: 'Phase not found', + }; + } + + // Update phase status to 'in_progress' + await this.phaseRepository.update(nextPhase.phaseId, { status: 'in_progress' }); + + // Remove from queue (now being worked on) + this.phaseQueue.delete(nextPhase.phaseId); + + // Emit PhaseStartedEvent + const event: PhaseStartedEvent = { + type: 'phase:started', + timestamp: new Date(), + payload: { + phaseId: nextPhase.phaseId, + initiativeId: phase.initiativeId, + }, + }; + this.eventBus.emit(event); + + return { + success: true, + phaseId: nextPhase.phaseId, + }; + } + + /** + * Mark a phase as complete. + * Updates phase status and removes from queue. + */ + async completePhase(phaseId: string): Promise { + // Get phase for event + const phase = await this.phaseRepository.findById(phaseId); + if (!phase) { + throw new Error(`Phase not found: ${phaseId}`); + } + + // Update phase status to 'completed' + await this.phaseRepository.update(phaseId, { status: 'completed' }); + + // Remove from queue + this.phaseQueue.delete(phaseId); + + // Also remove from blocked if it was there + this.blockedPhases.delete(phaseId); + + // Emit PhaseCompletedEvent + const event: PhaseCompletedEvent = { + type: 'phase:completed', + timestamp: new Date(), + payload: { + phaseId, + initiativeId: phase.initiativeId, + success: true, + message: 'Phase completed', + }, + }; + this.eventBus.emit(event); + } + + /** + * Mark a phase as blocked. + * Updates phase status and records block reason. + */ + async blockPhase(phaseId: string, reason: string): Promise { + // Update phase status to 'blocked' + await this.phaseRepository.update(phaseId, { status: 'blocked' }); + + // Record in blocked map + this.blockedPhases.set(phaseId, { phaseId, reason }); + + // Remove from queue (blocked phases aren't dispatchable) + this.phaseQueue.delete(phaseId); + + // Emit PhaseBlockedEvent + const event: PhaseBlockedEvent = { + type: 'phase:blocked', + timestamp: new Date(), + payload: { + phaseId, + reason, + }, + }; + this.eventBus.emit(event); + } + + /** + * Get current phase queue state. + */ + async getPhaseQueueState(): Promise<{ + queued: QueuedPhase[]; + ready: QueuedPhase[]; + blocked: Array<{ phaseId: string; reason: string }>; + }> { + const allQueued = Array.from(this.phaseQueue.values()); + + // Determine which are ready + const ready: QueuedPhase[] = []; + for (const qp of allQueued) { + const allDepsComplete = await this.areAllPhaseDependenciesComplete(qp.dependsOn); + if (allDepsComplete) { + ready.push(qp); + } + } + + return { + queued: allQueued, + ready, + blocked: Array.from(this.blockedPhases.values()), + }; + } + + // ============================================================================= + // Private Helpers + // ============================================================================= + + /** + * Check if all phase dependencies are complete. + */ + private async areAllPhaseDependenciesComplete(dependsOn: string[]): Promise { + if (dependsOn.length === 0) { + return true; + } + + for (const depPhaseId of dependsOn) { + const depPhase = await this.phaseRepository.findById(depPhaseId); + if (!depPhase || depPhase.status !== 'completed') { + return false; + } + } + + return true; + } +}