feat(14-04): implement DefaultPhaseDispatchManager adapter

- In-memory queue with Map<string, QueuedPhase>
- Dependency checking via phaseRepository.getDependencies()
- queuePhase: fetch phase, get dependencies, emit PhaseQueuedEvent
- getNextDispatchablePhase: filter queue, sort by queuedAt
- dispatchNextPhase: update status to in_progress, emit PhaseStartedEvent
- completePhase: update status to completed, emit PhaseCompletedEvent
- blockPhase: update status to blocked, emit PhaseBlockedEvent
- getPhaseQueueState: return queued, ready, blocked arrays
This commit is contained in:
Lukas May
2026-02-02 13:40:17 +01:00
parent 28622cbd04
commit 1ba95871f5

View File

@@ -0,0 +1,277 @@
/**
* Default Phase Dispatch Manager - Adapter Implementation
*
* Implements PhaseDispatchManager interface with in-memory queue
* and dependency-ordered dispatch.
*
* This is the ADAPTER for the PhaseDispatchManager PORT.
*/
import type {
EventBus,
PhaseQueuedEvent,
PhaseStartedEvent,
PhaseCompletedEvent,
PhaseBlockedEvent,
} from '../events/index.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { PhaseDispatchManager, QueuedPhase, PhaseDispatchResult } from './types.js';
// =============================================================================
// Internal Types
// =============================================================================
/**
* Internal representation of a blocked phase.
*/
interface BlockedPhase {
phaseId: string;
reason: string;
}
// =============================================================================
// DefaultPhaseDispatchManager Implementation
// =============================================================================
/**
* In-memory implementation of PhaseDispatchManager.
*
* Uses Map for queue management and checks phase_dependencies table
* for dependency resolution.
*/
export class DefaultPhaseDispatchManager implements PhaseDispatchManager {
/** Internal queue of phases pending dispatch */
private phaseQueue: Map<string, QueuedPhase> = new Map();
/** Blocked phases with their reasons */
private blockedPhases: Map<string, BlockedPhase> = new Map();
constructor(
private phaseRepository: PhaseRepository,
private eventBus: EventBus
) {}
/**
* Queue a phase for dispatch.
* Fetches phase dependencies and adds to internal queue.
*/
async queuePhase(phaseId: string): Promise<void> {
// Fetch phase to verify it exists and get initiativeId
const phase = await this.phaseRepository.findById(phaseId);
if (!phase) {
throw new Error(`Phase not found: ${phaseId}`);
}
// Get dependencies for this phase
const dependsOn = await this.phaseRepository.getDependencies(phaseId);
const queuedPhase: QueuedPhase = {
phaseId,
initiativeId: phase.initiativeId,
queuedAt: new Date(),
dependsOn,
};
this.phaseQueue.set(phaseId, queuedPhase);
// Emit PhaseQueuedEvent
const event: PhaseQueuedEvent = {
type: 'phase:queued',
timestamp: new Date(),
payload: {
phaseId,
initiativeId: phase.initiativeId,
dependsOn,
},
};
this.eventBus.emit(event);
}
/**
* Get next dispatchable phase.
* Returns phase with all dependencies complete, sorted by queuedAt (oldest first).
*/
async getNextDispatchablePhase(): Promise<QueuedPhase | null> {
const queuedPhases = Array.from(this.phaseQueue.values());
if (queuedPhases.length === 0) {
return null;
}
// Filter to only phases with all dependencies complete
const readyPhases: QueuedPhase[] = [];
for (const qp of queuedPhases) {
const allDepsComplete = await this.areAllPhaseDependenciesComplete(qp.dependsOn);
if (allDepsComplete) {
readyPhases.push(qp);
}
}
if (readyPhases.length === 0) {
return null;
}
// Sort by queuedAt (oldest first)
readyPhases.sort((a, b) => a.queuedAt.getTime() - b.queuedAt.getTime());
return readyPhases[0];
}
/**
* Dispatch next available phase.
* Updates phase status to 'in_progress' and emits PhaseStartedEvent.
*/
async dispatchNextPhase(): Promise<PhaseDispatchResult> {
// Get next dispatchable phase
const nextPhase = await this.getNextDispatchablePhase();
if (!nextPhase) {
return {
success: false,
phaseId: '',
reason: 'No dispatchable phases',
};
}
// Get phase details for event
const phase = await this.phaseRepository.findById(nextPhase.phaseId);
if (!phase) {
return {
success: false,
phaseId: nextPhase.phaseId,
reason: 'Phase not found',
};
}
// Update phase status to 'in_progress'
await this.phaseRepository.update(nextPhase.phaseId, { status: 'in_progress' });
// Remove from queue (now being worked on)
this.phaseQueue.delete(nextPhase.phaseId);
// Emit PhaseStartedEvent
const event: PhaseStartedEvent = {
type: 'phase:started',
timestamp: new Date(),
payload: {
phaseId: nextPhase.phaseId,
initiativeId: phase.initiativeId,
},
};
this.eventBus.emit(event);
return {
success: true,
phaseId: nextPhase.phaseId,
};
}
/**
* Mark a phase as complete.
* Updates phase status and removes from queue.
*/
async completePhase(phaseId: string): Promise<void> {
// Get phase for event
const phase = await this.phaseRepository.findById(phaseId);
if (!phase) {
throw new Error(`Phase not found: ${phaseId}`);
}
// Update phase status to 'completed'
await this.phaseRepository.update(phaseId, { status: 'completed' });
// Remove from queue
this.phaseQueue.delete(phaseId);
// Also remove from blocked if it was there
this.blockedPhases.delete(phaseId);
// Emit PhaseCompletedEvent
const event: PhaseCompletedEvent = {
type: 'phase:completed',
timestamp: new Date(),
payload: {
phaseId,
initiativeId: phase.initiativeId,
success: true,
message: 'Phase completed',
},
};
this.eventBus.emit(event);
}
/**
* Mark a phase as blocked.
* Updates phase status and records block reason.
*/
async blockPhase(phaseId: string, reason: string): Promise<void> {
// Update phase status to 'blocked'
await this.phaseRepository.update(phaseId, { status: 'blocked' });
// Record in blocked map
this.blockedPhases.set(phaseId, { phaseId, reason });
// Remove from queue (blocked phases aren't dispatchable)
this.phaseQueue.delete(phaseId);
// Emit PhaseBlockedEvent
const event: PhaseBlockedEvent = {
type: 'phase:blocked',
timestamp: new Date(),
payload: {
phaseId,
reason,
},
};
this.eventBus.emit(event);
}
/**
* Get current phase queue state.
*/
async getPhaseQueueState(): Promise<{
queued: QueuedPhase[];
ready: QueuedPhase[];
blocked: Array<{ phaseId: string; reason: string }>;
}> {
const allQueued = Array.from(this.phaseQueue.values());
// Determine which are ready
const ready: QueuedPhase[] = [];
for (const qp of allQueued) {
const allDepsComplete = await this.areAllPhaseDependenciesComplete(qp.dependsOn);
if (allDepsComplete) {
ready.push(qp);
}
}
return {
queued: allQueued,
ready,
blocked: Array.from(this.blockedPhases.values()),
};
}
// =============================================================================
// Private Helpers
// =============================================================================
/**
* Check if all phase dependencies are complete.
*/
private async areAllPhaseDependenciesComplete(dependsOn: string[]): Promise<boolean> {
if (dependsOn.length === 0) {
return true;
}
for (const depPhaseId of dependsOn) {
const depPhase = await this.phaseRepository.findById(depPhaseId);
if (!depPhase || depPhase.status !== 'completed') {
return false;
}
}
return true;
}
}