From 1c7d6f20ee41081dd7677273d597ad62d4e228a6 Mon Sep 17 00:00:00 2001 From: Lukas May Date: Wed, 4 Mar 2026 12:55:20 +0100 Subject: [PATCH] feat: Connect dispatch queue to agent spawning via auto-dispatch ExecutionOrchestrator now listens for phase:queued and agent:stopped events to drive the dispatch cycle, closing the gap between queueing phases (Execute button) and actually spawning agents. Coalesced scheduling prevents reentrancy with synchronous EventEmitter. --- apps/server/container.ts | 1 + apps/server/execution/orchestrator.ts | 64 +++++++++++++++++++++++++-- docs/dispatch-events.md | 21 +++++++++ 3 files changed, 83 insertions(+), 3 deletions(-) diff --git a/apps/server/container.ts b/apps/server/container.ts index d1ed485..3722579 100644 --- a/apps/server/container.ts +++ b/apps/server/container.ts @@ -227,6 +227,7 @@ export async function createContainer(options?: ContainerOptions): Promise> = new Map(); + /** Coalesced dispatch scheduling state */ + private dispatchPending = false; + private dispatchRunning = false; + constructor( private branchManager: BranchManager, private phaseRepository: PhaseRepository, @@ -36,13 +40,14 @@ export class ExecutionOrchestrator { private initiativeRepository: InitiativeRepository, private projectRepository: ProjectRepository, private phaseDispatchManager: PhaseDispatchManager, + private dispatchManager: DispatchManager, private conflictResolutionService: ConflictResolutionService, private eventBus: EventBus, private workspaceRoot: string, ) {} /** - * Start listening for task completion events. + * Start listening for events that drive execution. */ start(): void { this.eventBus.on('task:completed', (event) => { @@ -50,9 +55,57 @@ export class ExecutionOrchestrator { log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling task:completed'); }); }); + + // Kick off dispatch when phases are queued (e.g. user clicks Execute) + this.eventBus.on('phase:queued', () => this.scheduleDispatch()); + + // Re-dispatch queued tasks when an agent finishes and frees a slot + this.eventBus.on('agent:stopped', () => this.scheduleDispatch()); + log.info('execution orchestrator started'); } + // --------------------------------------------------------------------------- + // Coalesced dispatch scheduler + // --------------------------------------------------------------------------- + + /** + * Schedule a dispatch cycle. Coalesces multiple rapid calls (e.g. from + * synchronous EventEmitter) into a single async run. + */ + private scheduleDispatch(): void { + if (this.dispatchRunning) { + this.dispatchPending = true; + return; + } + this.runDispatchCycle().catch((err) => + log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch cycle error'), + ); + } + + private async runDispatchCycle(): Promise { + this.dispatchRunning = true; + try { + do { + this.dispatchPending = false; + // Dispatch all ready phases (each queues its tasks internally) + while (true) { + const result = await this.phaseDispatchManager.dispatchNextPhase(); + if (!result.success) break; + log.info({ phaseId: result.phaseId }, 'auto-dispatched phase'); + } + // Dispatch all ready tasks to idle agents + while (true) { + const result = await this.dispatchManager.dispatchNext(); + if (!result.success) break; + log.info({ taskId: result.taskId, agentId: result.agentId }, 'auto-dispatched task'); + } + } while (this.dispatchPending); + } finally { + this.dispatchRunning = false; + } + } + /** * Handle a task:completed event. * Merges the task branch into the phase branch, then checks if all phase tasks are done. @@ -90,6 +143,9 @@ export class ExecutionOrchestrator { if (allDone) { await this.handlePhaseAllTasksDone(task.phaseId); } + + // Fill freed agent slot with next queued task + this.scheduleDispatch(); } /** @@ -155,6 +211,7 @@ export class ExecutionOrchestrator { await this.mergePhaseIntoInitiative(phaseId); await this.phaseDispatchManager.completePhase(phaseId); await this.phaseDispatchManager.dispatchNextPhase(); + this.scheduleDispatch(); } else { // review_per_phase await this.phaseRepository.update(phaseId, { status: 'pending_review' as any }); @@ -220,6 +277,7 @@ export class ExecutionOrchestrator { await this.mergePhaseIntoInitiative(phaseId); await this.phaseDispatchManager.completePhase(phaseId); await this.phaseDispatchManager.dispatchNextPhase(); + this.scheduleDispatch(); log.info({ phaseId }, 'phase review approved and merged'); } diff --git a/docs/dispatch-events.md b/docs/dispatch-events.md index 997cebd..779e016 100644 --- a/docs/dispatch-events.md +++ b/docs/dispatch-events.md @@ -96,3 +96,24 @@ AccountCredentialsRefreshedEvent { accountId, expiresAt, previousExpiresAt? } | `completePhase(phaseId)` | Mark phase complete | | `blockPhase(phaseId, reason)` | Block phase | | `getPhaseQueueState()` | Return queued, ready, blocked phases | + +## Auto-Dispatch (ExecutionOrchestrator) + +`apps/server/execution/orchestrator.ts` — Connects the queue to agent spawning via event-driven auto-dispatch. + +### Trigger Events + +| Event | Action | +|-------|--------| +| `phase:queued` | Dispatch ready phases → dispatch their tasks to idle agents | +| `agent:stopped` | Re-dispatch queued tasks (freed agent slot) | +| `task:completed` | Merge task branch, then dispatch next queued task | + +### Coalesced Scheduling + +Multiple rapid events (e.g. several `phase:queued` from `queueAllPhases`) are coalesced into a single async dispatch cycle via `scheduleDispatch()`. The cycle loops `dispatchNextPhase()` + `dispatchNext()` until both queues are drained, then re-runs if new events arrived during execution. + +### Execution Mode Behavior + +- **YOLO**: phase completes → auto-merge → auto-dispatch next phase → auto-dispatch tasks +- **review_per_phase**: phase completes → set `pending_review` → STOP. User approves → `approveAndMergePhase()` → merge → dispatch next phase → dispatch tasks