Files
Codewalkers/docs/dispatch-events.md
Lukas May 0f1c578269 fix: Fail fast when agent worktree creation or branch setup fails
Previously, branch computation errors and ensureBranch failures were
silently swallowed for all tasks, allowing execution agents to spawn
without proper git isolation. This caused alert-pony to commit directly
to main instead of its task branch.

- manager.ts: Verify each project worktree subdirectory exists after
  createProjectWorktrees; throw if any are missing. Convert passive
  cwdVerified log to a hard guard.
- dispatch/manager.ts: Make branch computation and ensureBranch errors
  fatal for execution tasks (execute, verify, merge, review) while
  keeping them non-fatal for planning tasks.
2026-03-06 14:08:59 +01:00

143 lines
9.3 KiB
Markdown

# Dispatch & Events
`apps/server/dispatch/` — Task and phase dispatch queues. `apps/server/events/` — Typed event bus.
## Event Bus
`apps/server/events/` — Typed pub/sub system for inter-module communication.
### Architecture
- **Port**: `EventBus` interface with `emit(event)` and `on(type, handler)`
- **Adapter**: `TypedEventBus` using Node.js `EventEmitter`
- All events implement `BaseEvent { type, timestamp, payload }`
### Event Types (58)
| Category | Events | Count |
|----------|--------|-------|
| **Agent** | `agent:spawned`, `agent:stopped`, `agent:crashed`, `agent:resumed`, `agent:account_switched`, `agent:deleted`, `agent:waiting`, `agent:output` | 8 |
| **Task** | `task:queued`, `task:dispatched`, `task:completed`, `task:blocked` | 4 |
| **Phase** | `phase:queued`, `phase:started`, `phase:completed`, `phase:blocked`, `phase:changes_requested` | 5 |
| **Merge** | `merge:queued`, `merge:started`, `merge:completed`, `merge:conflicted` | 4 |
| **Page** | `page:created`, `page:updated`, `page:deleted` | 3 |
| **Process** | `process:spawned`, `process:stopped`, `process:crashed` | 3 |
| **Server** | `server:started`, `server:stopped` | 2 |
| **Worktree** | `worktree:created`, `worktree:removed`, `worktree:merged`, `worktree:conflict` | 4 |
| **Account** | `account:credentials_refreshed`, `account:credentials_expired`, `account:credentials_validated` | 3 |
| **Preview** | `preview:building`, `preview:ready`, `preview:stopped`, `preview:failed` | 4 |
| **Conversation** | `conversation:created`, `conversation:answered` | 2 | `conversation:created` triggers auto-resume of idle target agents via `resumeForConversation()` |
| **Chat** | `chat:message_created`, `chat:session_closed` | 2 | Chat session lifecycle events |
| **Initiative** | `initiative:pending_review`, `initiative:review_approved`, `initiative:changes_requested` | 3 | Initiative-level review gate events |
| **Project** | `project:synced`, `project:sync_failed` | 2 | Remote sync results from `ProjectSyncManager` |
| **Log** | `log:entry` | 1 |
### Key Event Payloads
```typescript
AgentSpawnedEvent { agentId, name, taskId, worktreeId, provider }
AgentStoppedEvent { agentId, name, taskId, reason }
// reason: 'user_requested'|'task_complete'|'error'|'waiting_for_input'|
// 'context_complete'|'plan_complete'|'detail_complete'|'refine_complete'|'chat_complete'
AgentWaitingEvent { agentId, name, taskId, sessionId, questions[] }
AgentOutputEvent { agentId, stream, data }
TaskCompletedEvent { taskId, agentId, success, message }
TaskQueuedEvent { taskId, priority, dependsOn[] }
PhaseStartedEvent { phaseId, initiativeId }
MergeConflictedEvent { taskId, agentId, worktreeId, targetBranch, conflictingFiles[] }
PhaseChangesRequestedEvent { phaseId, initiativeId, taskId, commentCount }
AccountCredentialsRefreshedEvent { accountId, expiresAt, previousExpiresAt? }
InitiativePendingReviewEvent { initiativeId, branch }
InitiativeReviewApprovedEvent { initiativeId, branch, strategy: 'push_branch' | 'merge_and_push' }
InitiativeChangesRequestedEvent { initiativeId, phaseId, taskId }
```
## Task Dispatch
`apps/server/dispatch/` — In-memory queue with dependency-ordered dispatch.
### Architecture
- **Port**: `DispatchManager` interface
- **Adapter**: `DefaultDispatchManager`
### How Task Dispatch Works
1. **Queue**`queue(taskId)` fetches task dependencies, adds to internal Map
2. **Dispatch**`dispatchNext()` finds highest-priority task with all deps complete
3. **Context gathering** — Before spawn, `dispatchNext()` gathers initiative context (initiative, phase, tasks, pages) and passes as `inputContext` to the agent. Agents receive `.cw/input/task.md`, `initiative.md`, `phase.md`, `context/tasks/`, `context/phases/`, and `pages/`.
4. **Priority order**: high > medium > low, then oldest first (FIFO within priority)
5. **Planning skip** — Planning-category tasks (research, discuss, plan, detail, refine) skip auto-dispatch — they use the architect flow
7. **Summary propagation**`completeTask()` reads the completing agent's `result.message` and stores it on the task's `summary` column. Dependent tasks see this summary in `context/tasks/<id>.md` frontmatter.
8. **Spawn failure** — If `agentManager.spawn()` throws, the task is blocked via `blockTask()` with the error message. The dispatch cycle continues instead of crashing.
9. **Retry blocked**`retryBlockedTask(taskId)` resets a blocked task to pending and re-queues it. Exposed via tRPC `retryBlockedTask` mutation. The UI shows a Retry button in the task slide-over when status is `blocked`.
10. **Branch validation** — Branch computation and `ensureBranch` errors are fatal for execution tasks (execute, verify, merge, review) but non-fatal for planning tasks. This prevents agents from spawning without proper branch isolation.
### DispatchManager Methods
| Method | Purpose |
|--------|---------|
| `queue(taskId)` | Add task to dispatch queue |
| `dispatchNext()` | Find and dispatch next ready task |
| `getNextDispatchable()` | Get next task without dispatching |
| `completeTask(taskId, agentId?)` | Complete task |
| `blockTask(taskId, reason)` | Block task with reason |
| `retryBlockedTask(taskId)` | Reset blocked task to pending and re-queue |
| `getQueueState()` | Return queued, ready, blocked tasks |
## Phase Dispatch
`DefaultPhaseDispatchManager` — Same pattern for phases:
1. **Queue**`queuePhase(phaseId)` validates phase is approved, gets dependencies
2. **Dispatch**`dispatchNextPhase()` finds phase with all deps complete
3. **Branch creation** — Initiative and phase branches are created in all linked project clones. On failure, the phase is blocked via `blockPhase()` and tasks are NOT queued.
4. **Auto-queue tasks** — When phase starts (branches confirmed), pending execution tasks are queued (planning-category tasks excluded)
5. **Events**`phase:queued`, `phase:started`, `phase:completed`, `phase:blocked`
### PhaseDispatchManager Methods
| Method | Purpose |
|--------|---------|
| `queuePhase(phaseId)` | Queue approved phase |
| `dispatchNextPhase()` | Start next ready phase, auto-queue its tasks |
| `getNextDispatchablePhase()` | Get next phase without dispatching |
| `completePhase(phaseId)` | Mark phase complete |
| `blockPhase(phaseId, reason)` | Block phase |
| `getPhaseQueueState()` | Return queued, ready, blocked phases |
## Auto-Dispatch (ExecutionOrchestrator)
`apps/server/execution/orchestrator.ts` — Connects the queue to agent spawning via event-driven auto-dispatch.
### Trigger Events
| Event | Action |
|-------|--------|
| `phase:queued` | Dispatch ready phases → dispatch their tasks to idle agents |
| `agent:stopped` | Auto-complete task (unless user_requested), re-dispatch queued tasks (freed agent slot) |
| `agent:crashed` | Auto-retry crashed task up to `MAX_TASK_RETRIES` (3). Increments `retryCount`, resets status to `pending`, re-queues. Exceeding retries leaves task `in_progress` for manual intervention. |
| `task:completed` | Merge task branch (if branch exists), check phase completion, dispatch next queued task |
### Crash Recovery
When an agent crashes (`agent:crashed` event), the orchestrator automatically retries the task:
1. Finds the task associated with the crashed agent
2. Checks `task.retryCount` against `MAX_TASK_RETRIES` (3)
3. If under limit: increments `retryCount`, resets task to `pending`, re-queues for dispatch
4. If over limit: logs warning, task stays `in_progress` for manual intervention
On server restart, `recoverDispatchQueues()` also recovers stuck `in_progress` tasks whose agents are dead (status is not `running` or `waiting_for_input`). These are reset to `pending` and re-queued.
Manual retry via `retryBlockedTask()` resets `retryCount` to 0, giving the task a fresh set of automatic retries.
### Coalesced Scheduling
Multiple rapid events (e.g. several `phase:queued` from `queueAllPhases`) are coalesced into a single async dispatch cycle via `scheduleDispatch()`. The cycle loops `dispatchNextPhase()` + `dispatchNext()` until both queues are drained, then re-runs if new events arrived during execution.
### Execution Mode Behavior
- **YOLO**: phase completes → auto-merge (if branch exists, skipped otherwise) → auto-dispatch next phase → auto-dispatch tasks
- **review_per_phase**: phase completes → set `pending_review` → STOP. User approves → `approveAndMergePhase()` → merge → dispatch next phase → dispatch tasks
- **No branch**: Phase completion check runs regardless of branch existence. Merge steps are skipped; status transitions still fire. `updateTaskStatus` tRPC routes completions through `dispatchManager.completeTask()` to emit `task:completed`.
- **request-changes (phase)**: phase `pending_review` → user requests changes → creates revision task (category=`'review'`, dedup guard skips if active review exists) with threaded comments (`[comment:ID]` tags + reply threads) → phase reset to `in_progress` → agent reads comments, fixes code, writes `.cw/output/comment-responses.json` → OutputHandler creates reply comments and optionally resolves threads → phase returns to `pending_review`
- **request-changes (initiative)**: initiative `pending_review` → user requests changes → creates/reuses "Finalization" phase → creates review task → initiative reset to `active` → agent fixes → initiative returns to `pending_review`