ExecutionOrchestrator now listens for phase:queued and agent:stopped events to drive the dispatch cycle, closing the gap between queueing phases (Execute button) and actually spawning agents. Coalesced scheduling prevents reentrancy with synchronous EventEmitter.
120 lines
6.2 KiB
Markdown
120 lines
6.2 KiB
Markdown
# Dispatch & Events
|
|
|
|
`apps/server/dispatch/` — Task and phase dispatch queues. `apps/server/events/` — Typed event bus.
|
|
|
|
## Event Bus
|
|
|
|
`apps/server/events/` — Typed pub/sub system for inter-module communication.
|
|
|
|
### Architecture
|
|
- **Port**: `EventBus` interface with `emit(event)` and `on(type, handler)`
|
|
- **Adapter**: `TypedEventBus` using Node.js `EventEmitter`
|
|
- All events implement `BaseEvent { type, timestamp, payload }`
|
|
|
|
### Event Types (54)
|
|
|
|
| Category | Events | Count |
|
|
|----------|--------|-------|
|
|
| **Agent** | `agent:spawned`, `agent:stopped`, `agent:crashed`, `agent:resumed`, `agent:account_switched`, `agent:deleted`, `agent:waiting`, `agent:output` | 8 |
|
|
| **Task** | `task:queued`, `task:dispatched`, `task:completed`, `task:blocked`, `task:pending_approval` | 5 |
|
|
| **Phase** | `phase:queued`, `phase:started`, `phase:completed`, `phase:blocked` | 4 |
|
|
| **Merge** | `merge:queued`, `merge:started`, `merge:completed`, `merge:conflicted` | 4 |
|
|
| **Page** | `page:created`, `page:updated`, `page:deleted` | 3 |
|
|
| **Process** | `process:spawned`, `process:stopped`, `process:crashed` | 3 |
|
|
| **Server** | `server:started`, `server:stopped` | 2 |
|
|
| **Worktree** | `worktree:created`, `worktree:removed`, `worktree:merged`, `worktree:conflict` | 4 |
|
|
| **Account** | `account:credentials_refreshed`, `account:credentials_expired`, `account:credentials_validated` | 3 |
|
|
| **Preview** | `preview:building`, `preview:ready`, `preview:stopped`, `preview:failed` | 4 |
|
|
| **Conversation** | `conversation:created`, `conversation:answered` | 2 | `conversation:created` triggers auto-resume of idle target agents via `resumeForConversation()` |
|
|
| **Chat** | `chat:message_created`, `chat:session_closed` | 2 | Chat session lifecycle events |
|
|
| **Log** | `log:entry` | 1 |
|
|
|
|
### Key Event Payloads
|
|
|
|
```typescript
|
|
AgentSpawnedEvent { agentId, name, taskId, worktreeId, provider }
|
|
AgentStoppedEvent { agentId, name, taskId, reason }
|
|
// reason: 'user_requested'|'task_complete'|'error'|'waiting_for_input'|
|
|
// 'context_complete'|'plan_complete'|'detail_complete'|'refine_complete'|'chat_complete'
|
|
AgentWaitingEvent { agentId, name, taskId, sessionId, questions[] }
|
|
AgentOutputEvent { agentId, stream, data }
|
|
TaskCompletedEvent { taskId, agentId, success, message }
|
|
TaskQueuedEvent { taskId, priority, dependsOn[] }
|
|
PhaseStartedEvent { phaseId, initiativeId }
|
|
MergeConflictedEvent { taskId, agentId, worktreeId, targetBranch, conflictingFiles[] }
|
|
AccountCredentialsRefreshedEvent { accountId, expiresAt, previousExpiresAt? }
|
|
```
|
|
|
|
## Task Dispatch
|
|
|
|
`apps/server/dispatch/` — In-memory queue with dependency-ordered dispatch.
|
|
|
|
### Architecture
|
|
- **Port**: `DispatchManager` interface
|
|
- **Adapter**: `DefaultDispatchManager`
|
|
|
|
### How Task Dispatch Works
|
|
|
|
1. **Queue** — `queue(taskId)` fetches task dependencies, adds to internal Map
|
|
2. **Dispatch** — `dispatchNext()` finds highest-priority task with all deps complete
|
|
3. **Context gathering** — Before spawn, `dispatchNext()` gathers initiative context (initiative, phase, tasks, pages) and passes as `inputContext` to the agent. Agents receive `.cw/input/task.md`, `initiative.md`, `phase.md`, `context/tasks/`, `context/phases/`, and `pages/`.
|
|
4. **Priority order**: high > medium > low, then oldest first (FIFO within priority)
|
|
5. **Checkpoint skip** — Tasks with type starting with `checkpoint:` skip auto-dispatch
|
|
6. **Planning skip** — Planning-category tasks (research, discuss, plan, detail, refine) skip auto-dispatch — they use the architect flow
|
|
7. **Summary propagation** — `completeTask()` reads the completing agent's `result.message` and stores it on the task's `summary` column. Dependent tasks see this summary in `context/tasks/<id>.md` frontmatter.
|
|
8. **Approval check** — `completeTask()` checks `requiresApproval` (task-level, then initiative-level)
|
|
9. **Approval flow** — If approval required: status → `pending_approval`, emit `task:pending_approval`
|
|
|
|
### DispatchManager Methods
|
|
|
|
| Method | Purpose |
|
|
|--------|---------|
|
|
| `queue(taskId)` | Add task to dispatch queue |
|
|
| `dispatchNext()` | Find and dispatch next ready task |
|
|
| `getNextDispatchable()` | Get next task without dispatching |
|
|
| `completeTask(taskId, agentId?)` | Complete with approval check |
|
|
| `approveTask(taskId)` | Approve pending task |
|
|
| `blockTask(taskId, reason)` | Block task with reason |
|
|
| `getQueueState()` | Return queued, ready, blocked tasks |
|
|
|
|
## Phase Dispatch
|
|
|
|
`DefaultPhaseDispatchManager` — Same pattern for phases:
|
|
|
|
1. **Queue** — `queuePhase(phaseId)` validates phase is approved, gets dependencies
|
|
2. **Dispatch** — `dispatchNextPhase()` finds phase with all deps complete
|
|
3. **Auto-queue tasks** — When phase starts, pending execution tasks are queued (planning-category tasks excluded)
|
|
4. **Events** — `phase:queued`, `phase:started`, `phase:completed`, `phase:blocked`
|
|
|
|
### PhaseDispatchManager Methods
|
|
|
|
| Method | Purpose |
|
|
|--------|---------|
|
|
| `queuePhase(phaseId)` | Queue approved phase |
|
|
| `dispatchNextPhase()` | Start next ready phase, auto-queue its tasks |
|
|
| `getNextDispatchablePhase()` | Get next phase without dispatching |
|
|
| `completePhase(phaseId)` | Mark phase complete |
|
|
| `blockPhase(phaseId, reason)` | Block phase |
|
|
| `getPhaseQueueState()` | Return queued, ready, blocked phases |
|
|
|
|
## Auto-Dispatch (ExecutionOrchestrator)
|
|
|
|
`apps/server/execution/orchestrator.ts` — Connects the queue to agent spawning via event-driven auto-dispatch.
|
|
|
|
### Trigger Events
|
|
|
|
| Event | Action |
|
|
|-------|--------|
|
|
| `phase:queued` | Dispatch ready phases → dispatch their tasks to idle agents |
|
|
| `agent:stopped` | Re-dispatch queued tasks (freed agent slot) |
|
|
| `task:completed` | Merge task branch, then dispatch next queued task |
|
|
|
|
### Coalesced Scheduling
|
|
|
|
Multiple rapid events (e.g. several `phase:queued` from `queueAllPhases`) are coalesced into a single async dispatch cycle via `scheduleDispatch()`. The cycle loops `dispatchNextPhase()` + `dispatchNext()` until both queues are drained, then re-runs if new events arrived during execution.
|
|
|
|
### Execution Mode Behavior
|
|
|
|
- **YOLO**: phase completes → auto-merge → auto-dispatch next phase → auto-dispatch tasks
|
|
- **review_per_phase**: phase completes → set `pending_review` → STOP. User approves → `approveAndMergePhase()` → merge → dispatch next phase → dispatch tasks
|