feat(06-02): implement DefaultCoordinationManager adapter

Implements CoordinationManager interface with:
- In-memory merge queue with dependency tracking
- Dependency-ordered merging (getNextMergeable)
- Merge queue state tracking (queued, in_progress, merged, conflicted)
- Event emission for all merge lifecycle events
- Basic conflict handling (creates resolution task)
This commit is contained in:
Lukas May
2026-01-30 21:06:54 +01:00
parent efcffef3ec
commit 6d7894bde4

343
src/coordination/manager.ts Normal file
View File

@@ -0,0 +1,343 @@
/**
* Default Coordination Manager - Adapter Implementation
*
* Implements CoordinationManager interface with in-memory merge queue
* and dependency-ordered merging.
*
* This is the ADAPTER for the CoordinationManager PORT.
*/
import type {
EventBus,
MergeQueuedEvent,
MergeStartedEvent,
MergeCompletedEvent,
MergeConflictedEvent,
TaskQueuedEvent,
} from '../events/index.js';
import type { WorktreeManager } from '../git/types.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { CoordinationManager, MergeQueueItem, MergeResult } from './types.js';
// =============================================================================
// Internal Types
// =============================================================================
/**
* Internal representation of a merge queue item with status.
*/
interface InternalMergeQueueItem extends MergeQueueItem {
status: 'queued' | 'in_progress';
}
// =============================================================================
// DefaultCoordinationManager Implementation
// =============================================================================
/**
* In-memory implementation of CoordinationManager.
*
* Uses Map for queue management and processes merges in dependency order.
* Handles conflicts by creating resolution tasks.
*/
export class DefaultCoordinationManager implements CoordinationManager {
/** Internal merge queue */
private mergeQueue: Map<string, InternalMergeQueueItem> = new Map();
/** Task IDs that have been successfully merged */
private mergedTasks: Set<string> = new Set();
/** Tasks with conflicts awaiting resolution */
private conflictedTasks: Map<string, string[]> = new Map();
constructor(
private worktreeManager?: WorktreeManager,
private taskRepository?: TaskRepository,
private agentRepository?: AgentRepository,
private eventBus?: EventBus
) {}
/**
* Queue a completed task for merge.
* Extracts agent/worktree information from the task.
*/
async queueMerge(taskId: string): Promise<void> {
// Look up task to get dependencies
if (!this.taskRepository) {
throw new Error('TaskRepository not configured');
}
const task = await this.taskRepository.findById(taskId);
if (!task) {
throw new Error(`Task not found: ${taskId}`);
}
// Look up agent assigned to task to get worktreeId
if (!this.agentRepository) {
throw new Error('AgentRepository not configured');
}
const agent = await this.agentRepository.findByTaskId(taskId);
if (!agent) {
throw new Error(`No agent found for task: ${taskId}`);
}
// For now, dependsOn is empty - would need to query task_dependencies table
// to get actual dependencies
const dependsOn: string[] = [];
const queueItem: InternalMergeQueueItem = {
taskId,
agentId: agent.id,
worktreeId: agent.worktreeId,
priority: task.priority,
queuedAt: new Date(),
dependsOn,
status: 'queued',
};
this.mergeQueue.set(taskId, queueItem);
// Emit MergeQueuedEvent
const event: MergeQueuedEvent = {
type: 'merge:queued',
timestamp: new Date(),
payload: {
taskId,
agentId: agent.id,
worktreeId: agent.worktreeId,
priority: task.priority,
},
};
this.eventBus?.emit(event);
}
/**
* Get next task ready to merge.
* Returns task with all dependency tasks already merged.
*/
async getNextMergeable(): Promise<MergeQueueItem | null> {
const queuedItems = Array.from(this.mergeQueue.values()).filter(
(item) => item.status === 'queued'
);
if (queuedItems.length === 0) {
return null;
}
// Filter to only items where ALL dependsOn tasks are in mergedTasks
const readyItems = queuedItems.filter((item) =>
item.dependsOn.every((depTaskId) => this.mergedTasks.has(depTaskId))
);
if (readyItems.length === 0) {
return null;
}
// Sort by priority (high > medium > low), then by queuedAt (oldest first)
const priorityOrder: Record<string, number> = { high: 0, medium: 1, low: 2 };
readyItems.sort((a, b) => {
const priorityDiff = priorityOrder[a.priority] - priorityOrder[b.priority];
if (priorityDiff !== 0) {
return priorityDiff;
}
return a.queuedAt.getTime() - b.queuedAt.getTime();
});
// Return as MergeQueueItem (without internal status)
const item = readyItems[0];
return {
taskId: item.taskId,
agentId: item.agentId,
worktreeId: item.worktreeId,
priority: item.priority,
queuedAt: item.queuedAt,
dependsOn: item.dependsOn,
};
}
/**
* Process all ready merges in dependency order.
* Merges each ready task into the target branch.
*/
async processMerges(targetBranch: string): Promise<MergeResult[]> {
if (!this.worktreeManager) {
throw new Error('WorktreeManager not configured');
}
const results: MergeResult[] = [];
// Loop while there are mergeable items
let nextItem = await this.getNextMergeable();
while (nextItem) {
const queueItem = this.mergeQueue.get(nextItem.taskId)!;
// Mark as in_progress
queueItem.status = 'in_progress';
// Emit MergeStartedEvent
const startEvent: MergeStartedEvent = {
type: 'merge:started',
timestamp: new Date(),
payload: {
taskId: nextItem.taskId,
agentId: nextItem.agentId,
worktreeId: nextItem.worktreeId,
targetBranch,
},
};
this.eventBus?.emit(startEvent);
// Attempt merge via worktreeManager
const mergeResult = await this.worktreeManager.merge(nextItem.worktreeId, targetBranch);
if (mergeResult.success) {
// Success - add to mergedTasks and remove from queue
this.mergedTasks.add(nextItem.taskId);
this.mergeQueue.delete(nextItem.taskId);
// Emit MergeCompletedEvent
const completedEvent: MergeCompletedEvent = {
type: 'merge:completed',
timestamp: new Date(),
payload: {
taskId: nextItem.taskId,
agentId: nextItem.agentId,
worktreeId: nextItem.worktreeId,
targetBranch,
},
};
this.eventBus?.emit(completedEvent);
results.push({
taskId: nextItem.taskId,
success: true,
message: mergeResult.message,
});
} else {
// Conflict - add to conflictedTasks and remove from queue
const conflicts = mergeResult.conflicts || [];
this.conflictedTasks.set(nextItem.taskId, conflicts);
this.mergeQueue.delete(nextItem.taskId);
// Emit MergeConflictedEvent
const conflictEvent: MergeConflictedEvent = {
type: 'merge:conflicted',
timestamp: new Date(),
payload: {
taskId: nextItem.taskId,
agentId: nextItem.agentId,
worktreeId: nextItem.worktreeId,
targetBranch,
conflictingFiles: conflicts,
},
};
this.eventBus?.emit(conflictEvent);
// Handle conflict - create resolution task
await this.handleConflict(nextItem.taskId, conflicts);
results.push({
taskId: nextItem.taskId,
success: false,
conflicts,
message: mergeResult.message,
});
}
// Get next item
nextItem = await this.getNextMergeable();
}
return results;
}
/**
* Handle a merge conflict.
* Creates a conflict-resolution task and assigns back to the agent.
*/
async handleConflict(taskId: string, conflicts: string[]): Promise<void> {
if (!this.taskRepository) {
throw new Error('TaskRepository not configured');
}
// Get original task for context
const originalTask = await this.taskRepository.findById(taskId);
if (!originalTask) {
throw new Error(`Original task not found: ${taskId}`);
}
// Create new conflict-resolution task
const conflictTask = await this.taskRepository.create({
planId: originalTask.planId,
name: `Resolve conflicts: ${originalTask.name}`,
description: `Merge conflicts detected. Resolve conflicts in the following files:\n\n${conflicts.map((f) => `- ${f}`).join('\n')}\n\nOriginal task: ${originalTask.name}\n\nInstructions: Resolve merge conflicts in the listed files, then mark task complete.`,
type: 'auto',
priority: 'high', // Conflicts should be resolved quickly
status: 'pending',
order: originalTask.order + 1,
});
// Update original task status to blocked
await this.taskRepository.update(taskId, { status: 'blocked' });
// Emit TaskQueuedEvent for the new conflict-resolution task
const event: TaskQueuedEvent = {
type: 'task:queued',
timestamp: new Date(),
payload: {
taskId: conflictTask.id,
priority: 'high',
dependsOn: [],
},
};
this.eventBus?.emit(event);
}
/**
* Get current state of the merge queue.
*/
async getQueueState(): Promise<{
queued: MergeQueueItem[];
inProgress: MergeQueueItem[];
merged: string[];
conflicted: Array<{ taskId: string; conflicts: string[] }>;
}> {
const allItems = Array.from(this.mergeQueue.values());
// Filter by status
const queued = allItems
.filter((item) => item.status === 'queued')
.map((item) => ({
taskId: item.taskId,
agentId: item.agentId,
worktreeId: item.worktreeId,
priority: item.priority,
queuedAt: item.queuedAt,
dependsOn: item.dependsOn,
}));
const inProgress = allItems
.filter((item) => item.status === 'in_progress')
.map((item) => ({
taskId: item.taskId,
agentId: item.agentId,
worktreeId: item.worktreeId,
priority: item.priority,
queuedAt: item.queuedAt,
dependsOn: item.dependsOn,
}));
const merged = Array.from(this.mergedTasks);
const conflicted = Array.from(this.conflictedTasks.entries()).map(([taskId, conflicts]) => ({
taskId,
conflicts,
}));
return { queued, inProgress, merged, conflicted };
}
}