fix: conflict resolution tasks now get dispatched instead of permanently blocking initiative

- Remove original task blocking in handleConflict (task is already completed by handleAgentStopped)
- Return created conflict task from handleConflict so orchestrator can queue it for dispatch
- Add dedup check to prevent duplicate resolution tasks on crash retries
- Queue conflict resolution task via dispatchManager in mergeTaskIntoPhase
- Add recovery for erroneously blocked tasks in recoverDispatchQueues
- Update tests and docs
This commit is contained in:
Lukas May
2026-03-06 20:37:29 +01:00
parent a41caa633b
commit d4a28713f6
7 changed files with 103 additions and 28 deletions

View File

@@ -113,7 +113,11 @@ describe('DefaultConflictResolutionService', () => {
const conflicts = ['src/file1.ts', 'src/file2.ts'];
await service.handleConflict(originalTask.id, conflicts);
const result = await service.handleConflict(originalTask.id, conflicts);
// Should return the created task
expect(result).toBeDefined();
expect(result!.name).toBe('Resolve conflicts: Original Task');
// Check resolution task was created
const tasks = await taskRepository.findByPhaseId(testPhaseId);
@@ -135,12 +139,12 @@ describe('DefaultConflictResolutionService', () => {
expect(resolutionTask!.description).toContain('Original Task');
});
it('should update original task status to blocked', async () => {
it('should NOT block original task (it stays at its current status)', async () => {
const originalTask = await taskRepository.create({
phaseId: testPhaseId,
initiativeId: testInitiativeId,
name: 'Task To Block',
status: 'in_progress',
name: 'Task To Not Block',
status: 'completed',
order: 1,
});
@@ -152,9 +156,37 @@ describe('DefaultConflictResolutionService', () => {
await service.handleConflict(originalTask.id, ['conflict.ts']);
// Check original task is blocked
// Original task should remain completed (not blocked)
const updatedTask = await taskRepository.findById(originalTask.id);
expect(updatedTask!.status).toBe('blocked');
expect(updatedTask!.status).toBe('completed');
});
it('should return null and skip creation if duplicate resolution task exists', async () => {
const originalTask = await taskRepository.create({
phaseId: testPhaseId,
initiativeId: testInitiativeId,
name: 'Dedup Task',
order: 1,
});
await agentRepository.create({
name: 'agent-dedup',
taskId: originalTask.id,
worktreeId: 'wt-dedup',
});
// First call creates the resolution task
const first = await service.handleConflict(originalTask.id, ['conflict.ts']);
expect(first).toBeDefined();
// Second call should return null (dedup)
const second = await service.handleConflict(originalTask.id, ['conflict.ts']);
expect(second).toBeNull();
// Only one resolution task should exist
const tasks = await taskRepository.findByPhaseId(testPhaseId);
const resolutionTasks = tasks.filter(t => t.name.startsWith('Resolve conflicts:'));
expect(resolutionTasks.length).toBe(1);
});
it('should create message to agent about conflict', async () => {
@@ -243,9 +275,9 @@ describe('DefaultConflictResolutionService', () => {
worktreeId: 'wt-no-msg',
});
// Should not throw and should still create task
await expect(serviceNoMsg.handleConflict(originalTask.id, ['test.ts']))
.resolves.not.toThrow();
// Should not throw and should return the created task
const result = await serviceNoMsg.handleConflict(originalTask.id, ['test.ts']);
expect(result).toBeDefined();
// Check resolution task was still created
const tasks = await taskRepository.findByPhaseId(testPhaseId);
@@ -275,9 +307,9 @@ describe('DefaultConflictResolutionService', () => {
worktreeId: 'wt-no-events',
});
// Should not throw and should still create task
await expect(serviceNoEvents.handleConflict(originalTask.id, ['test.ts']))
.resolves.not.toThrow();
// Should not throw and should return the created task
const result = await serviceNoEvents.handleConflict(originalTask.id, ['test.ts']);
expect(result).toBeDefined();
// Check resolution task was still created
const tasks = await taskRepository.findByPhaseId(testPhaseId);

View File

@@ -14,6 +14,7 @@ import type { EventBus, TaskQueuedEvent } from '../events/index.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { MessageRepository } from '../db/repositories/message-repository.js';
import type { Task } from '../db/schema.js';
// =============================================================================
// ConflictResolutionService Interface (Port)
@@ -38,8 +39,9 @@ export interface ConflictResolutionService {
* @param taskId - ID of the task that conflicted
* @param conflicts - List of conflicting file paths
* @param mergeContext - Optional branch context for branch hierarchy merges
* @returns The created conflict-resolution task, or null if a duplicate already exists
*/
handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise<void>;
handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise<Task | null>;
}
// =============================================================================
@@ -63,8 +65,13 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi
/**
* Handle a merge conflict.
* Creates a conflict-resolution task and notifies the agent via message.
* Returns the created task, or null if a duplicate already exists.
*
* NOTE: The original task is NOT blocked. It was already completed by
* handleAgentStopped before this method is called. The pending resolution
* task prevents premature phase completion on its own.
*/
async handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise<void> {
async handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise<Task | null> {
// Get original task for context
const originalTask = await this.taskRepository.findById(taskId);
if (!originalTask) {
@@ -77,6 +84,19 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi
throw new Error(`No agent found for task: ${taskId}`);
}
// Dedup: skip if a pending/in_progress resolution task already exists for this original task
if (originalTask.phaseId) {
const phaseTasks = await this.taskRepository.findByPhaseId(originalTask.phaseId);
const existingResolution = phaseTasks.find(
(t) =>
t.name === `Resolve conflicts: ${originalTask.name}` &&
(t.status === 'pending' || t.status === 'in_progress'),
);
if (existingResolution) {
return null;
}
}
// Build conflict description
const descriptionLines = [
'Merge conflicts detected. Resolve conflicts in the following files:',
@@ -115,9 +135,6 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi
order: originalTask.order + 1,
});
// Update original task status to blocked
await this.taskRepository.update(taskId, { status: 'blocked' });
// Create message to agent if messageRepository is configured
if (this.messageRepository) {
const messageContent = [
@@ -155,5 +172,7 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi
};
this.eventBus.emit(event);
}
return conflictTask;
}
}

View File

@@ -477,9 +477,9 @@ describe('DefaultCoordinationManager', () => {
expect(conflictTask!.priority).toBe('high');
expect(conflictTask!.description).toContain('src/index.ts');
// Check original task blocked
// Original task should NOT be blocked (stays at its current status)
const updatedOriginal = await taskRepository.findById(task.id);
expect(updatedOriginal!.status).toBe('blocked');
expect(updatedOriginal!.status).toBe('pending');
// Check TaskQueuedEvent emitted for conflict task
const queuedEvent = eventBus.emittedEvents.find(

View File

@@ -239,10 +239,14 @@ export class ExecutionOrchestrator {
if (!result.success && result.conflicts) {
log.warn({ taskId, taskBranch, phaseBranch, conflicts: result.conflicts }, 'task merge conflict');
await this.conflictResolutionService.handleConflict(taskId, result.conflicts, {
const conflictTask = await this.conflictResolutionService.handleConflict(taskId, result.conflicts, {
sourceBranch: taskBranch,
targetBranch: phaseBranch,
});
if (conflictTask) {
await this.dispatchManager.queue(conflictTask.id);
log.info({ taskId: conflictTask.id, originalTaskId: taskId }, 'conflict resolution task queued for dispatch');
}
return;
}
@@ -615,6 +619,16 @@ export class ExecutionOrchestrator {
tasksRecovered++;
log.info({ taskId: task.id, agentId: agent?.id }, 'recovered stuck in_progress task (dead agent)');
}
} else if (task.status === 'blocked' && this.agentRepository) {
// Task was blocked by merge conflict after agent had already completed.
// If the agent finished successfully, mark the task completed so the
// phase can progress.
const agent = await this.agentRepository.findByTaskId(task.id);
if (agent && (agent.status === 'idle' || agent.status === 'stopped')) {
await this.taskRepository.update(task.id, { status: 'completed' });
tasksRecovered++;
log.info({ taskId: task.id, agentId: agent.id }, 'recovered blocked task (agent completed, merge conflict)');
}
}
}
}

View File

@@ -357,9 +357,10 @@ describe('E2E Edge Cases', () => {
await harness.coordinationManager.queueMerge(taskAId);
await harness.coordinationManager.processMerges('main');
// Verify: original task is now blocked
// Verify: original task is NOT blocked (stays completed — the pending
// resolution task prevents premature phase completion)
const originalTask = await harness.taskRepository.findById(taskAId);
expect(originalTask?.status).toBe('blocked');
expect(originalTask?.status).toBe('completed');
// Verify: task:queued event emitted for conflict resolution task
const queuedEvents = harness.getEventsByType('task:queued');

View File

@@ -85,12 +85,10 @@ describe('E2E Extended Scenarios', () => {
expect(conflictPayload.taskId).toBe(taskAId);
expect(conflictPayload.conflictingFiles).toEqual(['src/shared.ts', 'src/types.ts']);
// Verify: original task marked blocked
// Verify: original task is NOT blocked (stays completed — the pending
// resolution task prevents premature phase completion)
const originalTask = await harness.taskRepository.findById(taskAId);
expect(originalTask?.status).toBe('blocked');
// Note: CoordinationManager.handleConflict updates task status to blocked
// but does not emit task:blocked event (that's emitted by DispatchManager.blockTask)
expect(originalTask?.status).toBe('completed');
// Verify: task:queued event emitted for resolution task
const queuedEvents = harness.getEventsByType('task:queued');

View File

@@ -117,6 +117,15 @@ InitiativeChangesRequestedEvent { initiativeId, phaseId, taskId }
| `agent:crashed` | Auto-retry crashed task up to `MAX_TASK_RETRIES` (3). Increments `retryCount`, resets status to `pending`, re-queues. Exceeding retries leaves task `in_progress` for manual intervention. |
| `task:completed` | Merge task branch (if branch exists), check phase completion, dispatch next queued task |
### Conflict Resolution → Dispatch Flow
When a task branch merge produces conflicts:
1. `mergeTaskIntoPhase()` detects conflicts from `branchManager.mergeBranch()`
2. Calls `conflictResolutionService.handleConflict()` which creates a "Resolve conflicts" task (with dedup — skips if an identical pending/in_progress resolution task already exists)
3. The original task is **not blocked** — it was already completed by `handleAgentStopped` before the merge attempt. The pending resolution task prevents premature phase completion.
4. Orchestrator queues the new conflict task via `dispatchManager.queue()`
5. `scheduleDispatch()` picks it up and assigns it to an idle agent
### Crash Recovery
When an agent crashes (`agent:crashed` event), the orchestrator automatically retries the task:
@@ -125,7 +134,9 @@ When an agent crashes (`agent:crashed` event), the orchestrator automatically re
3. If under limit: increments `retryCount`, resets task to `pending`, re-queues for dispatch
4. If over limit: logs warning, task stays `in_progress` for manual intervention
On server restart, `recoverDispatchQueues()` also recovers stuck `in_progress` tasks whose agents are dead (status is not `running` or `waiting_for_input`). These are reset to `pending` and re-queued.
On server restart, `recoverDispatchQueues()` also recovers:
- Stuck `in_progress` tasks whose agents are dead (status is not `running` or `waiting_for_input`) — reset to `pending` and re-queued
- Erroneously `blocked` tasks whose agents completed successfully (status is `idle` or `completed`) — marked `completed` so the phase can progress. This handles the legacy case where conflict resolution incorrectly blocked already-completed tasks.
Manual retry via `retryBlockedTask()` resets `retryCount` to 0, giving the task a fresh set of automatic retries.