Files
Codewalkers/apps/server/dispatch/manager.ts
Lukas May 34578d39c6 refactor: Restructure monorepo to apps/server/ and apps/web/ layout
Move src/ → apps/server/ and packages/web/ → apps/web/ to adopt
standard monorepo conventions (apps/ for runnable apps, packages/
for reusable libraries). Update all config files, shared package
imports, test fixtures, and documentation to reflect new paths.

Key fixes:
- Update workspace config to ["apps/*", "packages/*"]
- Update tsconfig.json rootDir/include for apps/server/
- Add apps/web/** to vitest exclude list
- Update drizzle.config.ts schema path
- Fix ensure-schema.ts migration path detection (3 levels up in dev,
  2 levels up in dist)
- Fix tests/integration/cli-server.test.ts import paths
- Update packages/shared imports to apps/server/ paths
- Update all docs/ files with new paths
2026-03-03 11:22:53 +01:00

486 lines
14 KiB
TypeScript

/**
* Default Dispatch Manager - Adapter Implementation
*
* Implements DispatchManager interface with in-memory queue
* and dependency-ordered dispatch.
*
* This is the ADAPTER for the DispatchManager PORT.
*/
import type {
EventBus,
TaskQueuedEvent,
TaskCompletedEvent,
TaskBlockedEvent,
TaskDispatchedEvent,
TaskPendingApprovalEvent,
} from '../events/index.js';
import type { AgentManager } from '../agent/types.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { MessageRepository } from '../db/repositories/message-repository.js';
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { Task } from '../db/schema.js';
import type { DispatchManager, QueuedTask, DispatchResult } from './types.js';
import { phaseBranchName, taskBranchName, isPlanningCategory, generateInitiativeBranch } from '../git/branch-naming.js';
import { buildExecutePrompt } from '../agent/prompts/index.js';
import { createModuleLogger } from '../logger/index.js';
const log = createModuleLogger('dispatch');
// =============================================================================
// Internal Types
// =============================================================================
/**
* Internal representation of a blocked task.
*/
interface BlockedTask {
taskId: string;
reason: string;
}
// =============================================================================
// DefaultDispatchManager Implementation
// =============================================================================
/**
* In-memory implementation of DispatchManager.
*
* Uses Map for queue management and checks task_dependencies table
* for dependency resolution.
*/
export class DefaultDispatchManager implements DispatchManager {
/** Internal queue of tasks pending dispatch */
private taskQueue: Map<string, QueuedTask> = new Map();
/** Blocked tasks with their reasons */
private blockedTasks: Map<string, BlockedTask> = new Map();
constructor(
private taskRepository: TaskRepository,
private messageRepository: MessageRepository,
private agentManager: AgentManager,
private eventBus: EventBus,
private initiativeRepository?: InitiativeRepository,
private phaseRepository?: PhaseRepository,
) {}
/**
* Queue a task for dispatch.
* Fetches task dependencies and adds to internal queue.
* Checkpoint tasks are queued but won't auto-dispatch.
*/
async queue(taskId: string): Promise<void> {
// Fetch task to verify it exists and get priority
const task = await this.taskRepository.findById(taskId);
if (!task) {
throw new Error(`Task not found: ${taskId}`);
}
// Get dependencies for this task from the repository
const dependsOn = await this.taskRepository.getDependencies(taskId);
const queuedTask: QueuedTask = {
taskId,
priority: task.priority,
queuedAt: new Date(),
dependsOn,
};
this.taskQueue.set(taskId, queuedTask);
log.info({ taskId, priority: task.priority, isCheckpoint: this.isCheckpointTask(task) }, 'task queued');
// Emit TaskQueuedEvent
const event: TaskQueuedEvent = {
type: 'task:queued',
timestamp: new Date(),
payload: {
taskId,
priority: task.priority,
dependsOn,
},
};
this.eventBus.emit(event);
}
/**
* Get next dispatchable task.
* Returns task with all dependencies complete, highest priority first.
* Checkpoint tasks are excluded (require human action).
*/
async getNextDispatchable(): Promise<QueuedTask | null> {
const queuedTasks = Array.from(this.taskQueue.values());
if (queuedTasks.length === 0) {
return null;
}
// Filter to only tasks with all dependencies complete and not checkpoint tasks
const readyTasks: QueuedTask[] = [];
log.debug({ queueSize: queuedTasks.length }, 'evaluating dispatchable tasks');
for (const qt of queuedTasks) {
// Check dependencies
const allDepsComplete = await this.areAllDependenciesComplete(qt.dependsOn);
if (!allDepsComplete) {
continue;
}
// Check if this is a checkpoint task (requires human action)
const task = await this.taskRepository.findById(qt.taskId);
if (task && this.isCheckpointTask(task)) {
log.debug({ taskId: qt.taskId, type: task.type }, 'skipping checkpoint task');
continue;
}
// Skip planning-category tasks (handled by architect flow)
if (task && isPlanningCategory(task.category)) {
log.debug({ taskId: qt.taskId, category: task.category }, 'skipping planning-category task');
continue;
}
readyTasks.push(qt);
}
log.debug({ queueSize: queuedTasks.length, readyCount: readyTasks.length }, 'dispatchable evaluation complete');
if (readyTasks.length === 0) {
return null;
}
// Sort by priority (high > medium > low), then by queuedAt (oldest first)
const priorityOrder: Record<string, number> = { high: 0, medium: 1, low: 2 };
readyTasks.sort((a, b) => {
const priorityDiff = priorityOrder[a.priority] - priorityOrder[b.priority];
if (priorityDiff !== 0) {
return priorityDiff;
}
return a.queuedAt.getTime() - b.queuedAt.getTime();
});
return readyTasks[0];
}
/**
* Mark a task as complete.
* If the task requires approval, sets status to 'pending_approval' instead.
* Updates task status and removes from queue.
*
* @param taskId - ID of the task to complete
* @param agentId - Optional ID of the agent that completed the task
*/
async completeTask(taskId: string, agentId?: string): Promise<void> {
const task = await this.taskRepository.findById(taskId);
if (!task) {
throw new Error(`Task not found: ${taskId}`);
}
// Determine if approval is required
const requiresApproval = await this.taskRequiresApproval(task);
if (requiresApproval) {
// Set to pending_approval instead of completed
await this.taskRepository.update(taskId, { status: 'pending_approval' });
// Remove from queue
this.taskQueue.delete(taskId);
log.info({ taskId, category: task.category }, 'task pending approval');
// Emit TaskPendingApprovalEvent
const event: TaskPendingApprovalEvent = {
type: 'task:pending_approval',
timestamp: new Date(),
payload: {
taskId,
agentId: agentId ?? '',
category: task.category,
name: task.name,
},
};
this.eventBus.emit(event);
} else {
// Complete directly
await this.taskRepository.update(taskId, { status: 'completed' });
// Remove from queue
this.taskQueue.delete(taskId);
log.info({ taskId }, 'task completed');
// Emit TaskCompletedEvent
const event: TaskCompletedEvent = {
type: 'task:completed',
timestamp: new Date(),
payload: {
taskId,
agentId: agentId ?? '',
success: true,
message: 'Task completed',
},
};
this.eventBus.emit(event);
}
// Also remove from blocked if it was there
this.blockedTasks.delete(taskId);
}
/**
* Approve a task that is pending approval.
* Sets status to 'completed' and emits completion event.
*/
async approveTask(taskId: string): Promise<void> {
const task = await this.taskRepository.findById(taskId);
if (!task) {
throw new Error(`Task not found: ${taskId}`);
}
if (task.status !== 'pending_approval') {
throw new Error(`Task ${taskId} is not pending approval (status: ${task.status})`);
}
// Complete the task
await this.taskRepository.update(taskId, { status: 'completed' });
log.info({ taskId }, 'task approved and completed');
// Emit TaskCompletedEvent
const event: TaskCompletedEvent = {
type: 'task:completed',
timestamp: new Date(),
payload: {
taskId,
agentId: '',
success: true,
message: 'Task approved',
},
};
this.eventBus.emit(event);
}
/**
* Mark a task as blocked.
* Updates task status and records block reason.
*/
async blockTask(taskId: string, reason: string): Promise<void> {
// Update task status to 'blocked'
await this.taskRepository.update(taskId, { status: 'blocked' });
// Record in blocked map
this.blockedTasks.set(taskId, { taskId, reason });
log.warn({ taskId, reason }, 'task blocked');
// Remove from queue (blocked tasks aren't dispatchable)
this.taskQueue.delete(taskId);
// Emit TaskBlockedEvent
const event: TaskBlockedEvent = {
type: 'task:blocked',
timestamp: new Date(),
payload: {
taskId,
reason,
},
};
this.eventBus.emit(event);
}
/**
* Dispatch next available task to an agent.
*/
async dispatchNext(): Promise<DispatchResult> {
// Get next dispatchable task
const nextTask = await this.getNextDispatchable();
if (!nextTask) {
log.debug('no dispatchable tasks');
return {
success: false,
taskId: '',
reason: 'No dispatchable tasks',
};
}
// Find available agent (status='idle')
const agents = await this.agentManager.list();
const idleAgent = agents.find((a) => a.status === 'idle');
if (!idleAgent) {
log.debug('no available agents');
return {
success: false,
taskId: nextTask.taskId,
reason: 'No available agents',
};
}
// Get task details
const task = await this.taskRepository.findById(nextTask.taskId);
if (!task) {
return {
success: false,
taskId: nextTask.taskId,
reason: 'Task not found',
};
}
// Compute branch info for branch-aware spawning
let baseBranch: string | undefined;
let branchName: string | undefined;
if (task.initiativeId && this.initiativeRepository) {
try {
if (isPlanningCategory(task.category)) {
// Planning tasks run on project default branches — no initiative branch needed.
// baseBranch and branchName remain undefined; ProcessManager uses per-project defaults.
} else if (task.phaseId && this.phaseRepository) {
// Execution task — ensure initiative has a branch
const initiative = await this.initiativeRepository.findById(task.initiativeId);
if (initiative) {
let initBranch = initiative.branch;
if (!initBranch) {
initBranch = generateInitiativeBranch(initiative.name);
await this.initiativeRepository.update(initiative.id, { branch: initBranch });
}
const phase = await this.phaseRepository.findById(task.phaseId);
if (phase) {
if (task.category === 'merge') {
// Merge tasks work directly on the phase branch
baseBranch = initBranch;
branchName = phaseBranchName(initBranch, phase.name);
} else {
baseBranch = phaseBranchName(initBranch, phase.name);
branchName = taskBranchName(initBranch, task.id);
}
}
}
}
} catch {
// Non-fatal: fall back to default branching
}
}
// Spawn agent with task (alias auto-generated by agent manager)
const agent = await this.agentManager.spawn({
taskId: nextTask.taskId,
initiativeId: task.initiativeId ?? undefined,
phaseId: task.phaseId ?? undefined,
prompt: buildExecutePrompt(task.description || task.name),
baseBranch,
branchName,
});
log.info({ taskId: nextTask.taskId, agentId: agent.id }, 'task dispatched');
// Update task status to 'in_progress'
await this.taskRepository.update(nextTask.taskId, { status: 'in_progress' });
// Remove from queue (now being worked on)
this.taskQueue.delete(nextTask.taskId);
// Emit TaskDispatchedEvent
const event: TaskDispatchedEvent = {
type: 'task:dispatched',
timestamp: new Date(),
payload: {
taskId: nextTask.taskId,
agentId: agent.id,
agentName: agent.name,
},
};
this.eventBus.emit(event);
return {
success: true,
taskId: nextTask.taskId,
agentId: agent.id,
};
}
/**
* Get current queue state.
*/
async getQueueState(): Promise<{
queued: QueuedTask[];
ready: QueuedTask[];
blocked: Array<{ taskId: string; reason: string }>;
}> {
const allQueued = Array.from(this.taskQueue.values());
// Determine which are ready
const ready: QueuedTask[] = [];
for (const qt of allQueued) {
const allDepsComplete = await this.areAllDependenciesComplete(qt.dependsOn);
if (allDepsComplete) {
ready.push(qt);
}
}
return {
queued: allQueued,
ready,
blocked: Array.from(this.blockedTasks.values()),
};
}
// =============================================================================
// Private Helpers
// =============================================================================
/**
* Check if all dependencies are complete.
*/
private async areAllDependenciesComplete(dependsOn: string[]): Promise<boolean> {
if (dependsOn.length === 0) {
return true;
}
for (const depTaskId of dependsOn) {
const depTask = await this.taskRepository.findById(depTaskId);
if (!depTask || depTask.status !== 'completed') {
return false;
}
}
return true;
}
/**
* Check if a task is a checkpoint task.
* Checkpoint tasks require human action and don't auto-dispatch.
*/
private isCheckpointTask(task: Task): boolean {
return task.type.startsWith('checkpoint:');
}
/**
* Determine if a task requires approval before being marked complete.
* Checks task-level override first, then falls back to initiative setting.
*/
private async taskRequiresApproval(task: Task): Promise<boolean> {
// Task-level override takes precedence
if (task.requiresApproval !== null) {
return task.requiresApproval;
}
// Fall back to initiative setting if we have initiative access
if (this.initiativeRepository && task.initiativeId) {
const initiative = await this.initiativeRepository.findById(task.initiativeId);
if (initiative) {
return initiative.mergeRequiresApproval;
}
}
// If task has a phaseId but no initiativeId, we could traverse up but for now default to false
// Default: no approval required
return false;
}
}