Files
Codewalkers/apps/server/coordination/manager.ts
Lukas May 34578d39c6 refactor: Restructure monorepo to apps/server/ and apps/web/ layout
Move src/ → apps/server/ and packages/web/ → apps/web/ to adopt
standard monorepo conventions (apps/ for runnable apps, packages/
for reusable libraries). Update all config files, shared package
imports, test fixtures, and documentation to reflect new paths.

Key fixes:
- Update workspace config to ["apps/*", "packages/*"]
- Update tsconfig.json rootDir/include for apps/server/
- Add apps/web/** to vitest exclude list
- Update drizzle.config.ts schema path
- Fix ensure-schema.ts migration path detection (3 levels up in dev,
  2 levels up in dist)
- Fix tests/integration/cli-server.test.ts import paths
- Update packages/shared imports to apps/server/ paths
- Update all docs/ files with new paths
2026-03-03 11:22:53 +01:00

333 lines
9.9 KiB
TypeScript

/**
* Default Coordination Manager - Adapter Implementation
*
* Implements CoordinationManager interface with in-memory merge queue
* and dependency-ordered merging.
*
* This is the ADAPTER for the CoordinationManager PORT.
*/
import type {
EventBus,
MergeQueuedEvent,
MergeStartedEvent,
MergeCompletedEvent,
MergeConflictedEvent,
} from '../events/index.js';
import type { WorktreeManager } from '../git/types.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { MessageRepository } from '../db/repositories/message-repository.js';
import type { CoordinationManager, MergeQueueItem, MergeResult } from './types.js';
import type { ConflictResolutionService } from './conflict-resolution-service.js';
import { DefaultConflictResolutionService } from './conflict-resolution-service.js';
// =============================================================================
// Internal Types
// =============================================================================
/**
* Internal representation of a merge queue item with status.
*/
interface InternalMergeQueueItem extends MergeQueueItem {
status: 'queued' | 'in_progress';
}
// =============================================================================
// DefaultCoordinationManager Implementation
// =============================================================================
/**
* In-memory implementation of CoordinationManager.
*
* Uses Map for queue management and processes merges in dependency order.
* Handles conflicts by creating resolution tasks.
*/
export class DefaultCoordinationManager implements CoordinationManager {
/** Internal merge queue */
private mergeQueue: Map<string, InternalMergeQueueItem> = new Map();
/** Task IDs that have been successfully merged */
private mergedTasks: Set<string> = new Set();
/** Tasks with conflicts awaiting resolution */
private conflictedTasks: Map<string, string[]> = new Map();
/** Service for handling merge conflicts */
private conflictResolutionService?: ConflictResolutionService;
constructor(
private worktreeManager?: WorktreeManager,
private taskRepository?: TaskRepository,
private agentRepository?: AgentRepository,
private messageRepository?: MessageRepository,
private eventBus?: EventBus,
conflictResolutionService?: ConflictResolutionService
) {
// Create default conflict resolution service if none provided
if (conflictResolutionService) {
this.conflictResolutionService = conflictResolutionService;
} else if (taskRepository && agentRepository) {
this.conflictResolutionService = new DefaultConflictResolutionService(
taskRepository,
agentRepository,
messageRepository,
eventBus
);
}
}
/**
* Queue a completed task for merge.
* Extracts agent/worktree information from the task.
*/
async queueMerge(taskId: string): Promise<void> {
// Look up task to get dependencies
if (!this.taskRepository) {
throw new Error('TaskRepository not configured');
}
const task = await this.taskRepository.findById(taskId);
if (!task) {
throw new Error(`Task not found: ${taskId}`);
}
// Look up agent assigned to task to get worktreeId
if (!this.agentRepository) {
throw new Error('AgentRepository not configured');
}
const agent = await this.agentRepository.findByTaskId(taskId);
if (!agent) {
throw new Error(`No agent found for task: ${taskId}`);
}
// For now, dependsOn is empty - would need to query task_dependencies table
// to get actual dependencies
const dependsOn: string[] = [];
const queueItem: InternalMergeQueueItem = {
taskId,
agentId: agent.id,
worktreeId: agent.worktreeId,
priority: task.priority,
queuedAt: new Date(),
dependsOn,
status: 'queued',
};
this.mergeQueue.set(taskId, queueItem);
// Emit MergeQueuedEvent
const event: MergeQueuedEvent = {
type: 'merge:queued',
timestamp: new Date(),
payload: {
taskId,
agentId: agent.id,
worktreeId: agent.worktreeId,
priority: task.priority,
},
};
this.eventBus?.emit(event);
}
/**
* Get next task ready to merge.
* Returns task with all dependency tasks already merged.
*/
async getNextMergeable(): Promise<MergeQueueItem | null> {
const queuedItems = Array.from(this.mergeQueue.values()).filter(
(item) => item.status === 'queued'
);
if (queuedItems.length === 0) {
return null;
}
// Filter to only items where ALL dependsOn tasks are in mergedTasks
const readyItems = queuedItems.filter((item) =>
item.dependsOn.every((depTaskId) => this.mergedTasks.has(depTaskId))
);
if (readyItems.length === 0) {
return null;
}
// Sort by priority (high > medium > low), then by queuedAt (oldest first)
const priorityOrder: Record<string, number> = { high: 0, medium: 1, low: 2 };
readyItems.sort((a, b) => {
const priorityDiff = priorityOrder[a.priority] - priorityOrder[b.priority];
if (priorityDiff !== 0) {
return priorityDiff;
}
return a.queuedAt.getTime() - b.queuedAt.getTime();
});
// Return as MergeQueueItem (without internal status)
const item = readyItems[0];
return {
taskId: item.taskId,
agentId: item.agentId,
worktreeId: item.worktreeId,
priority: item.priority,
queuedAt: item.queuedAt,
dependsOn: item.dependsOn,
};
}
/**
* Process all ready merges in dependency order.
* Merges each ready task into the target branch.
*/
async processMerges(targetBranch: string): Promise<MergeResult[]> {
if (!this.worktreeManager) {
throw new Error('WorktreeManager not configured');
}
const results: MergeResult[] = [];
// Loop while there are mergeable items
let nextItem = await this.getNextMergeable();
while (nextItem) {
const queueItem = this.mergeQueue.get(nextItem.taskId)!;
// Mark as in_progress
queueItem.status = 'in_progress';
// Emit MergeStartedEvent
const startEvent: MergeStartedEvent = {
type: 'merge:started',
timestamp: new Date(),
payload: {
taskId: nextItem.taskId,
agentId: nextItem.agentId,
worktreeId: nextItem.worktreeId,
targetBranch,
},
};
this.eventBus?.emit(startEvent);
// Attempt merge via worktreeManager
const mergeResult = await this.worktreeManager.merge(nextItem.worktreeId, targetBranch);
if (mergeResult.success) {
// Success - add to mergedTasks and remove from queue
this.mergedTasks.add(nextItem.taskId);
this.mergeQueue.delete(nextItem.taskId);
// Emit MergeCompletedEvent
const completedEvent: MergeCompletedEvent = {
type: 'merge:completed',
timestamp: new Date(),
payload: {
taskId: nextItem.taskId,
agentId: nextItem.agentId,
worktreeId: nextItem.worktreeId,
targetBranch,
},
};
this.eventBus?.emit(completedEvent);
results.push({
taskId: nextItem.taskId,
success: true,
message: mergeResult.message,
});
} else {
// Conflict - add to conflictedTasks and remove from queue
const conflicts = mergeResult.conflicts || [];
this.conflictedTasks.set(nextItem.taskId, conflicts);
this.mergeQueue.delete(nextItem.taskId);
// Emit MergeConflictedEvent
const conflictEvent: MergeConflictedEvent = {
type: 'merge:conflicted',
timestamp: new Date(),
payload: {
taskId: nextItem.taskId,
agentId: nextItem.agentId,
worktreeId: nextItem.worktreeId,
targetBranch,
conflictingFiles: conflicts,
},
};
this.eventBus?.emit(conflictEvent);
// Handle conflict - create resolution task
await this.handleConflict(nextItem.taskId, conflicts);
results.push({
taskId: nextItem.taskId,
success: false,
conflicts,
message: mergeResult.message,
});
}
// Get next item
nextItem = await this.getNextMergeable();
}
return results;
}
/**
* Handle a merge conflict.
* Delegates to the ConflictResolutionService.
*/
async handleConflict(taskId: string, conflicts: string[]): Promise<void> {
if (!this.conflictResolutionService) {
throw new Error('ConflictResolutionService not configured');
}
await this.conflictResolutionService.handleConflict(taskId, conflicts);
}
/**
* Get current state of the merge queue.
*/
async getQueueState(): Promise<{
queued: MergeQueueItem[];
inProgress: MergeQueueItem[];
merged: string[];
conflicted: Array<{ taskId: string; conflicts: string[] }>;
}> {
const allItems = Array.from(this.mergeQueue.values());
// Filter by status
const queued = allItems
.filter((item) => item.status === 'queued')
.map((item) => ({
taskId: item.taskId,
agentId: item.agentId,
worktreeId: item.worktreeId,
priority: item.priority,
queuedAt: item.queuedAt,
dependsOn: item.dependsOn,
}));
const inProgress = allItems
.filter((item) => item.status === 'in_progress')
.map((item) => ({
taskId: item.taskId,
agentId: item.agentId,
worktreeId: item.worktreeId,
priority: item.priority,
queuedAt: item.queuedAt,
dependsOn: item.dependsOn,
}));
const merged = Array.from(this.mergedTasks);
const conflicted = Array.from(this.conflictedTasks.entries()).map(([taskId, conflicts]) => ({
taskId,
conflicts,
}));
return { queued, inProgress, merged, conflicted };
}
}