fix: conflict resolution tasks now get dispatched instead of permanently blocking initiative
- Remove original task blocking in handleConflict (task is already completed by handleAgentStopped) - Return created conflict task from handleConflict so orchestrator can queue it for dispatch - Add dedup check to prevent duplicate resolution tasks on crash retries - Queue conflict resolution task via dispatchManager in mergeTaskIntoPhase - Add recovery for erroneously blocked tasks in recoverDispatchQueues - Update tests and docs
This commit is contained in:
@@ -113,7 +113,11 @@ describe('DefaultConflictResolutionService', () => {
|
||||
|
||||
const conflicts = ['src/file1.ts', 'src/file2.ts'];
|
||||
|
||||
await service.handleConflict(originalTask.id, conflicts);
|
||||
const result = await service.handleConflict(originalTask.id, conflicts);
|
||||
|
||||
// Should return the created task
|
||||
expect(result).toBeDefined();
|
||||
expect(result!.name).toBe('Resolve conflicts: Original Task');
|
||||
|
||||
// Check resolution task was created
|
||||
const tasks = await taskRepository.findByPhaseId(testPhaseId);
|
||||
@@ -135,12 +139,12 @@ describe('DefaultConflictResolutionService', () => {
|
||||
expect(resolutionTask!.description).toContain('Original Task');
|
||||
});
|
||||
|
||||
it('should update original task status to blocked', async () => {
|
||||
it('should NOT block original task (it stays at its current status)', async () => {
|
||||
const originalTask = await taskRepository.create({
|
||||
phaseId: testPhaseId,
|
||||
initiativeId: testInitiativeId,
|
||||
name: 'Task To Block',
|
||||
status: 'in_progress',
|
||||
name: 'Task To Not Block',
|
||||
status: 'completed',
|
||||
order: 1,
|
||||
});
|
||||
|
||||
@@ -152,9 +156,37 @@ describe('DefaultConflictResolutionService', () => {
|
||||
|
||||
await service.handleConflict(originalTask.id, ['conflict.ts']);
|
||||
|
||||
// Check original task is blocked
|
||||
// Original task should remain completed (not blocked)
|
||||
const updatedTask = await taskRepository.findById(originalTask.id);
|
||||
expect(updatedTask!.status).toBe('blocked');
|
||||
expect(updatedTask!.status).toBe('completed');
|
||||
});
|
||||
|
||||
it('should return null and skip creation if duplicate resolution task exists', async () => {
|
||||
const originalTask = await taskRepository.create({
|
||||
phaseId: testPhaseId,
|
||||
initiativeId: testInitiativeId,
|
||||
name: 'Dedup Task',
|
||||
order: 1,
|
||||
});
|
||||
|
||||
await agentRepository.create({
|
||||
name: 'agent-dedup',
|
||||
taskId: originalTask.id,
|
||||
worktreeId: 'wt-dedup',
|
||||
});
|
||||
|
||||
// First call creates the resolution task
|
||||
const first = await service.handleConflict(originalTask.id, ['conflict.ts']);
|
||||
expect(first).toBeDefined();
|
||||
|
||||
// Second call should return null (dedup)
|
||||
const second = await service.handleConflict(originalTask.id, ['conflict.ts']);
|
||||
expect(second).toBeNull();
|
||||
|
||||
// Only one resolution task should exist
|
||||
const tasks = await taskRepository.findByPhaseId(testPhaseId);
|
||||
const resolutionTasks = tasks.filter(t => t.name.startsWith('Resolve conflicts:'));
|
||||
expect(resolutionTasks.length).toBe(1);
|
||||
});
|
||||
|
||||
it('should create message to agent about conflict', async () => {
|
||||
@@ -243,9 +275,9 @@ describe('DefaultConflictResolutionService', () => {
|
||||
worktreeId: 'wt-no-msg',
|
||||
});
|
||||
|
||||
// Should not throw and should still create task
|
||||
await expect(serviceNoMsg.handleConflict(originalTask.id, ['test.ts']))
|
||||
.resolves.not.toThrow();
|
||||
// Should not throw and should return the created task
|
||||
const result = await serviceNoMsg.handleConflict(originalTask.id, ['test.ts']);
|
||||
expect(result).toBeDefined();
|
||||
|
||||
// Check resolution task was still created
|
||||
const tasks = await taskRepository.findByPhaseId(testPhaseId);
|
||||
@@ -275,9 +307,9 @@ describe('DefaultConflictResolutionService', () => {
|
||||
worktreeId: 'wt-no-events',
|
||||
});
|
||||
|
||||
// Should not throw and should still create task
|
||||
await expect(serviceNoEvents.handleConflict(originalTask.id, ['test.ts']))
|
||||
.resolves.not.toThrow();
|
||||
// Should not throw and should return the created task
|
||||
const result = await serviceNoEvents.handleConflict(originalTask.id, ['test.ts']);
|
||||
expect(result).toBeDefined();
|
||||
|
||||
// Check resolution task was still created
|
||||
const tasks = await taskRepository.findByPhaseId(testPhaseId);
|
||||
|
||||
@@ -14,6 +14,7 @@ import type { EventBus, TaskQueuedEvent } from '../events/index.js';
|
||||
import type { TaskRepository } from '../db/repositories/task-repository.js';
|
||||
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||
import type { MessageRepository } from '../db/repositories/message-repository.js';
|
||||
import type { Task } from '../db/schema.js';
|
||||
|
||||
// =============================================================================
|
||||
// ConflictResolutionService Interface (Port)
|
||||
@@ -38,8 +39,9 @@ export interface ConflictResolutionService {
|
||||
* @param taskId - ID of the task that conflicted
|
||||
* @param conflicts - List of conflicting file paths
|
||||
* @param mergeContext - Optional branch context for branch hierarchy merges
|
||||
* @returns The created conflict-resolution task, or null if a duplicate already exists
|
||||
*/
|
||||
handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise<void>;
|
||||
handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise<Task | null>;
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
@@ -63,8 +65,13 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi
|
||||
/**
|
||||
* Handle a merge conflict.
|
||||
* Creates a conflict-resolution task and notifies the agent via message.
|
||||
* Returns the created task, or null if a duplicate already exists.
|
||||
*
|
||||
* NOTE: The original task is NOT blocked. It was already completed by
|
||||
* handleAgentStopped before this method is called. The pending resolution
|
||||
* task prevents premature phase completion on its own.
|
||||
*/
|
||||
async handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise<void> {
|
||||
async handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise<Task | null> {
|
||||
// Get original task for context
|
||||
const originalTask = await this.taskRepository.findById(taskId);
|
||||
if (!originalTask) {
|
||||
@@ -77,6 +84,19 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi
|
||||
throw new Error(`No agent found for task: ${taskId}`);
|
||||
}
|
||||
|
||||
// Dedup: skip if a pending/in_progress resolution task already exists for this original task
|
||||
if (originalTask.phaseId) {
|
||||
const phaseTasks = await this.taskRepository.findByPhaseId(originalTask.phaseId);
|
||||
const existingResolution = phaseTasks.find(
|
||||
(t) =>
|
||||
t.name === `Resolve conflicts: ${originalTask.name}` &&
|
||||
(t.status === 'pending' || t.status === 'in_progress'),
|
||||
);
|
||||
if (existingResolution) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// Build conflict description
|
||||
const descriptionLines = [
|
||||
'Merge conflicts detected. Resolve conflicts in the following files:',
|
||||
@@ -115,9 +135,6 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi
|
||||
order: originalTask.order + 1,
|
||||
});
|
||||
|
||||
// Update original task status to blocked
|
||||
await this.taskRepository.update(taskId, { status: 'blocked' });
|
||||
|
||||
// Create message to agent if messageRepository is configured
|
||||
if (this.messageRepository) {
|
||||
const messageContent = [
|
||||
@@ -155,5 +172,7 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
|
||||
return conflictTask;
|
||||
}
|
||||
}
|
||||
@@ -477,9 +477,9 @@ describe('DefaultCoordinationManager', () => {
|
||||
expect(conflictTask!.priority).toBe('high');
|
||||
expect(conflictTask!.description).toContain('src/index.ts');
|
||||
|
||||
// Check original task blocked
|
||||
// Original task should NOT be blocked (stays at its current status)
|
||||
const updatedOriginal = await taskRepository.findById(task.id);
|
||||
expect(updatedOriginal!.status).toBe('blocked');
|
||||
expect(updatedOriginal!.status).toBe('pending');
|
||||
|
||||
// Check TaskQueuedEvent emitted for conflict task
|
||||
const queuedEvent = eventBus.emittedEvents.find(
|
||||
|
||||
@@ -239,10 +239,14 @@ export class ExecutionOrchestrator {
|
||||
|
||||
if (!result.success && result.conflicts) {
|
||||
log.warn({ taskId, taskBranch, phaseBranch, conflicts: result.conflicts }, 'task merge conflict');
|
||||
await this.conflictResolutionService.handleConflict(taskId, result.conflicts, {
|
||||
const conflictTask = await this.conflictResolutionService.handleConflict(taskId, result.conflicts, {
|
||||
sourceBranch: taskBranch,
|
||||
targetBranch: phaseBranch,
|
||||
});
|
||||
if (conflictTask) {
|
||||
await this.dispatchManager.queue(conflictTask.id);
|
||||
log.info({ taskId: conflictTask.id, originalTaskId: taskId }, 'conflict resolution task queued for dispatch');
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -615,6 +619,16 @@ export class ExecutionOrchestrator {
|
||||
tasksRecovered++;
|
||||
log.info({ taskId: task.id, agentId: agent?.id }, 'recovered stuck in_progress task (dead agent)');
|
||||
}
|
||||
} else if (task.status === 'blocked' && this.agentRepository) {
|
||||
// Task was blocked by merge conflict after agent had already completed.
|
||||
// If the agent finished successfully, mark the task completed so the
|
||||
// phase can progress.
|
||||
const agent = await this.agentRepository.findByTaskId(task.id);
|
||||
if (agent && (agent.status === 'idle' || agent.status === 'stopped')) {
|
||||
await this.taskRepository.update(task.id, { status: 'completed' });
|
||||
tasksRecovered++;
|
||||
log.info({ taskId: task.id, agentId: agent.id }, 'recovered blocked task (agent completed, merge conflict)');
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -357,9 +357,10 @@ describe('E2E Edge Cases', () => {
|
||||
await harness.coordinationManager.queueMerge(taskAId);
|
||||
await harness.coordinationManager.processMerges('main');
|
||||
|
||||
// Verify: original task is now blocked
|
||||
// Verify: original task is NOT blocked (stays completed — the pending
|
||||
// resolution task prevents premature phase completion)
|
||||
const originalTask = await harness.taskRepository.findById(taskAId);
|
||||
expect(originalTask?.status).toBe('blocked');
|
||||
expect(originalTask?.status).toBe('completed');
|
||||
|
||||
// Verify: task:queued event emitted for conflict resolution task
|
||||
const queuedEvents = harness.getEventsByType('task:queued');
|
||||
|
||||
@@ -85,12 +85,10 @@ describe('E2E Extended Scenarios', () => {
|
||||
expect(conflictPayload.taskId).toBe(taskAId);
|
||||
expect(conflictPayload.conflictingFiles).toEqual(['src/shared.ts', 'src/types.ts']);
|
||||
|
||||
// Verify: original task marked blocked
|
||||
// Verify: original task is NOT blocked (stays completed — the pending
|
||||
// resolution task prevents premature phase completion)
|
||||
const originalTask = await harness.taskRepository.findById(taskAId);
|
||||
expect(originalTask?.status).toBe('blocked');
|
||||
|
||||
// Note: CoordinationManager.handleConflict updates task status to blocked
|
||||
// but does not emit task:blocked event (that's emitted by DispatchManager.blockTask)
|
||||
expect(originalTask?.status).toBe('completed');
|
||||
|
||||
// Verify: task:queued event emitted for resolution task
|
||||
const queuedEvents = harness.getEventsByType('task:queued');
|
||||
|
||||
@@ -117,6 +117,15 @@ InitiativeChangesRequestedEvent { initiativeId, phaseId, taskId }
|
||||
| `agent:crashed` | Auto-retry crashed task up to `MAX_TASK_RETRIES` (3). Increments `retryCount`, resets status to `pending`, re-queues. Exceeding retries leaves task `in_progress` for manual intervention. |
|
||||
| `task:completed` | Merge task branch (if branch exists), check phase completion, dispatch next queued task |
|
||||
|
||||
### Conflict Resolution → Dispatch Flow
|
||||
|
||||
When a task branch merge produces conflicts:
|
||||
1. `mergeTaskIntoPhase()` detects conflicts from `branchManager.mergeBranch()`
|
||||
2. Calls `conflictResolutionService.handleConflict()` which creates a "Resolve conflicts" task (with dedup — skips if an identical pending/in_progress resolution task already exists)
|
||||
3. The original task is **not blocked** — it was already completed by `handleAgentStopped` before the merge attempt. The pending resolution task prevents premature phase completion.
|
||||
4. Orchestrator queues the new conflict task via `dispatchManager.queue()`
|
||||
5. `scheduleDispatch()` picks it up and assigns it to an idle agent
|
||||
|
||||
### Crash Recovery
|
||||
|
||||
When an agent crashes (`agent:crashed` event), the orchestrator automatically retries the task:
|
||||
@@ -125,7 +134,9 @@ When an agent crashes (`agent:crashed` event), the orchestrator automatically re
|
||||
3. If under limit: increments `retryCount`, resets task to `pending`, re-queues for dispatch
|
||||
4. If over limit: logs warning, task stays `in_progress` for manual intervention
|
||||
|
||||
On server restart, `recoverDispatchQueues()` also recovers stuck `in_progress` tasks whose agents are dead (status is not `running` or `waiting_for_input`). These are reset to `pending` and re-queued.
|
||||
On server restart, `recoverDispatchQueues()` also recovers:
|
||||
- Stuck `in_progress` tasks whose agents are dead (status is not `running` or `waiting_for_input`) — reset to `pending` and re-queued
|
||||
- Erroneously `blocked` tasks whose agents completed successfully (status is `idle` or `completed`) — marked `completed` so the phase can progress. This handles the legacy case where conflict resolution incorrectly blocked already-completed tasks.
|
||||
|
||||
Manual retry via `retryBlockedTask()` resets `retryCount` to 0, giving the task a fresh set of automatic retries.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user