feat(06-02): implement DefaultCoordinationManager adapter
Implements CoordinationManager interface with: - In-memory merge queue with dependency tracking - Dependency-ordered merging (getNextMergeable) - Merge queue state tracking (queued, in_progress, merged, conflicted) - Event emission for all merge lifecycle events - Basic conflict handling (creates resolution task)
This commit is contained in:
343
src/coordination/manager.ts
Normal file
343
src/coordination/manager.ts
Normal file
@@ -0,0 +1,343 @@
|
|||||||
|
/**
|
||||||
|
* Default Coordination Manager - Adapter Implementation
|
||||||
|
*
|
||||||
|
* Implements CoordinationManager interface with in-memory merge queue
|
||||||
|
* and dependency-ordered merging.
|
||||||
|
*
|
||||||
|
* This is the ADAPTER for the CoordinationManager PORT.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type {
|
||||||
|
EventBus,
|
||||||
|
MergeQueuedEvent,
|
||||||
|
MergeStartedEvent,
|
||||||
|
MergeCompletedEvent,
|
||||||
|
MergeConflictedEvent,
|
||||||
|
TaskQueuedEvent,
|
||||||
|
} from '../events/index.js';
|
||||||
|
import type { WorktreeManager } from '../git/types.js';
|
||||||
|
import type { TaskRepository } from '../db/repositories/task-repository.js';
|
||||||
|
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||||
|
import type { CoordinationManager, MergeQueueItem, MergeResult } from './types.js';
|
||||||
|
|
||||||
|
// =============================================================================
|
||||||
|
// Internal Types
|
||||||
|
// =============================================================================
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal representation of a merge queue item with status.
|
||||||
|
*/
|
||||||
|
interface InternalMergeQueueItem extends MergeQueueItem {
|
||||||
|
status: 'queued' | 'in_progress';
|
||||||
|
}
|
||||||
|
|
||||||
|
// =============================================================================
|
||||||
|
// DefaultCoordinationManager Implementation
|
||||||
|
// =============================================================================
|
||||||
|
|
||||||
|
/**
|
||||||
|
* In-memory implementation of CoordinationManager.
|
||||||
|
*
|
||||||
|
* Uses Map for queue management and processes merges in dependency order.
|
||||||
|
* Handles conflicts by creating resolution tasks.
|
||||||
|
*/
|
||||||
|
export class DefaultCoordinationManager implements CoordinationManager {
|
||||||
|
/** Internal merge queue */
|
||||||
|
private mergeQueue: Map<string, InternalMergeQueueItem> = new Map();
|
||||||
|
|
||||||
|
/** Task IDs that have been successfully merged */
|
||||||
|
private mergedTasks: Set<string> = new Set();
|
||||||
|
|
||||||
|
/** Tasks with conflicts awaiting resolution */
|
||||||
|
private conflictedTasks: Map<string, string[]> = new Map();
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private worktreeManager?: WorktreeManager,
|
||||||
|
private taskRepository?: TaskRepository,
|
||||||
|
private agentRepository?: AgentRepository,
|
||||||
|
private eventBus?: EventBus
|
||||||
|
) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Queue a completed task for merge.
|
||||||
|
* Extracts agent/worktree information from the task.
|
||||||
|
*/
|
||||||
|
async queueMerge(taskId: string): Promise<void> {
|
||||||
|
// Look up task to get dependencies
|
||||||
|
if (!this.taskRepository) {
|
||||||
|
throw new Error('TaskRepository not configured');
|
||||||
|
}
|
||||||
|
|
||||||
|
const task = await this.taskRepository.findById(taskId);
|
||||||
|
if (!task) {
|
||||||
|
throw new Error(`Task not found: ${taskId}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Look up agent assigned to task to get worktreeId
|
||||||
|
if (!this.agentRepository) {
|
||||||
|
throw new Error('AgentRepository not configured');
|
||||||
|
}
|
||||||
|
|
||||||
|
const agent = await this.agentRepository.findByTaskId(taskId);
|
||||||
|
if (!agent) {
|
||||||
|
throw new Error(`No agent found for task: ${taskId}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// For now, dependsOn is empty - would need to query task_dependencies table
|
||||||
|
// to get actual dependencies
|
||||||
|
const dependsOn: string[] = [];
|
||||||
|
|
||||||
|
const queueItem: InternalMergeQueueItem = {
|
||||||
|
taskId,
|
||||||
|
agentId: agent.id,
|
||||||
|
worktreeId: agent.worktreeId,
|
||||||
|
priority: task.priority,
|
||||||
|
queuedAt: new Date(),
|
||||||
|
dependsOn,
|
||||||
|
status: 'queued',
|
||||||
|
};
|
||||||
|
|
||||||
|
this.mergeQueue.set(taskId, queueItem);
|
||||||
|
|
||||||
|
// Emit MergeQueuedEvent
|
||||||
|
const event: MergeQueuedEvent = {
|
||||||
|
type: 'merge:queued',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: {
|
||||||
|
taskId,
|
||||||
|
agentId: agent.id,
|
||||||
|
worktreeId: agent.worktreeId,
|
||||||
|
priority: task.priority,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
this.eventBus?.emit(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get next task ready to merge.
|
||||||
|
* Returns task with all dependency tasks already merged.
|
||||||
|
*/
|
||||||
|
async getNextMergeable(): Promise<MergeQueueItem | null> {
|
||||||
|
const queuedItems = Array.from(this.mergeQueue.values()).filter(
|
||||||
|
(item) => item.status === 'queued'
|
||||||
|
);
|
||||||
|
|
||||||
|
if (queuedItems.length === 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter to only items where ALL dependsOn tasks are in mergedTasks
|
||||||
|
const readyItems = queuedItems.filter((item) =>
|
||||||
|
item.dependsOn.every((depTaskId) => this.mergedTasks.has(depTaskId))
|
||||||
|
);
|
||||||
|
|
||||||
|
if (readyItems.length === 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort by priority (high > medium > low), then by queuedAt (oldest first)
|
||||||
|
const priorityOrder: Record<string, number> = { high: 0, medium: 1, low: 2 };
|
||||||
|
|
||||||
|
readyItems.sort((a, b) => {
|
||||||
|
const priorityDiff = priorityOrder[a.priority] - priorityOrder[b.priority];
|
||||||
|
if (priorityDiff !== 0) {
|
||||||
|
return priorityDiff;
|
||||||
|
}
|
||||||
|
return a.queuedAt.getTime() - b.queuedAt.getTime();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Return as MergeQueueItem (without internal status)
|
||||||
|
const item = readyItems[0];
|
||||||
|
return {
|
||||||
|
taskId: item.taskId,
|
||||||
|
agentId: item.agentId,
|
||||||
|
worktreeId: item.worktreeId,
|
||||||
|
priority: item.priority,
|
||||||
|
queuedAt: item.queuedAt,
|
||||||
|
dependsOn: item.dependsOn,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process all ready merges in dependency order.
|
||||||
|
* Merges each ready task into the target branch.
|
||||||
|
*/
|
||||||
|
async processMerges(targetBranch: string): Promise<MergeResult[]> {
|
||||||
|
if (!this.worktreeManager) {
|
||||||
|
throw new Error('WorktreeManager not configured');
|
||||||
|
}
|
||||||
|
|
||||||
|
const results: MergeResult[] = [];
|
||||||
|
|
||||||
|
// Loop while there are mergeable items
|
||||||
|
let nextItem = await this.getNextMergeable();
|
||||||
|
|
||||||
|
while (nextItem) {
|
||||||
|
const queueItem = this.mergeQueue.get(nextItem.taskId)!;
|
||||||
|
|
||||||
|
// Mark as in_progress
|
||||||
|
queueItem.status = 'in_progress';
|
||||||
|
|
||||||
|
// Emit MergeStartedEvent
|
||||||
|
const startEvent: MergeStartedEvent = {
|
||||||
|
type: 'merge:started',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: {
|
||||||
|
taskId: nextItem.taskId,
|
||||||
|
agentId: nextItem.agentId,
|
||||||
|
worktreeId: nextItem.worktreeId,
|
||||||
|
targetBranch,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
this.eventBus?.emit(startEvent);
|
||||||
|
|
||||||
|
// Attempt merge via worktreeManager
|
||||||
|
const mergeResult = await this.worktreeManager.merge(nextItem.worktreeId, targetBranch);
|
||||||
|
|
||||||
|
if (mergeResult.success) {
|
||||||
|
// Success - add to mergedTasks and remove from queue
|
||||||
|
this.mergedTasks.add(nextItem.taskId);
|
||||||
|
this.mergeQueue.delete(nextItem.taskId);
|
||||||
|
|
||||||
|
// Emit MergeCompletedEvent
|
||||||
|
const completedEvent: MergeCompletedEvent = {
|
||||||
|
type: 'merge:completed',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: {
|
||||||
|
taskId: nextItem.taskId,
|
||||||
|
agentId: nextItem.agentId,
|
||||||
|
worktreeId: nextItem.worktreeId,
|
||||||
|
targetBranch,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
this.eventBus?.emit(completedEvent);
|
||||||
|
|
||||||
|
results.push({
|
||||||
|
taskId: nextItem.taskId,
|
||||||
|
success: true,
|
||||||
|
message: mergeResult.message,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
// Conflict - add to conflictedTasks and remove from queue
|
||||||
|
const conflicts = mergeResult.conflicts || [];
|
||||||
|
this.conflictedTasks.set(nextItem.taskId, conflicts);
|
||||||
|
this.mergeQueue.delete(nextItem.taskId);
|
||||||
|
|
||||||
|
// Emit MergeConflictedEvent
|
||||||
|
const conflictEvent: MergeConflictedEvent = {
|
||||||
|
type: 'merge:conflicted',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: {
|
||||||
|
taskId: nextItem.taskId,
|
||||||
|
agentId: nextItem.agentId,
|
||||||
|
worktreeId: nextItem.worktreeId,
|
||||||
|
targetBranch,
|
||||||
|
conflictingFiles: conflicts,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
this.eventBus?.emit(conflictEvent);
|
||||||
|
|
||||||
|
// Handle conflict - create resolution task
|
||||||
|
await this.handleConflict(nextItem.taskId, conflicts);
|
||||||
|
|
||||||
|
results.push({
|
||||||
|
taskId: nextItem.taskId,
|
||||||
|
success: false,
|
||||||
|
conflicts,
|
||||||
|
message: mergeResult.message,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get next item
|
||||||
|
nextItem = await this.getNextMergeable();
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle a merge conflict.
|
||||||
|
* Creates a conflict-resolution task and assigns back to the agent.
|
||||||
|
*/
|
||||||
|
async handleConflict(taskId: string, conflicts: string[]): Promise<void> {
|
||||||
|
if (!this.taskRepository) {
|
||||||
|
throw new Error('TaskRepository not configured');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get original task for context
|
||||||
|
const originalTask = await this.taskRepository.findById(taskId);
|
||||||
|
if (!originalTask) {
|
||||||
|
throw new Error(`Original task not found: ${taskId}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create new conflict-resolution task
|
||||||
|
const conflictTask = await this.taskRepository.create({
|
||||||
|
planId: originalTask.planId,
|
||||||
|
name: `Resolve conflicts: ${originalTask.name}`,
|
||||||
|
description: `Merge conflicts detected. Resolve conflicts in the following files:\n\n${conflicts.map((f) => `- ${f}`).join('\n')}\n\nOriginal task: ${originalTask.name}\n\nInstructions: Resolve merge conflicts in the listed files, then mark task complete.`,
|
||||||
|
type: 'auto',
|
||||||
|
priority: 'high', // Conflicts should be resolved quickly
|
||||||
|
status: 'pending',
|
||||||
|
order: originalTask.order + 1,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Update original task status to blocked
|
||||||
|
await this.taskRepository.update(taskId, { status: 'blocked' });
|
||||||
|
|
||||||
|
// Emit TaskQueuedEvent for the new conflict-resolution task
|
||||||
|
const event: TaskQueuedEvent = {
|
||||||
|
type: 'task:queued',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: {
|
||||||
|
taskId: conflictTask.id,
|
||||||
|
priority: 'high',
|
||||||
|
dependsOn: [],
|
||||||
|
},
|
||||||
|
};
|
||||||
|
this.eventBus?.emit(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get current state of the merge queue.
|
||||||
|
*/
|
||||||
|
async getQueueState(): Promise<{
|
||||||
|
queued: MergeQueueItem[];
|
||||||
|
inProgress: MergeQueueItem[];
|
||||||
|
merged: string[];
|
||||||
|
conflicted: Array<{ taskId: string; conflicts: string[] }>;
|
||||||
|
}> {
|
||||||
|
const allItems = Array.from(this.mergeQueue.values());
|
||||||
|
|
||||||
|
// Filter by status
|
||||||
|
const queued = allItems
|
||||||
|
.filter((item) => item.status === 'queued')
|
||||||
|
.map((item) => ({
|
||||||
|
taskId: item.taskId,
|
||||||
|
agentId: item.agentId,
|
||||||
|
worktreeId: item.worktreeId,
|
||||||
|
priority: item.priority,
|
||||||
|
queuedAt: item.queuedAt,
|
||||||
|
dependsOn: item.dependsOn,
|
||||||
|
}));
|
||||||
|
|
||||||
|
const inProgress = allItems
|
||||||
|
.filter((item) => item.status === 'in_progress')
|
||||||
|
.map((item) => ({
|
||||||
|
taskId: item.taskId,
|
||||||
|
agentId: item.agentId,
|
||||||
|
worktreeId: item.worktreeId,
|
||||||
|
priority: item.priority,
|
||||||
|
queuedAt: item.queuedAt,
|
||||||
|
dependsOn: item.dependsOn,
|
||||||
|
}));
|
||||||
|
|
||||||
|
const merged = Array.from(this.mergedTasks);
|
||||||
|
|
||||||
|
const conflicted = Array.from(this.conflictedTasks.entries()).map(([taskId, conflicts]) => ({
|
||||||
|
taskId,
|
||||||
|
conflicts,
|
||||||
|
}));
|
||||||
|
|
||||||
|
return { queued, inProgress, merged, conflicted };
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user