/** * OutputHandler — Stream event processing, signal parsing, file reading, result capture. * * Extracted from MultiProviderAgentManager. Processes all output from agent * subprocesses: stream events, agent signals, output files, and result/question * retrieval. */ import { readFile } from 'node:fs/promises'; import { existsSync } from 'node:fs'; import { join } from 'node:path'; import type { AgentRepository } from '../db/repositories/agent-repository.js'; import type { ChangeSetRepository, CreateChangeSetEntryData } from '../db/repositories/change-set-repository.js'; import type { PhaseRepository } from '../db/repositories/phase-repository.js'; import type { TaskRepository } from '../db/repositories/task-repository.js'; import type { PageRepository } from '../db/repositories/page-repository.js'; import type { EventBus, AgentStoppedEvent, AgentCrashedEvent, AgentWaitingEvent, } from '../events/index.js'; import type { AgentResult, AgentMode, PendingQuestions, QuestionItem, } from './types.js'; import type { StreamEvent } from './providers/parsers/index.js'; import type { AgentProviderConfig } from './providers/types.js'; import { agentSignalSchema } from './schema.js'; import { readSummary, readPhaseFiles, readTaskFiles, readDecisionFiles, readPageFiles, readFrontmatterFile, } from './file-io.js'; import { getProvider } from './providers/registry.js'; import { markdownToTiptapJson } from './markdown-to-tiptap.js'; import type { SignalManager } from './lifecycle/signal-manager.js'; import { createModuleLogger } from '../logger/index.js'; const log = createModuleLogger('output-handler'); /** * Tracks an active agent with its PID and file tailer. */ export interface ActiveAgent { agentId: string; pid: number; tailer: import('./file-tailer.js').FileTailer; outputFilePath: string; /** Actual working directory the agent process runs in (may differ from getAgentWorkdir for standalone agents) */ agentCwd?: string; result?: AgentResult; pendingQuestions?: PendingQuestions; streamResultText?: string; streamSessionId?: string; streamCostUsd?: number; /** True when the stream result indicated an error (e.g. auth failure) */ streamIsError?: boolean; /** Cancel handle for polling timer — call to stop polling on cleanup */ cancelPoll?: () => void; } /** * Result structure from Claude CLI with --output-format json. */ interface ClaudeCliResult { type: 'result'; subtype: 'success' | 'error'; is_error: boolean; session_id: string; result: string; structured_output?: unknown; total_cost_usd?: number; } export class OutputHandler { private filePositions = new Map(); private completionLocks = new Set(); // Track agents currently being processed constructor( private repository: AgentRepository, private eventBus?: EventBus, private changeSetRepository?: ChangeSetRepository, private phaseRepository?: PhaseRepository, private taskRepository?: TaskRepository, private pageRepository?: PageRepository, private signalManager?: SignalManager, ) {} /** * Validate that a signal file is complete and properly formatted. */ private async validateSignalFile(filePath: string): Promise { try { const content = await readFile(filePath, 'utf-8'); const trimmed = content.trim(); if (!trimmed) return false; // Check if JSON is complete (ends with } or ]) const endsCorrectly = trimmed.endsWith('}') || trimmed.endsWith(']'); if (!endsCorrectly) return false; // Try to parse as JSON to ensure it's valid JSON.parse(trimmed); return true; } catch { return false; } } /** * Read complete lines from a file, avoiding partial lines that might still be writing. * This eliminates race conditions when agents are still writing output. */ private async readCompleteLines(filePath: string, fromPosition: number = 0): Promise<{ content: string; lastPosition: number }> { try { const content = await readFile(filePath, 'utf-8'); if (fromPosition >= content.length) { return { content: '', lastPosition: fromPosition }; } // Get content from our last read position const newContent = content.slice(fromPosition); // Split into lines const lines = newContent.split('\n'); // If file doesn't end with newline, last element is potentially incomplete // Only process complete lines (all but the last, unless file ends with \n) const hasTrailingNewline = newContent.endsWith('\n'); const completeLines = hasTrailingNewline ? lines : lines.slice(0, -1); // Calculate new position (only count complete lines) const completeLinesContent = completeLines.join('\n') + (completeLines.length > 0 && hasTrailingNewline ? '\n' : ''); const newPosition = fromPosition + Buffer.byteLength(completeLinesContent, 'utf-8'); return { content: completeLinesContent, lastPosition: newPosition }; } catch (err) { log.debug({ filePath, err: err instanceof Error ? err.message : String(err) }, 'failed to read output file lines'); return { content: '', lastPosition: fromPosition }; } } /** * Handle a standardized stream event from a parser. */ handleStreamEvent( agentId: string, event: StreamEvent, active: ActiveAgent | undefined, ): void { switch (event.type) { case 'init': if (active && event.sessionId) { active.streamSessionId = event.sessionId; this.repository.update(agentId, { sessionId: event.sessionId }).catch((err) => { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to update session ID'); }); } break; case 'text_delta': // Text deltas are now streamed via DB log chunks + EventBus in manager.createLogChunkCallback break; case 'tool_use_start': log.debug({ agentId, tool: event.name, toolId: event.id }, 'tool use started'); break; case 'result': if (active) { active.streamResultText = event.text; active.streamCostUsd = event.costUsd; active.streamIsError = event.isError === true; if (!active.streamSessionId && event.sessionId) { active.streamSessionId = event.sessionId; } } break; case 'error': log.error({ agentId, error: event.message }, 'stream error event'); break; case 'turn_end': log.debug({ agentId, stopReason: event.stopReason }, 'turn ended'); break; } } /** * Handle completion of a detached agent. * Processes the final result from the stream data captured by the tailer. * * RACE CONDITION FIX: Uses a completion lock to prevent duplicate processing. * Both the polling handler (handleDetachedAgentCompletion) and crash handler * (handleProcessCrashed) can call this method when a process exits with non-zero code. * The mutex ensures only one handler processes the completion per agent. */ async handleCompletion( agentId: string, active: ActiveAgent | undefined, getAgentWorkdir: (alias: string) => string, ): Promise { // CRITICAL: Prevent race condition - only one completion handler per agent if (this.completionLocks.has(agentId)) { log.debug({ agentId }, 'completion already being processed - skipping duplicate'); return; } this.completionLocks.add(agentId); try { const agent = await this.repository.findById(agentId); if (!agent) return; const provider = getProvider(agent.provider); if (!provider) return; log.debug({ agentId }, 'detached agent completed'); // Resolve actual agent working directory — standalone agents run in a // "workspace/" subdirectory inside getAgentWorkdir, so prefer agentCwd // recorded at spawn time when available. const agentWorkdir = active?.agentCwd ?? getAgentWorkdir(agent.worktreeId); const outputDir = join(agentWorkdir, '.cw', 'output'); const expectedPwdFile = join(agentWorkdir, '.cw', 'expected-pwd.txt'); const diagnosticFile = join(agentWorkdir, '.cw', 'spawn-diagnostic.json'); const outputDirExists = existsSync(outputDir); const expectedPwdExists = existsSync(expectedPwdFile); const diagnosticExists = existsSync(diagnosticFile); log.info({ agentId, agentWorkdir, outputDirExists, expectedPwdExists, diagnosticExists, verification: outputDirExists ? 'PASS' : 'FAIL' }, 'agent workdir verification completed'); if (!outputDirExists) { log.warn({ agentId, agentWorkdir }, 'No output files found in agent workdir! Agent may have run in wrong location.'); } let signalText = active?.streamResultText; // If the stream result indicated an error (e.g. auth failure, usage limit), // route directly to error handling instead of trying to parse as signal JSON if (signalText && active?.streamIsError) { log.warn({ agentId, error: signalText }, 'agent returned error result'); await this.handleAgentError(agentId, new Error(signalText), provider, getAgentWorkdir); return; } if (!signalText) { try { const outputFilePath = active?.outputFilePath ?? ''; if (outputFilePath) { // First, check for robust signal.json completion before attempting incremental reading log.debug({ agentId, worktreeId: agent.worktreeId, agentWorkdir }, 'checking signal completion'); const hasSignalCompletion = await this.readSignalCompletion(agentWorkdir); log.debug({ agentId, agentWorkdir, hasSignalCompletion }, 'signal completion check result'); if (hasSignalCompletion) { const signalPath = join(agentWorkdir, '.cw/output/signal.json'); const signalContent = await readFile(signalPath, 'utf-8'); log.debug({ agentId, signalPath }, 'detected completion via signal.json, processing'); await this.processSignalAndFiles(agentId, signalContent, agent.mode as AgentMode, getAgentWorkdir, active?.streamSessionId); return; } else { log.debug({ agentId, agentWorkdir }, 'no signal completion found, proceeding with raw output'); } // Read only complete lines from the file, avoiding race conditions const lastPosition = this.filePositions.get(agentId) || 0; const { content: fileContent, lastPosition: newPosition } = await this.readCompleteLines(outputFilePath, lastPosition); if (fileContent.trim()) { this.filePositions.set(agentId, newPosition); await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir); return; } // If no new complete lines, but file might still be writing, try again with validation if (await this.validateSignalFile(outputFilePath)) { const fullContent = await readFile(outputFilePath, 'utf-8'); if (fullContent.trim() && fullContent.length > newPosition) { // File is complete and has content beyond what we've read await this.processAgentOutput(agentId, fullContent, provider, getAgentWorkdir); return; } } } } catch { /* file empty or missing */ } log.debug({ agentId }, 'no result from stream or file, marking as error'); await this.handleAgentError(agentId, new Error('No output received'), provider, getAgentWorkdir); return; } // Check for signal.json file first, then fall back to stream text if (await this.readSignalCompletion(agentWorkdir)) { const signalPath = join(agentWorkdir, '.cw/output/signal.json'); const signalContent = await readFile(signalPath, 'utf-8'); log.debug({ agentId, signalPath }, 'using signal.json content for completion'); await this.processSignalAndFiles(agentId, signalContent, agent.mode as AgentMode, getAgentWorkdir, active?.streamSessionId); } else { log.debug({ agentId }, 'using stream text for completion (no signal.json found)'); await this.processSignalAndFiles( agentId, signalText, agent.mode as AgentMode, getAgentWorkdir, active?.streamSessionId, ); } } finally { this.completionLocks.delete(agentId); this.filePositions.delete(agentId); } } /** * Process agent signal JSON and read output files. * Universal handler for all providers and modes. */ async processSignalAndFiles( agentId: string, signalText: string, mode: AgentMode, getAgentWorkdir: (alias: string) => string, sessionId?: string, ): Promise { const agent = await this.repository.findById(agentId); if (!agent) return; let signal; let parsed; // Step 1: JSON parsing try { parsed = JSON.parse(signalText.trim()); log.debug({ agentId }, 'signal JSON parsing successful'); } catch (jsonError) { log.error({ agentId, signalText: signalText.trim(), error: jsonError instanceof Error ? jsonError.message : String(jsonError), stack: jsonError instanceof Error ? jsonError.stack : undefined }, 'signal JSON parsing failed'); await this.repository.update(agentId, { status: 'crashed' }); this.emitCrashed(agent, 'Failed to parse agent signal JSON'); return; } // Step 2: Schema validation try { signal = agentSignalSchema.parse(parsed); log.debug({ agentId, signalStatus: signal.status }, 'signal schema validation passed'); } catch (schemaError) { log.error({ agentId, signalText: signalText.trim(), parsed, error: schemaError instanceof Error ? schemaError.message : String(schemaError), stack: schemaError instanceof Error ? schemaError.stack : undefined }, 'signal schema validation failed'); await this.repository.update(agentId, { status: 'crashed' }); this.emitCrashed(agent, 'Failed to validate agent signal schema'); return; } switch (signal.status) { case 'done': await this.processOutputFiles(agentId, agent, mode, getAgentWorkdir); break; case 'questions': await this.handleQuestions(agentId, agent, signal.questions, sessionId); break; case 'error': await this.handleSignalError(agentId, agent, signal.error); break; } } /** * Process output files from agent workdir after successful completion. * Performs direct writes to entities and records change sets. */ private async processOutputFiles( agentId: string, agent: { id: string; name: string; taskId: string | null; worktreeId: string; mode: string; initiativeId?: string | null }, mode: AgentMode, getAgentWorkdir: (alias: string) => string, ): Promise { const agentWorkdir = getAgentWorkdir(agent.worktreeId); const summary = readSummary(agentWorkdir); const initiativeId = agent.initiativeId; const canWriteChangeSets = this.changeSetRepository && initiativeId; let resultMessage = summary?.body ?? 'Task completed'; switch (mode) { case 'plan': { const phases = readPhaseFiles(agentWorkdir); if (canWriteChangeSets && this.phaseRepository && phases.length > 0) { const entries: CreateChangeSetEntryData[] = []; // First pass: create phases for (const [i, p] of phases.entries()) { try { const tiptapContent = p.body ? JSON.stringify(markdownToTiptapJson(p.body)) : undefined; const created = await this.phaseRepository.create({ id: p.id ?? undefined, initiativeId, name: p.title, content: tiptapContent, }); entries.push({ entityType: 'phase', entityId: created.id, action: 'create', newState: JSON.stringify(created), sortOrder: i, }); this.eventBus?.emit({ type: 'phase:started' as const, timestamp: new Date(), payload: { phaseId: created.id, initiativeId }, }); } catch (err) { log.warn({ agentId, phase: p.title, err: err instanceof Error ? err.message : String(err) }, 'failed to create phase'); } } // Second pass: create phase dependencies let depSortOrder = entries.length; for (const p of phases) { const phaseId = p.id; if (!phaseId || !Array.isArray(p.dependencies)) continue; for (const depFileId of p.dependencies) { try { await this.phaseRepository.createDependency(phaseId, depFileId); entries.push({ entityType: 'phase_dependency', entityId: `${phaseId}:${depFileId}`, action: 'create', newState: JSON.stringify({ phaseId, dependsOnPhaseId: depFileId }), sortOrder: depSortOrder++, }); } catch (err) { log.warn({ agentId, phaseId, depFileId, err: err instanceof Error ? err.message : String(err) }, 'failed to create phase dependency'); } } } if (entries.length > 0) { try { const cs = await this.changeSetRepository!.createWithEntries({ agentId, agentName: agent.name, initiativeId, mode: 'plan', summary: summary?.body ?? `Created ${phases.length} phases`, }, entries); this.eventBus?.emit({ type: 'changeset:created' as const, timestamp: new Date(), payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'plan', entryCount: entries.length }, }); } catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record change set after successful writes'); } } resultMessage = summary?.body ?? `${phases.length} phases created`; } else { resultMessage = JSON.stringify({ summary: summary?.body, phases }); } break; } case 'detail': { const tasks = readTaskFiles(agentWorkdir); if (canWriteChangeSets && this.taskRepository && tasks.length > 0) { const phaseInput = readFrontmatterFile(join(agentWorkdir, '.cw', 'input', 'phase.md')); const phaseId = (phaseInput?.data?.id as string) ?? null; const entries: CreateChangeSetEntryData[] = []; const fileIdToDbId = new Map(); // Load existing tasks for dedup — prevents duplicates when multiple agents finish concurrently const existingTasks = phaseId ? await this.taskRepository.findByPhaseId(phaseId) : []; const existingNames = new Set(existingTasks.map(t => t.name)); for (const [i, t] of tasks.entries()) { if (existingNames.has(t.title)) { log.info({ agentId, task: t.title, phaseId }, 'skipped duplicate task'); // Map deduped file ID to existing DB ID for dependency resolution const existing = existingTasks.find(et => et.name === t.title); if (existing) fileIdToDbId.set(t.id, existing.id); continue; } try { const created = await this.taskRepository.create({ initiativeId, phaseId, parentTaskId: agent.taskId ?? null, name: t.title, description: t.body ?? undefined, category: (t.category as any) ?? 'execute', type: (t.type as any) ?? 'auto', }); fileIdToDbId.set(t.id, created.id); existingNames.add(t.title); // prevent dupes within same agent output entries.push({ entityType: 'task', entityId: created.id, action: 'create', newState: JSON.stringify(created), sortOrder: i, }); this.eventBus?.emit({ type: 'task:completed' as const, timestamp: new Date(), payload: { taskId: created.id, agentId, success: true, message: 'Task created by detail' }, }); } catch (err) { log.warn({ agentId, task: t.title, err: err instanceof Error ? err.message : String(err) }, 'failed to create task'); } } // Second pass: create task dependencies let depSortOrder = entries.length; for (const t of tasks) { const taskDbId = fileIdToDbId.get(t.id); if (!taskDbId || t.dependencies.length === 0) continue; for (const depFileId of t.dependencies) { const depDbId = fileIdToDbId.get(depFileId); if (!depDbId) continue; try { await this.taskRepository.createDependency(taskDbId, depDbId); entries.push({ entityType: 'task_dependency', entityId: `${taskDbId}:${depDbId}`, action: 'create', newState: JSON.stringify({ taskId: taskDbId, dependsOnTaskId: depDbId }), sortOrder: depSortOrder++, }); } catch (err) { log.warn({ agentId, taskDbId, depFileId, err: err instanceof Error ? err.message : String(err) }, 'failed to create task dependency'); } } } if (entries.length > 0) { try { const cs = await this.changeSetRepository!.createWithEntries({ agentId, agentName: agent.name, initiativeId, mode: 'detail', summary: summary?.body ?? `Created ${tasks.length} tasks`, }, entries); this.eventBus?.emit({ type: 'changeset:created' as const, timestamp: new Date(), payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'detail', entryCount: entries.length }, }); } catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record change set after successful writes'); } } resultMessage = summary?.body ?? `${tasks.length} tasks created`; } else { resultMessage = JSON.stringify({ summary: summary?.body, tasks }); } break; } case 'discuss': { const decisions = readDecisionFiles(agentWorkdir); resultMessage = JSON.stringify({ summary: summary?.body, decisions }); break; } case 'refine': { const pages = readPageFiles(agentWorkdir); if (canWriteChangeSets && this.pageRepository && pages.length > 0) { const entries: CreateChangeSetEntryData[] = []; for (const [i, p] of pages.entries()) { try { if (!p.pageId) continue; const existing = await this.pageRepository.findById(p.pageId); if (!existing) { log.warn({ agentId, pageId: p.pageId }, 'page not found for refine update'); continue; } const previousState = JSON.stringify(existing); const tiptapJson = markdownToTiptapJson(p.body || ''); await this.pageRepository.update(p.pageId, { content: JSON.stringify(tiptapJson), title: p.title, }); const updated = await this.pageRepository.findById(p.pageId); entries.push({ entityType: 'page', entityId: p.pageId, action: 'update', previousState, newState: JSON.stringify(updated), sortOrder: i, }); this.eventBus?.emit({ type: 'page:updated' as const, timestamp: new Date(), payload: { pageId: p.pageId, initiativeId, title: p.title }, }); } catch (err) { log.warn({ agentId, pageId: p.pageId, err: err instanceof Error ? err.message : String(err) }, 'failed to update page'); } } if (entries.length > 0) { try { const cs = await this.changeSetRepository!.createWithEntries({ agentId, agentName: agent.name, initiativeId, mode: 'refine', summary: summary?.body ?? `Updated ${entries.length} pages`, }, entries); this.eventBus?.emit({ type: 'changeset:created' as const, timestamp: new Date(), payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'refine', entryCount: entries.length }, }); } catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record change set after successful writes'); } } resultMessage = summary?.body ?? `${entries.length} pages updated`; } else { resultMessage = JSON.stringify({ summary: summary?.body, pages }); } break; } } const resultPayload: AgentResult = { success: true, message: resultMessage, filesModified: summary?.filesModified, }; await this.repository.update(agentId, { result: JSON.stringify(resultPayload), status: 'idle' }); const reason = this.getStoppedReason(mode); if (this.eventBus) { const event: AgentStoppedEvent = { type: 'agent:stopped', timestamp: new Date(), payload: { agentId, name: agent.name, taskId: agent.taskId ?? '', reason, }, }; this.eventBus.emit(event); } return; } /** * Handle questions signal from agent. */ async handleQuestions( agentId: string, agent: { id: string; name: string; taskId: string | null; sessionId: string | null }, questions: QuestionItem[], sessionId?: string, ): Promise { const questionsPayload: PendingQuestions = { questions }; await this.repository.update(agentId, { pendingQuestions: JSON.stringify(questionsPayload), status: 'waiting_for_input' }); if (this.eventBus) { const event: AgentWaitingEvent = { type: 'agent:waiting', timestamp: new Date(), payload: { agentId, name: agent.name, taskId: agent.taskId ?? '', sessionId: sessionId ?? agent.sessionId ?? '', questions, }, }; this.eventBus.emit(event); } } /** * Handle error signal from agent. */ async handleSignalError( agentId: string, agent: { id: string; name: string; taskId: string | null }, error: string, ): Promise { const errorResult: AgentResult = { success: false, message: error }; await this.repository.update(agentId, { result: JSON.stringify(errorResult), status: 'crashed' }); this.emitCrashed(agent, error); } /** * Map agent mode to stopped event reason. */ getStoppedReason(mode: AgentMode): AgentStoppedEvent['payload']['reason'] { switch (mode) { case 'discuss': return 'context_complete'; case 'plan': return 'plan_complete'; case 'detail': return 'detail_complete'; case 'refine': return 'refine_complete'; default: return 'task_complete'; } } /** * Process raw output from an agent (from file or direct). */ async processAgentOutput( agentId: string, rawOutput: string, provider: AgentProviderConfig, getAgentWorkdir: (alias: string) => string, ): Promise { const agent = await this.repository.findById(agentId); if (!agent) return; // Extract session ID using provider's extraction config let sessionId: string | null = null; if (provider.sessionId) { const outputLines = rawOutput.trim().split('\n'); if (provider.sessionId.extractFrom === 'result') { // Find the result line in JSONL output for (const line of outputLines) { try { const parsed = JSON.parse(line); if (parsed.type === 'result' || parsed[provider.sessionId.field]) { sessionId = parsed[provider.sessionId.field] ?? null; if (sessionId) break; } } catch { /* intentional: skip non-JSON JSONL lines */ } } } else if (provider.sessionId.extractFrom === 'event') { for (const line of outputLines) { try { const event = JSON.parse(line); if (event.type === provider.sessionId.eventType) { sessionId = event[provider.sessionId.field] ?? null; } } catch { /* intentional: skip non-JSON JSONL lines */ } } } } if (sessionId) { await this.repository.update(agentId, { sessionId }); } log.debug({ agentId, provider: provider.name, hasSessionId: !!sessionId }, 'processing agent output'); if (provider.name === 'claude') { // rawOutput may be a single JSON object or multi-line JSONL — find the result line let cliResult: ClaudeCliResult | null = null; const lines = rawOutput.trim().split('\n'); for (const line of lines) { try { const parsed = JSON.parse(line); if (parsed.type === 'result') { cliResult = parsed; } } catch { /* intentional: skip non-JSON JSONL lines */ } } if (!cliResult) { log.error({ agentId }, 'no result event found in agent output'); await this.handleAgentError(agentId, new Error('No result event in output'), provider, getAgentWorkdir); return; } // Handle error results (auth failure, usage limits, etc.) if (cliResult.is_error) { log.warn({ agentId, error: cliResult.result }, 'agent returned error result from file'); await this.handleAgentError(agentId, new Error(cliResult.result), provider, getAgentWorkdir); return; } let signalText: string; try { const signal = cliResult.structured_output ?? JSON.parse(cliResult.result); signalText = JSON.stringify(signal); } catch (parseErr) { log.error({ agentId, err: parseErr instanceof Error ? parseErr.message : String(parseErr) }, 'failed to parse agent signal from result'); await this.handleAgentError(agentId, new Error('Failed to parse agent signal'), provider, getAgentWorkdir); return; } await this.processSignalAndFiles(agentId, signalText, agent.mode as AgentMode, getAgentWorkdir, sessionId ?? undefined); } else { await this.processSignalAndFiles(agentId, rawOutput, agent.mode as AgentMode, getAgentWorkdir, sessionId ?? undefined); } } /** * Handle agent errors. Detects usage limit exhaustion patterns. * Returns true if error was an exhaustion error (caller should attempt failover). */ async handleAgentError( agentId: string, error: unknown, provider: AgentProviderConfig, _getAgentWorkdir: (alias: string) => string, ): Promise { const errorMessage = error instanceof Error ? error.message : String(error); const agent = await this.repository.findById(agentId); if (!agent) return; log.error({ agentId, err: errorMessage }, 'agent error'); const errorResult: AgentResult = { success: false, message: errorMessage, }; await this.repository.update(agentId, { status: 'crashed', result: JSON.stringify(errorResult) }); if (this.eventBus) { const event: AgentCrashedEvent = { type: 'agent:crashed', timestamp: new Date(), payload: { agentId, name: agent.name, taskId: agent.taskId ?? '', error: errorMessage, }, }; this.eventBus.emit(event); } } /** * Format answers map as structured prompt. * Handles special __instruction__ key for retry scenarios. */ formatAnswersAsPrompt(answers: Record): string { const instruction = answers['__instruction__']; const realAnswers = { ...answers }; delete realAnswers['__instruction__']; const lines = Object.entries(realAnswers).map( ([questionId, answer]) => `[${questionId}]: ${answer}`, ); const basePrompt = `Here are my answers to your questions:\n${lines.join('\n')}`; return instruction ? `${instruction.trim()}\n\n${basePrompt}` : basePrompt; } /** * Get the result of an agent's work. */ async getResult(agentId: string, active?: ActiveAgent): Promise { if (active?.result) return active.result; const agent = await this.repository.findById(agentId); return agent?.result ? JSON.parse(agent.result) : null; } /** * Get pending questions for an agent waiting for input. */ async getPendingQuestions(agentId: string, active?: ActiveAgent): Promise { if (active?.pendingQuestions) return active.pendingQuestions; const agent = await this.repository.findById(agentId); return agent?.pendingQuestions ? JSON.parse(agent.pendingQuestions) : null; } // ========================================================================= // Private Helpers // ========================================================================= /** * Read signal.json and return its content if the agent completed successfully. * Uses SignalManager for atomic read-and-validate when available. * Returns the raw JSON string on success, null if missing/invalid. */ private async readSignalCompletion(agentWorkdir: string): Promise { // Prefer SignalManager (unified implementation with proper validation) if (this.signalManager) { const signal = await this.signalManager.readSignal(agentWorkdir); return signal ? JSON.stringify(signal) : null; } // Fallback: inline read (for tests that don't inject SignalManager) try { const signalPath = join(agentWorkdir, '.cw/output/signal.json'); if (!existsSync(signalPath)) return null; const signalContent = await readFile(signalPath, 'utf-8'); const signal = JSON.parse(signalContent); if (signal.status === 'done' || signal.status === 'questions' || signal.status === 'error') { return signalContent; } return null; } catch (err) { log.debug({ agentWorkdir, err: err instanceof Error ? err.message : String(err) }, 'failed to read or parse signal.json'); return null; } } private emitCrashed(agent: { id: string; name: string; taskId: string | null }, error: string): void { if (this.eventBus) { const event: AgentCrashedEvent = { type: 'agent:crashed', timestamp: new Date(), payload: { agentId: agent.id, name: agent.name, taskId: agent.taskId ?? '', error, }, }; this.eventBus.emit(event); } } }