fix: conflict resolution tasks now get dispatched instead of permanently blocking initiative
- Remove original task blocking in handleConflict (task is already completed by handleAgentStopped) - Return created conflict task from handleConflict so orchestrator can queue it for dispatch - Add dedup check to prevent duplicate resolution tasks on crash retries - Queue conflict resolution task via dispatchManager in mergeTaskIntoPhase - Add recovery for erroneously blocked tasks in recoverDispatchQueues - Update tests and docs
This commit is contained in:
@@ -113,7 +113,11 @@ describe('DefaultConflictResolutionService', () => {
|
|||||||
|
|
||||||
const conflicts = ['src/file1.ts', 'src/file2.ts'];
|
const conflicts = ['src/file1.ts', 'src/file2.ts'];
|
||||||
|
|
||||||
await service.handleConflict(originalTask.id, conflicts);
|
const result = await service.handleConflict(originalTask.id, conflicts);
|
||||||
|
|
||||||
|
// Should return the created task
|
||||||
|
expect(result).toBeDefined();
|
||||||
|
expect(result!.name).toBe('Resolve conflicts: Original Task');
|
||||||
|
|
||||||
// Check resolution task was created
|
// Check resolution task was created
|
||||||
const tasks = await taskRepository.findByPhaseId(testPhaseId);
|
const tasks = await taskRepository.findByPhaseId(testPhaseId);
|
||||||
@@ -135,12 +139,12 @@ describe('DefaultConflictResolutionService', () => {
|
|||||||
expect(resolutionTask!.description).toContain('Original Task');
|
expect(resolutionTask!.description).toContain('Original Task');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should update original task status to blocked', async () => {
|
it('should NOT block original task (it stays at its current status)', async () => {
|
||||||
const originalTask = await taskRepository.create({
|
const originalTask = await taskRepository.create({
|
||||||
phaseId: testPhaseId,
|
phaseId: testPhaseId,
|
||||||
initiativeId: testInitiativeId,
|
initiativeId: testInitiativeId,
|
||||||
name: 'Task To Block',
|
name: 'Task To Not Block',
|
||||||
status: 'in_progress',
|
status: 'completed',
|
||||||
order: 1,
|
order: 1,
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -152,9 +156,37 @@ describe('DefaultConflictResolutionService', () => {
|
|||||||
|
|
||||||
await service.handleConflict(originalTask.id, ['conflict.ts']);
|
await service.handleConflict(originalTask.id, ['conflict.ts']);
|
||||||
|
|
||||||
// Check original task is blocked
|
// Original task should remain completed (not blocked)
|
||||||
const updatedTask = await taskRepository.findById(originalTask.id);
|
const updatedTask = await taskRepository.findById(originalTask.id);
|
||||||
expect(updatedTask!.status).toBe('blocked');
|
expect(updatedTask!.status).toBe('completed');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return null and skip creation if duplicate resolution task exists', async () => {
|
||||||
|
const originalTask = await taskRepository.create({
|
||||||
|
phaseId: testPhaseId,
|
||||||
|
initiativeId: testInitiativeId,
|
||||||
|
name: 'Dedup Task',
|
||||||
|
order: 1,
|
||||||
|
});
|
||||||
|
|
||||||
|
await agentRepository.create({
|
||||||
|
name: 'agent-dedup',
|
||||||
|
taskId: originalTask.id,
|
||||||
|
worktreeId: 'wt-dedup',
|
||||||
|
});
|
||||||
|
|
||||||
|
// First call creates the resolution task
|
||||||
|
const first = await service.handleConflict(originalTask.id, ['conflict.ts']);
|
||||||
|
expect(first).toBeDefined();
|
||||||
|
|
||||||
|
// Second call should return null (dedup)
|
||||||
|
const second = await service.handleConflict(originalTask.id, ['conflict.ts']);
|
||||||
|
expect(second).toBeNull();
|
||||||
|
|
||||||
|
// Only one resolution task should exist
|
||||||
|
const tasks = await taskRepository.findByPhaseId(testPhaseId);
|
||||||
|
const resolutionTasks = tasks.filter(t => t.name.startsWith('Resolve conflicts:'));
|
||||||
|
expect(resolutionTasks.length).toBe(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should create message to agent about conflict', async () => {
|
it('should create message to agent about conflict', async () => {
|
||||||
@@ -243,9 +275,9 @@ describe('DefaultConflictResolutionService', () => {
|
|||||||
worktreeId: 'wt-no-msg',
|
worktreeId: 'wt-no-msg',
|
||||||
});
|
});
|
||||||
|
|
||||||
// Should not throw and should still create task
|
// Should not throw and should return the created task
|
||||||
await expect(serviceNoMsg.handleConflict(originalTask.id, ['test.ts']))
|
const result = await serviceNoMsg.handleConflict(originalTask.id, ['test.ts']);
|
||||||
.resolves.not.toThrow();
|
expect(result).toBeDefined();
|
||||||
|
|
||||||
// Check resolution task was still created
|
// Check resolution task was still created
|
||||||
const tasks = await taskRepository.findByPhaseId(testPhaseId);
|
const tasks = await taskRepository.findByPhaseId(testPhaseId);
|
||||||
@@ -275,9 +307,9 @@ describe('DefaultConflictResolutionService', () => {
|
|||||||
worktreeId: 'wt-no-events',
|
worktreeId: 'wt-no-events',
|
||||||
});
|
});
|
||||||
|
|
||||||
// Should not throw and should still create task
|
// Should not throw and should return the created task
|
||||||
await expect(serviceNoEvents.handleConflict(originalTask.id, ['test.ts']))
|
const result = await serviceNoEvents.handleConflict(originalTask.id, ['test.ts']);
|
||||||
.resolves.not.toThrow();
|
expect(result).toBeDefined();
|
||||||
|
|
||||||
// Check resolution task was still created
|
// Check resolution task was still created
|
||||||
const tasks = await taskRepository.findByPhaseId(testPhaseId);
|
const tasks = await taskRepository.findByPhaseId(testPhaseId);
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ import type { EventBus, TaskQueuedEvent } from '../events/index.js';
|
|||||||
import type { TaskRepository } from '../db/repositories/task-repository.js';
|
import type { TaskRepository } from '../db/repositories/task-repository.js';
|
||||||
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||||
import type { MessageRepository } from '../db/repositories/message-repository.js';
|
import type { MessageRepository } from '../db/repositories/message-repository.js';
|
||||||
|
import type { Task } from '../db/schema.js';
|
||||||
|
|
||||||
// =============================================================================
|
// =============================================================================
|
||||||
// ConflictResolutionService Interface (Port)
|
// ConflictResolutionService Interface (Port)
|
||||||
@@ -38,8 +39,9 @@ export interface ConflictResolutionService {
|
|||||||
* @param taskId - ID of the task that conflicted
|
* @param taskId - ID of the task that conflicted
|
||||||
* @param conflicts - List of conflicting file paths
|
* @param conflicts - List of conflicting file paths
|
||||||
* @param mergeContext - Optional branch context for branch hierarchy merges
|
* @param mergeContext - Optional branch context for branch hierarchy merges
|
||||||
|
* @returns The created conflict-resolution task, or null if a duplicate already exists
|
||||||
*/
|
*/
|
||||||
handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise<void>;
|
handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise<Task | null>;
|
||||||
}
|
}
|
||||||
|
|
||||||
// =============================================================================
|
// =============================================================================
|
||||||
@@ -63,8 +65,13 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi
|
|||||||
/**
|
/**
|
||||||
* Handle a merge conflict.
|
* Handle a merge conflict.
|
||||||
* Creates a conflict-resolution task and notifies the agent via message.
|
* Creates a conflict-resolution task and notifies the agent via message.
|
||||||
|
* Returns the created task, or null if a duplicate already exists.
|
||||||
|
*
|
||||||
|
* NOTE: The original task is NOT blocked. It was already completed by
|
||||||
|
* handleAgentStopped before this method is called. The pending resolution
|
||||||
|
* task prevents premature phase completion on its own.
|
||||||
*/
|
*/
|
||||||
async handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise<void> {
|
async handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise<Task | null> {
|
||||||
// Get original task for context
|
// Get original task for context
|
||||||
const originalTask = await this.taskRepository.findById(taskId);
|
const originalTask = await this.taskRepository.findById(taskId);
|
||||||
if (!originalTask) {
|
if (!originalTask) {
|
||||||
@@ -77,6 +84,19 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi
|
|||||||
throw new Error(`No agent found for task: ${taskId}`);
|
throw new Error(`No agent found for task: ${taskId}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Dedup: skip if a pending/in_progress resolution task already exists for this original task
|
||||||
|
if (originalTask.phaseId) {
|
||||||
|
const phaseTasks = await this.taskRepository.findByPhaseId(originalTask.phaseId);
|
||||||
|
const existingResolution = phaseTasks.find(
|
||||||
|
(t) =>
|
||||||
|
t.name === `Resolve conflicts: ${originalTask.name}` &&
|
||||||
|
(t.status === 'pending' || t.status === 'in_progress'),
|
||||||
|
);
|
||||||
|
if (existingResolution) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Build conflict description
|
// Build conflict description
|
||||||
const descriptionLines = [
|
const descriptionLines = [
|
||||||
'Merge conflicts detected. Resolve conflicts in the following files:',
|
'Merge conflicts detected. Resolve conflicts in the following files:',
|
||||||
@@ -115,9 +135,6 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi
|
|||||||
order: originalTask.order + 1,
|
order: originalTask.order + 1,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Update original task status to blocked
|
|
||||||
await this.taskRepository.update(taskId, { status: 'blocked' });
|
|
||||||
|
|
||||||
// Create message to agent if messageRepository is configured
|
// Create message to agent if messageRepository is configured
|
||||||
if (this.messageRepository) {
|
if (this.messageRepository) {
|
||||||
const messageContent = [
|
const messageContent = [
|
||||||
@@ -155,5 +172,7 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi
|
|||||||
};
|
};
|
||||||
this.eventBus.emit(event);
|
this.eventBus.emit(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return conflictTask;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -477,9 +477,9 @@ describe('DefaultCoordinationManager', () => {
|
|||||||
expect(conflictTask!.priority).toBe('high');
|
expect(conflictTask!.priority).toBe('high');
|
||||||
expect(conflictTask!.description).toContain('src/index.ts');
|
expect(conflictTask!.description).toContain('src/index.ts');
|
||||||
|
|
||||||
// Check original task blocked
|
// Original task should NOT be blocked (stays at its current status)
|
||||||
const updatedOriginal = await taskRepository.findById(task.id);
|
const updatedOriginal = await taskRepository.findById(task.id);
|
||||||
expect(updatedOriginal!.status).toBe('blocked');
|
expect(updatedOriginal!.status).toBe('pending');
|
||||||
|
|
||||||
// Check TaskQueuedEvent emitted for conflict task
|
// Check TaskQueuedEvent emitted for conflict task
|
||||||
const queuedEvent = eventBus.emittedEvents.find(
|
const queuedEvent = eventBus.emittedEvents.find(
|
||||||
|
|||||||
@@ -239,10 +239,14 @@ export class ExecutionOrchestrator {
|
|||||||
|
|
||||||
if (!result.success && result.conflicts) {
|
if (!result.success && result.conflicts) {
|
||||||
log.warn({ taskId, taskBranch, phaseBranch, conflicts: result.conflicts }, 'task merge conflict');
|
log.warn({ taskId, taskBranch, phaseBranch, conflicts: result.conflicts }, 'task merge conflict');
|
||||||
await this.conflictResolutionService.handleConflict(taskId, result.conflicts, {
|
const conflictTask = await this.conflictResolutionService.handleConflict(taskId, result.conflicts, {
|
||||||
sourceBranch: taskBranch,
|
sourceBranch: taskBranch,
|
||||||
targetBranch: phaseBranch,
|
targetBranch: phaseBranch,
|
||||||
});
|
});
|
||||||
|
if (conflictTask) {
|
||||||
|
await this.dispatchManager.queue(conflictTask.id);
|
||||||
|
log.info({ taskId: conflictTask.id, originalTaskId: taskId }, 'conflict resolution task queued for dispatch');
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -615,6 +619,16 @@ export class ExecutionOrchestrator {
|
|||||||
tasksRecovered++;
|
tasksRecovered++;
|
||||||
log.info({ taskId: task.id, agentId: agent?.id }, 'recovered stuck in_progress task (dead agent)');
|
log.info({ taskId: task.id, agentId: agent?.id }, 'recovered stuck in_progress task (dead agent)');
|
||||||
}
|
}
|
||||||
|
} else if (task.status === 'blocked' && this.agentRepository) {
|
||||||
|
// Task was blocked by merge conflict after agent had already completed.
|
||||||
|
// If the agent finished successfully, mark the task completed so the
|
||||||
|
// phase can progress.
|
||||||
|
const agent = await this.agentRepository.findByTaskId(task.id);
|
||||||
|
if (agent && (agent.status === 'idle' || agent.status === 'stopped')) {
|
||||||
|
await this.taskRepository.update(task.id, { status: 'completed' });
|
||||||
|
tasksRecovered++;
|
||||||
|
log.info({ taskId: task.id, agentId: agent.id }, 'recovered blocked task (agent completed, merge conflict)');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -357,9 +357,10 @@ describe('E2E Edge Cases', () => {
|
|||||||
await harness.coordinationManager.queueMerge(taskAId);
|
await harness.coordinationManager.queueMerge(taskAId);
|
||||||
await harness.coordinationManager.processMerges('main');
|
await harness.coordinationManager.processMerges('main');
|
||||||
|
|
||||||
// Verify: original task is now blocked
|
// Verify: original task is NOT blocked (stays completed — the pending
|
||||||
|
// resolution task prevents premature phase completion)
|
||||||
const originalTask = await harness.taskRepository.findById(taskAId);
|
const originalTask = await harness.taskRepository.findById(taskAId);
|
||||||
expect(originalTask?.status).toBe('blocked');
|
expect(originalTask?.status).toBe('completed');
|
||||||
|
|
||||||
// Verify: task:queued event emitted for conflict resolution task
|
// Verify: task:queued event emitted for conflict resolution task
|
||||||
const queuedEvents = harness.getEventsByType('task:queued');
|
const queuedEvents = harness.getEventsByType('task:queued');
|
||||||
|
|||||||
@@ -85,12 +85,10 @@ describe('E2E Extended Scenarios', () => {
|
|||||||
expect(conflictPayload.taskId).toBe(taskAId);
|
expect(conflictPayload.taskId).toBe(taskAId);
|
||||||
expect(conflictPayload.conflictingFiles).toEqual(['src/shared.ts', 'src/types.ts']);
|
expect(conflictPayload.conflictingFiles).toEqual(['src/shared.ts', 'src/types.ts']);
|
||||||
|
|
||||||
// Verify: original task marked blocked
|
// Verify: original task is NOT blocked (stays completed — the pending
|
||||||
|
// resolution task prevents premature phase completion)
|
||||||
const originalTask = await harness.taskRepository.findById(taskAId);
|
const originalTask = await harness.taskRepository.findById(taskAId);
|
||||||
expect(originalTask?.status).toBe('blocked');
|
expect(originalTask?.status).toBe('completed');
|
||||||
|
|
||||||
// Note: CoordinationManager.handleConflict updates task status to blocked
|
|
||||||
// but does not emit task:blocked event (that's emitted by DispatchManager.blockTask)
|
|
||||||
|
|
||||||
// Verify: task:queued event emitted for resolution task
|
// Verify: task:queued event emitted for resolution task
|
||||||
const queuedEvents = harness.getEventsByType('task:queued');
|
const queuedEvents = harness.getEventsByType('task:queued');
|
||||||
|
|||||||
@@ -117,6 +117,15 @@ InitiativeChangesRequestedEvent { initiativeId, phaseId, taskId }
|
|||||||
| `agent:crashed` | Auto-retry crashed task up to `MAX_TASK_RETRIES` (3). Increments `retryCount`, resets status to `pending`, re-queues. Exceeding retries leaves task `in_progress` for manual intervention. |
|
| `agent:crashed` | Auto-retry crashed task up to `MAX_TASK_RETRIES` (3). Increments `retryCount`, resets status to `pending`, re-queues. Exceeding retries leaves task `in_progress` for manual intervention. |
|
||||||
| `task:completed` | Merge task branch (if branch exists), check phase completion, dispatch next queued task |
|
| `task:completed` | Merge task branch (if branch exists), check phase completion, dispatch next queued task |
|
||||||
|
|
||||||
|
### Conflict Resolution → Dispatch Flow
|
||||||
|
|
||||||
|
When a task branch merge produces conflicts:
|
||||||
|
1. `mergeTaskIntoPhase()` detects conflicts from `branchManager.mergeBranch()`
|
||||||
|
2. Calls `conflictResolutionService.handleConflict()` which creates a "Resolve conflicts" task (with dedup — skips if an identical pending/in_progress resolution task already exists)
|
||||||
|
3. The original task is **not blocked** — it was already completed by `handleAgentStopped` before the merge attempt. The pending resolution task prevents premature phase completion.
|
||||||
|
4. Orchestrator queues the new conflict task via `dispatchManager.queue()`
|
||||||
|
5. `scheduleDispatch()` picks it up and assigns it to an idle agent
|
||||||
|
|
||||||
### Crash Recovery
|
### Crash Recovery
|
||||||
|
|
||||||
When an agent crashes (`agent:crashed` event), the orchestrator automatically retries the task:
|
When an agent crashes (`agent:crashed` event), the orchestrator automatically retries the task:
|
||||||
@@ -125,7 +134,9 @@ When an agent crashes (`agent:crashed` event), the orchestrator automatically re
|
|||||||
3. If under limit: increments `retryCount`, resets task to `pending`, re-queues for dispatch
|
3. If under limit: increments `retryCount`, resets task to `pending`, re-queues for dispatch
|
||||||
4. If over limit: logs warning, task stays `in_progress` for manual intervention
|
4. If over limit: logs warning, task stays `in_progress` for manual intervention
|
||||||
|
|
||||||
On server restart, `recoverDispatchQueues()` also recovers stuck `in_progress` tasks whose agents are dead (status is not `running` or `waiting_for_input`). These are reset to `pending` and re-queued.
|
On server restart, `recoverDispatchQueues()` also recovers:
|
||||||
|
- Stuck `in_progress` tasks whose agents are dead (status is not `running` or `waiting_for_input`) — reset to `pending` and re-queued
|
||||||
|
- Erroneously `blocked` tasks whose agents completed successfully (status is `idle` or `completed`) — marked `completed` so the phase can progress. This handles the legacy case where conflict resolution incorrectly blocked already-completed tasks.
|
||||||
|
|
||||||
Manual retry via `retryBlockedTask()` resets `retryCount` to 0, giving the task a fresh set of automatic retries.
|
Manual retry via `retryBlockedTask()` resets `retryCount` to 0, giving the task a fresh set of automatic retries.
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user