feat(08.1-01): update ClaudeAgentManager to use structured schema
- Pass --json-schema flag to Claude CLI for validated output - Parse discriminated union (done/question/unrecoverable_error) in handleAgentCompletion - Add getPendingQuestion method to AgentManager interface - Add PendingQuestion type for structured question data - Store pending question in ActiveAgent for later retrieval - Remove hacky string matching for waiting_for_input detection - Update MockAgentManager with getPendingQuestion and options support - Update tests for new CLI arguments and result format
This commit is contained in:
@@ -102,7 +102,7 @@ describe('ClaudeAgentManager', () => {
|
||||
then: () =>
|
||||
Promise.resolve({
|
||||
stdout:
|
||||
'{"type":"result","subtype":"success","session_id":"sess-123","result":"done"}',
|
||||
'{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Task completed\\"}"}',
|
||||
stderr: '',
|
||||
}),
|
||||
catch: () => mockSubprocess,
|
||||
@@ -144,7 +144,7 @@ describe('ClaudeAgentManager', () => {
|
||||
then: () =>
|
||||
Promise.resolve({
|
||||
stdout:
|
||||
'{"type":"result","subtype":"success","session_id":"sess-123","result":"done"}',
|
||||
'{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Task completed\\"}"}',
|
||||
stderr: '',
|
||||
}),
|
||||
catch: () => mockSubprocess,
|
||||
@@ -173,7 +173,7 @@ describe('ClaudeAgentManager', () => {
|
||||
then: () =>
|
||||
Promise.resolve({
|
||||
stdout:
|
||||
'{"type":"result","subtype":"success","session_id":"sess-123","result":"done"}',
|
||||
'{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Task completed\\"}"}',
|
||||
stderr: '',
|
||||
}),
|
||||
catch: () => mockSubprocess,
|
||||
@@ -189,7 +189,7 @@ describe('ClaudeAgentManager', () => {
|
||||
|
||||
expect(mockExeca).toHaveBeenCalledWith(
|
||||
'claude',
|
||||
['-p', 'Test task', '--output-format', 'json'],
|
||||
expect.arrayContaining(['-p', 'Test task', '--output-format', 'json', '--json-schema']),
|
||||
expect.objectContaining({ cwd: '/custom/path' })
|
||||
);
|
||||
});
|
||||
@@ -328,7 +328,7 @@ describe('ClaudeAgentManager', () => {
|
||||
then: () =>
|
||||
Promise.resolve({
|
||||
stdout:
|
||||
'{"type":"result","subtype":"success","session_id":"sess-123","result":"continued"}',
|
||||
'{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Continued successfully\\"}"}',
|
||||
stderr: '',
|
||||
}),
|
||||
catch: () => mockSubprocess,
|
||||
@@ -339,7 +339,7 @@ describe('ClaudeAgentManager', () => {
|
||||
|
||||
expect(mockExeca).toHaveBeenCalledWith(
|
||||
'claude',
|
||||
['-p', 'User response', '--resume', 'session-789', '--output-format', 'json'],
|
||||
expect.arrayContaining(['-p', 'User response', '--resume', 'session-789', '--output-format', 'json', '--json-schema']),
|
||||
expect.any(Object)
|
||||
);
|
||||
});
|
||||
@@ -391,7 +391,7 @@ describe('ClaudeAgentManager', () => {
|
||||
then: () =>
|
||||
Promise.resolve({
|
||||
stdout:
|
||||
'{"type":"result","subtype":"success","session_id":"sess-123","result":"continued"}',
|
||||
'{"type":"result","subtype":"success","session_id":"sess-123","result":"{\\"status\\":\\"done\\",\\"result\\":\\"Continued successfully\\"}"}',
|
||||
stderr: '',
|
||||
}),
|
||||
catch: () => mockSubprocess,
|
||||
|
||||
@@ -13,6 +13,7 @@ import type {
|
||||
SpawnAgentOptions,
|
||||
AgentResult,
|
||||
AgentStatus,
|
||||
PendingQuestion,
|
||||
} from './types.js';
|
||||
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||
import type { WorktreeManager } from '../git/types.js';
|
||||
@@ -24,6 +25,7 @@ import type {
|
||||
AgentResumedEvent,
|
||||
AgentWaitingEvent,
|
||||
} from '../events/index.js';
|
||||
import { agentOutputSchema, agentOutputJsonSchema } from './schema.js';
|
||||
|
||||
/**
|
||||
* Result structure from Claude CLI with --output-format json
|
||||
@@ -38,11 +40,12 @@ interface ClaudeCliResult {
|
||||
}
|
||||
|
||||
/**
|
||||
* Tracks an active agent subprocess and its result
|
||||
* Tracks an active agent subprocess, its result, and any pending question
|
||||
*/
|
||||
interface ActiveAgent {
|
||||
subprocess: ResultPromise;
|
||||
result?: AgentResult;
|
||||
pendingQuestion?: PendingQuestion;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -92,10 +95,17 @@ export class ClaudeAgentManager implements AgentManager {
|
||||
// Use agent.id from repository for all tracking
|
||||
const agentId = agent.id;
|
||||
|
||||
// 3. Start Claude CLI in background
|
||||
// 3. Start Claude CLI in background with JSON schema for structured output
|
||||
const subprocess = execa(
|
||||
'claude',
|
||||
['-p', prompt, '--output-format', 'json'],
|
||||
[
|
||||
'-p',
|
||||
prompt,
|
||||
'--output-format',
|
||||
'json',
|
||||
'--json-schema',
|
||||
JSON.stringify(agentOutputJsonSchema),
|
||||
],
|
||||
{
|
||||
cwd: cwd ?? worktree.path,
|
||||
detached: true,
|
||||
@@ -128,7 +138,7 @@ export class ClaudeAgentManager implements AgentManager {
|
||||
|
||||
/**
|
||||
* Handle agent subprocess completion.
|
||||
* Parses JSON result, updates session ID, emits events.
|
||||
* Parses structured JSON result with discriminated union, updates session ID, emits events.
|
||||
*/
|
||||
private async handleAgentCompletion(
|
||||
agentId: string,
|
||||
@@ -139,38 +149,101 @@ export class ClaudeAgentManager implements AgentManager {
|
||||
const agent = await this.repository.findById(agentId);
|
||||
if (!agent) return;
|
||||
|
||||
// Parse JSON result (stdout is string when stdio is 'pipe')
|
||||
const result: ClaudeCliResult = JSON.parse(stdout as string);
|
||||
// Parse CLI result wrapper (stdout is string when stdio is 'pipe')
|
||||
const cliResult: ClaudeCliResult = JSON.parse(stdout as string);
|
||||
|
||||
// Store session_id for potential resume
|
||||
if (result.session_id) {
|
||||
await this.repository.updateSessionId(agentId, result.session_id);
|
||||
// Store session_id for resume capability
|
||||
if (cliResult.session_id) {
|
||||
await this.repository.updateSessionId(agentId, cliResult.session_id);
|
||||
}
|
||||
|
||||
// Store result
|
||||
// Parse the agent's structured output from result field
|
||||
const agentOutput = agentOutputSchema.parse(JSON.parse(cliResult.result));
|
||||
const active = this.activeAgents.get(agentId);
|
||||
if (active) {
|
||||
active.result = {
|
||||
success: result.subtype === 'success',
|
||||
message: result.result,
|
||||
};
|
||||
}
|
||||
|
||||
// Update status to idle (ready for next prompt or resume)
|
||||
await this.repository.updateStatus(agentId, 'idle');
|
||||
switch (agentOutput.status) {
|
||||
case 'done': {
|
||||
// Success path
|
||||
if (active) {
|
||||
active.result = {
|
||||
success: true,
|
||||
message: agentOutput.result,
|
||||
filesModified: agentOutput.filesModified,
|
||||
};
|
||||
}
|
||||
await this.repository.updateStatus(agentId, 'idle');
|
||||
|
||||
if (this.eventBus) {
|
||||
const event: AgentStoppedEvent = {
|
||||
type: 'agent:stopped',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
agentId,
|
||||
name: agent.name,
|
||||
taskId: agent.taskId ?? '',
|
||||
reason: 'task_complete',
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
if (this.eventBus) {
|
||||
const event: AgentStoppedEvent = {
|
||||
type: 'agent:stopped',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
agentId,
|
||||
name: agent.name,
|
||||
taskId: agent.taskId ?? '',
|
||||
reason: 'task_complete',
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'question': {
|
||||
// Question path - agent needs input
|
||||
if (active) {
|
||||
active.pendingQuestion = {
|
||||
question: agentOutput.question,
|
||||
options: agentOutput.options,
|
||||
multiSelect: agentOutput.multiSelect,
|
||||
};
|
||||
}
|
||||
await this.repository.updateStatus(agentId, 'waiting_for_input');
|
||||
|
||||
if (this.eventBus) {
|
||||
const event: AgentWaitingEvent = {
|
||||
type: 'agent:waiting',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
agentId,
|
||||
name: agent.name,
|
||||
taskId: agent.taskId ?? '',
|
||||
sessionId: cliResult.session_id ?? agent.sessionId ?? '',
|
||||
question: agentOutput.question,
|
||||
options: agentOutput.options,
|
||||
multiSelect: agentOutput.multiSelect,
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'unrecoverable_error': {
|
||||
// Error path
|
||||
if (active) {
|
||||
active.result = {
|
||||
success: false,
|
||||
message: agentOutput.error,
|
||||
};
|
||||
}
|
||||
await this.repository.updateStatus(agentId, 'crashed');
|
||||
|
||||
if (this.eventBus) {
|
||||
const event: AgentCrashedEvent = {
|
||||
type: 'agent:crashed',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
agentId,
|
||||
name: agent.name,
|
||||
taskId: agent.taskId ?? '',
|
||||
error: agentOutput.error,
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
await this.handleAgentError(agentId, error);
|
||||
@@ -178,7 +251,9 @@ export class ClaudeAgentManager implements AgentManager {
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle agent errors - either waiting_for_input or crash.
|
||||
* Handle agent errors - actual crashes (not waiting for input).
|
||||
* With structured output via --json-schema, question status is handled in
|
||||
* handleAgentCompletion. This method only handles real subprocess errors.
|
||||
*/
|
||||
private async handleAgentError(
|
||||
agentId: string,
|
||||
@@ -188,32 +263,7 @@ export class ClaudeAgentManager implements AgentManager {
|
||||
const agent = await this.repository.findById(agentId);
|
||||
if (!agent) return;
|
||||
|
||||
// Check if this is a "waiting for input" scenario (agent asked AskUserQuestion)
|
||||
// The CLI exits with a specific pattern when waiting for user input
|
||||
if (
|
||||
errorMessage.includes('waiting for input') ||
|
||||
errorMessage.includes('user_question')
|
||||
) {
|
||||
await this.repository.updateStatus(agentId, 'waiting_for_input');
|
||||
|
||||
if (this.eventBus) {
|
||||
const event: AgentWaitingEvent = {
|
||||
type: 'agent:waiting',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
agentId,
|
||||
name: agent.name,
|
||||
taskId: agent.taskId ?? '',
|
||||
sessionId: agent.sessionId ?? '',
|
||||
question: errorMessage,
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Actual crash
|
||||
// Actual crash - structured output failed or subprocess error
|
||||
await this.repository.updateStatus(agentId, 'crashed');
|
||||
|
||||
if (this.eventBus) {
|
||||
@@ -324,10 +374,19 @@ export class ClaudeAgentManager implements AgentManager {
|
||||
|
||||
await this.repository.updateStatus(agentId, 'running');
|
||||
|
||||
// Start CLI with --resume flag
|
||||
// Start CLI with --resume flag and same JSON schema
|
||||
const subprocess = execa(
|
||||
'claude',
|
||||
['-p', prompt, '--resume', agent.sessionId, '--output-format', 'json'],
|
||||
[
|
||||
'-p',
|
||||
prompt,
|
||||
'--resume',
|
||||
agent.sessionId,
|
||||
'--output-format',
|
||||
'json',
|
||||
'--json-schema',
|
||||
JSON.stringify(agentOutputJsonSchema),
|
||||
],
|
||||
{
|
||||
cwd: worktree.path,
|
||||
detached: true,
|
||||
@@ -335,6 +394,7 @@ export class ClaudeAgentManager implements AgentManager {
|
||||
}
|
||||
);
|
||||
|
||||
// Clear any previous pending question when resuming
|
||||
this.activeAgents.set(agentId, { subprocess });
|
||||
|
||||
if (this.eventBus) {
|
||||
@@ -362,6 +422,14 @@ export class ClaudeAgentManager implements AgentManager {
|
||||
return active?.result ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get pending question for an agent waiting for input.
|
||||
*/
|
||||
async getPendingQuestion(agentId: string): Promise<PendingQuestion | null> {
|
||||
const active = this.activeAgents.get(agentId);
|
||||
return active?.pendingQuestion ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert database agent record to AgentInfo.
|
||||
*/
|
||||
|
||||
@@ -13,6 +13,7 @@ import type {
|
||||
SpawnAgentOptions,
|
||||
AgentResult,
|
||||
AgentStatus,
|
||||
PendingQuestion,
|
||||
} from './types.js';
|
||||
import type {
|
||||
EventBus,
|
||||
@@ -37,6 +38,10 @@ export interface MockAgentScenario {
|
||||
filesModified?: string[];
|
||||
/** Question to surface (for waiting_for_input) */
|
||||
question?: string;
|
||||
/** Options for question (for waiting_for_input) */
|
||||
options?: Array<{ label: string; description?: string }>;
|
||||
/** Whether multiple options can be selected (for waiting_for_input) */
|
||||
multiSelect?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -46,6 +51,7 @@ interface MockAgentRecord {
|
||||
info: AgentInfo;
|
||||
scenario: MockAgentScenario;
|
||||
result?: AgentResult;
|
||||
pendingQuestion?: PendingQuestion;
|
||||
completionTimer?: ReturnType<typeof setTimeout>;
|
||||
}
|
||||
|
||||
@@ -231,6 +237,11 @@ export class MockAgentManager implements AgentManager {
|
||||
case 'waiting_for_input':
|
||||
record.info.status = 'waiting_for_input';
|
||||
record.info.updatedAt = new Date();
|
||||
record.pendingQuestion = {
|
||||
question: scenario.question ?? 'User input required',
|
||||
options: scenario.options,
|
||||
multiSelect: scenario.multiSelect,
|
||||
};
|
||||
|
||||
if (this.eventBus) {
|
||||
const event: AgentWaitingEvent = {
|
||||
@@ -242,6 +253,8 @@ export class MockAgentManager implements AgentManager {
|
||||
taskId: info.taskId,
|
||||
sessionId: info.sessionId ?? '',
|
||||
question: scenario.question ?? 'User input required',
|
||||
options: scenario.options,
|
||||
multiSelect: scenario.multiSelect,
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
@@ -334,9 +347,10 @@ export class MockAgentManager implements AgentManager {
|
||||
throw new Error(`Agent '${record.info.name}' has no session to resume`);
|
||||
}
|
||||
|
||||
// Update status to running
|
||||
// Update status to running, clear pending question
|
||||
record.info.status = 'running';
|
||||
record.info.updatedAt = new Date();
|
||||
record.pendingQuestion = undefined;
|
||||
|
||||
// Emit resumed event
|
||||
if (this.eventBus) {
|
||||
@@ -375,6 +389,14 @@ export class MockAgentManager implements AgentManager {
|
||||
return record?.result ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get pending question for an agent waiting for input.
|
||||
*/
|
||||
async getPendingQuestion(agentId: string): Promise<PendingQuestion | null> {
|
||||
const record = this.agents.get(agentId);
|
||||
return record?.pendingQuestion ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all agents and pending timers.
|
||||
* Useful for test cleanup.
|
||||
|
||||
@@ -55,6 +55,18 @@ export interface AgentResult {
|
||||
filesModified?: string[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Pending question when agent is waiting for input
|
||||
*/
|
||||
export interface PendingQuestion {
|
||||
/** The question being asked */
|
||||
question: string;
|
||||
/** Optional predefined options for the question */
|
||||
options?: Array<{ label: string; description?: string }>;
|
||||
/** Whether multiple options can be selected */
|
||||
multiSelect?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* AgentManager Port Interface
|
||||
*
|
||||
@@ -133,4 +145,14 @@ export interface AgentManager {
|
||||
* @returns Result if available, null if agent still running
|
||||
*/
|
||||
getResult(agentId: string): Promise<AgentResult | null>;
|
||||
|
||||
/**
|
||||
* Get pending question for an agent waiting for input.
|
||||
*
|
||||
* Only available when agent status is 'waiting_for_input'.
|
||||
*
|
||||
* @param agentId - Agent ID
|
||||
* @returns Pending question if available, null otherwise
|
||||
*/
|
||||
getPendingQuestion(agentId: string): Promise<PendingQuestion | null>;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user