feat(04-03): implement ClaudeAgentManager adapter
- Use Claude CLI with --output-format json for agent spawning - Extract session_id from JSON result for resume capability - Emit lifecycle events: spawned, stopped, crashed, resumed, waiting - Handle waiting_for_input status for AskUserQuestion pauses - Uses WorktreeManager for isolated agent workspaces
This commit is contained in:
@@ -13,3 +13,6 @@ export type {
|
|||||||
AgentResult,
|
AgentResult,
|
||||||
AgentManager,
|
AgentManager,
|
||||||
} from './types.js';
|
} from './types.js';
|
||||||
|
|
||||||
|
// Adapter implementation
|
||||||
|
export { ClaudeAgentManager } from './manager.js';
|
||||||
|
|||||||
386
src/agent/manager.ts
Normal file
386
src/agent/manager.ts
Normal file
@@ -0,0 +1,386 @@
|
|||||||
|
/**
|
||||||
|
* Claude Agent Manager Adapter
|
||||||
|
*
|
||||||
|
* Implementation of AgentManager port using Claude CLI with JSON output.
|
||||||
|
* Spawns real Claude agents via `claude -p "prompt" --output-format json`.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { execa, type ResultPromise } from 'execa';
|
||||||
|
import { randomUUID } from 'crypto';
|
||||||
|
import type {
|
||||||
|
AgentManager,
|
||||||
|
AgentInfo,
|
||||||
|
SpawnAgentOptions,
|
||||||
|
AgentResult,
|
||||||
|
AgentStatus,
|
||||||
|
} from './types.js';
|
||||||
|
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||||
|
import type { WorktreeManager } from '../git/types.js';
|
||||||
|
import type {
|
||||||
|
EventBus,
|
||||||
|
AgentSpawnedEvent,
|
||||||
|
AgentStoppedEvent,
|
||||||
|
AgentCrashedEvent,
|
||||||
|
AgentResumedEvent,
|
||||||
|
AgentWaitingEvent,
|
||||||
|
} from '../events/index.js';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Result structure from Claude CLI with --output-format json
|
||||||
|
*/
|
||||||
|
interface ClaudeCliResult {
|
||||||
|
type: 'result';
|
||||||
|
subtype: 'success' | 'error';
|
||||||
|
is_error: boolean;
|
||||||
|
session_id: string;
|
||||||
|
result: string;
|
||||||
|
total_cost_usd?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tracks an active agent subprocess and its result
|
||||||
|
*/
|
||||||
|
interface ActiveAgent {
|
||||||
|
subprocess: ResultPromise;
|
||||||
|
result?: AgentResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ClaudeAgentManager - Adapter implementing AgentManager port
|
||||||
|
*
|
||||||
|
* Uses Claude CLI in JSON mode to spawn agents. Each agent gets:
|
||||||
|
* - Isolated worktree (via WorktreeManager)
|
||||||
|
* - Persisted state (via AgentRepository)
|
||||||
|
* - Lifecycle events (via EventBus)
|
||||||
|
*/
|
||||||
|
export class ClaudeAgentManager implements AgentManager {
|
||||||
|
private activeAgents: Map<string, ActiveAgent> = new Map();
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private repository: AgentRepository,
|
||||||
|
private worktreeManager: WorktreeManager,
|
||||||
|
private eventBus?: EventBus
|
||||||
|
) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Spawn a new agent to work on a task.
|
||||||
|
* Creates isolated worktree, starts Claude CLI, persists state.
|
||||||
|
*/
|
||||||
|
async spawn(options: SpawnAgentOptions): Promise<AgentInfo> {
|
||||||
|
const { name, taskId, prompt, cwd } = options;
|
||||||
|
const agentId = randomUUID();
|
||||||
|
const branchName = `agent/${name}`;
|
||||||
|
|
||||||
|
// Check name uniqueness
|
||||||
|
const existing = await this.repository.findByName(name);
|
||||||
|
if (existing) {
|
||||||
|
throw new Error(`Agent with name '${name}' already exists`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. Create isolated worktree
|
||||||
|
const worktree = await this.worktreeManager.create(agentId, branchName);
|
||||||
|
|
||||||
|
// 2. Create agent record (session ID null until first run completes)
|
||||||
|
const agent = await this.repository.create({
|
||||||
|
name,
|
||||||
|
taskId,
|
||||||
|
sessionId: null,
|
||||||
|
worktreeId: worktree.id,
|
||||||
|
status: 'running',
|
||||||
|
});
|
||||||
|
|
||||||
|
// 3. Start Claude CLI in background
|
||||||
|
const subprocess = execa(
|
||||||
|
'claude',
|
||||||
|
['-p', prompt, '--output-format', 'json'],
|
||||||
|
{
|
||||||
|
cwd: cwd ?? worktree.path,
|
||||||
|
detached: true,
|
||||||
|
stdio: ['ignore', 'pipe', 'pipe'],
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
this.activeAgents.set(agentId, { subprocess });
|
||||||
|
|
||||||
|
// Emit spawned event
|
||||||
|
if (this.eventBus) {
|
||||||
|
const event: AgentSpawnedEvent = {
|
||||||
|
type: 'agent:spawned',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: {
|
||||||
|
agentId,
|
||||||
|
name,
|
||||||
|
taskId,
|
||||||
|
worktreeId: worktree.id,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
this.eventBus.emit(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle completion in background
|
||||||
|
this.handleAgentCompletion(agentId, subprocess);
|
||||||
|
|
||||||
|
return this.toAgentInfo(agent);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle agent subprocess completion.
|
||||||
|
* Parses JSON result, updates session ID, emits events.
|
||||||
|
*/
|
||||||
|
private async handleAgentCompletion(
|
||||||
|
agentId: string,
|
||||||
|
subprocess: ResultPromise
|
||||||
|
): Promise<void> {
|
||||||
|
try {
|
||||||
|
const { stdout } = await subprocess;
|
||||||
|
const agent = await this.repository.findById(agentId);
|
||||||
|
if (!agent) return;
|
||||||
|
|
||||||
|
// Parse JSON result (stdout is string when stdio is 'pipe')
|
||||||
|
const result: ClaudeCliResult = JSON.parse(stdout as string);
|
||||||
|
|
||||||
|
// Store session_id for potential resume
|
||||||
|
if (result.session_id) {
|
||||||
|
await this.repository.updateSessionId(agentId, result.session_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store result
|
||||||
|
const active = this.activeAgents.get(agentId);
|
||||||
|
if (active) {
|
||||||
|
active.result = {
|
||||||
|
success: result.subtype === 'success',
|
||||||
|
message: result.result,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update status to idle (ready for next prompt or resume)
|
||||||
|
await this.repository.updateStatus(agentId, 'idle');
|
||||||
|
|
||||||
|
if (this.eventBus) {
|
||||||
|
const event: AgentStoppedEvent = {
|
||||||
|
type: 'agent:stopped',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: {
|
||||||
|
agentId,
|
||||||
|
name: agent.name,
|
||||||
|
taskId: agent.taskId ?? '',
|
||||||
|
reason: 'task_complete',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
this.eventBus.emit(event);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
await this.handleAgentError(agentId, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle agent errors - either waiting_for_input or crash.
|
||||||
|
*/
|
||||||
|
private async handleAgentError(
|
||||||
|
agentId: string,
|
||||||
|
error: unknown
|
||||||
|
): Promise<void> {
|
||||||
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||||
|
const agent = await this.repository.findById(agentId);
|
||||||
|
if (!agent) return;
|
||||||
|
|
||||||
|
// Check if this is a "waiting for input" scenario (agent asked AskUserQuestion)
|
||||||
|
// The CLI exits with a specific pattern when waiting for user input
|
||||||
|
if (
|
||||||
|
errorMessage.includes('waiting for input') ||
|
||||||
|
errorMessage.includes('user_question')
|
||||||
|
) {
|
||||||
|
await this.repository.updateStatus(agentId, 'waiting_for_input');
|
||||||
|
|
||||||
|
if (this.eventBus) {
|
||||||
|
const event: AgentWaitingEvent = {
|
||||||
|
type: 'agent:waiting',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: {
|
||||||
|
agentId,
|
||||||
|
name: agent.name,
|
||||||
|
taskId: agent.taskId ?? '',
|
||||||
|
sessionId: agent.sessionId ?? '',
|
||||||
|
question: errorMessage,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
this.eventBus.emit(event);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Actual crash
|
||||||
|
await this.repository.updateStatus(agentId, 'crashed');
|
||||||
|
|
||||||
|
if (this.eventBus) {
|
||||||
|
const event: AgentCrashedEvent = {
|
||||||
|
type: 'agent:crashed',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: {
|
||||||
|
agentId,
|
||||||
|
name: agent.name,
|
||||||
|
taskId: agent.taskId ?? '',
|
||||||
|
error: errorMessage,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
this.eventBus.emit(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
const active = this.activeAgents.get(agentId);
|
||||||
|
if (active) {
|
||||||
|
active.result = {
|
||||||
|
success: false,
|
||||||
|
message: errorMessage,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop a running agent.
|
||||||
|
* Sends SIGTERM and updates status.
|
||||||
|
*/
|
||||||
|
async stop(agentId: string): Promise<void> {
|
||||||
|
const agent = await this.repository.findById(agentId);
|
||||||
|
if (!agent) {
|
||||||
|
throw new Error(`Agent '${agentId}' not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const active = this.activeAgents.get(agentId);
|
||||||
|
if (active) {
|
||||||
|
active.subprocess.kill('SIGTERM');
|
||||||
|
this.activeAgents.delete(agentId);
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.repository.updateStatus(agentId, 'stopped');
|
||||||
|
|
||||||
|
if (this.eventBus) {
|
||||||
|
const event: AgentStoppedEvent = {
|
||||||
|
type: 'agent:stopped',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: {
|
||||||
|
agentId,
|
||||||
|
name: agent.name,
|
||||||
|
taskId: agent.taskId ?? '',
|
||||||
|
reason: 'user_requested',
|
||||||
|
},
|
||||||
|
};
|
||||||
|
this.eventBus.emit(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List all agents with their current status.
|
||||||
|
*/
|
||||||
|
async list(): Promise<AgentInfo[]> {
|
||||||
|
const agents = await this.repository.findAll();
|
||||||
|
return agents.map((a) => this.toAgentInfo(a));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a specific agent by ID.
|
||||||
|
*/
|
||||||
|
async get(agentId: string): Promise<AgentInfo | null> {
|
||||||
|
const agent = await this.repository.findById(agentId);
|
||||||
|
return agent ? this.toAgentInfo(agent) : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a specific agent by name.
|
||||||
|
*/
|
||||||
|
async getByName(name: string): Promise<AgentInfo | null> {
|
||||||
|
const agent = await this.repository.findByName(name);
|
||||||
|
return agent ? this.toAgentInfo(agent) : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resume an agent that's waiting for input.
|
||||||
|
* Uses stored session ID to continue with full context.
|
||||||
|
*/
|
||||||
|
async resume(agentId: string, prompt: string): Promise<void> {
|
||||||
|
const agent = await this.repository.findById(agentId);
|
||||||
|
if (!agent) {
|
||||||
|
throw new Error(`Agent '${agentId}' not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (agent.status !== 'waiting_for_input') {
|
||||||
|
throw new Error(
|
||||||
|
`Agent '${agent.name}' is not waiting for input (status: ${agent.status})`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!agent.sessionId) {
|
||||||
|
throw new Error(`Agent '${agent.name}' has no session to resume`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get worktree path
|
||||||
|
const worktree = await this.worktreeManager.get(agent.worktreeId);
|
||||||
|
if (!worktree) {
|
||||||
|
throw new Error(`Worktree '${agent.worktreeId}' not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.repository.updateStatus(agentId, 'running');
|
||||||
|
|
||||||
|
// Start CLI with --resume flag
|
||||||
|
const subprocess = execa(
|
||||||
|
'claude',
|
||||||
|
['-p', prompt, '--resume', agent.sessionId, '--output-format', 'json'],
|
||||||
|
{
|
||||||
|
cwd: worktree.path,
|
||||||
|
detached: true,
|
||||||
|
stdio: ['ignore', 'pipe', 'pipe'],
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
this.activeAgents.set(agentId, { subprocess });
|
||||||
|
|
||||||
|
if (this.eventBus) {
|
||||||
|
const event: AgentResumedEvent = {
|
||||||
|
type: 'agent:resumed',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: {
|
||||||
|
agentId,
|
||||||
|
name: agent.name,
|
||||||
|
taskId: agent.taskId ?? '',
|
||||||
|
sessionId: agent.sessionId,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
this.eventBus.emit(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.handleAgentCompletion(agentId, subprocess);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the result of an agent's work.
|
||||||
|
*/
|
||||||
|
async getResult(agentId: string): Promise<AgentResult | null> {
|
||||||
|
const active = this.activeAgents.get(agentId);
|
||||||
|
return active?.result ?? null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert database agent record to AgentInfo.
|
||||||
|
*/
|
||||||
|
private toAgentInfo(agent: {
|
||||||
|
id: string;
|
||||||
|
name: string;
|
||||||
|
taskId: string | null;
|
||||||
|
sessionId: string | null;
|
||||||
|
worktreeId: string;
|
||||||
|
status: string;
|
||||||
|
createdAt: Date;
|
||||||
|
updatedAt: Date;
|
||||||
|
}): AgentInfo {
|
||||||
|
return {
|
||||||
|
id: agent.id,
|
||||||
|
name: agent.name,
|
||||||
|
taskId: agent.taskId ?? '',
|
||||||
|
sessionId: agent.sessionId,
|
||||||
|
worktreeId: agent.worktreeId,
|
||||||
|
status: agent.status as AgentStatus,
|
||||||
|
createdAt: agent.createdAt,
|
||||||
|
updatedAt: agent.updatedAt,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user