fix: Prevent lost task completions after server restart
Three bugs causing empty phase diffs when server restarts during agent execution: 1. Startup ordering race: reconcileAfterRestart() emitted agent:stopped before orchestrator registered listeners — events lost. Moved reconciliation to after orchestrator.start(). 2. Stuck in_progress tasks: recoverDispatchQueues() only re-queued pending tasks. Added recovery for in_progress tasks whose agents are dead (not running/waiting_for_input). 3. Branch force-reset destroys work: git branch -f wiped commits when a second agent was dispatched for the same task. Now checks if the branch has commits beyond baseBranch before resetting. Also adds: - agent:crashed handler with auto-retry (MAX_TASK_RETRIES=3) - retryCount column on tasks table + migration - retryCount reset on manual retryBlockedTask()
This commit is contained in:
@@ -11,12 +11,13 @@
|
||||
* - Review per-phase: pause after each phase for diff review
|
||||
*/
|
||||
|
||||
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseChangesRequestedEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent, InitiativePendingReviewEvent, InitiativeReviewApprovedEvent, InitiativeChangesRequestedEvent } from '../events/index.js';
|
||||
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseChangesRequestedEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent, AgentCrashedEvent, InitiativePendingReviewEvent, InitiativeReviewApprovedEvent, InitiativeChangesRequestedEvent } from '../events/index.js';
|
||||
import type { BranchManager } from '../git/branch-manager.js';
|
||||
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
|
||||
import type { TaskRepository } from '../db/repositories/task-repository.js';
|
||||
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
|
||||
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
||||
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
|
||||
import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js';
|
||||
import { phaseBranchName, taskBranchName } from '../git/branch-naming.js';
|
||||
@@ -25,6 +26,9 @@ import { createModuleLogger } from '../logger/index.js';
|
||||
|
||||
const log = createModuleLogger('execution-orchestrator');
|
||||
|
||||
/** Maximum number of automatic retries for crashed tasks before blocking */
|
||||
const MAX_TASK_RETRIES = 3;
|
||||
|
||||
export class ExecutionOrchestrator {
|
||||
/** Serialize merges per phase to avoid concurrent merge conflicts */
|
||||
private phaseMergeLocks: Map<string, Promise<void>> = new Map();
|
||||
@@ -44,6 +48,7 @@ export class ExecutionOrchestrator {
|
||||
private conflictResolutionService: ConflictResolutionService,
|
||||
private eventBus: EventBus,
|
||||
private workspaceRoot: string,
|
||||
private agentRepository?: AgentRepository,
|
||||
) {}
|
||||
|
||||
/**
|
||||
@@ -66,6 +71,13 @@ export class ExecutionOrchestrator {
|
||||
});
|
||||
});
|
||||
|
||||
// Auto-retry crashed agent tasks (up to MAX_TASK_RETRIES)
|
||||
this.eventBus.on<AgentCrashedEvent>('agent:crashed', (event) => {
|
||||
this.handleAgentCrashed(event).catch((err) => {
|
||||
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling agent:crashed');
|
||||
});
|
||||
});
|
||||
|
||||
// Recover in-memory dispatch queues from DB state (survives server restarts)
|
||||
this.recoverDispatchQueues().catch((err) => {
|
||||
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch queue recovery failed');
|
||||
@@ -111,6 +123,27 @@ export class ExecutionOrchestrator {
|
||||
this.scheduleDispatch();
|
||||
}
|
||||
|
||||
private async handleAgentCrashed(event: AgentCrashedEvent): Promise<void> {
|
||||
const { taskId, agentId, error } = event.payload;
|
||||
if (!taskId) return;
|
||||
|
||||
const task = await this.taskRepository.findById(taskId);
|
||||
if (!task || task.status !== 'in_progress') return;
|
||||
|
||||
const retryCount = (task.retryCount ?? 0) + 1;
|
||||
if (retryCount > MAX_TASK_RETRIES) {
|
||||
log.warn({ taskId, agentId, retryCount, error }, 'task exceeded max retries, leaving in_progress');
|
||||
return;
|
||||
}
|
||||
|
||||
// Reset task for re-dispatch with incremented retry count
|
||||
await this.taskRepository.update(taskId, { status: 'pending', retryCount });
|
||||
await this.dispatchManager.queue(taskId);
|
||||
log.info({ taskId, agentId, retryCount, error }, 'crashed task re-queued for retry');
|
||||
|
||||
this.scheduleDispatch();
|
||||
}
|
||||
|
||||
private async runDispatchCycle(): Promise<void> {
|
||||
this.dispatchRunning = true;
|
||||
try {
|
||||
@@ -560,7 +593,7 @@ export class ExecutionOrchestrator {
|
||||
}
|
||||
}
|
||||
|
||||
// Re-queue pending tasks for in_progress phases into the task dispatch queue
|
||||
// Re-queue pending tasks and recover stuck in_progress tasks for in_progress phases
|
||||
if (phase.status === 'in_progress') {
|
||||
const tasks = await this.taskRepository.findByPhaseId(phase.id);
|
||||
for (const task of tasks) {
|
||||
@@ -571,6 +604,17 @@ export class ExecutionOrchestrator {
|
||||
} catch {
|
||||
// Already queued or task issue
|
||||
}
|
||||
} else if (task.status === 'in_progress' && this.agentRepository) {
|
||||
// Check if the assigned agent is still alive
|
||||
const agent = await this.agentRepository.findByTaskId(task.id);
|
||||
const isAlive = agent && (agent.status === 'running' || agent.status === 'waiting_for_input');
|
||||
if (!isAlive) {
|
||||
// Agent is dead — reset task for re-dispatch
|
||||
await this.taskRepository.update(task.id, { status: 'pending' });
|
||||
await this.dispatchManager.queue(task.id);
|
||||
tasksRecovered++;
|
||||
log.info({ taskId: task.id, agentId: agent?.id }, 'recovered stuck in_progress task (dead agent)');
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user