fix: Prevent lost task completions after server restart
Three bugs causing empty phase diffs when server restarts during agent execution: 1. Startup ordering race: reconcileAfterRestart() emitted agent:stopped before orchestrator registered listeners — events lost. Moved reconciliation to after orchestrator.start(). 2. Stuck in_progress tasks: recoverDispatchQueues() only re-queued pending tasks. Added recovery for in_progress tasks whose agents are dead (not running/waiting_for_input). 3. Branch force-reset destroys work: git branch -f wiped commits when a second agent was dispatched for the same task. Now checks if the branch has commits beyond baseBranch before resetting. Also adds: - agent:crashed handler with auto-retry (MAX_TASK_RETRIES=3) - retryCount column on tasks table + migration - retryCount reset on manual retryBlockedTask()
This commit is contained in:
@@ -187,10 +187,6 @@ export async function createContainer(options?: ContainerOptions): Promise<Conta
|
||||
);
|
||||
log.info('agent manager created');
|
||||
|
||||
// Reconcile agent state from any previous server session
|
||||
await agentManager.reconcileAfterRestart();
|
||||
log.info('agent reconciliation complete');
|
||||
|
||||
// Branch manager
|
||||
const branchManager = new SimpleGitBranchManager();
|
||||
log.info('branch manager created');
|
||||
@@ -250,10 +246,17 @@ export async function createContainer(options?: ContainerOptions): Promise<Conta
|
||||
conflictResolutionService,
|
||||
eventBus,
|
||||
workspaceRoot,
|
||||
repos.agentRepository,
|
||||
);
|
||||
executionOrchestrator.start();
|
||||
log.info('execution orchestrator started');
|
||||
|
||||
// Reconcile agent state from any previous server session.
|
||||
// Must run AFTER orchestrator.start() so event listeners are registered
|
||||
// and agent:stopped / agent:crashed events are not lost.
|
||||
await agentManager.reconcileAfterRestart();
|
||||
log.info('agent reconciliation complete');
|
||||
|
||||
// Preview manager
|
||||
const previewManager = new PreviewManager(
|
||||
repos.projectRepository,
|
||||
|
||||
@@ -157,6 +157,7 @@ export const tasks = sqliteTable('tasks', {
|
||||
.default('pending'),
|
||||
order: integer('order').notNull().default(0),
|
||||
summary: text('summary'), // Agent result summary — propagated to dependent tasks as context
|
||||
retryCount: integer('retry_count').notNull().default(0),
|
||||
createdAt: integer('created_at', { mode: 'timestamp' }).notNull(),
|
||||
updatedAt: integer('updated_at', { mode: 'timestamp' }).notNull(),
|
||||
});
|
||||
|
||||
@@ -247,8 +247,8 @@ export class DefaultDispatchManager implements DispatchManager {
|
||||
// Clear blocked state
|
||||
this.blockedTasks.delete(taskId);
|
||||
|
||||
// Reset DB status to pending
|
||||
await this.taskRepository.update(taskId, { status: 'pending' });
|
||||
// Reset DB status to pending and clear retry count (manual retry = fresh start)
|
||||
await this.taskRepository.update(taskId, { status: 'pending', retryCount: 0 });
|
||||
|
||||
log.info({ taskId }, 'retrying blocked task');
|
||||
|
||||
|
||||
1
apps/server/drizzle/0034_add_task_retry_count.sql
Normal file
1
apps/server/drizzle/0034_add_task_retry_count.sql
Normal file
@@ -0,0 +1 @@
|
||||
ALTER TABLE tasks ADD COLUMN retry_count integer NOT NULL DEFAULT 0;
|
||||
@@ -239,6 +239,13 @@
|
||||
"when": 1772409600000,
|
||||
"tag": "0033_drop_approval_columns",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 34,
|
||||
"version": "6",
|
||||
"when": 1772496000000,
|
||||
"tag": "0034_add_task_retry_count",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -11,12 +11,13 @@
|
||||
* - Review per-phase: pause after each phase for diff review
|
||||
*/
|
||||
|
||||
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseChangesRequestedEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent, InitiativePendingReviewEvent, InitiativeReviewApprovedEvent, InitiativeChangesRequestedEvent } from '../events/index.js';
|
||||
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseChangesRequestedEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent, AgentCrashedEvent, InitiativePendingReviewEvent, InitiativeReviewApprovedEvent, InitiativeChangesRequestedEvent } from '../events/index.js';
|
||||
import type { BranchManager } from '../git/branch-manager.js';
|
||||
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
|
||||
import type { TaskRepository } from '../db/repositories/task-repository.js';
|
||||
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
|
||||
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
||||
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
|
||||
import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js';
|
||||
import { phaseBranchName, taskBranchName } from '../git/branch-naming.js';
|
||||
@@ -25,6 +26,9 @@ import { createModuleLogger } from '../logger/index.js';
|
||||
|
||||
const log = createModuleLogger('execution-orchestrator');
|
||||
|
||||
/** Maximum number of automatic retries for crashed tasks before blocking */
|
||||
const MAX_TASK_RETRIES = 3;
|
||||
|
||||
export class ExecutionOrchestrator {
|
||||
/** Serialize merges per phase to avoid concurrent merge conflicts */
|
||||
private phaseMergeLocks: Map<string, Promise<void>> = new Map();
|
||||
@@ -44,6 +48,7 @@ export class ExecutionOrchestrator {
|
||||
private conflictResolutionService: ConflictResolutionService,
|
||||
private eventBus: EventBus,
|
||||
private workspaceRoot: string,
|
||||
private agentRepository?: AgentRepository,
|
||||
) {}
|
||||
|
||||
/**
|
||||
@@ -66,6 +71,13 @@ export class ExecutionOrchestrator {
|
||||
});
|
||||
});
|
||||
|
||||
// Auto-retry crashed agent tasks (up to MAX_TASK_RETRIES)
|
||||
this.eventBus.on<AgentCrashedEvent>('agent:crashed', (event) => {
|
||||
this.handleAgentCrashed(event).catch((err) => {
|
||||
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling agent:crashed');
|
||||
});
|
||||
});
|
||||
|
||||
// Recover in-memory dispatch queues from DB state (survives server restarts)
|
||||
this.recoverDispatchQueues().catch((err) => {
|
||||
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch queue recovery failed');
|
||||
@@ -111,6 +123,27 @@ export class ExecutionOrchestrator {
|
||||
this.scheduleDispatch();
|
||||
}
|
||||
|
||||
private async handleAgentCrashed(event: AgentCrashedEvent): Promise<void> {
|
||||
const { taskId, agentId, error } = event.payload;
|
||||
if (!taskId) return;
|
||||
|
||||
const task = await this.taskRepository.findById(taskId);
|
||||
if (!task || task.status !== 'in_progress') return;
|
||||
|
||||
const retryCount = (task.retryCount ?? 0) + 1;
|
||||
if (retryCount > MAX_TASK_RETRIES) {
|
||||
log.warn({ taskId, agentId, retryCount, error }, 'task exceeded max retries, leaving in_progress');
|
||||
return;
|
||||
}
|
||||
|
||||
// Reset task for re-dispatch with incremented retry count
|
||||
await this.taskRepository.update(taskId, { status: 'pending', retryCount });
|
||||
await this.dispatchManager.queue(taskId);
|
||||
log.info({ taskId, agentId, retryCount, error }, 'crashed task re-queued for retry');
|
||||
|
||||
this.scheduleDispatch();
|
||||
}
|
||||
|
||||
private async runDispatchCycle(): Promise<void> {
|
||||
this.dispatchRunning = true;
|
||||
try {
|
||||
@@ -560,7 +593,7 @@ export class ExecutionOrchestrator {
|
||||
}
|
||||
}
|
||||
|
||||
// Re-queue pending tasks for in_progress phases into the task dispatch queue
|
||||
// Re-queue pending tasks and recover stuck in_progress tasks for in_progress phases
|
||||
if (phase.status === 'in_progress') {
|
||||
const tasks = await this.taskRepository.findByPhaseId(phase.id);
|
||||
for (const task of tasks) {
|
||||
@@ -571,6 +604,17 @@ export class ExecutionOrchestrator {
|
||||
} catch {
|
||||
// Already queued or task issue
|
||||
}
|
||||
} else if (task.status === 'in_progress' && this.agentRepository) {
|
||||
// Check if the assigned agent is still alive
|
||||
const agent = await this.agentRepository.findByTaskId(task.id);
|
||||
const isAlive = agent && (agent.status === 'running' || agent.status === 'waiting_for_input');
|
||||
if (!isAlive) {
|
||||
// Agent is dead — reset task for re-dispatch
|
||||
await this.taskRepository.update(task.id, { status: 'pending' });
|
||||
await this.dispatchManager.queue(task.id);
|
||||
tasksRecovered++;
|
||||
log.info({ taskId: task.id, agentId: agent?.id }, 'recovered stuck in_progress task (dead agent)');
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,10 +70,21 @@ export class SimpleGitWorktreeManager implements WorktreeManager {
|
||||
// Create worktree — reuse existing branch or create new one
|
||||
const branchExists = await this.branchExists(branch);
|
||||
if (branchExists) {
|
||||
// Branch exists from a previous run — reset it to baseBranch and check it out.
|
||||
// Only safe because branch !== baseBranch (checked above), so we're resetting
|
||||
// an agent-scoped branch, not a shared branch like main or the initiative branch.
|
||||
await this.git.raw(['branch', '-f', branch, baseBranch]);
|
||||
// Branch exists from a previous run. Check if it has commits beyond baseBranch
|
||||
// before resetting — a previous agent may have done real work on this branch.
|
||||
try {
|
||||
const aheadCount = await this.git.raw(['rev-list', '--count', `${baseBranch}..${branch}`]);
|
||||
if (parseInt(aheadCount.trim(), 10) > 0) {
|
||||
log.warn({ branch, baseBranch, aheadBy: aheadCount.trim() }, 'branch has commits beyond base, preserving');
|
||||
} else {
|
||||
await this.git.raw(['branch', '-f', branch, baseBranch]);
|
||||
}
|
||||
} catch {
|
||||
// If rev-list fails (e.g. baseBranch doesn't exist yet), fall back to reset
|
||||
await this.git.raw(['branch', '-f', branch, baseBranch]);
|
||||
}
|
||||
// Prune stale worktree references before adding new one
|
||||
await this.git.raw(['worktree', 'prune']);
|
||||
await this.git.raw(['worktree', 'add', worktreePath, branch]);
|
||||
} else {
|
||||
// git worktree add -b <branch> <path> <base-branch>
|
||||
|
||||
@@ -51,6 +51,7 @@ All adapters use nanoid() for IDs, auto-manage timestamps, and use Drizzle's `.r
|
||||
| status | text enum | 'pending' \| 'in_progress' \| 'completed' \| 'blocked' |
|
||||
| order | integer | default 0 |
|
||||
| summary | text nullable | Agent result summary — propagated to dependent tasks as context |
|
||||
| retryCount | integer NOT NULL | default 0, incremented on agent crash auto-retry, reset on manual retry |
|
||||
| createdAt, updatedAt | integer/timestamp | |
|
||||
|
||||
### task_dependencies
|
||||
|
||||
@@ -112,9 +112,22 @@ InitiativeChangesRequestedEvent { initiativeId, phaseId, taskId }
|
||||
| Event | Action |
|
||||
|-------|--------|
|
||||
| `phase:queued` | Dispatch ready phases → dispatch their tasks to idle agents |
|
||||
| `agent:stopped` | Re-dispatch queued tasks (freed agent slot) |
|
||||
| `agent:stopped` | Auto-complete task (unless user_requested), re-dispatch queued tasks (freed agent slot) |
|
||||
| `agent:crashed` | Auto-retry crashed task up to `MAX_TASK_RETRIES` (3). Increments `retryCount`, resets status to `pending`, re-queues. Exceeding retries leaves task `in_progress` for manual intervention. |
|
||||
| `task:completed` | Merge task branch (if branch exists), check phase completion, dispatch next queued task |
|
||||
|
||||
### Crash Recovery
|
||||
|
||||
When an agent crashes (`agent:crashed` event), the orchestrator automatically retries the task:
|
||||
1. Finds the task associated with the crashed agent
|
||||
2. Checks `task.retryCount` against `MAX_TASK_RETRIES` (3)
|
||||
3. If under limit: increments `retryCount`, resets task to `pending`, re-queues for dispatch
|
||||
4. If over limit: logs warning, task stays `in_progress` for manual intervention
|
||||
|
||||
On server restart, `recoverDispatchQueues()` also recovers stuck `in_progress` tasks whose agents are dead (status is not `running` or `waiting_for_input`). These are reset to `pending` and re-queued.
|
||||
|
||||
Manual retry via `retryBlockedTask()` resets `retryCount` to 0, giving the task a fresh set of automatic retries.
|
||||
|
||||
### Coalesced Scheduling
|
||||
|
||||
Multiple rapid events (e.g. several `phase:queued` from `queueAllPhases`) are coalesced into a single async dispatch cycle via `scheduleDispatch()`. The cycle loops `dispatchNextPhase()` + `dispatchNext()` until both queues are drained, then re-runs if new events arrived during execution.
|
||||
|
||||
Reference in New Issue
Block a user