diff --git a/src/coordination/manager.ts b/src/coordination/manager.ts new file mode 100644 index 0000000..cb8dc88 --- /dev/null +++ b/src/coordination/manager.ts @@ -0,0 +1,343 @@ +/** + * Default Coordination Manager - Adapter Implementation + * + * Implements CoordinationManager interface with in-memory merge queue + * and dependency-ordered merging. + * + * This is the ADAPTER for the CoordinationManager PORT. + */ + +import type { + EventBus, + MergeQueuedEvent, + MergeStartedEvent, + MergeCompletedEvent, + MergeConflictedEvent, + TaskQueuedEvent, +} from '../events/index.js'; +import type { WorktreeManager } from '../git/types.js'; +import type { TaskRepository } from '../db/repositories/task-repository.js'; +import type { AgentRepository } from '../db/repositories/agent-repository.js'; +import type { CoordinationManager, MergeQueueItem, MergeResult } from './types.js'; + +// ============================================================================= +// Internal Types +// ============================================================================= + +/** + * Internal representation of a merge queue item with status. + */ +interface InternalMergeQueueItem extends MergeQueueItem { + status: 'queued' | 'in_progress'; +} + +// ============================================================================= +// DefaultCoordinationManager Implementation +// ============================================================================= + +/** + * In-memory implementation of CoordinationManager. + * + * Uses Map for queue management and processes merges in dependency order. + * Handles conflicts by creating resolution tasks. + */ +export class DefaultCoordinationManager implements CoordinationManager { + /** Internal merge queue */ + private mergeQueue: Map = new Map(); + + /** Task IDs that have been successfully merged */ + private mergedTasks: Set = new Set(); + + /** Tasks with conflicts awaiting resolution */ + private conflictedTasks: Map = new Map(); + + constructor( + private worktreeManager?: WorktreeManager, + private taskRepository?: TaskRepository, + private agentRepository?: AgentRepository, + private eventBus?: EventBus + ) {} + + /** + * Queue a completed task for merge. + * Extracts agent/worktree information from the task. + */ + async queueMerge(taskId: string): Promise { + // Look up task to get dependencies + if (!this.taskRepository) { + throw new Error('TaskRepository not configured'); + } + + const task = await this.taskRepository.findById(taskId); + if (!task) { + throw new Error(`Task not found: ${taskId}`); + } + + // Look up agent assigned to task to get worktreeId + if (!this.agentRepository) { + throw new Error('AgentRepository not configured'); + } + + const agent = await this.agentRepository.findByTaskId(taskId); + if (!agent) { + throw new Error(`No agent found for task: ${taskId}`); + } + + // For now, dependsOn is empty - would need to query task_dependencies table + // to get actual dependencies + const dependsOn: string[] = []; + + const queueItem: InternalMergeQueueItem = { + taskId, + agentId: agent.id, + worktreeId: agent.worktreeId, + priority: task.priority, + queuedAt: new Date(), + dependsOn, + status: 'queued', + }; + + this.mergeQueue.set(taskId, queueItem); + + // Emit MergeQueuedEvent + const event: MergeQueuedEvent = { + type: 'merge:queued', + timestamp: new Date(), + payload: { + taskId, + agentId: agent.id, + worktreeId: agent.worktreeId, + priority: task.priority, + }, + }; + this.eventBus?.emit(event); + } + + /** + * Get next task ready to merge. + * Returns task with all dependency tasks already merged. + */ + async getNextMergeable(): Promise { + const queuedItems = Array.from(this.mergeQueue.values()).filter( + (item) => item.status === 'queued' + ); + + if (queuedItems.length === 0) { + return null; + } + + // Filter to only items where ALL dependsOn tasks are in mergedTasks + const readyItems = queuedItems.filter((item) => + item.dependsOn.every((depTaskId) => this.mergedTasks.has(depTaskId)) + ); + + if (readyItems.length === 0) { + return null; + } + + // Sort by priority (high > medium > low), then by queuedAt (oldest first) + const priorityOrder: Record = { high: 0, medium: 1, low: 2 }; + + readyItems.sort((a, b) => { + const priorityDiff = priorityOrder[a.priority] - priorityOrder[b.priority]; + if (priorityDiff !== 0) { + return priorityDiff; + } + return a.queuedAt.getTime() - b.queuedAt.getTime(); + }); + + // Return as MergeQueueItem (without internal status) + const item = readyItems[0]; + return { + taskId: item.taskId, + agentId: item.agentId, + worktreeId: item.worktreeId, + priority: item.priority, + queuedAt: item.queuedAt, + dependsOn: item.dependsOn, + }; + } + + /** + * Process all ready merges in dependency order. + * Merges each ready task into the target branch. + */ + async processMerges(targetBranch: string): Promise { + if (!this.worktreeManager) { + throw new Error('WorktreeManager not configured'); + } + + const results: MergeResult[] = []; + + // Loop while there are mergeable items + let nextItem = await this.getNextMergeable(); + + while (nextItem) { + const queueItem = this.mergeQueue.get(nextItem.taskId)!; + + // Mark as in_progress + queueItem.status = 'in_progress'; + + // Emit MergeStartedEvent + const startEvent: MergeStartedEvent = { + type: 'merge:started', + timestamp: new Date(), + payload: { + taskId: nextItem.taskId, + agentId: nextItem.agentId, + worktreeId: nextItem.worktreeId, + targetBranch, + }, + }; + this.eventBus?.emit(startEvent); + + // Attempt merge via worktreeManager + const mergeResult = await this.worktreeManager.merge(nextItem.worktreeId, targetBranch); + + if (mergeResult.success) { + // Success - add to mergedTasks and remove from queue + this.mergedTasks.add(nextItem.taskId); + this.mergeQueue.delete(nextItem.taskId); + + // Emit MergeCompletedEvent + const completedEvent: MergeCompletedEvent = { + type: 'merge:completed', + timestamp: new Date(), + payload: { + taskId: nextItem.taskId, + agentId: nextItem.agentId, + worktreeId: nextItem.worktreeId, + targetBranch, + }, + }; + this.eventBus?.emit(completedEvent); + + results.push({ + taskId: nextItem.taskId, + success: true, + message: mergeResult.message, + }); + } else { + // Conflict - add to conflictedTasks and remove from queue + const conflicts = mergeResult.conflicts || []; + this.conflictedTasks.set(nextItem.taskId, conflicts); + this.mergeQueue.delete(nextItem.taskId); + + // Emit MergeConflictedEvent + const conflictEvent: MergeConflictedEvent = { + type: 'merge:conflicted', + timestamp: new Date(), + payload: { + taskId: nextItem.taskId, + agentId: nextItem.agentId, + worktreeId: nextItem.worktreeId, + targetBranch, + conflictingFiles: conflicts, + }, + }; + this.eventBus?.emit(conflictEvent); + + // Handle conflict - create resolution task + await this.handleConflict(nextItem.taskId, conflicts); + + results.push({ + taskId: nextItem.taskId, + success: false, + conflicts, + message: mergeResult.message, + }); + } + + // Get next item + nextItem = await this.getNextMergeable(); + } + + return results; + } + + /** + * Handle a merge conflict. + * Creates a conflict-resolution task and assigns back to the agent. + */ + async handleConflict(taskId: string, conflicts: string[]): Promise { + if (!this.taskRepository) { + throw new Error('TaskRepository not configured'); + } + + // Get original task for context + const originalTask = await this.taskRepository.findById(taskId); + if (!originalTask) { + throw new Error(`Original task not found: ${taskId}`); + } + + // Create new conflict-resolution task + const conflictTask = await this.taskRepository.create({ + planId: originalTask.planId, + name: `Resolve conflicts: ${originalTask.name}`, + description: `Merge conflicts detected. Resolve conflicts in the following files:\n\n${conflicts.map((f) => `- ${f}`).join('\n')}\n\nOriginal task: ${originalTask.name}\n\nInstructions: Resolve merge conflicts in the listed files, then mark task complete.`, + type: 'auto', + priority: 'high', // Conflicts should be resolved quickly + status: 'pending', + order: originalTask.order + 1, + }); + + // Update original task status to blocked + await this.taskRepository.update(taskId, { status: 'blocked' }); + + // Emit TaskQueuedEvent for the new conflict-resolution task + const event: TaskQueuedEvent = { + type: 'task:queued', + timestamp: new Date(), + payload: { + taskId: conflictTask.id, + priority: 'high', + dependsOn: [], + }, + }; + this.eventBus?.emit(event); + } + + /** + * Get current state of the merge queue. + */ + async getQueueState(): Promise<{ + queued: MergeQueueItem[]; + inProgress: MergeQueueItem[]; + merged: string[]; + conflicted: Array<{ taskId: string; conflicts: string[] }>; + }> { + const allItems = Array.from(this.mergeQueue.values()); + + // Filter by status + const queued = allItems + .filter((item) => item.status === 'queued') + .map((item) => ({ + taskId: item.taskId, + agentId: item.agentId, + worktreeId: item.worktreeId, + priority: item.priority, + queuedAt: item.queuedAt, + dependsOn: item.dependsOn, + })); + + const inProgress = allItems + .filter((item) => item.status === 'in_progress') + .map((item) => ({ + taskId: item.taskId, + agentId: item.agentId, + worktreeId: item.worktreeId, + priority: item.priority, + queuedAt: item.queuedAt, + dependsOn: item.dependsOn, + })); + + const merged = Array.from(this.mergedTasks); + + const conflicted = Array.from(this.conflictedTasks.entries()).map(([taskId, conflicts]) => ({ + taskId, + conflicts, + })); + + return { queued, inProgress, merged, conflicted }; + } +}