From d9673d54a360171242386d3b8075173768f8d756 Mon Sep 17 00:00:00 2001 From: Lukas May Date: Sat, 31 Jan 2026 15:25:07 +0100 Subject: [PATCH] feat(08.1-01): update ClaudeAgentManager to use structured schema - Pass --json-schema flag to Claude CLI for validated output - Parse discriminated union (done/question/unrecoverable_error) in handleAgentCompletion - Add getPendingQuestion method to AgentManager interface - Add PendingQuestion type for structured question data - Store pending question in ActiveAgent for later retrieval - Remove hacky string matching for waiting_for_input detection - Update MockAgentManager with getPendingQuestion and options support - Update tests for new CLI arguments and result format --- src/agent/manager.test.ts | 14 +-- src/agent/manager.ts | 186 ++++++++++++++++++++++++++------------ src/agent/mock-manager.ts | 24 ++++- src/agent/types.ts | 22 +++++ 4 files changed, 179 insertions(+), 67 deletions(-) diff --git a/src/agent/manager.test.ts b/src/agent/manager.test.ts index bba6248..fcda62d 100644 --- a/src/agent/manager.test.ts +++ b/src/agent/manager.test.ts @@ -102,7 +102,7 @@ describe('ClaudeAgentManager', () => { then: () => Promise.resolve({ stdout: - '{"type":"result","subtype":"success","session_id":"sess-123","result":"done"}', + '{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Task completed\\"}"}', stderr: '', }), catch: () => mockSubprocess, @@ -144,7 +144,7 @@ describe('ClaudeAgentManager', () => { then: () => Promise.resolve({ stdout: - '{"type":"result","subtype":"success","session_id":"sess-123","result":"done"}', + '{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Task completed\\"}"}', stderr: '', }), catch: () => mockSubprocess, @@ -173,7 +173,7 @@ describe('ClaudeAgentManager', () => { then: () => Promise.resolve({ stdout: - '{"type":"result","subtype":"success","session_id":"sess-123","result":"done"}', + '{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Task completed\\"}"}', stderr: '', }), catch: () => mockSubprocess, @@ -189,7 +189,7 @@ describe('ClaudeAgentManager', () => { expect(mockExeca).toHaveBeenCalledWith( 'claude', - ['-p', 'Test task', '--output-format', 'json'], + expect.arrayContaining(['-p', 'Test task', '--output-format', 'json', '--json-schema']), expect.objectContaining({ cwd: '/custom/path' }) ); }); @@ -328,7 +328,7 @@ describe('ClaudeAgentManager', () => { then: () => Promise.resolve({ stdout: - '{"type":"result","subtype":"success","session_id":"sess-123","result":"continued"}', + '{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Continued successfully\\"}"}', stderr: '', }), catch: () => mockSubprocess, @@ -339,7 +339,7 @@ describe('ClaudeAgentManager', () => { expect(mockExeca).toHaveBeenCalledWith( 'claude', - ['-p', 'User response', '--resume', 'session-789', '--output-format', 'json'], + expect.arrayContaining(['-p', 'User response', '--resume', 'session-789', '--output-format', 'json', '--json-schema']), expect.any(Object) ); }); @@ -391,7 +391,7 @@ describe('ClaudeAgentManager', () => { then: () => Promise.resolve({ stdout: - '{"type":"result","subtype":"success","session_id":"sess-123","result":"continued"}', + '{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Continued successfully\\"}"}', stderr: '', }), catch: () => mockSubprocess, diff --git a/src/agent/manager.ts b/src/agent/manager.ts index 97fcfbd..192d59d 100644 --- a/src/agent/manager.ts +++ b/src/agent/manager.ts @@ -13,6 +13,7 @@ import type { SpawnAgentOptions, AgentResult, AgentStatus, + PendingQuestion, } from './types.js'; import type { AgentRepository } from '../db/repositories/agent-repository.js'; import type { WorktreeManager } from '../git/types.js'; @@ -24,6 +25,7 @@ import type { AgentResumedEvent, AgentWaitingEvent, } from '../events/index.js'; +import { agentOutputSchema, agentOutputJsonSchema } from './schema.js'; /** * Result structure from Claude CLI with --output-format json @@ -38,11 +40,12 @@ interface ClaudeCliResult { } /** - * Tracks an active agent subprocess and its result + * Tracks an active agent subprocess, its result, and any pending question */ interface ActiveAgent { subprocess: ResultPromise; result?: AgentResult; + pendingQuestion?: PendingQuestion; } /** @@ -92,10 +95,17 @@ export class ClaudeAgentManager implements AgentManager { // Use agent.id from repository for all tracking const agentId = agent.id; - // 3. Start Claude CLI in background + // 3. Start Claude CLI in background with JSON schema for structured output const subprocess = execa( 'claude', - ['-p', prompt, '--output-format', 'json'], + [ + '-p', + prompt, + '--output-format', + 'json', + '--json-schema', + JSON.stringify(agentOutputJsonSchema), + ], { cwd: cwd ?? worktree.path, detached: true, @@ -128,7 +138,7 @@ export class ClaudeAgentManager implements AgentManager { /** * Handle agent subprocess completion. - * Parses JSON result, updates session ID, emits events. + * Parses structured JSON result with discriminated union, updates session ID, emits events. */ private async handleAgentCompletion( agentId: string, @@ -139,38 +149,101 @@ export class ClaudeAgentManager implements AgentManager { const agent = await this.repository.findById(agentId); if (!agent) return; - // Parse JSON result (stdout is string when stdio is 'pipe') - const result: ClaudeCliResult = JSON.parse(stdout as string); + // Parse CLI result wrapper (stdout is string when stdio is 'pipe') + const cliResult: ClaudeCliResult = JSON.parse(stdout as string); - // Store session_id for potential resume - if (result.session_id) { - await this.repository.updateSessionId(agentId, result.session_id); + // Store session_id for resume capability + if (cliResult.session_id) { + await this.repository.updateSessionId(agentId, cliResult.session_id); } - // Store result + // Parse the agent's structured output from result field + const agentOutput = agentOutputSchema.parse(JSON.parse(cliResult.result)); const active = this.activeAgents.get(agentId); - if (active) { - active.result = { - success: result.subtype === 'success', - message: result.result, - }; - } - // Update status to idle (ready for next prompt or resume) - await this.repository.updateStatus(agentId, 'idle'); + switch (agentOutput.status) { + case 'done': { + // Success path + if (active) { + active.result = { + success: true, + message: agentOutput.result, + filesModified: agentOutput.filesModified, + }; + } + await this.repository.updateStatus(agentId, 'idle'); - if (this.eventBus) { - const event: AgentStoppedEvent = { - type: 'agent:stopped', - timestamp: new Date(), - payload: { - agentId, - name: agent.name, - taskId: agent.taskId ?? '', - reason: 'task_complete', - }, - }; - this.eventBus.emit(event); + if (this.eventBus) { + const event: AgentStoppedEvent = { + type: 'agent:stopped', + timestamp: new Date(), + payload: { + agentId, + name: agent.name, + taskId: agent.taskId ?? '', + reason: 'task_complete', + }, + }; + this.eventBus.emit(event); + } + break; + } + + case 'question': { + // Question path - agent needs input + if (active) { + active.pendingQuestion = { + question: agentOutput.question, + options: agentOutput.options, + multiSelect: agentOutput.multiSelect, + }; + } + await this.repository.updateStatus(agentId, 'waiting_for_input'); + + if (this.eventBus) { + const event: AgentWaitingEvent = { + type: 'agent:waiting', + timestamp: new Date(), + payload: { + agentId, + name: agent.name, + taskId: agent.taskId ?? '', + sessionId: cliResult.session_id ?? agent.sessionId ?? '', + question: agentOutput.question, + options: agentOutput.options, + multiSelect: agentOutput.multiSelect, + }, + }; + this.eventBus.emit(event); + } + break; + } + + case 'unrecoverable_error': { + // Error path + if (active) { + active.result = { + success: false, + message: agentOutput.error, + }; + } + await this.repository.updateStatus(agentId, 'crashed'); + + if (this.eventBus) { + const event: AgentCrashedEvent = { + type: 'agent:crashed', + timestamp: new Date(), + payload: { + agentId, + name: agent.name, + taskId: agent.taskId ?? '', + error: agentOutput.error, + }, + }; + this.eventBus.emit(event); + } + break; + } } } catch (error) { await this.handleAgentError(agentId, error); @@ -178,7 +251,9 @@ export class ClaudeAgentManager implements AgentManager { } /** - * Handle agent errors - either waiting_for_input or crash. + * Handle agent errors - actual crashes (not waiting for input). + * With structured output via --json-schema, question status is handled in + * handleAgentCompletion. This method only handles real subprocess errors. */ private async handleAgentError( agentId: string, @@ -188,32 +263,7 @@ export class ClaudeAgentManager implements AgentManager { const agent = await this.repository.findById(agentId); if (!agent) return; - // Check if this is a "waiting for input" scenario (agent asked AskUserQuestion) - // The CLI exits with a specific pattern when waiting for user input - if ( - errorMessage.includes('waiting for input') || - errorMessage.includes('user_question') - ) { - await this.repository.updateStatus(agentId, 'waiting_for_input'); - - if (this.eventBus) { - const event: AgentWaitingEvent = { - type: 'agent:waiting', - timestamp: new Date(), - payload: { - agentId, - name: agent.name, - taskId: agent.taskId ?? '', - sessionId: agent.sessionId ?? '', - question: errorMessage, - }, - }; - this.eventBus.emit(event); - } - return; - } - - // Actual crash + // Actual crash - structured output failed or subprocess error await this.repository.updateStatus(agentId, 'crashed'); if (this.eventBus) { @@ -324,10 +374,19 @@ export class ClaudeAgentManager implements AgentManager { await this.repository.updateStatus(agentId, 'running'); - // Start CLI with --resume flag + // Start CLI with --resume flag and same JSON schema const subprocess = execa( 'claude', - ['-p', prompt, '--resume', agent.sessionId, '--output-format', 'json'], + [ + '-p', + prompt, + '--resume', + agent.sessionId, + '--output-format', + 'json', + '--json-schema', + JSON.stringify(agentOutputJsonSchema), + ], { cwd: worktree.path, detached: true, @@ -335,6 +394,7 @@ export class ClaudeAgentManager implements AgentManager { } ); + // Clear any previous pending question when resuming this.activeAgents.set(agentId, { subprocess }); if (this.eventBus) { @@ -362,6 +422,14 @@ export class ClaudeAgentManager implements AgentManager { return active?.result ?? null; } + /** + * Get pending question for an agent waiting for input. + */ + async getPendingQuestion(agentId: string): Promise { + const active = this.activeAgents.get(agentId); + return active?.pendingQuestion ?? null; + } + /** * Convert database agent record to AgentInfo. */ diff --git a/src/agent/mock-manager.ts b/src/agent/mock-manager.ts index bc6ea01..09dccf4 100644 --- a/src/agent/mock-manager.ts +++ b/src/agent/mock-manager.ts @@ -13,6 +13,7 @@ import type { SpawnAgentOptions, AgentResult, AgentStatus, + PendingQuestion, } from './types.js'; import type { EventBus, @@ -37,6 +38,10 @@ export interface MockAgentScenario { filesModified?: string[]; /** Question to surface (for waiting_for_input) */ question?: string; + /** Options for question (for waiting_for_input) */ + options?: Array<{ label: string; description?: string }>; + /** Whether multiple options can be selected (for waiting_for_input) */ + multiSelect?: boolean; } /** @@ -46,6 +51,7 @@ interface MockAgentRecord { info: AgentInfo; scenario: MockAgentScenario; result?: AgentResult; + pendingQuestion?: PendingQuestion; completionTimer?: ReturnType; } @@ -231,6 +237,11 @@ export class MockAgentManager implements AgentManager { case 'waiting_for_input': record.info.status = 'waiting_for_input'; record.info.updatedAt = new Date(); + record.pendingQuestion = { + question: scenario.question ?? 'User input required', + options: scenario.options, + multiSelect: scenario.multiSelect, + }; if (this.eventBus) { const event: AgentWaitingEvent = { @@ -242,6 +253,8 @@ export class MockAgentManager implements AgentManager { taskId: info.taskId, sessionId: info.sessionId ?? '', question: scenario.question ?? 'User input required', + options: scenario.options, + multiSelect: scenario.multiSelect, }, }; this.eventBus.emit(event); @@ -334,9 +347,10 @@ export class MockAgentManager implements AgentManager { throw new Error(`Agent '${record.info.name}' has no session to resume`); } - // Update status to running + // Update status to running, clear pending question record.info.status = 'running'; record.info.updatedAt = new Date(); + record.pendingQuestion = undefined; // Emit resumed event if (this.eventBus) { @@ -375,6 +389,14 @@ export class MockAgentManager implements AgentManager { return record?.result ?? null; } + /** + * Get pending question for an agent waiting for input. + */ + async getPendingQuestion(agentId: string): Promise { + const record = this.agents.get(agentId); + return record?.pendingQuestion ?? null; + } + /** * Clear all agents and pending timers. * Useful for test cleanup. diff --git a/src/agent/types.ts b/src/agent/types.ts index 3bc55f5..aa38d42 100644 --- a/src/agent/types.ts +++ b/src/agent/types.ts @@ -55,6 +55,18 @@ export interface AgentResult { filesModified?: string[]; } +/** + * Pending question when agent is waiting for input + */ +export interface PendingQuestion { + /** The question being asked */ + question: string; + /** Optional predefined options for the question */ + options?: Array<{ label: string; description?: string }>; + /** Whether multiple options can be selected */ + multiSelect?: boolean; +} + /** * AgentManager Port Interface * @@ -133,4 +145,14 @@ export interface AgentManager { * @returns Result if available, null if agent still running */ getResult(agentId: string): Promise; + + /** + * Get pending question for an agent waiting for input. + * + * Only available when agent status is 'waiting_for_input'. + * + * @param agentId - Agent ID + * @returns Pending question if available, null otherwise + */ + getPendingQuestion(agentId: string): Promise; }