feat: Connect dispatch queue to agent spawning via auto-dispatch
ExecutionOrchestrator now listens for phase:queued and agent:stopped events to drive the dispatch cycle, closing the gap between queueing phases (Execute button) and actually spawning agents. Coalesced scheduling prevents reentrancy with synchronous EventEmitter.
This commit is contained in:
@@ -227,6 +227,7 @@ export async function createContainer(options?: ContainerOptions): Promise<Conta
|
||||
repos.initiativeRepository,
|
||||
repos.projectRepository,
|
||||
phaseDispatchManager,
|
||||
dispatchManager,
|
||||
conflictResolutionService,
|
||||
eventBus,
|
||||
workspaceRoot,
|
||||
|
||||
@@ -11,13 +11,13 @@
|
||||
* - Review per-phase: pause after each phase for diff review
|
||||
*/
|
||||
|
||||
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseMergedEvent, TaskMergedEvent } from '../events/index.js';
|
||||
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent } from '../events/index.js';
|
||||
import type { BranchManager } from '../git/branch-manager.js';
|
||||
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
|
||||
import type { TaskRepository } from '../db/repositories/task-repository.js';
|
||||
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
|
||||
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
||||
import type { PhaseDispatchManager } from '../dispatch/types.js';
|
||||
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
|
||||
import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js';
|
||||
import { phaseBranchName, taskBranchName } from '../git/branch-naming.js';
|
||||
import { ensureProjectClone } from '../git/project-clones.js';
|
||||
@@ -29,6 +29,10 @@ export class ExecutionOrchestrator {
|
||||
/** Serialize merges per phase to avoid concurrent merge conflicts */
|
||||
private phaseMergeLocks: Map<string, Promise<void>> = new Map();
|
||||
|
||||
/** Coalesced dispatch scheduling state */
|
||||
private dispatchPending = false;
|
||||
private dispatchRunning = false;
|
||||
|
||||
constructor(
|
||||
private branchManager: BranchManager,
|
||||
private phaseRepository: PhaseRepository,
|
||||
@@ -36,13 +40,14 @@ export class ExecutionOrchestrator {
|
||||
private initiativeRepository: InitiativeRepository,
|
||||
private projectRepository: ProjectRepository,
|
||||
private phaseDispatchManager: PhaseDispatchManager,
|
||||
private dispatchManager: DispatchManager,
|
||||
private conflictResolutionService: ConflictResolutionService,
|
||||
private eventBus: EventBus,
|
||||
private workspaceRoot: string,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Start listening for task completion events.
|
||||
* Start listening for events that drive execution.
|
||||
*/
|
||||
start(): void {
|
||||
this.eventBus.on<TaskCompletedEvent>('task:completed', (event) => {
|
||||
@@ -50,9 +55,57 @@ export class ExecutionOrchestrator {
|
||||
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling task:completed');
|
||||
});
|
||||
});
|
||||
|
||||
// Kick off dispatch when phases are queued (e.g. user clicks Execute)
|
||||
this.eventBus.on<PhaseQueuedEvent>('phase:queued', () => this.scheduleDispatch());
|
||||
|
||||
// Re-dispatch queued tasks when an agent finishes and frees a slot
|
||||
this.eventBus.on<AgentStoppedEvent>('agent:stopped', () => this.scheduleDispatch());
|
||||
|
||||
log.info('execution orchestrator started');
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Coalesced dispatch scheduler
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Schedule a dispatch cycle. Coalesces multiple rapid calls (e.g. from
|
||||
* synchronous EventEmitter) into a single async run.
|
||||
*/
|
||||
private scheduleDispatch(): void {
|
||||
if (this.dispatchRunning) {
|
||||
this.dispatchPending = true;
|
||||
return;
|
||||
}
|
||||
this.runDispatchCycle().catch((err) =>
|
||||
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch cycle error'),
|
||||
);
|
||||
}
|
||||
|
||||
private async runDispatchCycle(): Promise<void> {
|
||||
this.dispatchRunning = true;
|
||||
try {
|
||||
do {
|
||||
this.dispatchPending = false;
|
||||
// Dispatch all ready phases (each queues its tasks internally)
|
||||
while (true) {
|
||||
const result = await this.phaseDispatchManager.dispatchNextPhase();
|
||||
if (!result.success) break;
|
||||
log.info({ phaseId: result.phaseId }, 'auto-dispatched phase');
|
||||
}
|
||||
// Dispatch all ready tasks to idle agents
|
||||
while (true) {
|
||||
const result = await this.dispatchManager.dispatchNext();
|
||||
if (!result.success) break;
|
||||
log.info({ taskId: result.taskId, agentId: result.agentId }, 'auto-dispatched task');
|
||||
}
|
||||
} while (this.dispatchPending);
|
||||
} finally {
|
||||
this.dispatchRunning = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a task:completed event.
|
||||
* Merges the task branch into the phase branch, then checks if all phase tasks are done.
|
||||
@@ -90,6 +143,9 @@ export class ExecutionOrchestrator {
|
||||
if (allDone) {
|
||||
await this.handlePhaseAllTasksDone(task.phaseId);
|
||||
}
|
||||
|
||||
// Fill freed agent slot with next queued task
|
||||
this.scheduleDispatch();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -155,6 +211,7 @@ export class ExecutionOrchestrator {
|
||||
await this.mergePhaseIntoInitiative(phaseId);
|
||||
await this.phaseDispatchManager.completePhase(phaseId);
|
||||
await this.phaseDispatchManager.dispatchNextPhase();
|
||||
this.scheduleDispatch();
|
||||
} else {
|
||||
// review_per_phase
|
||||
await this.phaseRepository.update(phaseId, { status: 'pending_review' as any });
|
||||
@@ -220,6 +277,7 @@ export class ExecutionOrchestrator {
|
||||
await this.mergePhaseIntoInitiative(phaseId);
|
||||
await this.phaseDispatchManager.completePhase(phaseId);
|
||||
await this.phaseDispatchManager.dispatchNextPhase();
|
||||
this.scheduleDispatch();
|
||||
|
||||
log.info({ phaseId }, 'phase review approved and merged');
|
||||
}
|
||||
|
||||
@@ -96,3 +96,24 @@ AccountCredentialsRefreshedEvent { accountId, expiresAt, previousExpiresAt? }
|
||||
| `completePhase(phaseId)` | Mark phase complete |
|
||||
| `blockPhase(phaseId, reason)` | Block phase |
|
||||
| `getPhaseQueueState()` | Return queued, ready, blocked phases |
|
||||
|
||||
## Auto-Dispatch (ExecutionOrchestrator)
|
||||
|
||||
`apps/server/execution/orchestrator.ts` — Connects the queue to agent spawning via event-driven auto-dispatch.
|
||||
|
||||
### Trigger Events
|
||||
|
||||
| Event | Action |
|
||||
|-------|--------|
|
||||
| `phase:queued` | Dispatch ready phases → dispatch their tasks to idle agents |
|
||||
| `agent:stopped` | Re-dispatch queued tasks (freed agent slot) |
|
||||
| `task:completed` | Merge task branch, then dispatch next queued task |
|
||||
|
||||
### Coalesced Scheduling
|
||||
|
||||
Multiple rapid events (e.g. several `phase:queued` from `queueAllPhases`) are coalesced into a single async dispatch cycle via `scheduleDispatch()`. The cycle loops `dispatchNextPhase()` + `dispatchNext()` until both queues are drained, then re-runs if new events arrived during execution.
|
||||
|
||||
### Execution Mode Behavior
|
||||
|
||||
- **YOLO**: phase completes → auto-merge → auto-dispatch next phase → auto-dispatch tasks
|
||||
- **review_per_phase**: phase completes → set `pending_review` → STOP. User approves → `approveAndMergePhase()` → merge → dispatch next phase → dispatch tasks
|
||||
|
||||
Reference in New Issue
Block a user