fix: Prevent lost task completions after server restart

Three bugs causing empty phase diffs when server restarts during agent
execution:

1. Startup ordering race: reconcileAfterRestart() emitted agent:stopped
   before orchestrator registered listeners — events lost. Moved
   reconciliation to after orchestrator.start().

2. Stuck in_progress tasks: recoverDispatchQueues() only re-queued
   pending tasks. Added recovery for in_progress tasks whose agents
   are dead (not running/waiting_for_input).

3. Branch force-reset destroys work: git branch -f wiped commits when
   a second agent was dispatched for the same task. Now checks if the
   branch has commits beyond baseBranch before resetting.

Also adds:
- agent:crashed handler with auto-retry (MAX_TASK_RETRIES=3)
- retryCount column on tasks table + migration
- retryCount reset on manual retryBlockedTask()
This commit is contained in:
Lukas May
2026-03-06 12:19:59 +01:00
parent a69527b7d6
commit eac03862e3
9 changed files with 94 additions and 13 deletions

View File

@@ -187,10 +187,6 @@ export async function createContainer(options?: ContainerOptions): Promise<Conta
);
log.info('agent manager created');
// Reconcile agent state from any previous server session
await agentManager.reconcileAfterRestart();
log.info('agent reconciliation complete');
// Branch manager
const branchManager = new SimpleGitBranchManager();
log.info('branch manager created');
@@ -250,10 +246,17 @@ export async function createContainer(options?: ContainerOptions): Promise<Conta
conflictResolutionService,
eventBus,
workspaceRoot,
repos.agentRepository,
);
executionOrchestrator.start();
log.info('execution orchestrator started');
// Reconcile agent state from any previous server session.
// Must run AFTER orchestrator.start() so event listeners are registered
// and agent:stopped / agent:crashed events are not lost.
await agentManager.reconcileAfterRestart();
log.info('agent reconciliation complete');
// Preview manager
const previewManager = new PreviewManager(
repos.projectRepository,

View File

@@ -157,6 +157,7 @@ export const tasks = sqliteTable('tasks', {
.default('pending'),
order: integer('order').notNull().default(0),
summary: text('summary'), // Agent result summary — propagated to dependent tasks as context
retryCount: integer('retry_count').notNull().default(0),
createdAt: integer('created_at', { mode: 'timestamp' }).notNull(),
updatedAt: integer('updated_at', { mode: 'timestamp' }).notNull(),
});

View File

@@ -247,8 +247,8 @@ export class DefaultDispatchManager implements DispatchManager {
// Clear blocked state
this.blockedTasks.delete(taskId);
// Reset DB status to pending
await this.taskRepository.update(taskId, { status: 'pending' });
// Reset DB status to pending and clear retry count (manual retry = fresh start)
await this.taskRepository.update(taskId, { status: 'pending', retryCount: 0 });
log.info({ taskId }, 'retrying blocked task');

View File

@@ -0,0 +1 @@
ALTER TABLE tasks ADD COLUMN retry_count integer NOT NULL DEFAULT 0;

View File

@@ -239,6 +239,13 @@
"when": 1772409600000,
"tag": "0033_drop_approval_columns",
"breakpoints": true
},
{
"idx": 34,
"version": "6",
"when": 1772496000000,
"tag": "0034_add_task_retry_count",
"breakpoints": true
}
]
}

View File

@@ -11,12 +11,13 @@
* - Review per-phase: pause after each phase for diff review
*/
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseChangesRequestedEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent, InitiativePendingReviewEvent, InitiativeReviewApprovedEvent, InitiativeChangesRequestedEvent } from '../events/index.js';
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseChangesRequestedEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent, AgentCrashedEvent, InitiativePendingReviewEvent, InitiativeReviewApprovedEvent, InitiativeChangesRequestedEvent } from '../events/index.js';
import type { BranchManager } from '../git/branch-manager.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js';
import { phaseBranchName, taskBranchName } from '../git/branch-naming.js';
@@ -25,6 +26,9 @@ import { createModuleLogger } from '../logger/index.js';
const log = createModuleLogger('execution-orchestrator');
/** Maximum number of automatic retries for crashed tasks before blocking */
const MAX_TASK_RETRIES = 3;
export class ExecutionOrchestrator {
/** Serialize merges per phase to avoid concurrent merge conflicts */
private phaseMergeLocks: Map<string, Promise<void>> = new Map();
@@ -44,6 +48,7 @@ export class ExecutionOrchestrator {
private conflictResolutionService: ConflictResolutionService,
private eventBus: EventBus,
private workspaceRoot: string,
private agentRepository?: AgentRepository,
) {}
/**
@@ -66,6 +71,13 @@ export class ExecutionOrchestrator {
});
});
// Auto-retry crashed agent tasks (up to MAX_TASK_RETRIES)
this.eventBus.on<AgentCrashedEvent>('agent:crashed', (event) => {
this.handleAgentCrashed(event).catch((err) => {
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling agent:crashed');
});
});
// Recover in-memory dispatch queues from DB state (survives server restarts)
this.recoverDispatchQueues().catch((err) => {
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch queue recovery failed');
@@ -111,6 +123,27 @@ export class ExecutionOrchestrator {
this.scheduleDispatch();
}
private async handleAgentCrashed(event: AgentCrashedEvent): Promise<void> {
const { taskId, agentId, error } = event.payload;
if (!taskId) return;
const task = await this.taskRepository.findById(taskId);
if (!task || task.status !== 'in_progress') return;
const retryCount = (task.retryCount ?? 0) + 1;
if (retryCount > MAX_TASK_RETRIES) {
log.warn({ taskId, agentId, retryCount, error }, 'task exceeded max retries, leaving in_progress');
return;
}
// Reset task for re-dispatch with incremented retry count
await this.taskRepository.update(taskId, { status: 'pending', retryCount });
await this.dispatchManager.queue(taskId);
log.info({ taskId, agentId, retryCount, error }, 'crashed task re-queued for retry');
this.scheduleDispatch();
}
private async runDispatchCycle(): Promise<void> {
this.dispatchRunning = true;
try {
@@ -560,7 +593,7 @@ export class ExecutionOrchestrator {
}
}
// Re-queue pending tasks for in_progress phases into the task dispatch queue
// Re-queue pending tasks and recover stuck in_progress tasks for in_progress phases
if (phase.status === 'in_progress') {
const tasks = await this.taskRepository.findByPhaseId(phase.id);
for (const task of tasks) {
@@ -571,6 +604,17 @@ export class ExecutionOrchestrator {
} catch {
// Already queued or task issue
}
} else if (task.status === 'in_progress' && this.agentRepository) {
// Check if the assigned agent is still alive
const agent = await this.agentRepository.findByTaskId(task.id);
const isAlive = agent && (agent.status === 'running' || agent.status === 'waiting_for_input');
if (!isAlive) {
// Agent is dead — reset task for re-dispatch
await this.taskRepository.update(task.id, { status: 'pending' });
await this.dispatchManager.queue(task.id);
tasksRecovered++;
log.info({ taskId: task.id, agentId: agent?.id }, 'recovered stuck in_progress task (dead agent)');
}
}
}
}

View File

@@ -70,10 +70,21 @@ export class SimpleGitWorktreeManager implements WorktreeManager {
// Create worktree — reuse existing branch or create new one
const branchExists = await this.branchExists(branch);
if (branchExists) {
// Branch exists from a previous run — reset it to baseBranch and check it out.
// Only safe because branch !== baseBranch (checked above), so we're resetting
// an agent-scoped branch, not a shared branch like main or the initiative branch.
await this.git.raw(['branch', '-f', branch, baseBranch]);
// Branch exists from a previous run. Check if it has commits beyond baseBranch
// before resetting — a previous agent may have done real work on this branch.
try {
const aheadCount = await this.git.raw(['rev-list', '--count', `${baseBranch}..${branch}`]);
if (parseInt(aheadCount.trim(), 10) > 0) {
log.warn({ branch, baseBranch, aheadBy: aheadCount.trim() }, 'branch has commits beyond base, preserving');
} else {
await this.git.raw(['branch', '-f', branch, baseBranch]);
}
} catch {
// If rev-list fails (e.g. baseBranch doesn't exist yet), fall back to reset
await this.git.raw(['branch', '-f', branch, baseBranch]);
}
// Prune stale worktree references before adding new one
await this.git.raw(['worktree', 'prune']);
await this.git.raw(['worktree', 'add', worktreePath, branch]);
} else {
// git worktree add -b <branch> <path> <base-branch>

View File

@@ -51,6 +51,7 @@ All adapters use nanoid() for IDs, auto-manage timestamps, and use Drizzle's `.r
| status | text enum | 'pending' \| 'in_progress' \| 'completed' \| 'blocked' |
| order | integer | default 0 |
| summary | text nullable | Agent result summary — propagated to dependent tasks as context |
| retryCount | integer NOT NULL | default 0, incremented on agent crash auto-retry, reset on manual retry |
| createdAt, updatedAt | integer/timestamp | |
### task_dependencies

View File

@@ -112,9 +112,22 @@ InitiativeChangesRequestedEvent { initiativeId, phaseId, taskId }
| Event | Action |
|-------|--------|
| `phase:queued` | Dispatch ready phases → dispatch their tasks to idle agents |
| `agent:stopped` | Re-dispatch queued tasks (freed agent slot) |
| `agent:stopped` | Auto-complete task (unless user_requested), re-dispatch queued tasks (freed agent slot) |
| `agent:crashed` | Auto-retry crashed task up to `MAX_TASK_RETRIES` (3). Increments `retryCount`, resets status to `pending`, re-queues. Exceeding retries leaves task `in_progress` for manual intervention. |
| `task:completed` | Merge task branch (if branch exists), check phase completion, dispatch next queued task |
### Crash Recovery
When an agent crashes (`agent:crashed` event), the orchestrator automatically retries the task:
1. Finds the task associated with the crashed agent
2. Checks `task.retryCount` against `MAX_TASK_RETRIES` (3)
3. If under limit: increments `retryCount`, resets task to `pending`, re-queues for dispatch
4. If over limit: logs warning, task stays `in_progress` for manual intervention
On server restart, `recoverDispatchQueues()` also recovers stuck `in_progress` tasks whose agents are dead (status is not `running` or `waiting_for_input`). These are reset to `pending` and re-queued.
Manual retry via `retryBlockedTask()` resets `retryCount` to 0, giving the task a fresh set of automatic retries.
### Coalesced Scheduling
Multiple rapid events (e.g. several `phase:queued` from `queueAllPhases`) are coalesced into a single async dispatch cycle via `scheduleDispatch()`. The cycle loops `dispatchNextPhase()` + `dispatchNext()` until both queues are drained, then re-runs if new events arrived during execution.