feat(04-03): implement ClaudeAgentManager adapter
- Use Claude CLI with --output-format json for agent spawning - Extract session_id from JSON result for resume capability - Emit lifecycle events: spawned, stopped, crashed, resumed, waiting - Handle waiting_for_input status for AskUserQuestion pauses - Uses WorktreeManager for isolated agent workspaces
This commit is contained in:
@@ -13,3 +13,6 @@ export type {
|
||||
AgentResult,
|
||||
AgentManager,
|
||||
} from './types.js';
|
||||
|
||||
// Adapter implementation
|
||||
export { ClaudeAgentManager } from './manager.js';
|
||||
|
||||
386
src/agent/manager.ts
Normal file
386
src/agent/manager.ts
Normal file
@@ -0,0 +1,386 @@
|
||||
/**
|
||||
* Claude Agent Manager Adapter
|
||||
*
|
||||
* Implementation of AgentManager port using Claude CLI with JSON output.
|
||||
* Spawns real Claude agents via `claude -p "prompt" --output-format json`.
|
||||
*/
|
||||
|
||||
import { execa, type ResultPromise } from 'execa';
|
||||
import { randomUUID } from 'crypto';
|
||||
import type {
|
||||
AgentManager,
|
||||
AgentInfo,
|
||||
SpawnAgentOptions,
|
||||
AgentResult,
|
||||
AgentStatus,
|
||||
} from './types.js';
|
||||
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||
import type { WorktreeManager } from '../git/types.js';
|
||||
import type {
|
||||
EventBus,
|
||||
AgentSpawnedEvent,
|
||||
AgentStoppedEvent,
|
||||
AgentCrashedEvent,
|
||||
AgentResumedEvent,
|
||||
AgentWaitingEvent,
|
||||
} from '../events/index.js';
|
||||
|
||||
/**
|
||||
* Result structure from Claude CLI with --output-format json
|
||||
*/
|
||||
interface ClaudeCliResult {
|
||||
type: 'result';
|
||||
subtype: 'success' | 'error';
|
||||
is_error: boolean;
|
||||
session_id: string;
|
||||
result: string;
|
||||
total_cost_usd?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tracks an active agent subprocess and its result
|
||||
*/
|
||||
interface ActiveAgent {
|
||||
subprocess: ResultPromise;
|
||||
result?: AgentResult;
|
||||
}
|
||||
|
||||
/**
|
||||
* ClaudeAgentManager - Adapter implementing AgentManager port
|
||||
*
|
||||
* Uses Claude CLI in JSON mode to spawn agents. Each agent gets:
|
||||
* - Isolated worktree (via WorktreeManager)
|
||||
* - Persisted state (via AgentRepository)
|
||||
* - Lifecycle events (via EventBus)
|
||||
*/
|
||||
export class ClaudeAgentManager implements AgentManager {
|
||||
private activeAgents: Map<string, ActiveAgent> = new Map();
|
||||
|
||||
constructor(
|
||||
private repository: AgentRepository,
|
||||
private worktreeManager: WorktreeManager,
|
||||
private eventBus?: EventBus
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Spawn a new agent to work on a task.
|
||||
* Creates isolated worktree, starts Claude CLI, persists state.
|
||||
*/
|
||||
async spawn(options: SpawnAgentOptions): Promise<AgentInfo> {
|
||||
const { name, taskId, prompt, cwd } = options;
|
||||
const agentId = randomUUID();
|
||||
const branchName = `agent/${name}`;
|
||||
|
||||
// Check name uniqueness
|
||||
const existing = await this.repository.findByName(name);
|
||||
if (existing) {
|
||||
throw new Error(`Agent with name '${name}' already exists`);
|
||||
}
|
||||
|
||||
// 1. Create isolated worktree
|
||||
const worktree = await this.worktreeManager.create(agentId, branchName);
|
||||
|
||||
// 2. Create agent record (session ID null until first run completes)
|
||||
const agent = await this.repository.create({
|
||||
name,
|
||||
taskId,
|
||||
sessionId: null,
|
||||
worktreeId: worktree.id,
|
||||
status: 'running',
|
||||
});
|
||||
|
||||
// 3. Start Claude CLI in background
|
||||
const subprocess = execa(
|
||||
'claude',
|
||||
['-p', prompt, '--output-format', 'json'],
|
||||
{
|
||||
cwd: cwd ?? worktree.path,
|
||||
detached: true,
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
}
|
||||
);
|
||||
|
||||
this.activeAgents.set(agentId, { subprocess });
|
||||
|
||||
// Emit spawned event
|
||||
if (this.eventBus) {
|
||||
const event: AgentSpawnedEvent = {
|
||||
type: 'agent:spawned',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
agentId,
|
||||
name,
|
||||
taskId,
|
||||
worktreeId: worktree.id,
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
|
||||
// Handle completion in background
|
||||
this.handleAgentCompletion(agentId, subprocess);
|
||||
|
||||
return this.toAgentInfo(agent);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle agent subprocess completion.
|
||||
* Parses JSON result, updates session ID, emits events.
|
||||
*/
|
||||
private async handleAgentCompletion(
|
||||
agentId: string,
|
||||
subprocess: ResultPromise
|
||||
): Promise<void> {
|
||||
try {
|
||||
const { stdout } = await subprocess;
|
||||
const agent = await this.repository.findById(agentId);
|
||||
if (!agent) return;
|
||||
|
||||
// Parse JSON result (stdout is string when stdio is 'pipe')
|
||||
const result: ClaudeCliResult = JSON.parse(stdout as string);
|
||||
|
||||
// Store session_id for potential resume
|
||||
if (result.session_id) {
|
||||
await this.repository.updateSessionId(agentId, result.session_id);
|
||||
}
|
||||
|
||||
// Store result
|
||||
const active = this.activeAgents.get(agentId);
|
||||
if (active) {
|
||||
active.result = {
|
||||
success: result.subtype === 'success',
|
||||
message: result.result,
|
||||
};
|
||||
}
|
||||
|
||||
// Update status to idle (ready for next prompt or resume)
|
||||
await this.repository.updateStatus(agentId, 'idle');
|
||||
|
||||
if (this.eventBus) {
|
||||
const event: AgentStoppedEvent = {
|
||||
type: 'agent:stopped',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
agentId,
|
||||
name: agent.name,
|
||||
taskId: agent.taskId ?? '',
|
||||
reason: 'task_complete',
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
} catch (error) {
|
||||
await this.handleAgentError(agentId, error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle agent errors - either waiting_for_input or crash.
|
||||
*/
|
||||
private async handleAgentError(
|
||||
agentId: string,
|
||||
error: unknown
|
||||
): Promise<void> {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
const agent = await this.repository.findById(agentId);
|
||||
if (!agent) return;
|
||||
|
||||
// Check if this is a "waiting for input" scenario (agent asked AskUserQuestion)
|
||||
// The CLI exits with a specific pattern when waiting for user input
|
||||
if (
|
||||
errorMessage.includes('waiting for input') ||
|
||||
errorMessage.includes('user_question')
|
||||
) {
|
||||
await this.repository.updateStatus(agentId, 'waiting_for_input');
|
||||
|
||||
if (this.eventBus) {
|
||||
const event: AgentWaitingEvent = {
|
||||
type: 'agent:waiting',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
agentId,
|
||||
name: agent.name,
|
||||
taskId: agent.taskId ?? '',
|
||||
sessionId: agent.sessionId ?? '',
|
||||
question: errorMessage,
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Actual crash
|
||||
await this.repository.updateStatus(agentId, 'crashed');
|
||||
|
||||
if (this.eventBus) {
|
||||
const event: AgentCrashedEvent = {
|
||||
type: 'agent:crashed',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
agentId,
|
||||
name: agent.name,
|
||||
taskId: agent.taskId ?? '',
|
||||
error: errorMessage,
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
|
||||
const active = this.activeAgents.get(agentId);
|
||||
if (active) {
|
||||
active.result = {
|
||||
success: false,
|
||||
message: errorMessage,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop a running agent.
|
||||
* Sends SIGTERM and updates status.
|
||||
*/
|
||||
async stop(agentId: string): Promise<void> {
|
||||
const agent = await this.repository.findById(agentId);
|
||||
if (!agent) {
|
||||
throw new Error(`Agent '${agentId}' not found`);
|
||||
}
|
||||
|
||||
const active = this.activeAgents.get(agentId);
|
||||
if (active) {
|
||||
active.subprocess.kill('SIGTERM');
|
||||
this.activeAgents.delete(agentId);
|
||||
}
|
||||
|
||||
await this.repository.updateStatus(agentId, 'stopped');
|
||||
|
||||
if (this.eventBus) {
|
||||
const event: AgentStoppedEvent = {
|
||||
type: 'agent:stopped',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
agentId,
|
||||
name: agent.name,
|
||||
taskId: agent.taskId ?? '',
|
||||
reason: 'user_requested',
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* List all agents with their current status.
|
||||
*/
|
||||
async list(): Promise<AgentInfo[]> {
|
||||
const agents = await this.repository.findAll();
|
||||
return agents.map((a) => this.toAgentInfo(a));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a specific agent by ID.
|
||||
*/
|
||||
async get(agentId: string): Promise<AgentInfo | null> {
|
||||
const agent = await this.repository.findById(agentId);
|
||||
return agent ? this.toAgentInfo(agent) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a specific agent by name.
|
||||
*/
|
||||
async getByName(name: string): Promise<AgentInfo | null> {
|
||||
const agent = await this.repository.findByName(name);
|
||||
return agent ? this.toAgentInfo(agent) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume an agent that's waiting for input.
|
||||
* Uses stored session ID to continue with full context.
|
||||
*/
|
||||
async resume(agentId: string, prompt: string): Promise<void> {
|
||||
const agent = await this.repository.findById(agentId);
|
||||
if (!agent) {
|
||||
throw new Error(`Agent '${agentId}' not found`);
|
||||
}
|
||||
|
||||
if (agent.status !== 'waiting_for_input') {
|
||||
throw new Error(
|
||||
`Agent '${agent.name}' is not waiting for input (status: ${agent.status})`
|
||||
);
|
||||
}
|
||||
|
||||
if (!agent.sessionId) {
|
||||
throw new Error(`Agent '${agent.name}' has no session to resume`);
|
||||
}
|
||||
|
||||
// Get worktree path
|
||||
const worktree = await this.worktreeManager.get(agent.worktreeId);
|
||||
if (!worktree) {
|
||||
throw new Error(`Worktree '${agent.worktreeId}' not found`);
|
||||
}
|
||||
|
||||
await this.repository.updateStatus(agentId, 'running');
|
||||
|
||||
// Start CLI with --resume flag
|
||||
const subprocess = execa(
|
||||
'claude',
|
||||
['-p', prompt, '--resume', agent.sessionId, '--output-format', 'json'],
|
||||
{
|
||||
cwd: worktree.path,
|
||||
detached: true,
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
}
|
||||
);
|
||||
|
||||
this.activeAgents.set(agentId, { subprocess });
|
||||
|
||||
if (this.eventBus) {
|
||||
const event: AgentResumedEvent = {
|
||||
type: 'agent:resumed',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
agentId,
|
||||
name: agent.name,
|
||||
taskId: agent.taskId ?? '',
|
||||
sessionId: agent.sessionId,
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
|
||||
this.handleAgentCompletion(agentId, subprocess);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the result of an agent's work.
|
||||
*/
|
||||
async getResult(agentId: string): Promise<AgentResult | null> {
|
||||
const active = this.activeAgents.get(agentId);
|
||||
return active?.result ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert database agent record to AgentInfo.
|
||||
*/
|
||||
private toAgentInfo(agent: {
|
||||
id: string;
|
||||
name: string;
|
||||
taskId: string | null;
|
||||
sessionId: string | null;
|
||||
worktreeId: string;
|
||||
status: string;
|
||||
createdAt: Date;
|
||||
updatedAt: Date;
|
||||
}): AgentInfo {
|
||||
return {
|
||||
id: agent.id,
|
||||
name: agent.name,
|
||||
taskId: agent.taskId ?? '',
|
||||
sessionId: agent.sessionId,
|
||||
worktreeId: agent.worktreeId,
|
||||
status: agent.status as AgentStatus,
|
||||
createdAt: agent.createdAt,
|
||||
updatedAt: agent.updatedAt,
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user