10 KiB
Dispatch & Events
apps/server/dispatch/ — Task and phase dispatch queues. apps/server/events/ — Typed event bus.
Event Bus
apps/server/events/ — Typed pub/sub system for inter-module communication.
Architecture
- Port:
EventBusinterface withemit(event)andon(type, handler) - Adapter:
TypedEventBususing Node.jsEventEmitter - All events implement
BaseEvent { type, timestamp, payload }
Event Types (58)
| Category | Events | Count |
|---|---|---|
| Agent | agent:spawned, agent:stopped, agent:crashed, agent:resumed, agent:account_switched, agent:deleted, agent:waiting, agent:output |
8 |
| Task | task:queued, task:dispatched, task:completed, task:blocked |
4 |
| Phase | phase:queued, phase:started, phase:completed, phase:blocked, phase:changes_requested |
5 |
| Merge | merge:queued, merge:started, merge:completed, merge:conflicted |
4 |
| Page | page:created, page:updated, page:deleted |
3 |
| Process | process:spawned, process:stopped, process:crashed |
3 |
| Server | server:started, server:stopped |
2 |
| Worktree | worktree:created, worktree:removed, worktree:merged, worktree:conflict |
4 |
| Account | account:credentials_refreshed, account:credentials_expired, account:credentials_validated |
3 |
| Preview | preview:building, preview:ready, preview:stopped, preview:failed |
4 |
| Conversation | conversation:created, conversation:answered |
2 |
| Chat | chat:message_created, chat:session_closed |
2 |
| Initiative | initiative:pending_review, initiative:review_approved, initiative:changes_requested |
3 |
| Project | project:synced, project:sync_failed |
2 |
| Log | log:entry |
1 |
Key Event Payloads
AgentSpawnedEvent { agentId, name, taskId, worktreeId, provider }
AgentStoppedEvent { agentId, name, taskId, reason }
// reason: 'user_requested'|'task_complete'|'error'|'waiting_for_input'|
// 'context_complete'|'plan_complete'|'detail_complete'|'refine_complete'|'chat_complete'
AgentWaitingEvent { agentId, name, taskId, sessionId, questions[] }
AgentOutputEvent { agentId, stream, data }
TaskCompletedEvent { taskId, agentId, success, message }
TaskQueuedEvent { taskId, priority, dependsOn[] }
PhaseStartedEvent { phaseId, initiativeId }
MergeConflictedEvent { taskId, agentId, worktreeId, targetBranch, conflictingFiles[] }
PhaseChangesRequestedEvent { phaseId, initiativeId, taskId, commentCount }
AccountCredentialsRefreshedEvent { accountId, expiresAt, previousExpiresAt? }
InitiativePendingReviewEvent { initiativeId, branch }
InitiativeReviewApprovedEvent { initiativeId, branch, strategy: 'push_branch' | 'merge_and_push' }
InitiativeChangesRequestedEvent { initiativeId, phaseId, taskId }
Task Dispatch
apps/server/dispatch/ — In-memory queue with dependency-ordered dispatch.
Architecture
- Port:
DispatchManagerinterface - Adapter:
DefaultDispatchManager
How Task Dispatch Works
- Queue —
queue(taskId)fetches task dependencies, adds to internal Map - Dispatch —
dispatchNext()finds highest-priority task with all deps complete - Context gathering — Before spawn,
dispatchNext()gathers initiative context (initiative, phase, tasks, pages) and passes asinputContextto the agent. Agents receive.cw/input/task.md,initiative.md,phase.md,context/tasks/,context/phases/, andpages/. - Priority order: high > medium > low, then oldest first (FIFO within priority)
- Planning skip — Planning-category tasks (research, discuss, plan, detail, refine) skip auto-dispatch — they use the architect flow
- Summary propagation —
completeTask()reads the completing agent'sresult.messageand stores it on the task'ssummarycolumn. Dependent tasks see this summary incontext/tasks/<id>.mdfrontmatter. - Spawn failure — If
agentManager.spawn()throws, the task is blocked viablockTask()with the error message. The dispatch cycle continues instead of crashing. - Retry blocked —
retryBlockedTask(taskId)resets a blocked task to pending and re-queues it. Exposed via tRPCretryBlockedTaskmutation. The UI shows a Retry button in the task slide-over when status isblocked. - Branch validation — Branch computation and
ensureBrancherrors are fatal for execution tasks (execute, verify, merge, review) but non-fatal for planning tasks. This prevents agents from spawning without proper branch isolation.
DispatchManager Methods
| Method | Purpose |
|---|---|
queue(taskId) |
Add task to dispatch queue |
dispatchNext() |
Find and dispatch next ready task |
getNextDispatchable() |
Get next task without dispatching |
completeTask(taskId, agentId?) |
Complete task |
blockTask(taskId, reason) |
Block task with reason |
retryBlockedTask(taskId) |
Reset blocked task to pending and re-queue |
getQueueState() |
Return queued, ready, blocked tasks |
Phase Dispatch
DefaultPhaseDispatchManager — Same pattern for phases:
- Queue —
queuePhase(phaseId)validates phase is approved, gets dependencies - Dispatch —
dispatchNextPhase()finds phase with all deps complete - Branch creation — Initiative and phase branches are created in all linked project clones. On failure, the phase is blocked via
blockPhase()and tasks are NOT queued. - Auto-queue tasks — When phase starts (branches confirmed), pending execution tasks are queued (planning-category tasks excluded)
- Events —
phase:queued,phase:started,phase:completed,phase:blocked
PhaseDispatchManager Methods
| Method | Purpose |
|---|---|
queuePhase(phaseId) |
Queue approved phase |
dispatchNextPhase() |
Start next ready phase, auto-queue its tasks |
getNextDispatchablePhase() |
Get next phase without dispatching |
completePhase(phaseId) |
Mark phase complete |
blockPhase(phaseId, reason) |
Block phase |
getPhaseQueueState() |
Return queued, ready, blocked phases |
Auto-Dispatch (ExecutionOrchestrator)
apps/server/execution/orchestrator.ts — Connects the queue to agent spawning via event-driven auto-dispatch.
Trigger Events
| Event | Action |
|---|---|
phase:queued |
Dispatch ready phases → dispatch their tasks to idle agents |
agent:stopped |
Auto-complete task (unless user_requested), re-dispatch queued tasks (freed agent slot) |
agent:crashed |
Auto-retry crashed task up to MAX_TASK_RETRIES (3). Increments retryCount, resets status to pending, re-queues. Exceeding retries leaves task in_progress for manual intervention. |
task:completed |
Merge task branch (if branch exists), check phase completion, dispatch next queued task |
Conflict Resolution → Dispatch Flow
When a task branch merge produces conflicts:
mergeTaskIntoPhase()detects conflicts frombranchManager.mergeBranch()- Calls
conflictResolutionService.handleConflict()which creates a "Resolve conflicts" task (with dedup — skips if an identical pending/in_progress resolution task already exists) - The original task is not blocked — it was already completed by
handleAgentStoppedbefore the merge attempt. The pending resolution task prevents premature phase completion. - Orchestrator queues the new conflict task via
dispatchManager.queue() scheduleDispatch()picks it up and assigns it to an idle agent
Crash Recovery
When an agent crashes (agent:crashed event), the orchestrator automatically retries the task:
- Finds the task associated with the crashed agent
- Checks
task.retryCountagainstMAX_TASK_RETRIES(3) - If under limit: increments
retryCount, resets task topending, re-queues for dispatch - If over limit: logs warning, task stays
in_progressfor manual intervention
On server restart, recoverDispatchQueues() also recovers:
- Stuck
in_progresstasks whose agents are dead (status is notrunningorwaiting_for_input) — reset topendingand re-queued - Erroneously
blockedtasks whose agents completed successfully (status isidleorstopped) — markedcompletedso the phase can progress. This handles the legacy case where conflict resolution incorrectly blocked already-completed tasks. - Fully-completed
in_progressphases — after task recovery, if all tasks in anin_progressphase are completed, triggershandlePhaseAllTasksDoneto complete/review the phase
Manual retry via retryBlockedTask() resets retryCount to 0, giving the task a fresh set of automatic retries.
Coalesced Scheduling
Multiple rapid events (e.g. several phase:queued from queueAllPhases) are coalesced into a single async dispatch cycle via scheduleDispatch(). The cycle loops dispatchNextPhase() + dispatchNext() until both queues are drained, then re-runs if new events arrived during execution.
Execution Mode Behavior
- YOLO: phase completes → auto-merge (if branch exists, skipped otherwise) → auto-dispatch next phase → auto-dispatch tasks
- review_per_phase: phase completes → set
pending_review→ STOP. User approves →approveAndMergePhase()→ merge → dispatch next phase → dispatch tasks - No branch: Phase completion check runs regardless of branch existence. Merge steps are skipped; status transitions still fire.
updateTaskStatustRPC routes completions throughdispatchManager.completeTask()to emittask:completed. - request-changes (phase): phase
pending_review→ user requests changes → creates revision task (category='review', dedup guard skips if active review exists) with threaded comments ([comment:ID]tags + reply threads) → phase reset toin_progress→ agent reads comments, fixes code, writes.cw/output/comment-responses.json→ OutputHandler creates reply comments and optionally resolves threads → phase returns topending_review - request-changes (initiative): initiative
pending_review→ user requests changes → creates/reuses "Finalization" phase → creates review task → initiative reset toactive→ agent fixes → initiative returns topending_review