feat(06-02): implement DefaultCoordinationManager adapter
Implements CoordinationManager interface with: - In-memory merge queue with dependency tracking - Dependency-ordered merging (getNextMergeable) - Merge queue state tracking (queued, in_progress, merged, conflicted) - Event emission for all merge lifecycle events - Basic conflict handling (creates resolution task)
This commit is contained in:
343
src/coordination/manager.ts
Normal file
343
src/coordination/manager.ts
Normal file
@@ -0,0 +1,343 @@
|
||||
/**
|
||||
* Default Coordination Manager - Adapter Implementation
|
||||
*
|
||||
* Implements CoordinationManager interface with in-memory merge queue
|
||||
* and dependency-ordered merging.
|
||||
*
|
||||
* This is the ADAPTER for the CoordinationManager PORT.
|
||||
*/
|
||||
|
||||
import type {
|
||||
EventBus,
|
||||
MergeQueuedEvent,
|
||||
MergeStartedEvent,
|
||||
MergeCompletedEvent,
|
||||
MergeConflictedEvent,
|
||||
TaskQueuedEvent,
|
||||
} from '../events/index.js';
|
||||
import type { WorktreeManager } from '../git/types.js';
|
||||
import type { TaskRepository } from '../db/repositories/task-repository.js';
|
||||
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||
import type { CoordinationManager, MergeQueueItem, MergeResult } from './types.js';
|
||||
|
||||
// =============================================================================
|
||||
// Internal Types
|
||||
// =============================================================================
|
||||
|
||||
/**
|
||||
* Internal representation of a merge queue item with status.
|
||||
*/
|
||||
interface InternalMergeQueueItem extends MergeQueueItem {
|
||||
status: 'queued' | 'in_progress';
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// DefaultCoordinationManager Implementation
|
||||
// =============================================================================
|
||||
|
||||
/**
|
||||
* In-memory implementation of CoordinationManager.
|
||||
*
|
||||
* Uses Map for queue management and processes merges in dependency order.
|
||||
* Handles conflicts by creating resolution tasks.
|
||||
*/
|
||||
export class DefaultCoordinationManager implements CoordinationManager {
|
||||
/** Internal merge queue */
|
||||
private mergeQueue: Map<string, InternalMergeQueueItem> = new Map();
|
||||
|
||||
/** Task IDs that have been successfully merged */
|
||||
private mergedTasks: Set<string> = new Set();
|
||||
|
||||
/** Tasks with conflicts awaiting resolution */
|
||||
private conflictedTasks: Map<string, string[]> = new Map();
|
||||
|
||||
constructor(
|
||||
private worktreeManager?: WorktreeManager,
|
||||
private taskRepository?: TaskRepository,
|
||||
private agentRepository?: AgentRepository,
|
||||
private eventBus?: EventBus
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Queue a completed task for merge.
|
||||
* Extracts agent/worktree information from the task.
|
||||
*/
|
||||
async queueMerge(taskId: string): Promise<void> {
|
||||
// Look up task to get dependencies
|
||||
if (!this.taskRepository) {
|
||||
throw new Error('TaskRepository not configured');
|
||||
}
|
||||
|
||||
const task = await this.taskRepository.findById(taskId);
|
||||
if (!task) {
|
||||
throw new Error(`Task not found: ${taskId}`);
|
||||
}
|
||||
|
||||
// Look up agent assigned to task to get worktreeId
|
||||
if (!this.agentRepository) {
|
||||
throw new Error('AgentRepository not configured');
|
||||
}
|
||||
|
||||
const agent = await this.agentRepository.findByTaskId(taskId);
|
||||
if (!agent) {
|
||||
throw new Error(`No agent found for task: ${taskId}`);
|
||||
}
|
||||
|
||||
// For now, dependsOn is empty - would need to query task_dependencies table
|
||||
// to get actual dependencies
|
||||
const dependsOn: string[] = [];
|
||||
|
||||
const queueItem: InternalMergeQueueItem = {
|
||||
taskId,
|
||||
agentId: agent.id,
|
||||
worktreeId: agent.worktreeId,
|
||||
priority: task.priority,
|
||||
queuedAt: new Date(),
|
||||
dependsOn,
|
||||
status: 'queued',
|
||||
};
|
||||
|
||||
this.mergeQueue.set(taskId, queueItem);
|
||||
|
||||
// Emit MergeQueuedEvent
|
||||
const event: MergeQueuedEvent = {
|
||||
type: 'merge:queued',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
taskId,
|
||||
agentId: agent.id,
|
||||
worktreeId: agent.worktreeId,
|
||||
priority: task.priority,
|
||||
},
|
||||
};
|
||||
this.eventBus?.emit(event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get next task ready to merge.
|
||||
* Returns task with all dependency tasks already merged.
|
||||
*/
|
||||
async getNextMergeable(): Promise<MergeQueueItem | null> {
|
||||
const queuedItems = Array.from(this.mergeQueue.values()).filter(
|
||||
(item) => item.status === 'queued'
|
||||
);
|
||||
|
||||
if (queuedItems.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Filter to only items where ALL dependsOn tasks are in mergedTasks
|
||||
const readyItems = queuedItems.filter((item) =>
|
||||
item.dependsOn.every((depTaskId) => this.mergedTasks.has(depTaskId))
|
||||
);
|
||||
|
||||
if (readyItems.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Sort by priority (high > medium > low), then by queuedAt (oldest first)
|
||||
const priorityOrder: Record<string, number> = { high: 0, medium: 1, low: 2 };
|
||||
|
||||
readyItems.sort((a, b) => {
|
||||
const priorityDiff = priorityOrder[a.priority] - priorityOrder[b.priority];
|
||||
if (priorityDiff !== 0) {
|
||||
return priorityDiff;
|
||||
}
|
||||
return a.queuedAt.getTime() - b.queuedAt.getTime();
|
||||
});
|
||||
|
||||
// Return as MergeQueueItem (without internal status)
|
||||
const item = readyItems[0];
|
||||
return {
|
||||
taskId: item.taskId,
|
||||
agentId: item.agentId,
|
||||
worktreeId: item.worktreeId,
|
||||
priority: item.priority,
|
||||
queuedAt: item.queuedAt,
|
||||
dependsOn: item.dependsOn,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Process all ready merges in dependency order.
|
||||
* Merges each ready task into the target branch.
|
||||
*/
|
||||
async processMerges(targetBranch: string): Promise<MergeResult[]> {
|
||||
if (!this.worktreeManager) {
|
||||
throw new Error('WorktreeManager not configured');
|
||||
}
|
||||
|
||||
const results: MergeResult[] = [];
|
||||
|
||||
// Loop while there are mergeable items
|
||||
let nextItem = await this.getNextMergeable();
|
||||
|
||||
while (nextItem) {
|
||||
const queueItem = this.mergeQueue.get(nextItem.taskId)!;
|
||||
|
||||
// Mark as in_progress
|
||||
queueItem.status = 'in_progress';
|
||||
|
||||
// Emit MergeStartedEvent
|
||||
const startEvent: MergeStartedEvent = {
|
||||
type: 'merge:started',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
taskId: nextItem.taskId,
|
||||
agentId: nextItem.agentId,
|
||||
worktreeId: nextItem.worktreeId,
|
||||
targetBranch,
|
||||
},
|
||||
};
|
||||
this.eventBus?.emit(startEvent);
|
||||
|
||||
// Attempt merge via worktreeManager
|
||||
const mergeResult = await this.worktreeManager.merge(nextItem.worktreeId, targetBranch);
|
||||
|
||||
if (mergeResult.success) {
|
||||
// Success - add to mergedTasks and remove from queue
|
||||
this.mergedTasks.add(nextItem.taskId);
|
||||
this.mergeQueue.delete(nextItem.taskId);
|
||||
|
||||
// Emit MergeCompletedEvent
|
||||
const completedEvent: MergeCompletedEvent = {
|
||||
type: 'merge:completed',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
taskId: nextItem.taskId,
|
||||
agentId: nextItem.agentId,
|
||||
worktreeId: nextItem.worktreeId,
|
||||
targetBranch,
|
||||
},
|
||||
};
|
||||
this.eventBus?.emit(completedEvent);
|
||||
|
||||
results.push({
|
||||
taskId: nextItem.taskId,
|
||||
success: true,
|
||||
message: mergeResult.message,
|
||||
});
|
||||
} else {
|
||||
// Conflict - add to conflictedTasks and remove from queue
|
||||
const conflicts = mergeResult.conflicts || [];
|
||||
this.conflictedTasks.set(nextItem.taskId, conflicts);
|
||||
this.mergeQueue.delete(nextItem.taskId);
|
||||
|
||||
// Emit MergeConflictedEvent
|
||||
const conflictEvent: MergeConflictedEvent = {
|
||||
type: 'merge:conflicted',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
taskId: nextItem.taskId,
|
||||
agentId: nextItem.agentId,
|
||||
worktreeId: nextItem.worktreeId,
|
||||
targetBranch,
|
||||
conflictingFiles: conflicts,
|
||||
},
|
||||
};
|
||||
this.eventBus?.emit(conflictEvent);
|
||||
|
||||
// Handle conflict - create resolution task
|
||||
await this.handleConflict(nextItem.taskId, conflicts);
|
||||
|
||||
results.push({
|
||||
taskId: nextItem.taskId,
|
||||
success: false,
|
||||
conflicts,
|
||||
message: mergeResult.message,
|
||||
});
|
||||
}
|
||||
|
||||
// Get next item
|
||||
nextItem = await this.getNextMergeable();
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a merge conflict.
|
||||
* Creates a conflict-resolution task and assigns back to the agent.
|
||||
*/
|
||||
async handleConflict(taskId: string, conflicts: string[]): Promise<void> {
|
||||
if (!this.taskRepository) {
|
||||
throw new Error('TaskRepository not configured');
|
||||
}
|
||||
|
||||
// Get original task for context
|
||||
const originalTask = await this.taskRepository.findById(taskId);
|
||||
if (!originalTask) {
|
||||
throw new Error(`Original task not found: ${taskId}`);
|
||||
}
|
||||
|
||||
// Create new conflict-resolution task
|
||||
const conflictTask = await this.taskRepository.create({
|
||||
planId: originalTask.planId,
|
||||
name: `Resolve conflicts: ${originalTask.name}`,
|
||||
description: `Merge conflicts detected. Resolve conflicts in the following files:\n\n${conflicts.map((f) => `- ${f}`).join('\n')}\n\nOriginal task: ${originalTask.name}\n\nInstructions: Resolve merge conflicts in the listed files, then mark task complete.`,
|
||||
type: 'auto',
|
||||
priority: 'high', // Conflicts should be resolved quickly
|
||||
status: 'pending',
|
||||
order: originalTask.order + 1,
|
||||
});
|
||||
|
||||
// Update original task status to blocked
|
||||
await this.taskRepository.update(taskId, { status: 'blocked' });
|
||||
|
||||
// Emit TaskQueuedEvent for the new conflict-resolution task
|
||||
const event: TaskQueuedEvent = {
|
||||
type: 'task:queued',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
taskId: conflictTask.id,
|
||||
priority: 'high',
|
||||
dependsOn: [],
|
||||
},
|
||||
};
|
||||
this.eventBus?.emit(event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current state of the merge queue.
|
||||
*/
|
||||
async getQueueState(): Promise<{
|
||||
queued: MergeQueueItem[];
|
||||
inProgress: MergeQueueItem[];
|
||||
merged: string[];
|
||||
conflicted: Array<{ taskId: string; conflicts: string[] }>;
|
||||
}> {
|
||||
const allItems = Array.from(this.mergeQueue.values());
|
||||
|
||||
// Filter by status
|
||||
const queued = allItems
|
||||
.filter((item) => item.status === 'queued')
|
||||
.map((item) => ({
|
||||
taskId: item.taskId,
|
||||
agentId: item.agentId,
|
||||
worktreeId: item.worktreeId,
|
||||
priority: item.priority,
|
||||
queuedAt: item.queuedAt,
|
||||
dependsOn: item.dependsOn,
|
||||
}));
|
||||
|
||||
const inProgress = allItems
|
||||
.filter((item) => item.status === 'in_progress')
|
||||
.map((item) => ({
|
||||
taskId: item.taskId,
|
||||
agentId: item.agentId,
|
||||
worktreeId: item.worktreeId,
|
||||
priority: item.priority,
|
||||
queuedAt: item.queuedAt,
|
||||
dependsOn: item.dependsOn,
|
||||
}));
|
||||
|
||||
const merged = Array.from(this.mergedTasks);
|
||||
|
||||
const conflicted = Array.from(this.conflictedTasks.entries()).map(([taskId, conflicts]) => ({
|
||||
taskId,
|
||||
conflicts,
|
||||
}));
|
||||
|
||||
return { queued, inProgress, merged, conflicted };
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user