# Conflicts: # apps/server/drizzle/meta/0037_snapshot.json # apps/server/drizzle/meta/_journal.json
170 lines
12 KiB
Markdown
170 lines
12 KiB
Markdown
# Dispatch & Events
|
|
|
|
`apps/server/dispatch/` — Task and phase dispatch queues. `apps/server/events/` — Typed event bus.
|
|
|
|
## Event Bus
|
|
|
|
`apps/server/events/` — Typed pub/sub system for inter-module communication.
|
|
|
|
### Architecture
|
|
- **Port**: `EventBus` interface with `emit(event)` and `on(type, handler)`
|
|
- **Adapter**: `TypedEventBus` using Node.js `EventEmitter`
|
|
- All events implement `BaseEvent { type, timestamp, payload }`
|
|
|
|
### Event Types (58)
|
|
|
|
| Category | Events | Count |
|
|
|----------|--------|-------|
|
|
| **Agent** | `agent:spawned`, `agent:stopped`, `agent:crashed`, `agent:resumed`, `agent:account_switched`, `agent:deleted`, `agent:waiting`, `agent:output` | 8 |
|
|
| **Task** | `task:queued`, `task:dispatched`, `task:completed`, `task:blocked` | 4 |
|
|
| **Phase** | `phase:queued`, `phase:started`, `phase:completed`, `phase:blocked`, `phase:changes_requested` | 5 |
|
|
| **Merge** | `merge:queued`, `merge:started`, `merge:completed`, `merge:conflicted` | 4 |
|
|
| **Page** | `page:created`, `page:updated`, `page:deleted` | 3 |
|
|
| **Process** | `process:spawned`, `process:stopped`, `process:crashed` | 3 |
|
|
| **Server** | `server:started`, `server:stopped` | 2 |
|
|
| **Worktree** | `worktree:created`, `worktree:removed`, `worktree:merged`, `worktree:conflict` | 4 |
|
|
| **Account** | `account:credentials_refreshed`, `account:credentials_expired`, `account:credentials_validated` | 3 |
|
|
| **Preview** | `preview:building`, `preview:ready`, `preview:stopped`, `preview:failed` | 4 |
|
|
| **Conversation** | `conversation:created`, `conversation:answered` | 2 | `conversation:created` triggers auto-resume of idle target agents via `resumeForConversation()` |
|
|
| **Chat** | `chat:message_created`, `chat:session_closed` | 2 | Chat session lifecycle events |
|
|
| **Initiative** | `initiative:pending_review`, `initiative:review_approved`, `initiative:changes_requested` | 3 | Initiative-level review gate events |
|
|
| **Project** | `project:synced`, `project:sync_failed` | 2 | Remote sync results from `ProjectSyncManager` |
|
|
| **Log** | `log:entry` | 1 |
|
|
|
|
### Key Event Payloads
|
|
|
|
```typescript
|
|
AgentSpawnedEvent { agentId, name, taskId, worktreeId, provider }
|
|
AgentStoppedEvent { agentId, name, taskId, reason }
|
|
// reason: 'user_requested'|'task_complete'|'error'|'waiting_for_input'|
|
|
// 'context_complete'|'plan_complete'|'detail_complete'|'refine_complete'|'chat_complete'
|
|
AgentWaitingEvent { agentId, name, taskId, sessionId, questions[] }
|
|
AgentOutputEvent { agentId, stream, data }
|
|
TaskCompletedEvent { taskId, agentId, success, message }
|
|
TaskQueuedEvent { taskId, priority, dependsOn[] }
|
|
PhaseStartedEvent { phaseId, initiativeId }
|
|
MergeConflictedEvent { taskId, agentId, worktreeId, targetBranch, conflictingFiles[] }
|
|
PhaseChangesRequestedEvent { phaseId, initiativeId, taskId, commentCount }
|
|
AccountCredentialsRefreshedEvent { accountId, expiresAt, previousExpiresAt? }
|
|
InitiativePendingReviewEvent { initiativeId, branch }
|
|
InitiativeReviewApprovedEvent { initiativeId, branch, strategy: 'push_branch' | 'merge_and_push' }
|
|
InitiativeChangesRequestedEvent { initiativeId, phaseId, taskId }
|
|
```
|
|
|
|
## Task Dispatch
|
|
|
|
`apps/server/dispatch/` — In-memory queue with dependency-ordered dispatch.
|
|
|
|
### Architecture
|
|
- **Port**: `DispatchManager` interface
|
|
- **Adapter**: `DefaultDispatchManager`
|
|
|
|
### How Task Dispatch Works
|
|
|
|
1. **Queue** — `queue(taskId)` fetches task dependencies, adds to internal Map
|
|
2. **Dispatch** — `dispatchNext()` finds highest-priority task with all deps complete
|
|
3. **Context gathering** — Before spawn, `dispatchNext()` gathers initiative context (initiative, phase, tasks, pages) and passes as `inputContext` to the agent. Agents receive `.cw/input/task.md`, `initiative.md`, `phase.md`, `context/tasks/`, `context/phases/`, and `pages/`.
|
|
4. **Priority order**: high > medium > low, then oldest first (FIFO within priority)
|
|
5. **Planning skip** — Planning-category tasks (research, discuss, plan, detail, refine) skip auto-dispatch — they use the architect flow
|
|
7. **Summary propagation** — `completeTask()` reads the completing agent's `result.message` and stores it on the task's `summary` column. Dependent tasks see this summary in `context/tasks/<id>.md` frontmatter.
|
|
8. **Spawn failure** — If `agentManager.spawn()` throws, the task is blocked via `blockTask()` with the error message. The dispatch cycle continues instead of crashing.
|
|
9. **Retry blocked** — `retryBlockedTask(taskId)` resets a blocked task to pending and re-queues it. Exposed via tRPC `retryBlockedTask` mutation. The UI shows a Retry button in the task slide-over when status is `blocked`.
|
|
10. **Branch validation** — Branch computation and `ensureBranch` errors are fatal for execution tasks (execute, verify, merge, review) but non-fatal for planning tasks. This prevents agents from spawning without proper branch isolation.
|
|
|
|
### DispatchManager Methods
|
|
|
|
| Method | Purpose |
|
|
|--------|---------|
|
|
| `queue(taskId)` | Add task to dispatch queue |
|
|
| `dispatchNext()` | Find and dispatch next ready task |
|
|
| `getNextDispatchable()` | Get next task without dispatching |
|
|
| `completeTask(taskId, agentId?)` | Complete task |
|
|
| `blockTask(taskId, reason)` | Block task with reason |
|
|
| `retryBlockedTask(taskId)` | Reset blocked task to pending and re-queue |
|
|
| `getQueueState()` | Return queued, ready, blocked tasks |
|
|
|
|
## Phase Dispatch
|
|
|
|
`DefaultPhaseDispatchManager` — Same pattern for phases:
|
|
|
|
1. **Queue** — `queuePhase(phaseId)` validates phase is approved, gets dependencies
|
|
2. **Dispatch** — `dispatchNextPhase()` finds phase with all deps complete
|
|
3. **Branch creation** — Initiative and phase branches are created in all linked project clones. On failure, the phase is blocked via `blockPhase()` and tasks are NOT queued.
|
|
4. **Auto-queue tasks** — When phase starts (branches confirmed), pending execution tasks are queued (planning-category tasks excluded)
|
|
5. **Events** — `phase:queued`, `phase:started`, `phase:completed`, `phase:blocked`
|
|
|
|
### Auto-Integration Phase
|
|
|
|
When `queueAllPhases` is called (i.e. the user clicks "Execute"), it auto-creates an **Integration** phase if the initiative has multiple end phases (leaf nodes with no dependents). This catches cross-phase incompatibilities before the initiative reaches review.
|
|
|
|
- **Trigger**: `queueAllPhases` in `apps/server/trpc/routers/phase-dispatch.ts`
|
|
- **Guard**: Only created when `endPhaseIds.length > 1` and no existing "Integration" phase
|
|
- **Status**: Created as `approved` (same pattern as Finalization in orchestrator.ts)
|
|
- **Dependencies**: Integration depends on all end phases — dispatched last
|
|
- **Task**: A single `verify` category task instructs the agent to build, run tests, check types, and review cross-phase imports
|
|
- **Idempotency**: Name-based check prevents duplicates on re-execution
|
|
- **Coexistence**: Independent of the Finalization phase (different purpose, different trigger)
|
|
|
|
### PhaseDispatchManager Methods
|
|
|
|
| Method | Purpose |
|
|
|--------|---------|
|
|
| `queuePhase(phaseId)` | Queue approved phase |
|
|
| `dispatchNextPhase()` | Start next ready phase, auto-queue its tasks |
|
|
| `getNextDispatchablePhase()` | Get next phase without dispatching |
|
|
| `completePhase(phaseId)` | Mark phase complete |
|
|
| `blockPhase(phaseId, reason)` | Block phase |
|
|
| `getPhaseQueueState()` | Return queued, ready, blocked phases |
|
|
|
|
## Auto-Dispatch (ExecutionOrchestrator)
|
|
|
|
`apps/server/execution/orchestrator.ts` — Connects the queue to agent spawning via event-driven auto-dispatch.
|
|
|
|
### Trigger Events
|
|
|
|
| Event | Action |
|
|
|-------|--------|
|
|
| `phase:queued` | Dispatch ready phases → dispatch their tasks to idle agents |
|
|
| `agent:stopped` | When `task_complete`: check `shouldRunQualityReview()` — if conditions met, spawn quality-review agent and set task to `quality_review`; otherwise auto-complete task. Manual stops (`user_requested`) are skipped. Re-dispatch queued tasks after either path. |
|
|
| `agent:crashed` | Auto-retry crashed task up to `MAX_TASK_RETRIES` (3). Increments `retryCount`, resets status to `pending`, re-queues. Exceeding retries leaves task `in_progress` for manual intervention. |
|
|
| `task:completed` | Merge task branch (if branch exists), check phase completion, dispatch next queued task |
|
|
|
|
### Conflict Resolution → Dispatch Flow
|
|
|
|
When a task branch merge produces conflicts:
|
|
1. `mergeTaskIntoPhase()` detects conflicts from `branchManager.mergeBranch()`
|
|
2. Calls `conflictResolutionService.handleConflict()` which creates a "Resolve conflicts" task (with dedup — skips if an identical pending/in_progress resolution task already exists)
|
|
3. The original task is **not blocked** — it was already completed by `handleAgentStopped` before the merge attempt. The pending resolution task prevents premature phase completion.
|
|
4. Orchestrator queues the new conflict task via `dispatchManager.queue()`
|
|
5. `scheduleDispatch()` picks it up and assigns it to an idle agent
|
|
|
|
### Crash Recovery
|
|
|
|
When an agent crashes (`agent:crashed` event), the orchestrator automatically retries the task:
|
|
1. Finds the task associated with the crashed agent
|
|
2. Checks `task.retryCount` against `MAX_TASK_RETRIES` (3)
|
|
3. If under limit: increments `retryCount`, resets task to `pending`, re-queues for dispatch
|
|
4. If over limit: logs warning, task stays `in_progress` for manual intervention
|
|
|
|
On server restart, `recoverDispatchQueues()` also recovers:
|
|
- Stuck `in_progress` tasks whose agents are dead (status is not `running` or `waiting_for_input`) — reset to `pending` and re-queued
|
|
- Erroneously `blocked` tasks whose agents completed successfully (status is `idle` or `stopped`) — marked `completed` so the phase can progress. This handles the legacy case where conflict resolution incorrectly blocked already-completed tasks.
|
|
- Stale duplicate planning tasks — if a phase has both a completed and a pending task of the same planning category (e.g. two `detail` tasks from a crash-and-retry), the pending one is marked `completed` with summary "Superseded by retry"
|
|
- Fully-completed `in_progress` phases — after task recovery, if all tasks in an `in_progress` phase are completed, triggers `handlePhaseAllTasksDone` to complete/review the phase
|
|
|
|
The `detailPhase` mutation in `architect.ts` also cleans up orphaned pending/in_progress detail tasks before creating new ones, preventing duplicates at the source.
|
|
|
|
Manual retry via `retryBlockedTask()` resets `retryCount` to 0, giving the task a fresh set of automatic retries.
|
|
|
|
### Coalesced Scheduling
|
|
|
|
Multiple rapid events (e.g. several `phase:queued` from `queueAllPhases`) are coalesced into a single async dispatch cycle via `scheduleDispatch()`. The cycle loops `dispatchNextPhase()` + `dispatchNext()` until both queues are drained, then re-runs if new events arrived during execution.
|
|
|
|
### Execution Mode Behavior
|
|
|
|
- **YOLO**: phase completes → auto-merge (if branch exists, skipped otherwise) → auto-dispatch next phase → auto-dispatch tasks
|
|
- **review_per_phase**: phase completes → set `pending_review` → STOP. User approves → `approveAndMergePhase()` → merge → dispatch next phase → dispatch tasks
|
|
- **No branch**: Phase completion check runs regardless of branch existence. Merge steps are skipped; status transitions still fire. `updateTaskStatus` tRPC routes completions through `dispatchManager.completeTask()` to emit `task:completed`.
|
|
- **request-changes (phase)**: phase `pending_review` → user requests changes → creates revision task (category=`'review'`, dedup guard skips if active review exists) with threaded comments (`[comment:ID]` tags + reply threads) → phase reset to `in_progress` → agent reads comments, fixes code, writes `.cw/output/comment-responses.json` → OutputHandler creates reply comments and optionally resolves threads → phase returns to `pending_review`
|
|
- **request-changes (initiative)**: initiative `pending_review` → user requests changes → creates/reuses "Finalization" phase → creates review task → initiative reset to `active` → agent fixes → initiative returns to `pending_review`
|