/** * Default Dispatch Manager - Adapter Implementation * * Implements DispatchManager interface with in-memory queue * and dependency-ordered dispatch. * * This is the ADAPTER for the DispatchManager PORT. */ import type { EventBus, TaskQueuedEvent, TaskCompletedEvent, TaskBlockedEvent, TaskDispatchedEvent, } from '../events/index.js'; import type { AgentManager, AgentResult, AgentInfo } from '../agent/types.js'; import type { TaskRepository } from '../db/repositories/task-repository.js'; import type { MessageRepository } from '../db/repositories/message-repository.js'; import type { AgentRepository } from '../db/repositories/agent-repository.js'; import type { InitiativeRepository } from '../db/repositories/initiative-repository.js'; import type { PhaseRepository } from '../db/repositories/phase-repository.js'; import type { PageRepository } from '../db/repositories/page-repository.js'; import type { Task, Phase } from '../db/schema.js'; import type { PageForSerialization } from '../agent/content-serializer.js'; import type { DispatchManager, QueuedTask, DispatchResult } from './types.js'; import { phaseBranchName, taskBranchName, isPlanningCategory, generateInitiativeBranch } from '../git/branch-naming.js'; import { buildExecutePrompt } from '../agent/prompts/index.js'; import { createModuleLogger } from '../logger/index.js'; const log = createModuleLogger('dispatch'); // ============================================================================= // Internal Types // ============================================================================= /** * Internal representation of a blocked task. */ interface BlockedTask { taskId: string; reason: string; } // ============================================================================= // DefaultDispatchManager Implementation // ============================================================================= /** * In-memory implementation of DispatchManager. * * Uses Map for queue management and checks task_dependencies table * for dependency resolution. */ export class DefaultDispatchManager implements DispatchManager { /** Internal queue of tasks pending dispatch */ private taskQueue: Map = new Map(); /** Blocked tasks with their reasons */ private blockedTasks: Map = new Map(); constructor( private taskRepository: TaskRepository, private messageRepository: MessageRepository, private agentManager: AgentManager, private eventBus: EventBus, private initiativeRepository?: InitiativeRepository, private phaseRepository?: PhaseRepository, private agentRepository?: AgentRepository, private pageRepository?: PageRepository, ) {} /** * Queue a task for dispatch. * Fetches task dependencies and adds to internal queue. * Checkpoint tasks are queued but won't auto-dispatch. */ async queue(taskId: string): Promise { // Fetch task to verify it exists and get priority const task = await this.taskRepository.findById(taskId); if (!task) { throw new Error(`Task not found: ${taskId}`); } // Get dependencies for this task from the repository const dependsOn = await this.taskRepository.getDependencies(taskId); const queuedTask: QueuedTask = { taskId, priority: task.priority, queuedAt: new Date(), dependsOn, }; this.taskQueue.set(taskId, queuedTask); log.info({ taskId, priority: task.priority, isCheckpoint: this.isCheckpointTask(task) }, 'task queued'); // Emit TaskQueuedEvent const event: TaskQueuedEvent = { type: 'task:queued', timestamp: new Date(), payload: { taskId, priority: task.priority, dependsOn, }, }; this.eventBus.emit(event); } /** * Get next dispatchable task. * Returns task with all dependencies complete, highest priority first. * Checkpoint tasks are excluded (require human action). */ async getNextDispatchable(): Promise { const queuedTasks = Array.from(this.taskQueue.values()); if (queuedTasks.length === 0) { return null; } // Filter to only tasks with all dependencies complete and not checkpoint tasks const readyTasks: QueuedTask[] = []; log.debug({ queueSize: queuedTasks.length }, 'evaluating dispatchable tasks'); for (const qt of queuedTasks) { // Check dependencies const allDepsComplete = await this.areAllDependenciesComplete(qt.dependsOn); if (!allDepsComplete) { continue; } // Check if this is a checkpoint task (requires human action) const task = await this.taskRepository.findById(qt.taskId); if (task && this.isCheckpointTask(task)) { log.debug({ taskId: qt.taskId, type: task.type }, 'skipping checkpoint task'); continue; } // Skip planning-category tasks (handled by architect flow) if (task && isPlanningCategory(task.category)) { log.debug({ taskId: qt.taskId, category: task.category }, 'skipping planning-category task'); continue; } readyTasks.push(qt); } log.debug({ queueSize: queuedTasks.length, readyCount: readyTasks.length }, 'dispatchable evaluation complete'); if (readyTasks.length === 0) { return null; } // Sort by priority (high > medium > low), then by queuedAt (oldest first) const priorityOrder: Record = { high: 0, medium: 1, low: 2 }; readyTasks.sort((a, b) => { const priorityDiff = priorityOrder[a.priority] - priorityOrder[b.priority]; if (priorityDiff !== 0) { return priorityDiff; } return a.queuedAt.getTime() - b.queuedAt.getTime(); }); return readyTasks[0]; } /** * Mark a task as complete. * Updates task status and removes from queue. * * @param taskId - ID of the task to complete * @param agentId - Optional ID of the agent that completed the task */ async completeTask(taskId: string, agentId?: string): Promise { const task = await this.taskRepository.findById(taskId); if (!task) { throw new Error(`Task not found: ${taskId}`); } // Store agent result summary on the task for propagation to dependent tasks await this.storeAgentSummary(taskId, agentId); await this.taskRepository.update(taskId, { status: 'completed' }); // Remove from queue this.taskQueue.delete(taskId); log.info({ taskId }, 'task completed'); // Emit TaskCompletedEvent const event: TaskCompletedEvent = { type: 'task:completed', timestamp: new Date(), payload: { taskId, agentId: agentId ?? '', success: true, message: 'Task completed', }, }; this.eventBus.emit(event); // Also remove from blocked if it was there this.blockedTasks.delete(taskId); } /** * Mark a task as blocked. * Updates task status and records block reason. */ async blockTask(taskId: string, reason: string): Promise { // Update task status to 'blocked' await this.taskRepository.update(taskId, { status: 'blocked' }); // Record in blocked map this.blockedTasks.set(taskId, { taskId, reason }); log.warn({ taskId, reason }, 'task blocked'); // Remove from queue (blocked tasks aren't dispatchable) this.taskQueue.delete(taskId); // Emit TaskBlockedEvent const event: TaskBlockedEvent = { type: 'task:blocked', timestamp: new Date(), payload: { taskId, reason, }, }; this.eventBus.emit(event); } /** * Dispatch next available task to an agent. */ async dispatchNext(): Promise { // Get next dispatchable task const nextTask = await this.getNextDispatchable(); if (!nextTask) { log.debug('no dispatchable tasks'); return { success: false, taskId: '', reason: 'No dispatchable tasks', }; } // Find available agent (status='idle') const agents = await this.agentManager.list(); const idleAgent = agents.find((a) => a.status === 'idle'); if (!idleAgent) { log.debug('no available agents'); return { success: false, taskId: nextTask.taskId, reason: 'No available agents', }; } // Get task details const task = await this.taskRepository.findById(nextTask.taskId); if (!task) { return { success: false, taskId: nextTask.taskId, reason: 'Task not found', }; } // Compute branch info for branch-aware spawning let baseBranch: string | undefined; let branchName: string | undefined; if (task.initiativeId && this.initiativeRepository) { try { if (isPlanningCategory(task.category)) { // Planning tasks run on project default branches — no initiative branch needed. // baseBranch and branchName remain undefined; ProcessManager uses per-project defaults. } else if (task.phaseId && this.phaseRepository) { // Execution task — ensure initiative has a branch const initiative = await this.initiativeRepository.findById(task.initiativeId); if (initiative) { let initBranch = initiative.branch; if (!initBranch) { initBranch = generateInitiativeBranch(initiative.name); await this.initiativeRepository.update(initiative.id, { branch: initBranch }); } const phase = await this.phaseRepository.findById(task.phaseId); if (phase) { if (task.category === 'merge' || task.category === 'review') { // Merge/review tasks work directly on the phase branch baseBranch = initBranch; branchName = phaseBranchName(initBranch, phase.name); } else { baseBranch = phaseBranchName(initBranch, phase.name); branchName = taskBranchName(initBranch, task.id); } } } } } catch { // Non-fatal: fall back to default branching } } // Gather initiative context for the agent's input files let inputContext: import('../agent/types.js').AgentInputContext | undefined; if (task.initiativeId) { try { const initiative = await this.initiativeRepository?.findById(task.initiativeId); const phase = task.phaseId ? await this.phaseRepository?.findById(task.phaseId) : undefined; const context = await this.gatherInitiativeContext(task.initiativeId); inputContext = { initiative: initiative ?? undefined, task, phase: phase ?? undefined, phases: context.phases.length > 0 ? context.phases : undefined, tasks: context.tasks.length > 0 ? context.tasks : undefined, pages: context.pages.length > 0 ? context.pages : undefined, }; } catch (err) { log.warn({ taskId: task.id, err }, 'failed to gather initiative context for dispatch'); } } // Spawn agent with task (alias auto-generated by agent manager) let agent: AgentInfo; try { agent = await this.agentManager.spawn({ taskId: nextTask.taskId, initiativeId: task.initiativeId ?? undefined, phaseId: task.phaseId ?? undefined, prompt: buildExecutePrompt(task.description || task.name), baseBranch, branchName, inputContext, }); } catch (err) { const reason = `Spawn failed: ${err instanceof Error ? err.message : String(err)}`; log.error({ taskId: nextTask.taskId, err: reason }, 'agent spawn failed, blocking task'); await this.blockTask(nextTask.taskId, reason); return { success: false, taskId: nextTask.taskId, reason }; } log.info({ taskId: nextTask.taskId, agentId: agent.id }, 'task dispatched'); // Update task status to 'in_progress' await this.taskRepository.update(nextTask.taskId, { status: 'in_progress' }); // Remove from queue (now being worked on) this.taskQueue.delete(nextTask.taskId); // Emit TaskDispatchedEvent const event: TaskDispatchedEvent = { type: 'task:dispatched', timestamp: new Date(), payload: { taskId: nextTask.taskId, agentId: agent.id, agentName: agent.name, }, }; this.eventBus.emit(event); return { success: true, taskId: nextTask.taskId, agentId: agent.id, }; } /** * Get current queue state. */ async getQueueState(): Promise<{ queued: QueuedTask[]; ready: QueuedTask[]; blocked: Array<{ taskId: string; reason: string }>; }> { const allQueued = Array.from(this.taskQueue.values()); // Determine which are ready const ready: QueuedTask[] = []; for (const qt of allQueued) { const allDepsComplete = await this.areAllDependenciesComplete(qt.dependsOn); if (allDepsComplete) { ready.push(qt); } } return { queued: allQueued, ready, blocked: Array.from(this.blockedTasks.values()), }; } // ============================================================================= // Private Helpers // ============================================================================= /** * Check if all dependencies are complete. */ private async areAllDependenciesComplete(dependsOn: string[]): Promise { if (dependsOn.length === 0) { return true; } for (const depTaskId of dependsOn) { const depTask = await this.taskRepository.findById(depTaskId); if (!depTask || depTask.status !== 'completed') { return false; } } return true; } /** * Check if a task is a checkpoint task. * Checkpoint tasks require human action and don't auto-dispatch. */ private isCheckpointTask(task: Task): boolean { return task.type.startsWith('checkpoint:'); } /** * Store the completing agent's result summary on the task record. */ private async storeAgentSummary(taskId: string, agentId?: string): Promise { if (!agentId || !this.agentRepository) return; try { const agentRecord = await this.agentRepository.findById(agentId); if (agentRecord?.result) { const result: AgentResult = JSON.parse(agentRecord.result); if (result.message) { await this.taskRepository.update(taskId, { summary: result.message }); } } } catch (err) { log.warn({ taskId, agentId, err }, 'failed to store agent summary on task'); } } /** * Gather initiative context for passing to execution agents. * Reuses the same pattern as architect.ts gatherInitiativeContext. */ private async gatherInitiativeContext(initiativeId: string): Promise<{ phases: Array; tasks: Task[]; pages: PageForSerialization[]; }> { const [rawPhases, deps, initiativeTasks, pages] = await Promise.all([ this.phaseRepository?.findByInitiativeId(initiativeId) ?? [], this.phaseRepository?.findDependenciesByInitiativeId(initiativeId) ?? [], this.taskRepository.findByInitiativeId(initiativeId), this.pageRepository?.findByInitiativeId(initiativeId) ?? [], ]); // Merge dependencies into each phase as a dependsOn array const depsByPhase = new Map(); for (const dep of deps) { const arr = depsByPhase.get(dep.phaseId) ?? []; arr.push(dep.dependsOnPhaseId); depsByPhase.set(dep.phaseId, arr); } const phases = rawPhases.map((ph) => ({ ...ph, dependsOn: depsByPhase.get(ph.id) ?? [], })); // Collect tasks from all phases (some tasks only have phaseId, not initiativeId) const taskIds = new Set(initiativeTasks.map((t) => t.id)); const allTasks = [...initiativeTasks]; for (const ph of rawPhases) { const phaseTasks = await this.taskRepository.findByPhaseId(ph.id); for (const t of phaseTasks) { if (!taskIds.has(t.id)) { taskIds.add(t.id); allTasks.push(t); } } } // Only include implementation tasks — planning tasks are irrelevant noise const implementationTasks = allTasks.filter(t => !isPlanningCategory(t.category)); return { phases, tasks: implementationTasks, pages }; } }