/** * Default Coordination Manager - Adapter Implementation * * Implements CoordinationManager interface with in-memory merge queue * and dependency-ordered merging. * * This is the ADAPTER for the CoordinationManager PORT. */ import type { EventBus, MergeQueuedEvent, MergeStartedEvent, MergeCompletedEvent, MergeConflictedEvent, } from '../events/index.js'; import type { WorktreeManager } from '../git/types.js'; import type { TaskRepository } from '../db/repositories/task-repository.js'; import type { AgentRepository } from '../db/repositories/agent-repository.js'; import type { MessageRepository } from '../db/repositories/message-repository.js'; import type { CoordinationManager, MergeQueueItem, MergeResult } from './types.js'; import type { ConflictResolutionService } from './conflict-resolution-service.js'; import { DefaultConflictResolutionService } from './conflict-resolution-service.js'; // ============================================================================= // Internal Types // ============================================================================= /** * Internal representation of a merge queue item with status. */ interface InternalMergeQueueItem extends MergeQueueItem { status: 'queued' | 'in_progress'; } // ============================================================================= // DefaultCoordinationManager Implementation // ============================================================================= /** * In-memory implementation of CoordinationManager. * * Uses Map for queue management and processes merges in dependency order. * Handles conflicts by creating resolution tasks. */ export class DefaultCoordinationManager implements CoordinationManager { /** Internal merge queue */ private mergeQueue: Map = new Map(); /** Task IDs that have been successfully merged */ private mergedTasks: Set = new Set(); /** Tasks with conflicts awaiting resolution */ private conflictedTasks: Map = new Map(); /** Service for handling merge conflicts */ private conflictResolutionService?: ConflictResolutionService; constructor( private worktreeManager?: WorktreeManager, private taskRepository?: TaskRepository, private agentRepository?: AgentRepository, private messageRepository?: MessageRepository, private eventBus?: EventBus, conflictResolutionService?: ConflictResolutionService ) { // Create default conflict resolution service if none provided if (conflictResolutionService) { this.conflictResolutionService = conflictResolutionService; } else if (taskRepository && agentRepository) { this.conflictResolutionService = new DefaultConflictResolutionService( taskRepository, agentRepository, messageRepository, eventBus ); } } /** * Queue a completed task for merge. * Extracts agent/worktree information from the task. */ async queueMerge(taskId: string): Promise { // Look up task to get dependencies if (!this.taskRepository) { throw new Error('TaskRepository not configured'); } const task = await this.taskRepository.findById(taskId); if (!task) { throw new Error(`Task not found: ${taskId}`); } // Look up agent assigned to task to get worktreeId if (!this.agentRepository) { throw new Error('AgentRepository not configured'); } const agent = await this.agentRepository.findByTaskId(taskId); if (!agent) { throw new Error(`No agent found for task: ${taskId}`); } // For now, dependsOn is empty - would need to query task_dependencies table // to get actual dependencies const dependsOn: string[] = []; const queueItem: InternalMergeQueueItem = { taskId, agentId: agent.id, worktreeId: agent.worktreeId, priority: task.priority, queuedAt: new Date(), dependsOn, status: 'queued', }; this.mergeQueue.set(taskId, queueItem); // Emit MergeQueuedEvent const event: MergeQueuedEvent = { type: 'merge:queued', timestamp: new Date(), payload: { taskId, agentId: agent.id, worktreeId: agent.worktreeId, priority: task.priority, }, }; this.eventBus?.emit(event); } /** * Get next task ready to merge. * Returns task with all dependency tasks already merged. */ async getNextMergeable(): Promise { const queuedItems = Array.from(this.mergeQueue.values()).filter( (item) => item.status === 'queued' ); if (queuedItems.length === 0) { return null; } // Filter to only items where ALL dependsOn tasks are in mergedTasks const readyItems = queuedItems.filter((item) => item.dependsOn.every((depTaskId) => this.mergedTasks.has(depTaskId)) ); if (readyItems.length === 0) { return null; } // Sort by priority (high > medium > low), then by queuedAt (oldest first) const priorityOrder: Record = { high: 0, medium: 1, low: 2 }; readyItems.sort((a, b) => { const priorityDiff = priorityOrder[a.priority] - priorityOrder[b.priority]; if (priorityDiff !== 0) { return priorityDiff; } return a.queuedAt.getTime() - b.queuedAt.getTime(); }); // Return as MergeQueueItem (without internal status) const item = readyItems[0]; return { taskId: item.taskId, agentId: item.agentId, worktreeId: item.worktreeId, priority: item.priority, queuedAt: item.queuedAt, dependsOn: item.dependsOn, }; } /** * Process all ready merges in dependency order. * Merges each ready task into the target branch. */ async processMerges(targetBranch: string): Promise { if (!this.worktreeManager) { throw new Error('WorktreeManager not configured'); } const results: MergeResult[] = []; // Loop while there are mergeable items let nextItem = await this.getNextMergeable(); while (nextItem) { const queueItem = this.mergeQueue.get(nextItem.taskId)!; // Mark as in_progress queueItem.status = 'in_progress'; // Emit MergeStartedEvent const startEvent: MergeStartedEvent = { type: 'merge:started', timestamp: new Date(), payload: { taskId: nextItem.taskId, agentId: nextItem.agentId, worktreeId: nextItem.worktreeId, targetBranch, }, }; this.eventBus?.emit(startEvent); // Attempt merge via worktreeManager const mergeResult = await this.worktreeManager.merge(nextItem.worktreeId, targetBranch); if (mergeResult.success) { // Success - add to mergedTasks and remove from queue this.mergedTasks.add(nextItem.taskId); this.mergeQueue.delete(nextItem.taskId); // Emit MergeCompletedEvent const completedEvent: MergeCompletedEvent = { type: 'merge:completed', timestamp: new Date(), payload: { taskId: nextItem.taskId, agentId: nextItem.agentId, worktreeId: nextItem.worktreeId, targetBranch, }, }; this.eventBus?.emit(completedEvent); results.push({ taskId: nextItem.taskId, success: true, message: mergeResult.message, }); } else { // Conflict - add to conflictedTasks and remove from queue const conflicts = mergeResult.conflicts || []; this.conflictedTasks.set(nextItem.taskId, conflicts); this.mergeQueue.delete(nextItem.taskId); // Emit MergeConflictedEvent const conflictEvent: MergeConflictedEvent = { type: 'merge:conflicted', timestamp: new Date(), payload: { taskId: nextItem.taskId, agentId: nextItem.agentId, worktreeId: nextItem.worktreeId, targetBranch, conflictingFiles: conflicts, }, }; this.eventBus?.emit(conflictEvent); // Handle conflict - create resolution task await this.handleConflict(nextItem.taskId, conflicts); results.push({ taskId: nextItem.taskId, success: false, conflicts, message: mergeResult.message, }); } // Get next item nextItem = await this.getNextMergeable(); } return results; } /** * Handle a merge conflict. * Delegates to the ConflictResolutionService. */ async handleConflict(taskId: string, conflicts: string[]): Promise { if (!this.conflictResolutionService) { throw new Error('ConflictResolutionService not configured'); } await this.conflictResolutionService.handleConflict(taskId, conflicts); } /** * Get current state of the merge queue. */ async getQueueState(): Promise<{ queued: MergeQueueItem[]; inProgress: MergeQueueItem[]; merged: string[]; conflicted: Array<{ taskId: string; conflicts: string[] }>; }> { const allItems = Array.from(this.mergeQueue.values()); // Filter by status const queued = allItems .filter((item) => item.status === 'queued') .map((item) => ({ taskId: item.taskId, agentId: item.agentId, worktreeId: item.worktreeId, priority: item.priority, queuedAt: item.queuedAt, dependsOn: item.dependsOn, })); const inProgress = allItems .filter((item) => item.status === 'in_progress') .map((item) => ({ taskId: item.taskId, agentId: item.agentId, worktreeId: item.worktreeId, priority: item.priority, queuedAt: item.queuedAt, dependsOn: item.dependsOn, })); const merged = Array.from(this.mergedTasks); const conflicted = Array.from(this.conflictedTasks.entries()).map(([taskId, conflicts]) => ({ taskId, conflicts, })); return { queued, inProgress, merged, conflicted }; } }