From 81934237cae5836476473e554ee2aeceafa7ac39 Mon Sep 17 00:00:00 2001 From: Lukas May Date: Fri, 30 Jan 2026 20:05:03 +0100 Subject: [PATCH] feat(04-03): implement ClaudeAgentManager adapter - Use Claude CLI with --output-format json for agent spawning - Extract session_id from JSON result for resume capability - Emit lifecycle events: spawned, stopped, crashed, resumed, waiting - Handle waiting_for_input status for AskUserQuestion pauses - Uses WorktreeManager for isolated agent workspaces --- src/agent/index.ts | 3 + src/agent/manager.ts | 386 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 389 insertions(+) create mode 100644 src/agent/manager.ts diff --git a/src/agent/index.ts b/src/agent/index.ts index fb4f985..9e2e848 100644 --- a/src/agent/index.ts +++ b/src/agent/index.ts @@ -13,3 +13,6 @@ export type { AgentResult, AgentManager, } from './types.js'; + +// Adapter implementation +export { ClaudeAgentManager } from './manager.js'; diff --git a/src/agent/manager.ts b/src/agent/manager.ts new file mode 100644 index 0000000..44a885a --- /dev/null +++ b/src/agent/manager.ts @@ -0,0 +1,386 @@ +/** + * Claude Agent Manager Adapter + * + * Implementation of AgentManager port using Claude CLI with JSON output. + * Spawns real Claude agents via `claude -p "prompt" --output-format json`. + */ + +import { execa, type ResultPromise } from 'execa'; +import { randomUUID } from 'crypto'; +import type { + AgentManager, + AgentInfo, + SpawnAgentOptions, + AgentResult, + AgentStatus, +} from './types.js'; +import type { AgentRepository } from '../db/repositories/agent-repository.js'; +import type { WorktreeManager } from '../git/types.js'; +import type { + EventBus, + AgentSpawnedEvent, + AgentStoppedEvent, + AgentCrashedEvent, + AgentResumedEvent, + AgentWaitingEvent, +} from '../events/index.js'; + +/** + * Result structure from Claude CLI with --output-format json + */ +interface ClaudeCliResult { + type: 'result'; + subtype: 'success' | 'error'; + is_error: boolean; + session_id: string; + result: string; + total_cost_usd?: number; +} + +/** + * Tracks an active agent subprocess and its result + */ +interface ActiveAgent { + subprocess: ResultPromise; + result?: AgentResult; +} + +/** + * ClaudeAgentManager - Adapter implementing AgentManager port + * + * Uses Claude CLI in JSON mode to spawn agents. Each agent gets: + * - Isolated worktree (via WorktreeManager) + * - Persisted state (via AgentRepository) + * - Lifecycle events (via EventBus) + */ +export class ClaudeAgentManager implements AgentManager { + private activeAgents: Map = new Map(); + + constructor( + private repository: AgentRepository, + private worktreeManager: WorktreeManager, + private eventBus?: EventBus + ) {} + + /** + * Spawn a new agent to work on a task. + * Creates isolated worktree, starts Claude CLI, persists state. + */ + async spawn(options: SpawnAgentOptions): Promise { + const { name, taskId, prompt, cwd } = options; + const agentId = randomUUID(); + const branchName = `agent/${name}`; + + // Check name uniqueness + const existing = await this.repository.findByName(name); + if (existing) { + throw new Error(`Agent with name '${name}' already exists`); + } + + // 1. Create isolated worktree + const worktree = await this.worktreeManager.create(agentId, branchName); + + // 2. Create agent record (session ID null until first run completes) + const agent = await this.repository.create({ + name, + taskId, + sessionId: null, + worktreeId: worktree.id, + status: 'running', + }); + + // 3. Start Claude CLI in background + const subprocess = execa( + 'claude', + ['-p', prompt, '--output-format', 'json'], + { + cwd: cwd ?? worktree.path, + detached: true, + stdio: ['ignore', 'pipe', 'pipe'], + } + ); + + this.activeAgents.set(agentId, { subprocess }); + + // Emit spawned event + if (this.eventBus) { + const event: AgentSpawnedEvent = { + type: 'agent:spawned', + timestamp: new Date(), + payload: { + agentId, + name, + taskId, + worktreeId: worktree.id, + }, + }; + this.eventBus.emit(event); + } + + // Handle completion in background + this.handleAgentCompletion(agentId, subprocess); + + return this.toAgentInfo(agent); + } + + /** + * Handle agent subprocess completion. + * Parses JSON result, updates session ID, emits events. + */ + private async handleAgentCompletion( + agentId: string, + subprocess: ResultPromise + ): Promise { + try { + const { stdout } = await subprocess; + const agent = await this.repository.findById(agentId); + if (!agent) return; + + // Parse JSON result (stdout is string when stdio is 'pipe') + const result: ClaudeCliResult = JSON.parse(stdout as string); + + // Store session_id for potential resume + if (result.session_id) { + await this.repository.updateSessionId(agentId, result.session_id); + } + + // Store result + const active = this.activeAgents.get(agentId); + if (active) { + active.result = { + success: result.subtype === 'success', + message: result.result, + }; + } + + // Update status to idle (ready for next prompt or resume) + await this.repository.updateStatus(agentId, 'idle'); + + if (this.eventBus) { + const event: AgentStoppedEvent = { + type: 'agent:stopped', + timestamp: new Date(), + payload: { + agentId, + name: agent.name, + taskId: agent.taskId ?? '', + reason: 'task_complete', + }, + }; + this.eventBus.emit(event); + } + } catch (error) { + await this.handleAgentError(agentId, error); + } + } + + /** + * Handle agent errors - either waiting_for_input or crash. + */ + private async handleAgentError( + agentId: string, + error: unknown + ): Promise { + const errorMessage = error instanceof Error ? error.message : String(error); + const agent = await this.repository.findById(agentId); + if (!agent) return; + + // Check if this is a "waiting for input" scenario (agent asked AskUserQuestion) + // The CLI exits with a specific pattern when waiting for user input + if ( + errorMessage.includes('waiting for input') || + errorMessage.includes('user_question') + ) { + await this.repository.updateStatus(agentId, 'waiting_for_input'); + + if (this.eventBus) { + const event: AgentWaitingEvent = { + type: 'agent:waiting', + timestamp: new Date(), + payload: { + agentId, + name: agent.name, + taskId: agent.taskId ?? '', + sessionId: agent.sessionId ?? '', + question: errorMessage, + }, + }; + this.eventBus.emit(event); + } + return; + } + + // Actual crash + await this.repository.updateStatus(agentId, 'crashed'); + + if (this.eventBus) { + const event: AgentCrashedEvent = { + type: 'agent:crashed', + timestamp: new Date(), + payload: { + agentId, + name: agent.name, + taskId: agent.taskId ?? '', + error: errorMessage, + }, + }; + this.eventBus.emit(event); + } + + const active = this.activeAgents.get(agentId); + if (active) { + active.result = { + success: false, + message: errorMessage, + }; + } + } + + /** + * Stop a running agent. + * Sends SIGTERM and updates status. + */ + async stop(agentId: string): Promise { + const agent = await this.repository.findById(agentId); + if (!agent) { + throw new Error(`Agent '${agentId}' not found`); + } + + const active = this.activeAgents.get(agentId); + if (active) { + active.subprocess.kill('SIGTERM'); + this.activeAgents.delete(agentId); + } + + await this.repository.updateStatus(agentId, 'stopped'); + + if (this.eventBus) { + const event: AgentStoppedEvent = { + type: 'agent:stopped', + timestamp: new Date(), + payload: { + agentId, + name: agent.name, + taskId: agent.taskId ?? '', + reason: 'user_requested', + }, + }; + this.eventBus.emit(event); + } + } + + /** + * List all agents with their current status. + */ + async list(): Promise { + const agents = await this.repository.findAll(); + return agents.map((a) => this.toAgentInfo(a)); + } + + /** + * Get a specific agent by ID. + */ + async get(agentId: string): Promise { + const agent = await this.repository.findById(agentId); + return agent ? this.toAgentInfo(agent) : null; + } + + /** + * Get a specific agent by name. + */ + async getByName(name: string): Promise { + const agent = await this.repository.findByName(name); + return agent ? this.toAgentInfo(agent) : null; + } + + /** + * Resume an agent that's waiting for input. + * Uses stored session ID to continue with full context. + */ + async resume(agentId: string, prompt: string): Promise { + const agent = await this.repository.findById(agentId); + if (!agent) { + throw new Error(`Agent '${agentId}' not found`); + } + + if (agent.status !== 'waiting_for_input') { + throw new Error( + `Agent '${agent.name}' is not waiting for input (status: ${agent.status})` + ); + } + + if (!agent.sessionId) { + throw new Error(`Agent '${agent.name}' has no session to resume`); + } + + // Get worktree path + const worktree = await this.worktreeManager.get(agent.worktreeId); + if (!worktree) { + throw new Error(`Worktree '${agent.worktreeId}' not found`); + } + + await this.repository.updateStatus(agentId, 'running'); + + // Start CLI with --resume flag + const subprocess = execa( + 'claude', + ['-p', prompt, '--resume', agent.sessionId, '--output-format', 'json'], + { + cwd: worktree.path, + detached: true, + stdio: ['ignore', 'pipe', 'pipe'], + } + ); + + this.activeAgents.set(agentId, { subprocess }); + + if (this.eventBus) { + const event: AgentResumedEvent = { + type: 'agent:resumed', + timestamp: new Date(), + payload: { + agentId, + name: agent.name, + taskId: agent.taskId ?? '', + sessionId: agent.sessionId, + }, + }; + this.eventBus.emit(event); + } + + this.handleAgentCompletion(agentId, subprocess); + } + + /** + * Get the result of an agent's work. + */ + async getResult(agentId: string): Promise { + const active = this.activeAgents.get(agentId); + return active?.result ?? null; + } + + /** + * Convert database agent record to AgentInfo. + */ + private toAgentInfo(agent: { + id: string; + name: string; + taskId: string | null; + sessionId: string | null; + worktreeId: string; + status: string; + createdAt: Date; + updatedAt: Date; + }): AgentInfo { + return { + id: agent.id, + name: agent.name, + taskId: agent.taskId ?? '', + sessionId: agent.sessionId, + worktreeId: agent.worktreeId, + status: agent.status as AgentStatus, + createdAt: agent.createdAt, + updatedAt: agent.updatedAt, + }; + } +}