fix: Handle spawn and branch failures gracefully in dispatch cycle

Wrap agentManager.spawn() in try/catch — on failure, block the task
instead of crashing the entire dispatch cycle. Move phase status update
to after branch creation succeeds — on branch failure, block the phase
and skip task queuing. Fix statement-breakpoint markers in migration
0020 to use separate lines.
This commit is contained in:
Lukas May
2026-03-05 16:36:39 +01:00
parent 42f5bcb80a
commit da8c714de2
4 changed files with 36 additions and 20 deletions

View File

@@ -15,7 +15,7 @@ import type {
TaskDispatchedEvent, TaskDispatchedEvent,
TaskPendingApprovalEvent, TaskPendingApprovalEvent,
} from '../events/index.js'; } from '../events/index.js';
import type { AgentManager, AgentResult } from '../agent/types.js'; import type { AgentManager, AgentResult, AgentInfo } from '../agent/types.js';
import type { TaskRepository } from '../db/repositories/task-repository.js'; import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { MessageRepository } from '../db/repositories/message-repository.js'; import type { MessageRepository } from '../db/repositories/message-repository.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js'; import type { AgentRepository } from '../db/repositories/agent-repository.js';
@@ -397,15 +397,23 @@ export class DefaultDispatchManager implements DispatchManager {
} }
// Spawn agent with task (alias auto-generated by agent manager) // Spawn agent with task (alias auto-generated by agent manager)
const agent = await this.agentManager.spawn({ let agent: AgentInfo;
taskId: nextTask.taskId, try {
initiativeId: task.initiativeId ?? undefined, agent = await this.agentManager.spawn({
phaseId: task.phaseId ?? undefined, taskId: nextTask.taskId,
prompt: buildExecutePrompt(task.description || task.name), initiativeId: task.initiativeId ?? undefined,
baseBranch, phaseId: task.phaseId ?? undefined,
branchName, prompt: buildExecutePrompt(task.description || task.name),
inputContext, baseBranch,
}); branchName,
inputContext,
});
} catch (err) {
const reason = `Spawn failed: ${err instanceof Error ? err.message : String(err)}`;
log.error({ taskId: nextTask.taskId, err: reason }, 'agent spawn failed, blocking task');
await this.blockTask(nextTask.taskId, reason);
return { success: false, taskId: nextTask.taskId, reason };
}
log.info({ taskId: nextTask.taskId, agentId: agent.id }, 'task dispatched'); log.info({ taskId: nextTask.taskId, agentId: agent.id }, 'task dispatched');

View File

@@ -167,10 +167,7 @@ export class DefaultPhaseDispatchManager implements PhaseDispatchManager {
}; };
} }
// Update phase status to 'in_progress' // Create phase branch in all linked project clones (must succeed before marking in_progress)
await this.phaseRepository.update(nextPhase.phaseId, { status: 'in_progress' });
// Create phase branch in all linked project clones
if (this.initiativeRepository && this.projectRepository && this.branchManager && this.workspaceRoot) { if (this.initiativeRepository && this.projectRepository && this.branchManager && this.workspaceRoot) {
try { try {
const initiative = await this.initiativeRepository.findById(phase.initiativeId); const initiative = await this.initiativeRepository.findById(phase.initiativeId);
@@ -204,10 +201,16 @@ export class DefaultPhaseDispatchManager implements PhaseDispatchManager {
log.info({ phaseId: nextPhase.phaseId, phBranch, initBranch }, 'phase branch created'); log.info({ phaseId: nextPhase.phaseId, phBranch, initBranch }, 'phase branch created');
} }
} catch (err) { } catch (err) {
log.error({ phaseId: nextPhase.phaseId, err: err instanceof Error ? err.message : String(err) }, 'failed to create phase branch'); const reason = `Branch creation failed: ${err instanceof Error ? err.message : String(err)}`;
log.error({ phaseId: nextPhase.phaseId, err: reason }, 'phase branch creation failed, blocking phase');
await this.blockPhase(nextPhase.phaseId, reason);
return { success: false, phaseId: nextPhase.phaseId, reason };
} }
} }
// Update phase status to 'in_progress' (only after branches confirmed)
await this.phaseRepository.update(nextPhase.phaseId, { status: 'in_progress' });
// Remove from queue (now being worked on) // Remove from queue (now being worked on)
this.phaseQueue.delete(nextPhase.phaseId); this.phaseQueue.delete(nextPhase.phaseId);

View File

@@ -9,7 +9,8 @@ CREATE TABLE `change_sets` (
`status` text NOT NULL DEFAULT 'applied', `status` text NOT NULL DEFAULT 'applied',
`reverted_at` integer, `reverted_at` integer,
`created_at` integer NOT NULL `created_at` integer NOT NULL
);--> statement-breakpoint );
--> statement-breakpoint
CREATE TABLE `change_set_entries` ( CREATE TABLE `change_set_entries` (
`id` text PRIMARY KEY NOT NULL, `id` text PRIMARY KEY NOT NULL,
`change_set_id` text NOT NULL REFERENCES `change_sets`(`id`) ON DELETE CASCADE, `change_set_id` text NOT NULL REFERENCES `change_sets`(`id`) ON DELETE CASCADE,
@@ -20,6 +21,8 @@ CREATE TABLE `change_set_entries` (
`new_state` text, `new_state` text,
`sort_order` integer NOT NULL DEFAULT 0, `sort_order` integer NOT NULL DEFAULT 0,
`created_at` integer NOT NULL `created_at` integer NOT NULL
);--> statement-breakpoint );
CREATE INDEX `change_sets_initiative_id_idx` ON `change_sets`(`initiative_id`);--> statement-breakpoint --> statement-breakpoint
CREATE INDEX `change_sets_initiative_id_idx` ON `change_sets`(`initiative_id`);
--> statement-breakpoint
CREATE INDEX `change_set_entries_change_set_id_idx` ON `change_set_entries`(`change_set_id`); CREATE INDEX `change_set_entries_change_set_id_idx` ON `change_set_entries`(`change_set_id`);

View File

@@ -66,6 +66,7 @@ AccountCredentialsRefreshedEvent { accountId, expiresAt, previousExpiresAt? }
7. **Summary propagation**`completeTask()` reads the completing agent's `result.message` and stores it on the task's `summary` column. Dependent tasks see this summary in `context/tasks/<id>.md` frontmatter. 7. **Summary propagation**`completeTask()` reads the completing agent's `result.message` and stores it on the task's `summary` column. Dependent tasks see this summary in `context/tasks/<id>.md` frontmatter.
8. **Approval check**`completeTask()` checks `requiresApproval` (task-level, then initiative-level) 8. **Approval check**`completeTask()` checks `requiresApproval` (task-level, then initiative-level)
9. **Approval flow** — If approval required: status → `pending_approval`, emit `task:pending_approval` 9. **Approval flow** — If approval required: status → `pending_approval`, emit `task:pending_approval`
10. **Spawn failure** — If `agentManager.spawn()` throws, the task is blocked via `blockTask()` with the error message. The dispatch cycle continues instead of crashing.
### DispatchManager Methods ### DispatchManager Methods
@@ -85,8 +86,9 @@ AccountCredentialsRefreshedEvent { accountId, expiresAt, previousExpiresAt? }
1. **Queue**`queuePhase(phaseId)` validates phase is approved, gets dependencies 1. **Queue**`queuePhase(phaseId)` validates phase is approved, gets dependencies
2. **Dispatch**`dispatchNextPhase()` finds phase with all deps complete 2. **Dispatch**`dispatchNextPhase()` finds phase with all deps complete
3. **Auto-queue tasks** — When phase starts, pending execution tasks are queued (planning-category tasks excluded) 3. **Branch creation** — Initiative and phase branches are created in all linked project clones. On failure, the phase is blocked via `blockPhase()` and tasks are NOT queued.
4. **Events**`phase:queued`, `phase:started`, `phase:completed`, `phase:blocked` 4. **Auto-queue tasks** — When phase starts (branches confirmed), pending execution tasks are queued (planning-category tasks excluded)
5. **Events**`phase:queued`, `phase:started`, `phase:completed`, `phase:blocked`
### PhaseDispatchManager Methods ### PhaseDispatchManager Methods