Files
Codewalkers/src/agent/manager.ts
Lukas May 5605547aea fix(13-01): parse structured_output from Claude CLI response
- Add structured_output field to ClaudeCliResult interface
- Read from structured_output when present (--json-schema response)
- Fall back to parsing result for backwards compatibility
2026-02-02 10:38:10 +01:00

803 lines
22 KiB
TypeScript

/**
* Claude Agent Manager Adapter
*
* Implementation of AgentManager port using Claude CLI with JSON output.
* Spawns real Claude agents via `claude -p "prompt" --output-format json`.
*/
import { execa, type ResultPromise } from 'execa';
import { randomUUID } from 'crypto';
import type {
AgentManager,
AgentInfo,
SpawnAgentOptions,
AgentResult,
AgentStatus,
AgentMode,
PendingQuestions,
} from './types.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { WorktreeManager } from '../git/types.js';
import type {
EventBus,
AgentSpawnedEvent,
AgentStoppedEvent,
AgentCrashedEvent,
AgentResumedEvent,
AgentWaitingEvent,
} from '../events/index.js';
import {
agentOutputSchema,
agentOutputJsonSchema,
discussOutputSchema,
discussOutputJsonSchema,
breakdownOutputSchema,
breakdownOutputJsonSchema,
decomposeOutputSchema,
decomposeOutputJsonSchema,
} from './schema.js';
/**
* Result structure from Claude CLI with --output-format json
*
* When --json-schema is used, structured output is in `structured_output` field.
* The `result` field may be empty or contain the raw text.
*/
interface ClaudeCliResult {
type: 'result';
subtype: 'success' | 'error';
is_error: boolean;
session_id: string;
result: string;
structured_output?: unknown; // Present when --json-schema is used
total_cost_usd?: number;
}
/**
* Tracks an active agent subprocess, its result, and any pending questions
*/
interface ActiveAgent {
subprocess: ResultPromise;
result?: AgentResult;
pendingQuestions?: PendingQuestions;
}
/**
* ClaudeAgentManager - Adapter implementing AgentManager port
*
* Uses Claude CLI in JSON mode to spawn agents. Each agent gets:
* - Isolated worktree (via WorktreeManager)
* - Persisted state (via AgentRepository)
* - Lifecycle events (via EventBus)
*/
export class ClaudeAgentManager implements AgentManager {
private activeAgents: Map<string, ActiveAgent> = new Map();
constructor(
private repository: AgentRepository,
private worktreeManager: WorktreeManager,
private eventBus?: EventBus
) {}
/**
* Get the appropriate JSON schema for a given agent mode.
* Each mode has its own output schema for Claude CLI --json-schema flag.
*/
private getJsonSchemaForMode(mode: AgentMode): object {
switch (mode) {
case 'discuss':
return discussOutputJsonSchema;
case 'breakdown':
return breakdownOutputJsonSchema;
case 'decompose':
return decomposeOutputJsonSchema;
case 'execute':
default:
return agentOutputJsonSchema;
}
}
/**
* Spawn a new agent to work on a task.
* Creates isolated worktree, starts Claude CLI, persists state.
*/
async spawn(options: SpawnAgentOptions): Promise<AgentInfo> {
const { name, taskId, prompt, cwd, mode = 'execute' } = options;
const worktreeId = randomUUID();
const branchName = `agent/${name}`;
// Check name uniqueness
const existing = await this.repository.findByName(name);
if (existing) {
throw new Error(`Agent with name '${name}' already exists`);
}
// 1. Create isolated worktree
const worktree = await this.worktreeManager.create(worktreeId, branchName);
// 2. Create agent record (session ID null until first run completes)
const agent = await this.repository.create({
name,
taskId,
sessionId: null,
worktreeId: worktree.id,
status: 'running',
mode,
});
// Use agent.id from repository for all tracking
const agentId = agent.id;
// 3. Start Claude CLI in background with mode-specific JSON schema
const jsonSchema = this.getJsonSchemaForMode(mode);
const subprocess = execa(
'claude',
[
'-p',
prompt,
'--output-format',
'json',
'--json-schema',
JSON.stringify(jsonSchema),
],
{
cwd: cwd ?? worktree.path,
detached: true,
stdio: ['ignore', 'pipe', 'pipe'],
}
);
this.activeAgents.set(agentId, { subprocess });
// Emit spawned event
if (this.eventBus) {
const event: AgentSpawnedEvent = {
type: 'agent:spawned',
timestamp: new Date(),
payload: {
agentId,
name,
taskId,
worktreeId: worktree.id,
},
};
this.eventBus.emit(event);
}
// Handle completion in background
this.handleAgentCompletion(agentId, subprocess);
return this.toAgentInfo(agent);
}
/**
* Handle agent subprocess completion.
* Parses structured JSON result with mode-specific schema, updates session ID, emits events.
*/
private async handleAgentCompletion(
agentId: string,
subprocess: ResultPromise
): Promise<void> {
try {
const { stdout } = await subprocess;
const agent = await this.repository.findById(agentId);
if (!agent) return;
// Parse CLI result wrapper (stdout is string when stdio is 'pipe')
const cliResult: ClaudeCliResult = JSON.parse(stdout as string);
// Store session_id for resume capability
if (cliResult.session_id) {
await this.repository.updateSessionId(agentId, cliResult.session_id);
}
// When --json-schema is used, structured output is in structured_output field
// Falls back to parsing result if structured_output is not present (backwards compatible)
const rawOutput = cliResult.structured_output ?? JSON.parse(cliResult.result);
const active = this.activeAgents.get(agentId);
// Parse output based on agent mode
switch (agent.mode) {
case 'discuss':
await this.handleDiscussOutput(agent, rawOutput, cliResult.session_id);
break;
case 'breakdown':
await this.handleBreakdownOutput(agent, rawOutput);
break;
case 'decompose':
await this.handleDecomposeOutput(agent, rawOutput);
break;
case 'execute':
default:
await this.handleExecuteOutput(agent, rawOutput, cliResult.session_id);
break;
}
} catch (error) {
await this.handleAgentError(agentId, error);
}
}
/**
* Handle output for execute mode (default mode).
*/
private async handleExecuteOutput(
agent: { id: string; name: string; taskId: string | null; sessionId: string | null },
rawOutput: unknown,
sessionId: string | undefined
): Promise<void> {
const agentOutput = agentOutputSchema.parse(rawOutput);
const active = this.activeAgents.get(agent.id);
switch (agentOutput.status) {
case 'done': {
if (active) {
active.result = {
success: true,
message: agentOutput.result,
filesModified: agentOutput.filesModified,
};
}
await this.repository.updateStatus(agent.id, 'idle');
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: {
agentId: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
reason: 'task_complete',
},
};
this.eventBus.emit(event);
}
break;
}
case 'questions': {
if (active) {
active.pendingQuestions = {
questions: agentOutput.questions,
};
}
await this.repository.updateStatus(agent.id, 'waiting_for_input');
if (this.eventBus) {
const event: AgentWaitingEvent = {
type: 'agent:waiting',
timestamp: new Date(),
payload: {
agentId: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: sessionId ?? agent.sessionId ?? '',
questions: agentOutput.questions,
},
};
this.eventBus.emit(event);
}
break;
}
case 'unrecoverable_error': {
if (active) {
active.result = {
success: false,
message: agentOutput.error,
};
}
await this.repository.updateStatus(agent.id, 'crashed');
if (this.eventBus) {
const event: AgentCrashedEvent = {
type: 'agent:crashed',
timestamp: new Date(),
payload: {
agentId: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
error: agentOutput.error,
},
};
this.eventBus.emit(event);
}
break;
}
}
}
/**
* Handle output for discuss mode.
* Outputs decisions array when context gathering is complete.
*/
private async handleDiscussOutput(
agent: { id: string; name: string; taskId: string | null; sessionId: string | null },
rawOutput: unknown,
sessionId: string | undefined
): Promise<void> {
const discussOutput = discussOutputSchema.parse(rawOutput);
const active = this.activeAgents.get(agent.id);
switch (discussOutput.status) {
case 'context_complete': {
if (active) {
active.result = {
success: true,
message: discussOutput.summary,
};
}
await this.repository.updateStatus(agent.id, 'idle');
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: {
agentId: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
reason: 'context_complete',
},
};
this.eventBus.emit(event);
}
break;
}
case 'questions': {
if (active) {
active.pendingQuestions = {
questions: discussOutput.questions,
};
}
await this.repository.updateStatus(agent.id, 'waiting_for_input');
if (this.eventBus) {
const event: AgentWaitingEvent = {
type: 'agent:waiting',
timestamp: new Date(),
payload: {
agentId: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: sessionId ?? agent.sessionId ?? '',
questions: discussOutput.questions,
},
};
this.eventBus.emit(event);
}
break;
}
case 'unrecoverable_error': {
if (active) {
active.result = {
success: false,
message: discussOutput.error,
};
}
await this.repository.updateStatus(agent.id, 'crashed');
if (this.eventBus) {
const event: AgentCrashedEvent = {
type: 'agent:crashed',
timestamp: new Date(),
payload: {
agentId: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
error: discussOutput.error,
},
};
this.eventBus.emit(event);
}
break;
}
}
}
/**
* Handle output for breakdown mode.
* Outputs phases array when initiative decomposition is complete.
*/
private async handleBreakdownOutput(
agent: { id: string; name: string; taskId: string | null },
rawOutput: unknown
): Promise<void> {
const breakdownOutput = breakdownOutputSchema.parse(rawOutput);
const active = this.activeAgents.get(agent.id);
switch (breakdownOutput.status) {
case 'breakdown_complete': {
if (active) {
active.result = {
success: true,
message: `Breakdown complete with ${breakdownOutput.phases.length} phases`,
};
}
await this.repository.updateStatus(agent.id, 'idle');
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: {
agentId: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
reason: 'breakdown_complete',
},
};
this.eventBus.emit(event);
}
break;
}
case 'questions': {
if (active) {
active.pendingQuestions = {
questions: breakdownOutput.questions,
};
}
await this.repository.updateStatus(agent.id, 'waiting_for_input');
if (this.eventBus) {
const event: AgentWaitingEvent = {
type: 'agent:waiting',
timestamp: new Date(),
payload: {
agentId: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: '',
questions: breakdownOutput.questions,
},
};
this.eventBus.emit(event);
}
break;
}
case 'unrecoverable_error': {
if (active) {
active.result = {
success: false,
message: breakdownOutput.error,
};
}
await this.repository.updateStatus(agent.id, 'crashed');
if (this.eventBus) {
const event: AgentCrashedEvent = {
type: 'agent:crashed',
timestamp: new Date(),
payload: {
agentId: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
error: breakdownOutput.error,
},
};
this.eventBus.emit(event);
}
break;
}
}
}
/**
* Handle output for decompose mode.
* Outputs tasks array when phase decomposition is complete.
*/
private async handleDecomposeOutput(
agent: { id: string; name: string; taskId: string | null },
rawOutput: unknown
): Promise<void> {
const decomposeOutput = decomposeOutputSchema.parse(rawOutput);
const active = this.activeAgents.get(agent.id);
switch (decomposeOutput.status) {
case 'decompose_complete': {
if (active) {
active.result = {
success: true,
message: `Decompose complete with ${decomposeOutput.tasks.length} tasks`,
};
}
await this.repository.updateStatus(agent.id, 'idle');
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: {
agentId: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
reason: 'decompose_complete',
},
};
this.eventBus.emit(event);
}
break;
}
case 'questions': {
if (active) {
active.pendingQuestions = {
questions: decomposeOutput.questions,
};
}
await this.repository.updateStatus(agent.id, 'waiting_for_input');
if (this.eventBus) {
const event: AgentWaitingEvent = {
type: 'agent:waiting',
timestamp: new Date(),
payload: {
agentId: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: '',
questions: decomposeOutput.questions,
},
};
this.eventBus.emit(event);
}
break;
}
case 'unrecoverable_error': {
if (active) {
active.result = {
success: false,
message: decomposeOutput.error,
};
}
await this.repository.updateStatus(agent.id, 'crashed');
if (this.eventBus) {
const event: AgentCrashedEvent = {
type: 'agent:crashed',
timestamp: new Date(),
payload: {
agentId: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
error: decomposeOutput.error,
},
};
this.eventBus.emit(event);
}
break;
}
}
}
/**
* Handle agent errors - actual crashes (not waiting for input).
* With structured output via --json-schema, question status is handled in
* handleAgentCompletion. This method only handles real subprocess errors.
*/
private async handleAgentError(
agentId: string,
error: unknown
): Promise<void> {
const errorMessage = error instanceof Error ? error.message : String(error);
const agent = await this.repository.findById(agentId);
if (!agent) return;
// Actual crash - structured output failed or subprocess error
await this.repository.updateStatus(agentId, 'crashed');
if (this.eventBus) {
const event: AgentCrashedEvent = {
type: 'agent:crashed',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
error: errorMessage,
},
};
this.eventBus.emit(event);
}
const active = this.activeAgents.get(agentId);
if (active) {
active.result = {
success: false,
message: errorMessage,
};
}
}
/**
* Stop a running agent.
* Sends SIGTERM and updates status.
*/
async stop(agentId: string): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) {
throw new Error(`Agent '${agentId}' not found`);
}
const active = this.activeAgents.get(agentId);
if (active) {
active.subprocess.kill('SIGTERM');
this.activeAgents.delete(agentId);
}
await this.repository.updateStatus(agentId, 'stopped');
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
reason: 'user_requested',
},
};
this.eventBus.emit(event);
}
}
/**
* List all agents with their current status.
*/
async list(): Promise<AgentInfo[]> {
const agents = await this.repository.findAll();
return agents.map((a) => this.toAgentInfo(a));
}
/**
* Get a specific agent by ID.
*/
async get(agentId: string): Promise<AgentInfo | null> {
const agent = await this.repository.findById(agentId);
return agent ? this.toAgentInfo(agent) : null;
}
/**
* Get a specific agent by name.
*/
async getByName(name: string): Promise<AgentInfo | null> {
const agent = await this.repository.findByName(name);
return agent ? this.toAgentInfo(agent) : null;
}
/**
* Resume an agent that's waiting for input.
* Uses stored session ID to continue with full context.
*
* @param agentId - Agent to resume
* @param answers - Map of question ID to user's answer
*/
async resume(agentId: string, answers: Record<string, string>): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) {
throw new Error(`Agent '${agentId}' not found`);
}
if (agent.status !== 'waiting_for_input') {
throw new Error(
`Agent '${agent.name}' is not waiting for input (status: ${agent.status})`
);
}
if (!agent.sessionId) {
throw new Error(`Agent '${agent.name}' has no session to resume`);
}
// Get worktree path
const worktree = await this.worktreeManager.get(agent.worktreeId);
if (!worktree) {
throw new Error(`Worktree '${agent.worktreeId}' not found`);
}
// Format answers map as structured prompt for Claude
const prompt = this.formatAnswersAsPrompt(answers);
await this.repository.updateStatus(agentId, 'running');
// Start CLI with --resume flag and mode-specific JSON schema
const jsonSchema = this.getJsonSchemaForMode(agent.mode as AgentMode);
const subprocess = execa(
'claude',
[
'-p',
prompt,
'--resume',
agent.sessionId,
'--output-format',
'json',
'--json-schema',
JSON.stringify(jsonSchema),
],
{
cwd: worktree.path,
detached: true,
stdio: ['ignore', 'pipe', 'pipe'],
}
);
// Clear any previous pending questions when resuming
this.activeAgents.set(agentId, { subprocess });
if (this.eventBus) {
const event: AgentResumedEvent = {
type: 'agent:resumed',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: agent.sessionId,
},
};
this.eventBus.emit(event);
}
this.handleAgentCompletion(agentId, subprocess);
}
/**
* Format answers map as structured prompt for Claude.
* One line per answer in format: "[id]: answer"
*/
private formatAnswersAsPrompt(answers: Record<string, string>): string {
const lines = Object.entries(answers).map(
([questionId, answer]) => `[${questionId}]: ${answer}`
);
return `Here are my answers to your questions:\n${lines.join('\n')}`;
}
/**
* Get the result of an agent's work.
*/
async getResult(agentId: string): Promise<AgentResult | null> {
const active = this.activeAgents.get(agentId);
return active?.result ?? null;
}
/**
* Get pending questions for an agent waiting for input.
*/
async getPendingQuestions(agentId: string): Promise<PendingQuestions | null> {
const active = this.activeAgents.get(agentId);
return active?.pendingQuestions ?? null;
}
/**
* Convert database agent record to AgentInfo.
*/
private toAgentInfo(agent: {
id: string;
name: string;
taskId: string | null;
sessionId: string | null;
worktreeId: string;
status: string;
mode: string;
createdAt: Date;
updatedAt: Date;
}): AgentInfo {
return {
id: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: agent.sessionId,
worktreeId: agent.worktreeId,
status: agent.status as AgentStatus,
mode: agent.mode as AgentMode,
createdAt: agent.createdAt,
updatedAt: agent.updatedAt,
};
}
}