Files
Codewalkers/src/agent/manager.ts
Lukas May 91e57c66eb feat(11-01): add AgentMode type and database column
- Add AgentMode type: 'execute' | 'discuss' | 'breakdown'
- Add mode column to agents table with 'execute' default
- Update SpawnAgentOptions to accept optional mode
- Update AgentInfo interface to include mode field
- Update ClaudeAgentManager.toAgentInfo to map mode
- Fix MockAgentManager to include mode in spawn
- Fix dispatch manager tests to include mode
2026-01-31 19:03:42 +01:00

474 lines
13 KiB
TypeScript

/**
* Claude Agent Manager Adapter
*
* Implementation of AgentManager port using Claude CLI with JSON output.
* Spawns real Claude agents via `claude -p "prompt" --output-format json`.
*/
import { execa, type ResultPromise } from 'execa';
import { randomUUID } from 'crypto';
import type {
AgentManager,
AgentInfo,
SpawnAgentOptions,
AgentResult,
AgentStatus,
AgentMode,
PendingQuestions,
} from './types.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { WorktreeManager } from '../git/types.js';
import type {
EventBus,
AgentSpawnedEvent,
AgentStoppedEvent,
AgentCrashedEvent,
AgentResumedEvent,
AgentWaitingEvent,
} from '../events/index.js';
import { agentOutputSchema, agentOutputJsonSchema } from './schema.js';
/**
* Result structure from Claude CLI with --output-format json
*/
interface ClaudeCliResult {
type: 'result';
subtype: 'success' | 'error';
is_error: boolean;
session_id: string;
result: string;
total_cost_usd?: number;
}
/**
* Tracks an active agent subprocess, its result, and any pending questions
*/
interface ActiveAgent {
subprocess: ResultPromise;
result?: AgentResult;
pendingQuestions?: PendingQuestions;
}
/**
* ClaudeAgentManager - Adapter implementing AgentManager port
*
* Uses Claude CLI in JSON mode to spawn agents. Each agent gets:
* - Isolated worktree (via WorktreeManager)
* - Persisted state (via AgentRepository)
* - Lifecycle events (via EventBus)
*/
export class ClaudeAgentManager implements AgentManager {
private activeAgents: Map<string, ActiveAgent> = new Map();
constructor(
private repository: AgentRepository,
private worktreeManager: WorktreeManager,
private eventBus?: EventBus
) {}
/**
* Spawn a new agent to work on a task.
* Creates isolated worktree, starts Claude CLI, persists state.
*/
async spawn(options: SpawnAgentOptions): Promise<AgentInfo> {
const { name, taskId, prompt, cwd } = options;
const worktreeId = randomUUID();
const branchName = `agent/${name}`;
// Check name uniqueness
const existing = await this.repository.findByName(name);
if (existing) {
throw new Error(`Agent with name '${name}' already exists`);
}
// 1. Create isolated worktree
const worktree = await this.worktreeManager.create(worktreeId, branchName);
// 2. Create agent record (session ID null until first run completes)
const agent = await this.repository.create({
name,
taskId,
sessionId: null,
worktreeId: worktree.id,
status: 'running',
});
// Use agent.id from repository for all tracking
const agentId = agent.id;
// 3. Start Claude CLI in background with JSON schema for structured output
const subprocess = execa(
'claude',
[
'-p',
prompt,
'--output-format',
'json',
'--json-schema',
JSON.stringify(agentOutputJsonSchema),
],
{
cwd: cwd ?? worktree.path,
detached: true,
stdio: ['ignore', 'pipe', 'pipe'],
}
);
this.activeAgents.set(agentId, { subprocess });
// Emit spawned event
if (this.eventBus) {
const event: AgentSpawnedEvent = {
type: 'agent:spawned',
timestamp: new Date(),
payload: {
agentId,
name,
taskId,
worktreeId: worktree.id,
},
};
this.eventBus.emit(event);
}
// Handle completion in background
this.handleAgentCompletion(agentId, subprocess);
return this.toAgentInfo(agent);
}
/**
* Handle agent subprocess completion.
* Parses structured JSON result with discriminated union, updates session ID, emits events.
*/
private async handleAgentCompletion(
agentId: string,
subprocess: ResultPromise
): Promise<void> {
try {
const { stdout } = await subprocess;
const agent = await this.repository.findById(agentId);
if (!agent) return;
// Parse CLI result wrapper (stdout is string when stdio is 'pipe')
const cliResult: ClaudeCliResult = JSON.parse(stdout as string);
// Store session_id for resume capability
if (cliResult.session_id) {
await this.repository.updateSessionId(agentId, cliResult.session_id);
}
// Parse the agent's structured output from result field
const agentOutput = agentOutputSchema.parse(JSON.parse(cliResult.result));
const active = this.activeAgents.get(agentId);
switch (agentOutput.status) {
case 'done': {
// Success path
if (active) {
active.result = {
success: true,
message: agentOutput.result,
filesModified: agentOutput.filesModified,
};
}
await this.repository.updateStatus(agentId, 'idle');
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
reason: 'task_complete',
},
};
this.eventBus.emit(event);
}
break;
}
case 'questions': {
// Questions path - agent needs input (one or more questions)
if (active) {
active.pendingQuestions = {
questions: agentOutput.questions,
};
}
await this.repository.updateStatus(agentId, 'waiting_for_input');
if (this.eventBus) {
const event: AgentWaitingEvent = {
type: 'agent:waiting',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: cliResult.session_id ?? agent.sessionId ?? '',
questions: agentOutput.questions,
},
};
this.eventBus.emit(event);
}
break;
}
case 'unrecoverable_error': {
// Error path
if (active) {
active.result = {
success: false,
message: agentOutput.error,
};
}
await this.repository.updateStatus(agentId, 'crashed');
if (this.eventBus) {
const event: AgentCrashedEvent = {
type: 'agent:crashed',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
error: agentOutput.error,
},
};
this.eventBus.emit(event);
}
break;
}
}
} catch (error) {
await this.handleAgentError(agentId, error);
}
}
/**
* Handle agent errors - actual crashes (not waiting for input).
* With structured output via --json-schema, question status is handled in
* handleAgentCompletion. This method only handles real subprocess errors.
*/
private async handleAgentError(
agentId: string,
error: unknown
): Promise<void> {
const errorMessage = error instanceof Error ? error.message : String(error);
const agent = await this.repository.findById(agentId);
if (!agent) return;
// Actual crash - structured output failed or subprocess error
await this.repository.updateStatus(agentId, 'crashed');
if (this.eventBus) {
const event: AgentCrashedEvent = {
type: 'agent:crashed',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
error: errorMessage,
},
};
this.eventBus.emit(event);
}
const active = this.activeAgents.get(agentId);
if (active) {
active.result = {
success: false,
message: errorMessage,
};
}
}
/**
* Stop a running agent.
* Sends SIGTERM and updates status.
*/
async stop(agentId: string): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) {
throw new Error(`Agent '${agentId}' not found`);
}
const active = this.activeAgents.get(agentId);
if (active) {
active.subprocess.kill('SIGTERM');
this.activeAgents.delete(agentId);
}
await this.repository.updateStatus(agentId, 'stopped');
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
reason: 'user_requested',
},
};
this.eventBus.emit(event);
}
}
/**
* List all agents with their current status.
*/
async list(): Promise<AgentInfo[]> {
const agents = await this.repository.findAll();
return agents.map((a) => this.toAgentInfo(a));
}
/**
* Get a specific agent by ID.
*/
async get(agentId: string): Promise<AgentInfo | null> {
const agent = await this.repository.findById(agentId);
return agent ? this.toAgentInfo(agent) : null;
}
/**
* Get a specific agent by name.
*/
async getByName(name: string): Promise<AgentInfo | null> {
const agent = await this.repository.findByName(name);
return agent ? this.toAgentInfo(agent) : null;
}
/**
* Resume an agent that's waiting for input.
* Uses stored session ID to continue with full context.
*
* @param agentId - Agent to resume
* @param answers - Map of question ID to user's answer
*/
async resume(agentId: string, answers: Record<string, string>): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) {
throw new Error(`Agent '${agentId}' not found`);
}
if (agent.status !== 'waiting_for_input') {
throw new Error(
`Agent '${agent.name}' is not waiting for input (status: ${agent.status})`
);
}
if (!agent.sessionId) {
throw new Error(`Agent '${agent.name}' has no session to resume`);
}
// Get worktree path
const worktree = await this.worktreeManager.get(agent.worktreeId);
if (!worktree) {
throw new Error(`Worktree '${agent.worktreeId}' not found`);
}
// Format answers map as structured prompt for Claude
const prompt = this.formatAnswersAsPrompt(answers);
await this.repository.updateStatus(agentId, 'running');
// Start CLI with --resume flag and same JSON schema
const subprocess = execa(
'claude',
[
'-p',
prompt,
'--resume',
agent.sessionId,
'--output-format',
'json',
'--json-schema',
JSON.stringify(agentOutputJsonSchema),
],
{
cwd: worktree.path,
detached: true,
stdio: ['ignore', 'pipe', 'pipe'],
}
);
// Clear any previous pending questions when resuming
this.activeAgents.set(agentId, { subprocess });
if (this.eventBus) {
const event: AgentResumedEvent = {
type: 'agent:resumed',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: agent.sessionId,
},
};
this.eventBus.emit(event);
}
this.handleAgentCompletion(agentId, subprocess);
}
/**
* Format answers map as structured prompt for Claude.
* One line per answer in format: "[id]: answer"
*/
private formatAnswersAsPrompt(answers: Record<string, string>): string {
const lines = Object.entries(answers).map(
([questionId, answer]) => `[${questionId}]: ${answer}`
);
return `Here are my answers to your questions:\n${lines.join('\n')}`;
}
/**
* Get the result of an agent's work.
*/
async getResult(agentId: string): Promise<AgentResult | null> {
const active = this.activeAgents.get(agentId);
return active?.result ?? null;
}
/**
* Get pending questions for an agent waiting for input.
*/
async getPendingQuestions(agentId: string): Promise<PendingQuestions | null> {
const active = this.activeAgents.get(agentId);
return active?.pendingQuestions ?? null;
}
/**
* Convert database agent record to AgentInfo.
*/
private toAgentInfo(agent: {
id: string;
name: string;
taskId: string | null;
sessionId: string | null;
worktreeId: string;
status: string;
mode: string;
createdAt: Date;
updatedAt: Date;
}): AgentInfo {
return {
id: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: agent.sessionId,
worktreeId: agent.worktreeId,
status: agent.status as AgentStatus,
mode: agent.mode as AgentMode,
createdAt: agent.createdAt,
updatedAt: agent.updatedAt,
};
}
}