feat(08.1-01): update ClaudeAgentManager to use structured schema

- Pass --json-schema flag to Claude CLI for validated output
- Parse discriminated union (done/question/unrecoverable_error) in handleAgentCompletion
- Add getPendingQuestion method to AgentManager interface
- Add PendingQuestion type for structured question data
- Store pending question in ActiveAgent for later retrieval
- Remove hacky string matching for waiting_for_input detection
- Update MockAgentManager with getPendingQuestion and options support
- Update tests for new CLI arguments and result format
This commit is contained in:
Lukas May
2026-01-31 15:25:07 +01:00
parent 41598f577f
commit d9673d54a3
4 changed files with 179 additions and 67 deletions

View File

@@ -102,7 +102,7 @@ describe('ClaudeAgentManager', () => {
then: () =>
Promise.resolve({
stdout:
'{"type":"result","subtype":"success","session_id":"sess-123","result":"done"}',
'{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Task completed\\"}"}',
stderr: '',
}),
catch: () => mockSubprocess,
@@ -144,7 +144,7 @@ describe('ClaudeAgentManager', () => {
then: () =>
Promise.resolve({
stdout:
'{"type":"result","subtype":"success","session_id":"sess-123","result":"done"}',
'{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Task completed\\"}"}',
stderr: '',
}),
catch: () => mockSubprocess,
@@ -173,7 +173,7 @@ describe('ClaudeAgentManager', () => {
then: () =>
Promise.resolve({
stdout:
'{"type":"result","subtype":"success","session_id":"sess-123","result":"done"}',
'{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Task completed\\"}"}',
stderr: '',
}),
catch: () => mockSubprocess,
@@ -189,7 +189,7 @@ describe('ClaudeAgentManager', () => {
expect(mockExeca).toHaveBeenCalledWith(
'claude',
['-p', 'Test task', '--output-format', 'json'],
expect.arrayContaining(['-p', 'Test task', '--output-format', 'json', '--json-schema']),
expect.objectContaining({ cwd: '/custom/path' })
);
});
@@ -328,7 +328,7 @@ describe('ClaudeAgentManager', () => {
then: () =>
Promise.resolve({
stdout:
'{"type":"result","subtype":"success","session_id":"sess-123","result":"continued"}',
'{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Continued successfully\\"}"}',
stderr: '',
}),
catch: () => mockSubprocess,
@@ -339,7 +339,7 @@ describe('ClaudeAgentManager', () => {
expect(mockExeca).toHaveBeenCalledWith(
'claude',
['-p', 'User response', '--resume', 'session-789', '--output-format', 'json'],
expect.arrayContaining(['-p', 'User response', '--resume', 'session-789', '--output-format', 'json', '--json-schema']),
expect.any(Object)
);
});
@@ -391,7 +391,7 @@ describe('ClaudeAgentManager', () => {
then: () =>
Promise.resolve({
stdout:
'{"type":"result","subtype":"success","session_id":"sess-123","result":"continued"}',
'{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Continued successfully\\"}"}',
stderr: '',
}),
catch: () => mockSubprocess,

View File

@@ -13,6 +13,7 @@ import type {
SpawnAgentOptions,
AgentResult,
AgentStatus,
PendingQuestion,
} from './types.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { WorktreeManager } from '../git/types.js';
@@ -24,6 +25,7 @@ import type {
AgentResumedEvent,
AgentWaitingEvent,
} from '../events/index.js';
import { agentOutputSchema, agentOutputJsonSchema } from './schema.js';
/**
* Result structure from Claude CLI with --output-format json
@@ -38,11 +40,12 @@ interface ClaudeCliResult {
}
/**
* Tracks an active agent subprocess and its result
* Tracks an active agent subprocess, its result, and any pending question
*/
interface ActiveAgent {
subprocess: ResultPromise;
result?: AgentResult;
pendingQuestion?: PendingQuestion;
}
/**
@@ -92,10 +95,17 @@ export class ClaudeAgentManager implements AgentManager {
// Use agent.id from repository for all tracking
const agentId = agent.id;
// 3. Start Claude CLI in background
// 3. Start Claude CLI in background with JSON schema for structured output
const subprocess = execa(
'claude',
['-p', prompt, '--output-format', 'json'],
[
'-p',
prompt,
'--output-format',
'json',
'--json-schema',
JSON.stringify(agentOutputJsonSchema),
],
{
cwd: cwd ?? worktree.path,
detached: true,
@@ -128,7 +138,7 @@ export class ClaudeAgentManager implements AgentManager {
/**
* Handle agent subprocess completion.
* Parses JSON result, updates session ID, emits events.
* Parses structured JSON result with discriminated union, updates session ID, emits events.
*/
private async handleAgentCompletion(
agentId: string,
@@ -139,38 +149,101 @@ export class ClaudeAgentManager implements AgentManager {
const agent = await this.repository.findById(agentId);
if (!agent) return;
// Parse JSON result (stdout is string when stdio is 'pipe')
const result: ClaudeCliResult = JSON.parse(stdout as string);
// Parse CLI result wrapper (stdout is string when stdio is 'pipe')
const cliResult: ClaudeCliResult = JSON.parse(stdout as string);
// Store session_id for potential resume
if (result.session_id) {
await this.repository.updateSessionId(agentId, result.session_id);
// Store session_id for resume capability
if (cliResult.session_id) {
await this.repository.updateSessionId(agentId, cliResult.session_id);
}
// Store result
// Parse the agent's structured output from result field
const agentOutput = agentOutputSchema.parse(JSON.parse(cliResult.result));
const active = this.activeAgents.get(agentId);
if (active) {
active.result = {
success: result.subtype === 'success',
message: result.result,
};
}
// Update status to idle (ready for next prompt or resume)
await this.repository.updateStatus(agentId, 'idle');
switch (agentOutput.status) {
case 'done': {
// Success path
if (active) {
active.result = {
success: true,
message: agentOutput.result,
filesModified: agentOutput.filesModified,
};
}
await this.repository.updateStatus(agentId, 'idle');
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
reason: 'task_complete',
},
};
this.eventBus.emit(event);
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
reason: 'task_complete',
},
};
this.eventBus.emit(event);
}
break;
}
case 'question': {
// Question path - agent needs input
if (active) {
active.pendingQuestion = {
question: agentOutput.question,
options: agentOutput.options,
multiSelect: agentOutput.multiSelect,
};
}
await this.repository.updateStatus(agentId, 'waiting_for_input');
if (this.eventBus) {
const event: AgentWaitingEvent = {
type: 'agent:waiting',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: cliResult.session_id ?? agent.sessionId ?? '',
question: agentOutput.question,
options: agentOutput.options,
multiSelect: agentOutput.multiSelect,
},
};
this.eventBus.emit(event);
}
break;
}
case 'unrecoverable_error': {
// Error path
if (active) {
active.result = {
success: false,
message: agentOutput.error,
};
}
await this.repository.updateStatus(agentId, 'crashed');
if (this.eventBus) {
const event: AgentCrashedEvent = {
type: 'agent:crashed',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
error: agentOutput.error,
},
};
this.eventBus.emit(event);
}
break;
}
}
} catch (error) {
await this.handleAgentError(agentId, error);
@@ -178,7 +251,9 @@ export class ClaudeAgentManager implements AgentManager {
}
/**
* Handle agent errors - either waiting_for_input or crash.
* Handle agent errors - actual crashes (not waiting for input).
* With structured output via --json-schema, question status is handled in
* handleAgentCompletion. This method only handles real subprocess errors.
*/
private async handleAgentError(
agentId: string,
@@ -188,32 +263,7 @@ export class ClaudeAgentManager implements AgentManager {
const agent = await this.repository.findById(agentId);
if (!agent) return;
// Check if this is a "waiting for input" scenario (agent asked AskUserQuestion)
// The CLI exits with a specific pattern when waiting for user input
if (
errorMessage.includes('waiting for input') ||
errorMessage.includes('user_question')
) {
await this.repository.updateStatus(agentId, 'waiting_for_input');
if (this.eventBus) {
const event: AgentWaitingEvent = {
type: 'agent:waiting',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: agent.sessionId ?? '',
question: errorMessage,
},
};
this.eventBus.emit(event);
}
return;
}
// Actual crash
// Actual crash - structured output failed or subprocess error
await this.repository.updateStatus(agentId, 'crashed');
if (this.eventBus) {
@@ -324,10 +374,19 @@ export class ClaudeAgentManager implements AgentManager {
await this.repository.updateStatus(agentId, 'running');
// Start CLI with --resume flag
// Start CLI with --resume flag and same JSON schema
const subprocess = execa(
'claude',
['-p', prompt, '--resume', agent.sessionId, '--output-format', 'json'],
[
'-p',
prompt,
'--resume',
agent.sessionId,
'--output-format',
'json',
'--json-schema',
JSON.stringify(agentOutputJsonSchema),
],
{
cwd: worktree.path,
detached: true,
@@ -335,6 +394,7 @@ export class ClaudeAgentManager implements AgentManager {
}
);
// Clear any previous pending question when resuming
this.activeAgents.set(agentId, { subprocess });
if (this.eventBus) {
@@ -362,6 +422,14 @@ export class ClaudeAgentManager implements AgentManager {
return active?.result ?? null;
}
/**
* Get pending question for an agent waiting for input.
*/
async getPendingQuestion(agentId: string): Promise<PendingQuestion | null> {
const active = this.activeAgents.get(agentId);
return active?.pendingQuestion ?? null;
}
/**
* Convert database agent record to AgentInfo.
*/

View File

@@ -13,6 +13,7 @@ import type {
SpawnAgentOptions,
AgentResult,
AgentStatus,
PendingQuestion,
} from './types.js';
import type {
EventBus,
@@ -37,6 +38,10 @@ export interface MockAgentScenario {
filesModified?: string[];
/** Question to surface (for waiting_for_input) */
question?: string;
/** Options for question (for waiting_for_input) */
options?: Array<{ label: string; description?: string }>;
/** Whether multiple options can be selected (for waiting_for_input) */
multiSelect?: boolean;
}
/**
@@ -46,6 +51,7 @@ interface MockAgentRecord {
info: AgentInfo;
scenario: MockAgentScenario;
result?: AgentResult;
pendingQuestion?: PendingQuestion;
completionTimer?: ReturnType<typeof setTimeout>;
}
@@ -231,6 +237,11 @@ export class MockAgentManager implements AgentManager {
case 'waiting_for_input':
record.info.status = 'waiting_for_input';
record.info.updatedAt = new Date();
record.pendingQuestion = {
question: scenario.question ?? 'User input required',
options: scenario.options,
multiSelect: scenario.multiSelect,
};
if (this.eventBus) {
const event: AgentWaitingEvent = {
@@ -242,6 +253,8 @@ export class MockAgentManager implements AgentManager {
taskId: info.taskId,
sessionId: info.sessionId ?? '',
question: scenario.question ?? 'User input required',
options: scenario.options,
multiSelect: scenario.multiSelect,
},
};
this.eventBus.emit(event);
@@ -334,9 +347,10 @@ export class MockAgentManager implements AgentManager {
throw new Error(`Agent '${record.info.name}' has no session to resume`);
}
// Update status to running
// Update status to running, clear pending question
record.info.status = 'running';
record.info.updatedAt = new Date();
record.pendingQuestion = undefined;
// Emit resumed event
if (this.eventBus) {
@@ -375,6 +389,14 @@ export class MockAgentManager implements AgentManager {
return record?.result ?? null;
}
/**
* Get pending question for an agent waiting for input.
*/
async getPendingQuestion(agentId: string): Promise<PendingQuestion | null> {
const record = this.agents.get(agentId);
return record?.pendingQuestion ?? null;
}
/**
* Clear all agents and pending timers.
* Useful for test cleanup.

View File

@@ -55,6 +55,18 @@ export interface AgentResult {
filesModified?: string[];
}
/**
* Pending question when agent is waiting for input
*/
export interface PendingQuestion {
/** The question being asked */
question: string;
/** Optional predefined options for the question */
options?: Array<{ label: string; description?: string }>;
/** Whether multiple options can be selected */
multiSelect?: boolean;
}
/**
* AgentManager Port Interface
*
@@ -133,4 +145,14 @@ export interface AgentManager {
* @returns Result if available, null if agent still running
*/
getResult(agentId: string): Promise<AgentResult | null>;
/**
* Get pending question for an agent waiting for input.
*
* Only available when agent status is 'waiting_for_input'.
*
* @param agentId - Agent ID
* @returns Pending question if available, null otherwise
*/
getPendingQuestion(agentId: string): Promise<PendingQuestion | null>;
}