From d4a28713f67bf68409a8e3e56ebbc80bfe90c2c4 Mon Sep 17 00:00:00 2001 From: Lukas May Date: Fri, 6 Mar 2026 20:37:29 +0100 Subject: [PATCH] fix: conflict resolution tasks now get dispatched instead of permanently blocking initiative - Remove original task blocking in handleConflict (task is already completed by handleAgentStopped) - Return created conflict task from handleConflict so orchestrator can queue it for dispatch - Add dedup check to prevent duplicate resolution tasks on crash retries - Queue conflict resolution task via dispatchManager in mergeTaskIntoPhase - Add recovery for erroneously blocked tasks in recoverDispatchQueues - Update tests and docs --- .../conflict-resolution-service.test.ts | 56 +++++++++++++++---- .../conflict-resolution-service.ts | 29 ++++++++-- apps/server/coordination/manager.test.ts | 4 +- apps/server/execution/orchestrator.ts | 16 +++++- apps/server/test/e2e/edge-cases.test.ts | 5 +- .../test/e2e/extended-scenarios.test.ts | 8 +-- docs/dispatch-events.md | 13 ++++- 7 files changed, 103 insertions(+), 28 deletions(-) diff --git a/apps/server/coordination/conflict-resolution-service.test.ts b/apps/server/coordination/conflict-resolution-service.test.ts index dc80947..311f9cb 100644 --- a/apps/server/coordination/conflict-resolution-service.test.ts +++ b/apps/server/coordination/conflict-resolution-service.test.ts @@ -113,7 +113,11 @@ describe('DefaultConflictResolutionService', () => { const conflicts = ['src/file1.ts', 'src/file2.ts']; - await service.handleConflict(originalTask.id, conflicts); + const result = await service.handleConflict(originalTask.id, conflicts); + + // Should return the created task + expect(result).toBeDefined(); + expect(result!.name).toBe('Resolve conflicts: Original Task'); // Check resolution task was created const tasks = await taskRepository.findByPhaseId(testPhaseId); @@ -135,12 +139,12 @@ describe('DefaultConflictResolutionService', () => { expect(resolutionTask!.description).toContain('Original Task'); }); - it('should update original task status to blocked', async () => { + it('should NOT block original task (it stays at its current status)', async () => { const originalTask = await taskRepository.create({ phaseId: testPhaseId, initiativeId: testInitiativeId, - name: 'Task To Block', - status: 'in_progress', + name: 'Task To Not Block', + status: 'completed', order: 1, }); @@ -152,9 +156,37 @@ describe('DefaultConflictResolutionService', () => { await service.handleConflict(originalTask.id, ['conflict.ts']); - // Check original task is blocked + // Original task should remain completed (not blocked) const updatedTask = await taskRepository.findById(originalTask.id); - expect(updatedTask!.status).toBe('blocked'); + expect(updatedTask!.status).toBe('completed'); + }); + + it('should return null and skip creation if duplicate resolution task exists', async () => { + const originalTask = await taskRepository.create({ + phaseId: testPhaseId, + initiativeId: testInitiativeId, + name: 'Dedup Task', + order: 1, + }); + + await agentRepository.create({ + name: 'agent-dedup', + taskId: originalTask.id, + worktreeId: 'wt-dedup', + }); + + // First call creates the resolution task + const first = await service.handleConflict(originalTask.id, ['conflict.ts']); + expect(first).toBeDefined(); + + // Second call should return null (dedup) + const second = await service.handleConflict(originalTask.id, ['conflict.ts']); + expect(second).toBeNull(); + + // Only one resolution task should exist + const tasks = await taskRepository.findByPhaseId(testPhaseId); + const resolutionTasks = tasks.filter(t => t.name.startsWith('Resolve conflicts:')); + expect(resolutionTasks.length).toBe(1); }); it('should create message to agent about conflict', async () => { @@ -243,9 +275,9 @@ describe('DefaultConflictResolutionService', () => { worktreeId: 'wt-no-msg', }); - // Should not throw and should still create task - await expect(serviceNoMsg.handleConflict(originalTask.id, ['test.ts'])) - .resolves.not.toThrow(); + // Should not throw and should return the created task + const result = await serviceNoMsg.handleConflict(originalTask.id, ['test.ts']); + expect(result).toBeDefined(); // Check resolution task was still created const tasks = await taskRepository.findByPhaseId(testPhaseId); @@ -275,9 +307,9 @@ describe('DefaultConflictResolutionService', () => { worktreeId: 'wt-no-events', }); - // Should not throw and should still create task - await expect(serviceNoEvents.handleConflict(originalTask.id, ['test.ts'])) - .resolves.not.toThrow(); + // Should not throw and should return the created task + const result = await serviceNoEvents.handleConflict(originalTask.id, ['test.ts']); + expect(result).toBeDefined(); // Check resolution task was still created const tasks = await taskRepository.findByPhaseId(testPhaseId); diff --git a/apps/server/coordination/conflict-resolution-service.ts b/apps/server/coordination/conflict-resolution-service.ts index 47e3026..68110ad 100644 --- a/apps/server/coordination/conflict-resolution-service.ts +++ b/apps/server/coordination/conflict-resolution-service.ts @@ -14,6 +14,7 @@ import type { EventBus, TaskQueuedEvent } from '../events/index.js'; import type { TaskRepository } from '../db/repositories/task-repository.js'; import type { AgentRepository } from '../db/repositories/agent-repository.js'; import type { MessageRepository } from '../db/repositories/message-repository.js'; +import type { Task } from '../db/schema.js'; // ============================================================================= // ConflictResolutionService Interface (Port) @@ -38,8 +39,9 @@ export interface ConflictResolutionService { * @param taskId - ID of the task that conflicted * @param conflicts - List of conflicting file paths * @param mergeContext - Optional branch context for branch hierarchy merges + * @returns The created conflict-resolution task, or null if a duplicate already exists */ - handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise; + handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise; } // ============================================================================= @@ -63,8 +65,13 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi /** * Handle a merge conflict. * Creates a conflict-resolution task and notifies the agent via message. + * Returns the created task, or null if a duplicate already exists. + * + * NOTE: The original task is NOT blocked. It was already completed by + * handleAgentStopped before this method is called. The pending resolution + * task prevents premature phase completion on its own. */ - async handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise { + async handleConflict(taskId: string, conflicts: string[], mergeContext?: MergeContext): Promise { // Get original task for context const originalTask = await this.taskRepository.findById(taskId); if (!originalTask) { @@ -77,6 +84,19 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi throw new Error(`No agent found for task: ${taskId}`); } + // Dedup: skip if a pending/in_progress resolution task already exists for this original task + if (originalTask.phaseId) { + const phaseTasks = await this.taskRepository.findByPhaseId(originalTask.phaseId); + const existingResolution = phaseTasks.find( + (t) => + t.name === `Resolve conflicts: ${originalTask.name}` && + (t.status === 'pending' || t.status === 'in_progress'), + ); + if (existingResolution) { + return null; + } + } + // Build conflict description const descriptionLines = [ 'Merge conflicts detected. Resolve conflicts in the following files:', @@ -115,9 +135,6 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi order: originalTask.order + 1, }); - // Update original task status to blocked - await this.taskRepository.update(taskId, { status: 'blocked' }); - // Create message to agent if messageRepository is configured if (this.messageRepository) { const messageContent = [ @@ -155,5 +172,7 @@ export class DefaultConflictResolutionService implements ConflictResolutionServi }; this.eventBus.emit(event); } + + return conflictTask; } } \ No newline at end of file diff --git a/apps/server/coordination/manager.test.ts b/apps/server/coordination/manager.test.ts index 0904d36..ec62b7d 100644 --- a/apps/server/coordination/manager.test.ts +++ b/apps/server/coordination/manager.test.ts @@ -477,9 +477,9 @@ describe('DefaultCoordinationManager', () => { expect(conflictTask!.priority).toBe('high'); expect(conflictTask!.description).toContain('src/index.ts'); - // Check original task blocked + // Original task should NOT be blocked (stays at its current status) const updatedOriginal = await taskRepository.findById(task.id); - expect(updatedOriginal!.status).toBe('blocked'); + expect(updatedOriginal!.status).toBe('pending'); // Check TaskQueuedEvent emitted for conflict task const queuedEvent = eventBus.emittedEvents.find( diff --git a/apps/server/execution/orchestrator.ts b/apps/server/execution/orchestrator.ts index 5b8c521..83f9194 100644 --- a/apps/server/execution/orchestrator.ts +++ b/apps/server/execution/orchestrator.ts @@ -239,10 +239,14 @@ export class ExecutionOrchestrator { if (!result.success && result.conflicts) { log.warn({ taskId, taskBranch, phaseBranch, conflicts: result.conflicts }, 'task merge conflict'); - await this.conflictResolutionService.handleConflict(taskId, result.conflicts, { + const conflictTask = await this.conflictResolutionService.handleConflict(taskId, result.conflicts, { sourceBranch: taskBranch, targetBranch: phaseBranch, }); + if (conflictTask) { + await this.dispatchManager.queue(conflictTask.id); + log.info({ taskId: conflictTask.id, originalTaskId: taskId }, 'conflict resolution task queued for dispatch'); + } return; } @@ -615,6 +619,16 @@ export class ExecutionOrchestrator { tasksRecovered++; log.info({ taskId: task.id, agentId: agent?.id }, 'recovered stuck in_progress task (dead agent)'); } + } else if (task.status === 'blocked' && this.agentRepository) { + // Task was blocked by merge conflict after agent had already completed. + // If the agent finished successfully, mark the task completed so the + // phase can progress. + const agent = await this.agentRepository.findByTaskId(task.id); + if (agent && (agent.status === 'idle' || agent.status === 'stopped')) { + await this.taskRepository.update(task.id, { status: 'completed' }); + tasksRecovered++; + log.info({ taskId: task.id, agentId: agent.id }, 'recovered blocked task (agent completed, merge conflict)'); + } } } } diff --git a/apps/server/test/e2e/edge-cases.test.ts b/apps/server/test/e2e/edge-cases.test.ts index 2ddfcee..8439dfd 100644 --- a/apps/server/test/e2e/edge-cases.test.ts +++ b/apps/server/test/e2e/edge-cases.test.ts @@ -357,9 +357,10 @@ describe('E2E Edge Cases', () => { await harness.coordinationManager.queueMerge(taskAId); await harness.coordinationManager.processMerges('main'); - // Verify: original task is now blocked + // Verify: original task is NOT blocked (stays completed — the pending + // resolution task prevents premature phase completion) const originalTask = await harness.taskRepository.findById(taskAId); - expect(originalTask?.status).toBe('blocked'); + expect(originalTask?.status).toBe('completed'); // Verify: task:queued event emitted for conflict resolution task const queuedEvents = harness.getEventsByType('task:queued'); diff --git a/apps/server/test/e2e/extended-scenarios.test.ts b/apps/server/test/e2e/extended-scenarios.test.ts index e2c538c..b95ef0b 100644 --- a/apps/server/test/e2e/extended-scenarios.test.ts +++ b/apps/server/test/e2e/extended-scenarios.test.ts @@ -85,12 +85,10 @@ describe('E2E Extended Scenarios', () => { expect(conflictPayload.taskId).toBe(taskAId); expect(conflictPayload.conflictingFiles).toEqual(['src/shared.ts', 'src/types.ts']); - // Verify: original task marked blocked + // Verify: original task is NOT blocked (stays completed — the pending + // resolution task prevents premature phase completion) const originalTask = await harness.taskRepository.findById(taskAId); - expect(originalTask?.status).toBe('blocked'); - - // Note: CoordinationManager.handleConflict updates task status to blocked - // but does not emit task:blocked event (that's emitted by DispatchManager.blockTask) + expect(originalTask?.status).toBe('completed'); // Verify: task:queued event emitted for resolution task const queuedEvents = harness.getEventsByType('task:queued'); diff --git a/docs/dispatch-events.md b/docs/dispatch-events.md index 7f0ffd6..0b3be50 100644 --- a/docs/dispatch-events.md +++ b/docs/dispatch-events.md @@ -117,6 +117,15 @@ InitiativeChangesRequestedEvent { initiativeId, phaseId, taskId } | `agent:crashed` | Auto-retry crashed task up to `MAX_TASK_RETRIES` (3). Increments `retryCount`, resets status to `pending`, re-queues. Exceeding retries leaves task `in_progress` for manual intervention. | | `task:completed` | Merge task branch (if branch exists), check phase completion, dispatch next queued task | +### Conflict Resolution → Dispatch Flow + +When a task branch merge produces conflicts: +1. `mergeTaskIntoPhase()` detects conflicts from `branchManager.mergeBranch()` +2. Calls `conflictResolutionService.handleConflict()` which creates a "Resolve conflicts" task (with dedup — skips if an identical pending/in_progress resolution task already exists) +3. The original task is **not blocked** — it was already completed by `handleAgentStopped` before the merge attempt. The pending resolution task prevents premature phase completion. +4. Orchestrator queues the new conflict task via `dispatchManager.queue()` +5. `scheduleDispatch()` picks it up and assigns it to an idle agent + ### Crash Recovery When an agent crashes (`agent:crashed` event), the orchestrator automatically retries the task: @@ -125,7 +134,9 @@ When an agent crashes (`agent:crashed` event), the orchestrator automatically re 3. If under limit: increments `retryCount`, resets task to `pending`, re-queues for dispatch 4. If over limit: logs warning, task stays `in_progress` for manual intervention -On server restart, `recoverDispatchQueues()` also recovers stuck `in_progress` tasks whose agents are dead (status is not `running` or `waiting_for_input`). These are reset to `pending` and re-queued. +On server restart, `recoverDispatchQueues()` also recovers: +- Stuck `in_progress` tasks whose agents are dead (status is not `running` or `waiting_for_input`) — reset to `pending` and re-queued +- Erroneously `blocked` tasks whose agents completed successfully (status is `idle` or `completed`) — marked `completed` so the phase can progress. This handles the legacy case where conflict resolution incorrectly blocked already-completed tasks. Manual retry via `retryBlockedTask()` resets `retryCount` to 0, giving the task a fresh set of automatic retries.