feat(04-03): implement ClaudeAgentManager adapter

- Use Claude CLI with --output-format json for agent spawning
- Extract session_id from JSON result for resume capability
- Emit lifecycle events: spawned, stopped, crashed, resumed, waiting
- Handle waiting_for_input status for AskUserQuestion pauses
- Uses WorktreeManager for isolated agent workspaces
This commit is contained in:
Lukas May
2026-01-30 20:05:03 +01:00
parent 9ab6e5eb28
commit 81934237ca
2 changed files with 389 additions and 0 deletions

View File

@@ -13,3 +13,6 @@ export type {
AgentResult,
AgentManager,
} from './types.js';
// Adapter implementation
export { ClaudeAgentManager } from './manager.js';

386
src/agent/manager.ts Normal file
View File

@@ -0,0 +1,386 @@
/**
* Claude Agent Manager Adapter
*
* Implementation of AgentManager port using Claude CLI with JSON output.
* Spawns real Claude agents via `claude -p "prompt" --output-format json`.
*/
import { execa, type ResultPromise } from 'execa';
import { randomUUID } from 'crypto';
import type {
AgentManager,
AgentInfo,
SpawnAgentOptions,
AgentResult,
AgentStatus,
} from './types.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { WorktreeManager } from '../git/types.js';
import type {
EventBus,
AgentSpawnedEvent,
AgentStoppedEvent,
AgentCrashedEvent,
AgentResumedEvent,
AgentWaitingEvent,
} from '../events/index.js';
/**
* Result structure from Claude CLI with --output-format json
*/
interface ClaudeCliResult {
type: 'result';
subtype: 'success' | 'error';
is_error: boolean;
session_id: string;
result: string;
total_cost_usd?: number;
}
/**
* Tracks an active agent subprocess and its result
*/
interface ActiveAgent {
subprocess: ResultPromise;
result?: AgentResult;
}
/**
* ClaudeAgentManager - Adapter implementing AgentManager port
*
* Uses Claude CLI in JSON mode to spawn agents. Each agent gets:
* - Isolated worktree (via WorktreeManager)
* - Persisted state (via AgentRepository)
* - Lifecycle events (via EventBus)
*/
export class ClaudeAgentManager implements AgentManager {
private activeAgents: Map<string, ActiveAgent> = new Map();
constructor(
private repository: AgentRepository,
private worktreeManager: WorktreeManager,
private eventBus?: EventBus
) {}
/**
* Spawn a new agent to work on a task.
* Creates isolated worktree, starts Claude CLI, persists state.
*/
async spawn(options: SpawnAgentOptions): Promise<AgentInfo> {
const { name, taskId, prompt, cwd } = options;
const agentId = randomUUID();
const branchName = `agent/${name}`;
// Check name uniqueness
const existing = await this.repository.findByName(name);
if (existing) {
throw new Error(`Agent with name '${name}' already exists`);
}
// 1. Create isolated worktree
const worktree = await this.worktreeManager.create(agentId, branchName);
// 2. Create agent record (session ID null until first run completes)
const agent = await this.repository.create({
name,
taskId,
sessionId: null,
worktreeId: worktree.id,
status: 'running',
});
// 3. Start Claude CLI in background
const subprocess = execa(
'claude',
['-p', prompt, '--output-format', 'json'],
{
cwd: cwd ?? worktree.path,
detached: true,
stdio: ['ignore', 'pipe', 'pipe'],
}
);
this.activeAgents.set(agentId, { subprocess });
// Emit spawned event
if (this.eventBus) {
const event: AgentSpawnedEvent = {
type: 'agent:spawned',
timestamp: new Date(),
payload: {
agentId,
name,
taskId,
worktreeId: worktree.id,
},
};
this.eventBus.emit(event);
}
// Handle completion in background
this.handleAgentCompletion(agentId, subprocess);
return this.toAgentInfo(agent);
}
/**
* Handle agent subprocess completion.
* Parses JSON result, updates session ID, emits events.
*/
private async handleAgentCompletion(
agentId: string,
subprocess: ResultPromise
): Promise<void> {
try {
const { stdout } = await subprocess;
const agent = await this.repository.findById(agentId);
if (!agent) return;
// Parse JSON result (stdout is string when stdio is 'pipe')
const result: ClaudeCliResult = JSON.parse(stdout as string);
// Store session_id for potential resume
if (result.session_id) {
await this.repository.updateSessionId(agentId, result.session_id);
}
// Store result
const active = this.activeAgents.get(agentId);
if (active) {
active.result = {
success: result.subtype === 'success',
message: result.result,
};
}
// Update status to idle (ready for next prompt or resume)
await this.repository.updateStatus(agentId, 'idle');
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
reason: 'task_complete',
},
};
this.eventBus.emit(event);
}
} catch (error) {
await this.handleAgentError(agentId, error);
}
}
/**
* Handle agent errors - either waiting_for_input or crash.
*/
private async handleAgentError(
agentId: string,
error: unknown
): Promise<void> {
const errorMessage = error instanceof Error ? error.message : String(error);
const agent = await this.repository.findById(agentId);
if (!agent) return;
// Check if this is a "waiting for input" scenario (agent asked AskUserQuestion)
// The CLI exits with a specific pattern when waiting for user input
if (
errorMessage.includes('waiting for input') ||
errorMessage.includes('user_question')
) {
await this.repository.updateStatus(agentId, 'waiting_for_input');
if (this.eventBus) {
const event: AgentWaitingEvent = {
type: 'agent:waiting',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: agent.sessionId ?? '',
question: errorMessage,
},
};
this.eventBus.emit(event);
}
return;
}
// Actual crash
await this.repository.updateStatus(agentId, 'crashed');
if (this.eventBus) {
const event: AgentCrashedEvent = {
type: 'agent:crashed',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
error: errorMessage,
},
};
this.eventBus.emit(event);
}
const active = this.activeAgents.get(agentId);
if (active) {
active.result = {
success: false,
message: errorMessage,
};
}
}
/**
* Stop a running agent.
* Sends SIGTERM and updates status.
*/
async stop(agentId: string): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) {
throw new Error(`Agent '${agentId}' not found`);
}
const active = this.activeAgents.get(agentId);
if (active) {
active.subprocess.kill('SIGTERM');
this.activeAgents.delete(agentId);
}
await this.repository.updateStatus(agentId, 'stopped');
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
reason: 'user_requested',
},
};
this.eventBus.emit(event);
}
}
/**
* List all agents with their current status.
*/
async list(): Promise<AgentInfo[]> {
const agents = await this.repository.findAll();
return agents.map((a) => this.toAgentInfo(a));
}
/**
* Get a specific agent by ID.
*/
async get(agentId: string): Promise<AgentInfo | null> {
const agent = await this.repository.findById(agentId);
return agent ? this.toAgentInfo(agent) : null;
}
/**
* Get a specific agent by name.
*/
async getByName(name: string): Promise<AgentInfo | null> {
const agent = await this.repository.findByName(name);
return agent ? this.toAgentInfo(agent) : null;
}
/**
* Resume an agent that's waiting for input.
* Uses stored session ID to continue with full context.
*/
async resume(agentId: string, prompt: string): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) {
throw new Error(`Agent '${agentId}' not found`);
}
if (agent.status !== 'waiting_for_input') {
throw new Error(
`Agent '${agent.name}' is not waiting for input (status: ${agent.status})`
);
}
if (!agent.sessionId) {
throw new Error(`Agent '${agent.name}' has no session to resume`);
}
// Get worktree path
const worktree = await this.worktreeManager.get(agent.worktreeId);
if (!worktree) {
throw new Error(`Worktree '${agent.worktreeId}' not found`);
}
await this.repository.updateStatus(agentId, 'running');
// Start CLI with --resume flag
const subprocess = execa(
'claude',
['-p', prompt, '--resume', agent.sessionId, '--output-format', 'json'],
{
cwd: worktree.path,
detached: true,
stdio: ['ignore', 'pipe', 'pipe'],
}
);
this.activeAgents.set(agentId, { subprocess });
if (this.eventBus) {
const event: AgentResumedEvent = {
type: 'agent:resumed',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: agent.sessionId,
},
};
this.eventBus.emit(event);
}
this.handleAgentCompletion(agentId, subprocess);
}
/**
* Get the result of an agent's work.
*/
async getResult(agentId: string): Promise<AgentResult | null> {
const active = this.activeAgents.get(agentId);
return active?.result ?? null;
}
/**
* Convert database agent record to AgentInfo.
*/
private toAgentInfo(agent: {
id: string;
name: string;
taskId: string | null;
sessionId: string | null;
worktreeId: string;
status: string;
createdAt: Date;
updatedAt: Date;
}): AgentInfo {
return {
id: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: agent.sessionId,
worktreeId: agent.worktreeId,
status: agent.status as AgentStatus,
createdAt: agent.createdAt,
updatedAt: agent.updatedAt,
};
}
}