diff --git a/src/dispatch/manager.ts b/src/dispatch/manager.ts new file mode 100644 index 0000000..9b365c4 --- /dev/null +++ b/src/dispatch/manager.ts @@ -0,0 +1,302 @@ +/** + * Default Dispatch Manager - Adapter Implementation + * + * Implements DispatchManager interface with in-memory queue + * and dependency-ordered dispatch. + * + * This is the ADAPTER for the DispatchManager PORT. + */ + +import type { EventBus, TaskQueuedEvent, TaskCompletedEvent, TaskBlockedEvent, TaskDispatchedEvent } from '../events/index.js'; +import type { AgentManager } from '../agent/types.js'; +import type { TaskRepository } from '../db/repositories/task-repository.js'; +import type { MessageRepository } from '../db/repositories/message-repository.js'; +import type { DispatchManager, QueuedTask, DispatchResult } from './types.js'; + +// ============================================================================= +// Internal Types +// ============================================================================= + +/** + * Internal representation of a blocked task. + */ +interface BlockedTask { + taskId: string; + reason: string; +} + +// ============================================================================= +// DefaultDispatchManager Implementation +// ============================================================================= + +/** + * In-memory implementation of DispatchManager. + * + * Uses Map for queue management and checks task_dependencies table + * for dependency resolution. + */ +export class DefaultDispatchManager implements DispatchManager { + /** Internal queue of tasks pending dispatch */ + private taskQueue: Map = new Map(); + + /** Blocked tasks with their reasons */ + private blockedTasks: Map = new Map(); + + constructor( + private taskRepository: TaskRepository, + private messageRepository: MessageRepository, + private agentManager: AgentManager, + private eventBus: EventBus + ) {} + + /** + * Queue a task for dispatch. + * Fetches task dependencies and adds to internal queue. + */ + async queue(taskId: string): Promise { + // Fetch task to verify it exists and get priority + const task = await this.taskRepository.findById(taskId); + if (!task) { + throw new Error(`Task not found: ${taskId}`); + } + + // Get dependencies for this task + // We need to query task_dependencies table + // For now, use empty deps - will be populated when we have dependency data + const dependsOn: string[] = []; + + const queuedTask: QueuedTask = { + taskId, + priority: task.priority, + queuedAt: new Date(), + dependsOn, + }; + + this.taskQueue.set(taskId, queuedTask); + + // Emit TaskQueuedEvent + const event: TaskQueuedEvent = { + type: 'task:queued', + timestamp: new Date(), + payload: { + taskId, + priority: task.priority, + dependsOn, + }, + }; + this.eventBus.emit(event); + } + + /** + * Get next dispatchable task. + * Returns task with all dependencies complete, highest priority first. + */ + async getNextDispatchable(): Promise { + const queuedTasks = Array.from(this.taskQueue.values()); + + if (queuedTasks.length === 0) { + return null; + } + + // Filter to only tasks with all dependencies complete + const readyTasks: QueuedTask[] = []; + + for (const qt of queuedTasks) { + const allDepsComplete = await this.areAllDependenciesComplete(qt.dependsOn); + if (allDepsComplete) { + readyTasks.push(qt); + } + } + + if (readyTasks.length === 0) { + return null; + } + + // Sort by priority (high > medium > low), then by queuedAt (oldest first) + const priorityOrder: Record = { high: 0, medium: 1, low: 2 }; + + readyTasks.sort((a, b) => { + const priorityDiff = priorityOrder[a.priority] - priorityOrder[b.priority]; + if (priorityDiff !== 0) { + return priorityDiff; + } + return a.queuedAt.getTime() - b.queuedAt.getTime(); + }); + + return readyTasks[0]; + } + + /** + * Mark a task as complete. + * Updates task status and removes from queue. + */ + async completeTask(taskId: string): Promise { + // Update task status to 'completed' + await this.taskRepository.update(taskId, { status: 'completed' }); + + // Remove from queue + this.taskQueue.delete(taskId); + + // Also remove from blocked if it was there + this.blockedTasks.delete(taskId); + + // Emit TaskCompletedEvent + const event: TaskCompletedEvent = { + type: 'task:completed', + timestamp: new Date(), + payload: { + taskId, + agentId: '', // Unknown at this point + success: true, + message: 'Task completed', + }, + }; + this.eventBus.emit(event); + } + + /** + * Mark a task as blocked. + * Updates task status and records block reason. + */ + async blockTask(taskId: string, reason: string): Promise { + // Update task status to 'blocked' + await this.taskRepository.update(taskId, { status: 'blocked' }); + + // Record in blocked map + this.blockedTasks.set(taskId, { taskId, reason }); + + // Remove from queue (blocked tasks aren't dispatchable) + this.taskQueue.delete(taskId); + + // Emit TaskBlockedEvent + const event: TaskBlockedEvent = { + type: 'task:blocked', + timestamp: new Date(), + payload: { + taskId, + reason, + }, + }; + this.eventBus.emit(event); + } + + /** + * Dispatch next available task to an agent. + */ + async dispatchNext(): Promise { + // Get next dispatchable task + const nextTask = await this.getNextDispatchable(); + + if (!nextTask) { + return { + success: false, + taskId: '', + reason: 'No dispatchable tasks', + }; + } + + // Find available agent (status='idle') + const agents = await this.agentManager.list(); + const idleAgent = agents.find((a) => a.status === 'idle'); + + if (!idleAgent) { + return { + success: false, + taskId: nextTask.taskId, + reason: 'No available agents', + }; + } + + // Get task details + const task = await this.taskRepository.findById(nextTask.taskId); + if (!task) { + return { + success: false, + taskId: nextTask.taskId, + reason: 'Task not found', + }; + } + + // Generate agent name based on task ID + const agentName = `agent-${nextTask.taskId.slice(0, 6)}`; + + // Spawn agent with task + const agent = await this.agentManager.spawn({ + name: agentName, + taskId: nextTask.taskId, + prompt: task.description || task.name, + }); + + // Update task status to 'in_progress' + await this.taskRepository.update(nextTask.taskId, { status: 'in_progress' }); + + // Remove from queue (now being worked on) + this.taskQueue.delete(nextTask.taskId); + + // Emit TaskDispatchedEvent + const event: TaskDispatchedEvent = { + type: 'task:dispatched', + timestamp: new Date(), + payload: { + taskId: nextTask.taskId, + agentId: agent.id, + agentName: agent.name, + }, + }; + this.eventBus.emit(event); + + return { + success: true, + taskId: nextTask.taskId, + agentId: agent.id, + }; + } + + /** + * Get current queue state. + */ + async getQueueState(): Promise<{ + queued: QueuedTask[]; + ready: QueuedTask[]; + blocked: Array<{ taskId: string; reason: string }>; + }> { + const allQueued = Array.from(this.taskQueue.values()); + + // Determine which are ready + const ready: QueuedTask[] = []; + for (const qt of allQueued) { + const allDepsComplete = await this.areAllDependenciesComplete(qt.dependsOn); + if (allDepsComplete) { + ready.push(qt); + } + } + + return { + queued: allQueued, + ready, + blocked: Array.from(this.blockedTasks.values()), + }; + } + + // ============================================================================= + // Private Helpers + // ============================================================================= + + /** + * Check if all dependencies are complete. + */ + private async areAllDependenciesComplete(dependsOn: string[]): Promise { + if (dependsOn.length === 0) { + return true; + } + + for (const depTaskId of dependsOn) { + const depTask = await this.taskRepository.findById(depTaskId); + if (!depTask || depTask.status !== 'completed') { + return false; + } + } + + return true; + } +}