feat: Connect dispatch queue to agent spawning via auto-dispatch

ExecutionOrchestrator now listens for phase:queued and agent:stopped
events to drive the dispatch cycle, closing the gap between queueing
phases (Execute button) and actually spawning agents. Coalesced
scheduling prevents reentrancy with synchronous EventEmitter.
This commit is contained in:
Lukas May
2026-03-04 12:55:20 +01:00
parent d03b204096
commit 1c7d6f20ee
3 changed files with 83 additions and 3 deletions

View File

@@ -227,6 +227,7 @@ export async function createContainer(options?: ContainerOptions): Promise<Conta
repos.initiativeRepository,
repos.projectRepository,
phaseDispatchManager,
dispatchManager,
conflictResolutionService,
eventBus,
workspaceRoot,

View File

@@ -11,13 +11,13 @@
* - Review per-phase: pause after each phase for diff review
*/
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseMergedEvent, TaskMergedEvent } from '../events/index.js';
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent } from '../events/index.js';
import type { BranchManager } from '../git/branch-manager.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { PhaseDispatchManager } from '../dispatch/types.js';
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js';
import { phaseBranchName, taskBranchName } from '../git/branch-naming.js';
import { ensureProjectClone } from '../git/project-clones.js';
@@ -29,6 +29,10 @@ export class ExecutionOrchestrator {
/** Serialize merges per phase to avoid concurrent merge conflicts */
private phaseMergeLocks: Map<string, Promise<void>> = new Map();
/** Coalesced dispatch scheduling state */
private dispatchPending = false;
private dispatchRunning = false;
constructor(
private branchManager: BranchManager,
private phaseRepository: PhaseRepository,
@@ -36,13 +40,14 @@ export class ExecutionOrchestrator {
private initiativeRepository: InitiativeRepository,
private projectRepository: ProjectRepository,
private phaseDispatchManager: PhaseDispatchManager,
private dispatchManager: DispatchManager,
private conflictResolutionService: ConflictResolutionService,
private eventBus: EventBus,
private workspaceRoot: string,
) {}
/**
* Start listening for task completion events.
* Start listening for events that drive execution.
*/
start(): void {
this.eventBus.on<TaskCompletedEvent>('task:completed', (event) => {
@@ -50,9 +55,57 @@ export class ExecutionOrchestrator {
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling task:completed');
});
});
// Kick off dispatch when phases are queued (e.g. user clicks Execute)
this.eventBus.on<PhaseQueuedEvent>('phase:queued', () => this.scheduleDispatch());
// Re-dispatch queued tasks when an agent finishes and frees a slot
this.eventBus.on<AgentStoppedEvent>('agent:stopped', () => this.scheduleDispatch());
log.info('execution orchestrator started');
}
// ---------------------------------------------------------------------------
// Coalesced dispatch scheduler
// ---------------------------------------------------------------------------
/**
* Schedule a dispatch cycle. Coalesces multiple rapid calls (e.g. from
* synchronous EventEmitter) into a single async run.
*/
private scheduleDispatch(): void {
if (this.dispatchRunning) {
this.dispatchPending = true;
return;
}
this.runDispatchCycle().catch((err) =>
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch cycle error'),
);
}
private async runDispatchCycle(): Promise<void> {
this.dispatchRunning = true;
try {
do {
this.dispatchPending = false;
// Dispatch all ready phases (each queues its tasks internally)
while (true) {
const result = await this.phaseDispatchManager.dispatchNextPhase();
if (!result.success) break;
log.info({ phaseId: result.phaseId }, 'auto-dispatched phase');
}
// Dispatch all ready tasks to idle agents
while (true) {
const result = await this.dispatchManager.dispatchNext();
if (!result.success) break;
log.info({ taskId: result.taskId, agentId: result.agentId }, 'auto-dispatched task');
}
} while (this.dispatchPending);
} finally {
this.dispatchRunning = false;
}
}
/**
* Handle a task:completed event.
* Merges the task branch into the phase branch, then checks if all phase tasks are done.
@@ -90,6 +143,9 @@ export class ExecutionOrchestrator {
if (allDone) {
await this.handlePhaseAllTasksDone(task.phaseId);
}
// Fill freed agent slot with next queued task
this.scheduleDispatch();
}
/**
@@ -155,6 +211,7 @@ export class ExecutionOrchestrator {
await this.mergePhaseIntoInitiative(phaseId);
await this.phaseDispatchManager.completePhase(phaseId);
await this.phaseDispatchManager.dispatchNextPhase();
this.scheduleDispatch();
} else {
// review_per_phase
await this.phaseRepository.update(phaseId, { status: 'pending_review' as any });
@@ -220,6 +277,7 @@ export class ExecutionOrchestrator {
await this.mergePhaseIntoInitiative(phaseId);
await this.phaseDispatchManager.completePhase(phaseId);
await this.phaseDispatchManager.dispatchNextPhase();
this.scheduleDispatch();
log.info({ phaseId }, 'phase review approved and merged');
}