Files
Codewalkers/docs/dispatch-events.md

10 KiB

Dispatch & Events

apps/server/dispatch/ — Task and phase dispatch queues. apps/server/events/ — Typed event bus.

Event Bus

apps/server/events/ — Typed pub/sub system for inter-module communication.

Architecture

  • Port: EventBus interface with emit(event) and on(type, handler)
  • Adapter: TypedEventBus using Node.js EventEmitter
  • All events implement BaseEvent { type, timestamp, payload }

Event Types (58)

Category Events Count
Agent agent:spawned, agent:stopped, agent:crashed, agent:resumed, agent:account_switched, agent:deleted, agent:waiting, agent:output 8
Task task:queued, task:dispatched, task:completed, task:blocked 4
Phase phase:queued, phase:started, phase:completed, phase:blocked, phase:changes_requested 5
Merge merge:queued, merge:started, merge:completed, merge:conflicted 4
Page page:created, page:updated, page:deleted 3
Process process:spawned, process:stopped, process:crashed 3
Server server:started, server:stopped 2
Worktree worktree:created, worktree:removed, worktree:merged, worktree:conflict 4
Account account:credentials_refreshed, account:credentials_expired, account:credentials_validated 3
Preview preview:building, preview:ready, preview:stopped, preview:failed 4
Conversation conversation:created, conversation:answered 2
Chat chat:message_created, chat:session_closed 2
Initiative initiative:pending_review, initiative:review_approved, initiative:changes_requested 3
Project project:synced, project:sync_failed 2
Log log:entry 1

Key Event Payloads

AgentSpawnedEvent    { agentId, name, taskId, worktreeId, provider }
AgentStoppedEvent    { agentId, name, taskId, reason }
  // reason: 'user_requested'|'task_complete'|'error'|'waiting_for_input'|
  //         'context_complete'|'plan_complete'|'detail_complete'|'refine_complete'|'chat_complete'
AgentWaitingEvent    { agentId, name, taskId, sessionId, questions[] }
AgentOutputEvent     { agentId, stream, data }
TaskCompletedEvent   { taskId, agentId, success, message }
TaskQueuedEvent      { taskId, priority, dependsOn[] }
PhaseStartedEvent    { phaseId, initiativeId }
MergeConflictedEvent { taskId, agentId, worktreeId, targetBranch, conflictingFiles[] }
PhaseChangesRequestedEvent { phaseId, initiativeId, taskId, commentCount }
AccountCredentialsRefreshedEvent { accountId, expiresAt, previousExpiresAt? }
InitiativePendingReviewEvent   { initiativeId, branch }
InitiativeReviewApprovedEvent  { initiativeId, branch, strategy: 'push_branch' | 'merge_and_push' }
InitiativeChangesRequestedEvent { initiativeId, phaseId, taskId }

Task Dispatch

apps/server/dispatch/ — In-memory queue with dependency-ordered dispatch.

Architecture

  • Port: DispatchManager interface
  • Adapter: DefaultDispatchManager

How Task Dispatch Works

  1. Queuequeue(taskId) fetches task dependencies, adds to internal Map
  2. DispatchdispatchNext() finds highest-priority task with all deps complete
  3. Context gathering — Before spawn, dispatchNext() gathers initiative context (initiative, phase, tasks, pages) and passes as inputContext to the agent. Agents receive .cw/input/task.md, initiative.md, phase.md, context/tasks/, context/phases/, and pages/.
  4. Priority order: high > medium > low, then oldest first (FIFO within priority)
  5. Planning skip — Planning-category tasks (research, discuss, plan, detail, refine) skip auto-dispatch — they use the architect flow
  6. Summary propagationcompleteTask() reads the completing agent's result.message and stores it on the task's summary column. Dependent tasks see this summary in context/tasks/<id>.md frontmatter.
  7. Spawn failure — If agentManager.spawn() throws, the task is blocked via blockTask() with the error message. The dispatch cycle continues instead of crashing.
  8. Retry blockedretryBlockedTask(taskId) resets a blocked task to pending and re-queues it. Exposed via tRPC retryBlockedTask mutation. The UI shows a Retry button in the task slide-over when status is blocked.
  9. Branch validation — Branch computation and ensureBranch errors are fatal for execution tasks (execute, verify, merge, review) but non-fatal for planning tasks. This prevents agents from spawning without proper branch isolation.

DispatchManager Methods

Method Purpose
queue(taskId) Add task to dispatch queue
dispatchNext() Find and dispatch next ready task
getNextDispatchable() Get next task without dispatching
completeTask(taskId, agentId?) Complete task
blockTask(taskId, reason) Block task with reason
retryBlockedTask(taskId) Reset blocked task to pending and re-queue
getQueueState() Return queued, ready, blocked tasks

Phase Dispatch

DefaultPhaseDispatchManager — Same pattern for phases:

  1. QueuequeuePhase(phaseId) validates phase is approved, gets dependencies
  2. DispatchdispatchNextPhase() finds phase with all deps complete
  3. Branch creation — Initiative and phase branches are created in all linked project clones. On failure, the phase is blocked via blockPhase() and tasks are NOT queued.
  4. Auto-queue tasks — When phase starts (branches confirmed), pending execution tasks are queued (planning-category tasks excluded)
  5. Eventsphase:queued, phase:started, phase:completed, phase:blocked

PhaseDispatchManager Methods

Method Purpose
queuePhase(phaseId) Queue approved phase
dispatchNextPhase() Start next ready phase, auto-queue its tasks
getNextDispatchablePhase() Get next phase without dispatching
completePhase(phaseId) Mark phase complete
blockPhase(phaseId, reason) Block phase
getPhaseQueueState() Return queued, ready, blocked phases

Auto-Dispatch (ExecutionOrchestrator)

apps/server/execution/orchestrator.ts — Connects the queue to agent spawning via event-driven auto-dispatch.

Trigger Events

Event Action
phase:queued Dispatch ready phases → dispatch their tasks to idle agents
agent:stopped Auto-complete task (unless user_requested), re-dispatch queued tasks (freed agent slot)
agent:crashed Auto-retry crashed task up to MAX_TASK_RETRIES (3). Increments retryCount, resets status to pending, re-queues. Exceeding retries leaves task in_progress for manual intervention.
task:completed Merge task branch (if branch exists), check phase completion, dispatch next queued task

Conflict Resolution → Dispatch Flow

When a task branch merge produces conflicts:

  1. mergeTaskIntoPhase() detects conflicts from branchManager.mergeBranch()
  2. Calls conflictResolutionService.handleConflict() which creates a "Resolve conflicts" task (with dedup — skips if an identical pending/in_progress resolution task already exists)
  3. The original task is not blocked — it was already completed by handleAgentStopped before the merge attempt. The pending resolution task prevents premature phase completion.
  4. Orchestrator queues the new conflict task via dispatchManager.queue()
  5. scheduleDispatch() picks it up and assigns it to an idle agent

Crash Recovery

When an agent crashes (agent:crashed event), the orchestrator automatically retries the task:

  1. Finds the task associated with the crashed agent
  2. Checks task.retryCount against MAX_TASK_RETRIES (3)
  3. If under limit: increments retryCount, resets task to pending, re-queues for dispatch
  4. If over limit: logs warning, task stays in_progress for manual intervention

On server restart, recoverDispatchQueues() also recovers:

  • Stuck in_progress tasks whose agents are dead (status is not running or waiting_for_input) — reset to pending and re-queued
  • Erroneously blocked tasks whose agents completed successfully (status is idle or stopped) — marked completed so the phase can progress. This handles the legacy case where conflict resolution incorrectly blocked already-completed tasks.
  • Fully-completed in_progress phases — after task recovery, if all tasks in an in_progress phase are completed, triggers handlePhaseAllTasksDone to complete/review the phase

Manual retry via retryBlockedTask() resets retryCount to 0, giving the task a fresh set of automatic retries.

Coalesced Scheduling

Multiple rapid events (e.g. several phase:queued from queueAllPhases) are coalesced into a single async dispatch cycle via scheduleDispatch(). The cycle loops dispatchNextPhase() + dispatchNext() until both queues are drained, then re-runs if new events arrived during execution.

Execution Mode Behavior

  • YOLO: phase completes → auto-merge (if branch exists, skipped otherwise) → auto-dispatch next phase → auto-dispatch tasks
  • review_per_phase: phase completes → set pending_review → STOP. User approves → approveAndMergePhase() → merge → dispatch next phase → dispatch tasks
  • No branch: Phase completion check runs regardless of branch existence. Merge steps are skipped; status transitions still fire. updateTaskStatus tRPC routes completions through dispatchManager.completeTask() to emit task:completed.
  • request-changes (phase): phase pending_review → user requests changes → creates revision task (category='review', dedup guard skips if active review exists) with threaded comments ([comment:ID] tags + reply threads) → phase reset to in_progress → agent reads comments, fixes code, writes .cw/output/comment-responses.json → OutputHandler creates reply comments and optionally resolves threads → phase returns to pending_review
  • request-changes (initiative): initiative pending_review → user requests changes → creates/reuses "Finalization" phase → creates review task → initiative reset to active → agent fixes → initiative returns to pending_review