From eac03862e392afe1b109fdb80d96ddca0a72d24c Mon Sep 17 00:00:00 2001 From: Lukas May Date: Fri, 6 Mar 2026 12:19:59 +0100 Subject: [PATCH] fix: Prevent lost task completions after server restart MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three bugs causing empty phase diffs when server restarts during agent execution: 1. Startup ordering race: reconcileAfterRestart() emitted agent:stopped before orchestrator registered listeners — events lost. Moved reconciliation to after orchestrator.start(). 2. Stuck in_progress tasks: recoverDispatchQueues() only re-queued pending tasks. Added recovery for in_progress tasks whose agents are dead (not running/waiting_for_input). 3. Branch force-reset destroys work: git branch -f wiped commits when a second agent was dispatched for the same task. Now checks if the branch has commits beyond baseBranch before resetting. Also adds: - agent:crashed handler with auto-retry (MAX_TASK_RETRIES=3) - retryCount column on tasks table + migration - retryCount reset on manual retryBlockedTask() --- apps/server/container.ts | 11 +++-- apps/server/db/schema.ts | 1 + apps/server/dispatch/manager.ts | 4 +- .../drizzle/0034_add_task_retry_count.sql | 1 + apps/server/drizzle/meta/_journal.json | 7 +++ apps/server/execution/orchestrator.ts | 48 ++++++++++++++++++- apps/server/git/manager.ts | 19 ++++++-- docs/database.md | 1 + docs/dispatch-events.md | 15 +++++- 9 files changed, 94 insertions(+), 13 deletions(-) create mode 100644 apps/server/drizzle/0034_add_task_retry_count.sql diff --git a/apps/server/container.ts b/apps/server/container.ts index 5e6aefd..4468184 100644 --- a/apps/server/container.ts +++ b/apps/server/container.ts @@ -187,10 +187,6 @@ export async function createContainer(options?: ContainerOptions): Promise> = new Map(); @@ -44,6 +48,7 @@ export class ExecutionOrchestrator { private conflictResolutionService: ConflictResolutionService, private eventBus: EventBus, private workspaceRoot: string, + private agentRepository?: AgentRepository, ) {} /** @@ -66,6 +71,13 @@ export class ExecutionOrchestrator { }); }); + // Auto-retry crashed agent tasks (up to MAX_TASK_RETRIES) + this.eventBus.on('agent:crashed', (event) => { + this.handleAgentCrashed(event).catch((err) => { + log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling agent:crashed'); + }); + }); + // Recover in-memory dispatch queues from DB state (survives server restarts) this.recoverDispatchQueues().catch((err) => { log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch queue recovery failed'); @@ -111,6 +123,27 @@ export class ExecutionOrchestrator { this.scheduleDispatch(); } + private async handleAgentCrashed(event: AgentCrashedEvent): Promise { + const { taskId, agentId, error } = event.payload; + if (!taskId) return; + + const task = await this.taskRepository.findById(taskId); + if (!task || task.status !== 'in_progress') return; + + const retryCount = (task.retryCount ?? 0) + 1; + if (retryCount > MAX_TASK_RETRIES) { + log.warn({ taskId, agentId, retryCount, error }, 'task exceeded max retries, leaving in_progress'); + return; + } + + // Reset task for re-dispatch with incremented retry count + await this.taskRepository.update(taskId, { status: 'pending', retryCount }); + await this.dispatchManager.queue(taskId); + log.info({ taskId, agentId, retryCount, error }, 'crashed task re-queued for retry'); + + this.scheduleDispatch(); + } + private async runDispatchCycle(): Promise { this.dispatchRunning = true; try { @@ -560,7 +593,7 @@ export class ExecutionOrchestrator { } } - // Re-queue pending tasks for in_progress phases into the task dispatch queue + // Re-queue pending tasks and recover stuck in_progress tasks for in_progress phases if (phase.status === 'in_progress') { const tasks = await this.taskRepository.findByPhaseId(phase.id); for (const task of tasks) { @@ -571,6 +604,17 @@ export class ExecutionOrchestrator { } catch { // Already queued or task issue } + } else if (task.status === 'in_progress' && this.agentRepository) { + // Check if the assigned agent is still alive + const agent = await this.agentRepository.findByTaskId(task.id); + const isAlive = agent && (agent.status === 'running' || agent.status === 'waiting_for_input'); + if (!isAlive) { + // Agent is dead — reset task for re-dispatch + await this.taskRepository.update(task.id, { status: 'pending' }); + await this.dispatchManager.queue(task.id); + tasksRecovered++; + log.info({ taskId: task.id, agentId: agent?.id }, 'recovered stuck in_progress task (dead agent)'); + } } } } diff --git a/apps/server/git/manager.ts b/apps/server/git/manager.ts index a28dd2a..95539af 100644 --- a/apps/server/git/manager.ts +++ b/apps/server/git/manager.ts @@ -70,10 +70,21 @@ export class SimpleGitWorktreeManager implements WorktreeManager { // Create worktree — reuse existing branch or create new one const branchExists = await this.branchExists(branch); if (branchExists) { - // Branch exists from a previous run — reset it to baseBranch and check it out. - // Only safe because branch !== baseBranch (checked above), so we're resetting - // an agent-scoped branch, not a shared branch like main or the initiative branch. - await this.git.raw(['branch', '-f', branch, baseBranch]); + // Branch exists from a previous run. Check if it has commits beyond baseBranch + // before resetting — a previous agent may have done real work on this branch. + try { + const aheadCount = await this.git.raw(['rev-list', '--count', `${baseBranch}..${branch}`]); + if (parseInt(aheadCount.trim(), 10) > 0) { + log.warn({ branch, baseBranch, aheadBy: aheadCount.trim() }, 'branch has commits beyond base, preserving'); + } else { + await this.git.raw(['branch', '-f', branch, baseBranch]); + } + } catch { + // If rev-list fails (e.g. baseBranch doesn't exist yet), fall back to reset + await this.git.raw(['branch', '-f', branch, baseBranch]); + } + // Prune stale worktree references before adding new one + await this.git.raw(['worktree', 'prune']); await this.git.raw(['worktree', 'add', worktreePath, branch]); } else { // git worktree add -b diff --git a/docs/database.md b/docs/database.md index 6afb841..dd12a0b 100644 --- a/docs/database.md +++ b/docs/database.md @@ -51,6 +51,7 @@ All adapters use nanoid() for IDs, auto-manage timestamps, and use Drizzle's `.r | status | text enum | 'pending' \| 'in_progress' \| 'completed' \| 'blocked' | | order | integer | default 0 | | summary | text nullable | Agent result summary — propagated to dependent tasks as context | +| retryCount | integer NOT NULL | default 0, incremented on agent crash auto-retry, reset on manual retry | | createdAt, updatedAt | integer/timestamp | | ### task_dependencies diff --git a/docs/dispatch-events.md b/docs/dispatch-events.md index 5356b0e..d9e336d 100644 --- a/docs/dispatch-events.md +++ b/docs/dispatch-events.md @@ -112,9 +112,22 @@ InitiativeChangesRequestedEvent { initiativeId, phaseId, taskId } | Event | Action | |-------|--------| | `phase:queued` | Dispatch ready phases → dispatch their tasks to idle agents | -| `agent:stopped` | Re-dispatch queued tasks (freed agent slot) | +| `agent:stopped` | Auto-complete task (unless user_requested), re-dispatch queued tasks (freed agent slot) | +| `agent:crashed` | Auto-retry crashed task up to `MAX_TASK_RETRIES` (3). Increments `retryCount`, resets status to `pending`, re-queues. Exceeding retries leaves task `in_progress` for manual intervention. | | `task:completed` | Merge task branch (if branch exists), check phase completion, dispatch next queued task | +### Crash Recovery + +When an agent crashes (`agent:crashed` event), the orchestrator automatically retries the task: +1. Finds the task associated with the crashed agent +2. Checks `task.retryCount` against `MAX_TASK_RETRIES` (3) +3. If under limit: increments `retryCount`, resets task to `pending`, re-queues for dispatch +4. If over limit: logs warning, task stays `in_progress` for manual intervention + +On server restart, `recoverDispatchQueues()` also recovers stuck `in_progress` tasks whose agents are dead (status is not `running` or `waiting_for_input`). These are reset to `pending` and re-queued. + +Manual retry via `retryBlockedTask()` resets `retryCount` to 0, giving the task a fresh set of automatic retries. + ### Coalesced Scheduling Multiple rapid events (e.g. several `phase:queued` from `queueAllPhases`) are coalesced into a single async dispatch cycle via `scheduleDispatch()`. The cycle loops `dispatchNextPhase()` + `dispatchNext()` until both queues are drained, then re-runs if new events arrived during execution.