Files
Codewalkers/apps/server/agent/output-handler.ts
Lukas May 73a4c6cb0c fix: Convert sync file I/O to async in read path to unblock event loop
readFrontmatterFile, readFrontmatterDir, readSummary, readPhaseFiles,
readTaskFiles, readDecisionFiles, and readPageFiles all used readFileSync
and readdirSync which block the Node.js event loop during agent completion
handling. Converted to async using readFile/readdir from fs/promises and
added await at all call sites in output-handler.ts.
2026-03-04 12:25:34 +01:00

1152 lines
46 KiB
TypeScript

/**
* OutputHandler — Stream event processing, signal parsing, file reading, result capture.
*
* Extracted from MultiProviderAgentManager. Processes all output from agent
* subprocesses: stream events, agent signals, output files, and result/question
* retrieval.
*/
import { readFile } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import { join } from 'node:path';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { ChangeSetRepository, CreateChangeSetEntryData } from '../db/repositories/change-set-repository.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { PageRepository } from '../db/repositories/page-repository.js';
import type { ChatSessionRepository } from '../db/repositories/chat-session-repository.js';
import type {
EventBus,
AgentStoppedEvent,
AgentCrashedEvent,
AgentWaitingEvent,
} from '../events/index.js';
import type {
AgentResult,
AgentMode,
PendingQuestions,
QuestionItem,
} from './types.js';
import type { StreamEvent } from './providers/parsers/index.js';
import type { AgentProviderConfig } from './providers/types.js';
import { agentSignalSchema } from './schema.js';
import {
readSummary,
readPhaseFiles,
readTaskFiles,
readDecisionFiles,
readPageFiles,
readFrontmatterFile,
} from './file-io.js';
import { getProvider } from './providers/registry.js';
import { markdownToTiptapJson } from './markdown-to-tiptap.js';
import type { SignalManager } from './lifecycle/signal-manager.js';
import { createModuleLogger } from '../logger/index.js';
const log = createModuleLogger('output-handler');
/**
* Tracks an active agent with its PID and file tailer.
*/
export interface ActiveAgent {
agentId: string;
pid: number;
tailer: import('./file-tailer.js').FileTailer;
outputFilePath: string;
/** Actual working directory the agent process runs in (may differ from getAgentWorkdir for standalone agents) */
agentCwd?: string;
result?: AgentResult;
pendingQuestions?: PendingQuestions;
streamResultText?: string;
streamSessionId?: string;
streamCostUsd?: number;
/** True when the stream result indicated an error (e.g. auth failure) */
streamIsError?: boolean;
/** Cancel handle for polling timer — call to stop polling on cleanup */
cancelPoll?: () => void;
}
/**
* Result structure from Claude CLI with --output-format json.
*/
interface ClaudeCliResult {
type: 'result';
subtype: 'success' | 'error';
is_error: boolean;
session_id: string;
result: string;
structured_output?: unknown;
total_cost_usd?: number;
}
export class OutputHandler {
private filePositions = new Map<string, number>();
private completionLocks = new Set<string>(); // Track agents currently being processed
constructor(
private repository: AgentRepository,
private eventBus?: EventBus,
private changeSetRepository?: ChangeSetRepository,
private phaseRepository?: PhaseRepository,
private taskRepository?: TaskRepository,
private pageRepository?: PageRepository,
private signalManager?: SignalManager,
private chatSessionRepository?: ChatSessionRepository,
) {}
/**
* Validate that a signal file is complete and properly formatted.
*/
private async validateSignalFile(filePath: string): Promise<boolean> {
try {
const content = await readFile(filePath, 'utf-8');
const trimmed = content.trim();
if (!trimmed) return false;
// Check if JSON is complete (ends with } or ])
const endsCorrectly = trimmed.endsWith('}') || trimmed.endsWith(']');
if (!endsCorrectly) return false;
// Try to parse as JSON to ensure it's valid
JSON.parse(trimmed);
return true;
} catch {
return false;
}
}
/**
* Read complete lines from a file, avoiding partial lines that might still be writing.
* This eliminates race conditions when agents are still writing output.
*/
private async readCompleteLines(filePath: string, fromPosition: number = 0): Promise<{ content: string; lastPosition: number }> {
try {
const content = await readFile(filePath, 'utf-8');
if (fromPosition >= content.length) {
return { content: '', lastPosition: fromPosition };
}
// Get content from our last read position
const newContent = content.slice(fromPosition);
// Split into lines
const lines = newContent.split('\n');
// If file doesn't end with newline, last element is potentially incomplete
// Only process complete lines (all but the last, unless file ends with \n)
const hasTrailingNewline = newContent.endsWith('\n');
const completeLines = hasTrailingNewline ? lines : lines.slice(0, -1);
// Calculate new position (only count complete lines)
const completeLinesContent = completeLines.join('\n') + (completeLines.length > 0 && hasTrailingNewline ? '\n' : '');
const newPosition = fromPosition + Buffer.byteLength(completeLinesContent, 'utf-8');
return {
content: completeLinesContent,
lastPosition: newPosition
};
} catch (err) {
log.debug({ filePath, err: err instanceof Error ? err.message : String(err) }, 'failed to read output file lines');
return { content: '', lastPosition: fromPosition };
}
}
/**
* Handle a standardized stream event from a parser.
*/
handleStreamEvent(
agentId: string,
event: StreamEvent,
active: ActiveAgent | undefined,
): void {
switch (event.type) {
case 'init':
if (active && event.sessionId) {
active.streamSessionId = event.sessionId;
this.repository.update(agentId, { sessionId: event.sessionId }).catch((err) => {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to update session ID');
});
}
break;
case 'text_delta':
// Text deltas are now streamed via DB log chunks + EventBus in manager.createLogChunkCallback
break;
case 'tool_use_start':
log.debug({ agentId, tool: event.name, toolId: event.id }, 'tool use started');
break;
case 'result':
if (active) {
active.streamResultText = event.text;
active.streamCostUsd = event.costUsd;
active.streamIsError = event.isError === true;
if (!active.streamSessionId && event.sessionId) {
active.streamSessionId = event.sessionId;
}
}
break;
case 'error':
log.error({ agentId, error: event.message }, 'stream error event');
break;
case 'turn_end':
log.debug({ agentId, stopReason: event.stopReason }, 'turn ended');
break;
}
}
/**
* Handle completion of a detached agent.
* Processes the final result from the stream data captured by the tailer.
*
* RACE CONDITION FIX: Uses a completion lock to prevent duplicate processing.
* Both the polling handler (handleDetachedAgentCompletion) and crash handler
* (handleProcessCrashed) can call this method when a process exits with non-zero code.
* The mutex ensures only one handler processes the completion per agent.
*/
async handleCompletion(
agentId: string,
active: ActiveAgent | undefined,
getAgentWorkdir: (alias: string) => string,
): Promise<void> {
// CRITICAL: Prevent race condition - only one completion handler per agent
if (this.completionLocks.has(agentId)) {
log.debug({ agentId }, 'completion already being processed - skipping duplicate');
return;
}
this.completionLocks.add(agentId);
try {
const agent = await this.repository.findById(agentId);
if (!agent) return;
const provider = getProvider(agent.provider);
if (!provider) return;
log.debug({ agentId }, 'detached agent completed');
// Resolve actual agent working directory — standalone agents run in a
// "workspace/" subdirectory inside getAgentWorkdir, so prefer agentCwd
// recorded at spawn time when available.
const agentWorkdir = active?.agentCwd ?? getAgentWorkdir(agent.worktreeId);
const outputDir = join(agentWorkdir, '.cw', 'output');
const expectedPwdFile = join(agentWorkdir, '.cw', 'expected-pwd.txt');
const diagnosticFile = join(agentWorkdir, '.cw', 'spawn-diagnostic.json');
const outputDirExists = existsSync(outputDir);
const expectedPwdExists = existsSync(expectedPwdFile);
const diagnosticExists = existsSync(diagnosticFile);
log.info({
agentId,
agentWorkdir,
outputDirExists,
expectedPwdExists,
diagnosticExists,
verification: outputDirExists ? 'PASS' : 'FAIL'
}, 'agent workdir verification completed');
if (!outputDirExists) {
log.warn({
agentId,
agentWorkdir
}, 'No output files found in agent workdir! Agent may have run in wrong location.');
}
let signalText = active?.streamResultText;
// If the stream result indicated an error (e.g. auth failure, usage limit),
// route directly to error handling instead of trying to parse as signal JSON
if (signalText && active?.streamIsError) {
log.warn({ agentId, error: signalText }, 'agent returned error result');
await this.handleAgentError(agentId, new Error(signalText), provider, getAgentWorkdir);
return;
}
if (!signalText) {
try {
const outputFilePath = active?.outputFilePath ?? '';
if (outputFilePath) {
// First, check for robust signal.json completion before attempting incremental reading
log.debug({ agentId, worktreeId: agent.worktreeId, agentWorkdir }, 'checking signal completion');
const hasSignalCompletion = await this.readSignalCompletion(agentWorkdir);
log.debug({ agentId, agentWorkdir, hasSignalCompletion }, 'signal completion check result');
if (hasSignalCompletion) {
const signalPath = join(agentWorkdir, '.cw/output/signal.json');
const signalContent = await readFile(signalPath, 'utf-8');
log.debug({ agentId, signalPath }, 'detected completion via signal.json, processing');
await this.processSignalAndFiles(agentId, signalContent, agent.mode as AgentMode, getAgentWorkdir, active?.streamSessionId);
return;
} else {
log.debug({ agentId, agentWorkdir }, 'no signal completion found, proceeding with raw output');
}
// Read only complete lines from the file, avoiding race conditions
const lastPosition = this.filePositions.get(agentId) || 0;
const { content: fileContent, lastPosition: newPosition } = await this.readCompleteLines(outputFilePath, lastPosition);
if (fileContent.trim()) {
this.filePositions.set(agentId, newPosition);
await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir);
return;
}
// If no new complete lines, but file might still be writing, try again with validation
if (await this.validateSignalFile(outputFilePath)) {
const fullContent = await readFile(outputFilePath, 'utf-8');
if (fullContent.trim() && fullContent.length > newPosition) {
// File is complete and has content beyond what we've read
await this.processAgentOutput(agentId, fullContent, provider, getAgentWorkdir);
return;
}
}
}
} catch { /* file empty or missing */ }
log.debug({ agentId }, 'no result from stream or file, marking as error');
await this.handleAgentError(agentId, new Error('No output received'), provider, getAgentWorkdir);
return;
}
// Check for signal.json file first, then fall back to stream text
if (await this.readSignalCompletion(agentWorkdir)) {
const signalPath = join(agentWorkdir, '.cw/output/signal.json');
const signalContent = await readFile(signalPath, 'utf-8');
log.debug({ agentId, signalPath }, 'using signal.json content for completion');
await this.processSignalAndFiles(agentId, signalContent, agent.mode as AgentMode, getAgentWorkdir, active?.streamSessionId);
} else {
log.debug({ agentId }, 'using stream text for completion (no signal.json found)');
await this.processSignalAndFiles(
agentId,
signalText,
agent.mode as AgentMode,
getAgentWorkdir,
active?.streamSessionId,
);
}
} finally {
this.completionLocks.delete(agentId);
this.filePositions.delete(agentId);
}
}
/**
* Process agent signal JSON and read output files.
* Universal handler for all providers and modes.
*/
async processSignalAndFiles(
agentId: string,
signalText: string,
mode: AgentMode,
getAgentWorkdir: (alias: string) => string,
sessionId?: string,
): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) return;
let signal;
let parsed;
// Step 1: JSON parsing
try {
parsed = JSON.parse(signalText.trim());
log.debug({ agentId }, 'signal JSON parsing successful');
} catch (jsonError) {
log.error({
agentId,
signalText: signalText.trim(),
error: jsonError instanceof Error ? jsonError.message : String(jsonError),
stack: jsonError instanceof Error ? jsonError.stack : undefined
}, 'signal JSON parsing failed');
await this.repository.update(agentId, { status: 'crashed' });
this.emitCrashed(agent, 'Failed to parse agent signal JSON');
return;
}
// Step 2: Schema validation
try {
signal = agentSignalSchema.parse(parsed);
log.debug({ agentId, signalStatus: signal.status }, 'signal schema validation passed');
} catch (schemaError) {
log.error({
agentId,
signalText: signalText.trim(),
parsed,
error: schemaError instanceof Error ? schemaError.message : String(schemaError),
stack: schemaError instanceof Error ? schemaError.stack : undefined
}, 'signal schema validation failed');
await this.repository.update(agentId, { status: 'crashed' });
this.emitCrashed(agent, 'Failed to validate agent signal schema');
return;
}
switch (signal.status) {
case 'done':
await this.processOutputFiles(agentId, agent, mode, getAgentWorkdir);
break;
case 'questions':
// Chat mode: process output files before handling questions
if (mode === 'chat') {
await this.processOutputFiles(agentId, agent, mode, getAgentWorkdir);
}
await this.handleQuestions(agentId, agent, signal.questions, sessionId);
break;
case 'error':
await this.handleSignalError(agentId, agent, signal.error);
break;
}
}
/**
* Process output files from agent workdir after successful completion.
* Performs direct writes to entities and records change sets.
*/
private async processOutputFiles(
agentId: string,
agent: { id: string; name: string; taskId: string | null; worktreeId: string; mode: string; initiativeId?: string | null },
mode: AgentMode,
getAgentWorkdir: (alias: string) => string,
): Promise<void> {
const agentWorkdir = getAgentWorkdir(agent.worktreeId);
const summary = await readSummary(agentWorkdir);
const initiativeId = agent.initiativeId;
const canWriteChangeSets = this.changeSetRepository && initiativeId;
let resultMessage = summary?.body ?? 'Task completed';
switch (mode) {
case 'plan': {
const phases = await readPhaseFiles(agentWorkdir);
if (canWriteChangeSets && this.phaseRepository && phases.length > 0) {
const entries: CreateChangeSetEntryData[] = [];
// First pass: create phases
for (const [i, p] of phases.entries()) {
try {
const tiptapContent = p.body ? JSON.stringify(markdownToTiptapJson(p.body)) : undefined;
const created = await this.phaseRepository.create({
id: p.id ?? undefined,
initiativeId,
name: p.title,
content: tiptapContent,
});
entries.push({
entityType: 'phase',
entityId: created.id,
action: 'create',
newState: JSON.stringify(created),
sortOrder: i,
});
this.eventBus?.emit({
type: 'phase:started' as const,
timestamp: new Date(),
payload: { phaseId: created.id, initiativeId },
});
} catch (err) {
log.warn({ agentId, phase: p.title, err: err instanceof Error ? err.message : String(err) }, 'failed to create phase');
}
}
// Second pass: create phase dependencies
let depSortOrder = entries.length;
for (const p of phases) {
const phaseId = p.id;
if (!phaseId || !Array.isArray(p.dependencies)) continue;
for (const depFileId of p.dependencies) {
try {
await this.phaseRepository.createDependency(phaseId, depFileId);
entries.push({
entityType: 'phase_dependency',
entityId: `${phaseId}:${depFileId}`,
action: 'create',
newState: JSON.stringify({ phaseId, dependsOnPhaseId: depFileId }),
sortOrder: depSortOrder++,
});
} catch (err) {
log.warn({ agentId, phaseId, depFileId, err: err instanceof Error ? err.message : String(err) }, 'failed to create phase dependency');
}
}
}
if (entries.length > 0) {
try {
const cs = await this.changeSetRepository!.createWithEntries({
agentId,
agentName: agent.name,
initiativeId,
mode: 'plan',
summary: summary?.body ?? `Created ${phases.length} phases`,
}, entries);
this.eventBus?.emit({
type: 'changeset:created' as const,
timestamp: new Date(),
payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'plan', entryCount: entries.length },
});
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record change set after successful writes');
}
}
resultMessage = summary?.body ?? `${phases.length} phases created`;
} else {
resultMessage = JSON.stringify({ summary: summary?.body, phases });
}
break;
}
case 'detail': {
const tasks = await readTaskFiles(agentWorkdir);
if (canWriteChangeSets && this.taskRepository && tasks.length > 0) {
const phaseInput = await readFrontmatterFile(join(agentWorkdir, '.cw', 'input', 'phase.md'));
const phaseId = (phaseInput?.data?.id as string) ?? null;
const entries: CreateChangeSetEntryData[] = [];
const fileIdToDbId = new Map<string, string>();
// Load existing tasks for dedup — prevents duplicates when multiple agents finish concurrently
const existingTasks = phaseId ? await this.taskRepository.findByPhaseId(phaseId) : [];
const existingNames = new Set(existingTasks.map(t => t.name));
for (const [i, t] of tasks.entries()) {
if (existingNames.has(t.title)) {
log.info({ agentId, task: t.title, phaseId }, 'skipped duplicate task');
// Map deduped file ID to existing DB ID for dependency resolution
const existing = existingTasks.find(et => et.name === t.title);
if (existing) fileIdToDbId.set(t.id, existing.id);
continue;
}
try {
const created = await this.taskRepository.create({
initiativeId,
phaseId,
parentTaskId: agent.taskId ?? null,
name: t.title,
description: t.body ?? undefined,
category: (t.category as any) ?? 'execute',
type: (t.type as any) ?? 'auto',
});
fileIdToDbId.set(t.id, created.id);
existingNames.add(t.title); // prevent dupes within same agent output
entries.push({
entityType: 'task',
entityId: created.id,
action: 'create',
newState: JSON.stringify(created),
sortOrder: i,
});
this.eventBus?.emit({
type: 'task:completed' as const,
timestamp: new Date(),
payload: { taskId: created.id, agentId, success: true, message: 'Task created by detail' },
});
} catch (err) {
log.warn({ agentId, task: t.title, err: err instanceof Error ? err.message : String(err) }, 'failed to create task');
}
}
// Second pass: create task dependencies
let depSortOrder = entries.length;
for (const t of tasks) {
const taskDbId = fileIdToDbId.get(t.id);
if (!taskDbId || t.dependencies.length === 0) continue;
for (const depFileId of t.dependencies) {
const depDbId = fileIdToDbId.get(depFileId);
if (!depDbId) continue;
try {
await this.taskRepository.createDependency(taskDbId, depDbId);
entries.push({
entityType: 'task_dependency',
entityId: `${taskDbId}:${depDbId}`,
action: 'create',
newState: JSON.stringify({ taskId: taskDbId, dependsOnTaskId: depDbId }),
sortOrder: depSortOrder++,
});
} catch (err) {
log.warn({ agentId, taskDbId, depFileId, err: err instanceof Error ? err.message : String(err) }, 'failed to create task dependency');
}
}
}
if (entries.length > 0) {
try {
const cs = await this.changeSetRepository!.createWithEntries({
agentId,
agentName: agent.name,
initiativeId,
mode: 'detail',
summary: summary?.body ?? `Created ${tasks.length} tasks`,
}, entries);
this.eventBus?.emit({
type: 'changeset:created' as const,
timestamp: new Date(),
payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'detail', entryCount: entries.length },
});
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record change set after successful writes');
}
}
resultMessage = summary?.body ?? `${tasks.length} tasks created`;
} else {
resultMessage = JSON.stringify({ summary: summary?.body, tasks });
}
break;
}
case 'discuss': {
const decisions = await readDecisionFiles(agentWorkdir);
resultMessage = JSON.stringify({ summary: summary?.body, decisions });
break;
}
case 'refine': {
const pages = await readPageFiles(agentWorkdir);
if (canWriteChangeSets && this.pageRepository && pages.length > 0) {
const entries: CreateChangeSetEntryData[] = [];
for (const [i, p] of pages.entries()) {
try {
if (!p.pageId) continue;
const existing = await this.pageRepository.findById(p.pageId);
if (!existing) {
log.warn({ agentId, pageId: p.pageId }, 'page not found for refine update');
continue;
}
const previousState = JSON.stringify(existing);
const tiptapJson = markdownToTiptapJson(p.body || '');
await this.pageRepository.update(p.pageId, {
content: JSON.stringify(tiptapJson),
title: p.title,
});
const updated = await this.pageRepository.findById(p.pageId);
entries.push({
entityType: 'page',
entityId: p.pageId,
action: 'update',
previousState,
newState: JSON.stringify(updated),
sortOrder: i,
});
this.eventBus?.emit({
type: 'page:updated' as const,
timestamp: new Date(),
payload: { pageId: p.pageId, initiativeId, title: p.title },
});
} catch (err) {
log.warn({ agentId, pageId: p.pageId, err: err instanceof Error ? err.message : String(err) }, 'failed to update page');
}
}
if (entries.length > 0) {
try {
const cs = await this.changeSetRepository!.createWithEntries({
agentId,
agentName: agent.name,
initiativeId,
mode: 'refine',
summary: summary?.body ?? `Updated ${entries.length} pages`,
}, entries);
this.eventBus?.emit({
type: 'changeset:created' as const,
timestamp: new Date(),
payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'refine', entryCount: entries.length },
});
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record change set after successful writes');
}
}
resultMessage = summary?.body ?? `${entries.length} pages updated`;
} else {
resultMessage = JSON.stringify({ summary: summary?.body, pages });
}
break;
}
case 'chat': {
const chatPhases = await readPhaseFiles(agentWorkdir);
const chatTasks = await readTaskFiles(agentWorkdir);
const chatPages = await readPageFiles(agentWorkdir);
if (canWriteChangeSets) {
const entries: CreateChangeSetEntryData[] = [];
let sortOrd = 0;
// Process phases
if (this.phaseRepository) {
for (const p of chatPhases) {
try {
const action = p.action ?? 'create';
if (action === 'create') {
const tiptapContent = p.body ? JSON.stringify(markdownToTiptapJson(p.body)) : undefined;
const created = await this.phaseRepository.create({
id: p.id ?? undefined,
initiativeId: initiativeId!,
name: p.title,
content: tiptapContent,
});
entries.push({ entityType: 'phase', entityId: created.id, action: 'create', newState: JSON.stringify(created), sortOrder: sortOrd++ });
} else if (action === 'update') {
const existing = await this.phaseRepository.findById(p.id);
if (!existing) continue;
const previousState = JSON.stringify(existing);
const tiptapContent = p.body ? JSON.stringify(markdownToTiptapJson(p.body)) : undefined;
await this.phaseRepository.update(p.id, { name: p.title, content: tiptapContent });
const updated = await this.phaseRepository.findById(p.id);
entries.push({ entityType: 'phase', entityId: p.id, action: 'update', previousState, newState: JSON.stringify(updated), sortOrder: sortOrd++ });
} else if (action === 'delete') {
const existing = await this.phaseRepository.findById(p.id);
if (!existing) continue;
const previousState = JSON.stringify(existing);
await this.phaseRepository.delete(p.id);
entries.push({ entityType: 'phase', entityId: p.id, action: 'delete', previousState, sortOrder: sortOrd++ });
}
} catch (err) {
log.warn({ agentId, phase: p.title, action: p.action, err: err instanceof Error ? err.message : String(err) }, 'failed to process chat phase');
}
}
}
// Process tasks — two pass (create first, then deps)
if (this.taskRepository) {
const fileIdToDbId = new Map<string, string>();
for (const t of chatTasks) {
try {
const action = t.action ?? 'create';
if (action === 'create') {
const created = await this.taskRepository.create({
initiativeId: initiativeId!,
phaseId: t.phaseId ?? null,
parentTaskId: t.parentTaskId ?? null,
name: t.title,
description: t.body ?? undefined,
category: (t.category as any) ?? 'execute',
type: (t.type as any) ?? 'auto',
});
fileIdToDbId.set(t.id, created.id);
entries.push({ entityType: 'task', entityId: created.id, action: 'create', newState: JSON.stringify(created), sortOrder: sortOrd++ });
} else if (action === 'update') {
const existing = await this.taskRepository.findById(t.id);
if (!existing) continue;
const previousState = JSON.stringify(existing);
await this.taskRepository.update(t.id, {
name: t.title,
description: t.body ?? undefined,
category: (t.category as any) ?? existing.category,
type: (t.type as any) ?? existing.type,
});
const updated = await this.taskRepository.findById(t.id);
fileIdToDbId.set(t.id, t.id);
entries.push({ entityType: 'task', entityId: t.id, action: 'update', previousState, newState: JSON.stringify(updated), sortOrder: sortOrd++ });
} else if (action === 'delete') {
const existing = await this.taskRepository.findById(t.id);
if (!existing) continue;
const previousState = JSON.stringify(existing);
await this.taskRepository.delete(t.id);
entries.push({ entityType: 'task', entityId: t.id, action: 'delete', previousState, sortOrder: sortOrd++ });
}
} catch (err) {
log.warn({ agentId, task: t.title, action: t.action, err: err instanceof Error ? err.message : String(err) }, 'failed to process chat task');
}
}
// Second pass: deps for created tasks
for (const t of chatTasks) {
if (t.action !== 'create' || t.dependencies.length === 0) continue;
const taskDbId = fileIdToDbId.get(t.id);
if (!taskDbId) continue;
for (const depFileId of t.dependencies) {
const depDbId = fileIdToDbId.get(depFileId) ?? depFileId;
try {
await this.taskRepository.createDependency(taskDbId, depDbId);
entries.push({ entityType: 'task_dependency', entityId: `${taskDbId}:${depDbId}`, action: 'create', newState: JSON.stringify({ taskId: taskDbId, dependsOnTaskId: depDbId }), sortOrder: sortOrd++ });
} catch (err) {
log.warn({ agentId, taskDbId, depFileId, err: err instanceof Error ? err.message : String(err) }, 'failed to create chat task dependency');
}
}
}
}
// Process pages
if (this.pageRepository) {
for (const p of chatPages) {
try {
const action = p.action ?? 'create';
if (action === 'create') {
const tiptapJson = markdownToTiptapJson(p.body || '');
const created = await this.pageRepository.create({
id: p.pageId ?? undefined,
initiativeId: initiativeId!,
title: p.title,
content: JSON.stringify(tiptapJson),
});
entries.push({ entityType: 'page', entityId: created.id, action: 'create', newState: JSON.stringify(created), sortOrder: sortOrd++ });
} else if (action === 'update') {
const existing = await this.pageRepository.findById(p.pageId);
if (!existing) continue;
const previousState = JSON.stringify(existing);
const tiptapJson = markdownToTiptapJson(p.body || '');
await this.pageRepository.update(p.pageId, { content: JSON.stringify(tiptapJson), title: p.title });
const updated = await this.pageRepository.findById(p.pageId);
entries.push({ entityType: 'page', entityId: p.pageId, action: 'update', previousState, newState: JSON.stringify(updated), sortOrder: sortOrd++ });
} else if (action === 'delete') {
const existing = await this.pageRepository.findById(p.pageId);
if (!existing) continue;
const previousState = JSON.stringify(existing);
await this.pageRepository.delete(p.pageId);
entries.push({ entityType: 'page', entityId: p.pageId, action: 'delete', previousState, sortOrder: sortOrd++ });
}
} catch (err) {
log.warn({ agentId, pageId: p.pageId, action: p.action, err: err instanceof Error ? err.message : String(err) }, 'failed to process chat page');
}
}
}
// Create change set
let changeSetId: string | null = null;
if (entries.length > 0) {
try {
const cs = await this.changeSetRepository!.createWithEntries({
agentId,
agentName: agent.name,
initiativeId: initiativeId!,
mode: 'chat' as 'plan' | 'detail' | 'refine',
summary: summary?.body ?? `Chat: ${entries.length} changes applied`,
}, entries);
changeSetId = cs.id;
this.eventBus?.emit({
type: 'changeset:created' as const,
timestamp: new Date(),
payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'chat', entryCount: entries.length },
});
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record chat change set');
}
}
// Store assistant message in chat session
if (this.chatSessionRepository) {
try {
const session = await this.chatSessionRepository.findActiveSessionByAgentId(agentId);
if (session) {
const assistantContent = summary?.body ?? `Applied ${entries.length} changes`;
await this.chatSessionRepository.createMessage({
chatSessionId: session.id,
role: 'assistant',
content: assistantContent,
changeSetId,
});
this.eventBus?.emit({
type: 'chat:message_created' as const,
timestamp: new Date(),
payload: { chatSessionId: session.id, role: 'assistant' },
});
}
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to store chat assistant message');
}
}
resultMessage = summary?.body ?? `${entries.length} changes applied`;
} else {
resultMessage = JSON.stringify({ summary: summary?.body, phases: chatPhases, tasks: chatTasks, pages: chatPages });
}
break;
}
}
const resultPayload: AgentResult = {
success: true,
message: resultMessage,
filesModified: summary?.filesModified,
};
await this.repository.update(agentId, { result: JSON.stringify(resultPayload), status: 'idle' });
const reason = this.getStoppedReason(mode);
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
reason,
},
};
this.eventBus.emit(event);
}
return;
}
/**
* Handle questions signal from agent.
*/
async handleQuestions(
agentId: string,
agent: { id: string; name: string; taskId: string | null; sessionId: string | null },
questions: QuestionItem[],
sessionId?: string,
): Promise<void> {
const questionsPayload: PendingQuestions = { questions };
await this.repository.update(agentId, { pendingQuestions: JSON.stringify(questionsPayload), status: 'waiting_for_input' });
if (this.eventBus) {
const event: AgentWaitingEvent = {
type: 'agent:waiting',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: sessionId ?? agent.sessionId ?? '',
questions,
},
};
this.eventBus.emit(event);
}
}
/**
* Handle error signal from agent.
*/
async handleSignalError(
agentId: string,
agent: { id: string; name: string; taskId: string | null },
error: string,
): Promise<void> {
const errorResult: AgentResult = { success: false, message: error };
await this.repository.update(agentId, {
result: JSON.stringify(errorResult),
status: 'crashed'
});
this.emitCrashed(agent, error);
}
/**
* Map agent mode to stopped event reason.
*/
getStoppedReason(mode: AgentMode): AgentStoppedEvent['payload']['reason'] {
switch (mode) {
case 'discuss': return 'context_complete';
case 'plan': return 'plan_complete';
case 'detail': return 'detail_complete';
case 'refine': return 'refine_complete';
case 'chat': return 'chat_complete';
default: return 'task_complete';
}
}
/**
* Process raw output from an agent (from file or direct).
*/
async processAgentOutput(
agentId: string,
rawOutput: string,
provider: AgentProviderConfig,
getAgentWorkdir: (alias: string) => string,
): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) return;
// Extract session ID using provider's extraction config
let sessionId: string | null = null;
if (provider.sessionId) {
const outputLines = rawOutput.trim().split('\n');
if (provider.sessionId.extractFrom === 'result') {
// Find the result line in JSONL output
for (const line of outputLines) {
try {
const parsed = JSON.parse(line);
if (parsed.type === 'result' || parsed[provider.sessionId.field]) {
sessionId = parsed[provider.sessionId.field] ?? null;
if (sessionId) break;
}
} catch { /* intentional: skip non-JSON JSONL lines */ }
}
} else if (provider.sessionId.extractFrom === 'event') {
for (const line of outputLines) {
try {
const event = JSON.parse(line);
if (event.type === provider.sessionId.eventType) {
sessionId = event[provider.sessionId.field] ?? null;
}
} catch { /* intentional: skip non-JSON JSONL lines */ }
}
}
}
if (sessionId) {
await this.repository.update(agentId, { sessionId });
}
log.debug({ agentId, provider: provider.name, hasSessionId: !!sessionId }, 'processing agent output');
if (provider.name === 'claude') {
// rawOutput may be a single JSON object or multi-line JSONL — find the result line
let cliResult: ClaudeCliResult | null = null;
const lines = rawOutput.trim().split('\n');
for (const line of lines) {
try {
const parsed = JSON.parse(line);
if (parsed.type === 'result') {
cliResult = parsed;
}
} catch { /* intentional: skip non-JSON JSONL lines */ }
}
if (!cliResult) {
log.error({ agentId }, 'no result event found in agent output');
await this.handleAgentError(agentId, new Error('No result event in output'), provider, getAgentWorkdir);
return;
}
// Handle error results (auth failure, usage limits, etc.)
if (cliResult.is_error) {
log.warn({ agentId, error: cliResult.result }, 'agent returned error result from file');
await this.handleAgentError(agentId, new Error(cliResult.result), provider, getAgentWorkdir);
return;
}
let signalText: string;
try {
const signal = cliResult.structured_output ?? JSON.parse(cliResult.result);
signalText = JSON.stringify(signal);
} catch (parseErr) {
log.error({ agentId, err: parseErr instanceof Error ? parseErr.message : String(parseErr) }, 'failed to parse agent signal from result');
await this.handleAgentError(agentId, new Error('Failed to parse agent signal'), provider, getAgentWorkdir);
return;
}
await this.processSignalAndFiles(agentId, signalText, agent.mode as AgentMode, getAgentWorkdir, sessionId ?? undefined);
} else {
await this.processSignalAndFiles(agentId, rawOutput, agent.mode as AgentMode, getAgentWorkdir, sessionId ?? undefined);
}
}
/**
* Handle agent errors. Detects usage limit exhaustion patterns.
* Returns true if error was an exhaustion error (caller should attempt failover).
*/
async handleAgentError(
agentId: string,
error: unknown,
provider: AgentProviderConfig,
_getAgentWorkdir: (alias: string) => string,
): Promise<void> {
const errorMessage = error instanceof Error ? error.message : String(error);
const agent = await this.repository.findById(agentId);
if (!agent) return;
log.error({ agentId, err: errorMessage }, 'agent error');
const errorResult: AgentResult = {
success: false,
message: errorMessage,
};
await this.repository.update(agentId, {
status: 'crashed',
result: JSON.stringify(errorResult)
});
if (this.eventBus) {
const event: AgentCrashedEvent = {
type: 'agent:crashed',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
error: errorMessage,
},
};
this.eventBus.emit(event);
}
}
/**
* Format answers map as structured prompt.
* Handles special __instruction__ key for retry scenarios.
*/
formatAnswersAsPrompt(answers: Record<string, string>): string {
const instruction = answers['__instruction__'];
const realAnswers = { ...answers };
delete realAnswers['__instruction__'];
const lines = Object.entries(realAnswers).map(
([questionId, answer]) => `[${questionId}]: ${answer}`,
);
const basePrompt = `Here are my answers to your questions:\n${lines.join('\n')}`;
return instruction ? `${instruction.trim()}\n\n${basePrompt}` : basePrompt;
}
/**
* Get the result of an agent's work.
*/
async getResult(agentId: string, active?: ActiveAgent): Promise<AgentResult | null> {
if (active?.result) return active.result;
const agent = await this.repository.findById(agentId);
return agent?.result ? JSON.parse(agent.result) : null;
}
/**
* Get pending questions for an agent waiting for input.
*/
async getPendingQuestions(agentId: string, active?: ActiveAgent): Promise<PendingQuestions | null> {
if (active?.pendingQuestions) return active.pendingQuestions;
const agent = await this.repository.findById(agentId);
return agent?.pendingQuestions ? JSON.parse(agent.pendingQuestions) : null;
}
// =========================================================================
// Private Helpers
// =========================================================================
/**
* Read signal.json and return its content if the agent completed successfully.
* Uses SignalManager for atomic read-and-validate when available.
* Returns the raw JSON string on success, null if missing/invalid.
*/
private async readSignalCompletion(agentWorkdir: string): Promise<string | null> {
// Prefer SignalManager (unified implementation with proper validation)
if (this.signalManager) {
const signal = await this.signalManager.readSignal(agentWorkdir);
return signal ? JSON.stringify(signal) : null;
}
// Fallback: inline read (for tests that don't inject SignalManager)
try {
const signalPath = join(agentWorkdir, '.cw/output/signal.json');
if (!existsSync(signalPath)) return null;
const signalContent = await readFile(signalPath, 'utf-8');
const signal = JSON.parse(signalContent);
if (signal.status === 'done' || signal.status === 'questions' || signal.status === 'error') {
return signalContent;
}
return null;
} catch (err) {
log.debug({ agentWorkdir, err: err instanceof Error ? err.message : String(err) }, 'failed to read or parse signal.json');
return null;
}
}
private emitCrashed(agent: { id: string; name: string; taskId: string | null }, error: string): void {
if (this.eventBus) {
const event: AgentCrashedEvent = {
type: 'agent:crashed',
timestamp: new Date(),
payload: {
agentId: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
error,
},
};
this.eventBus.emit(event);
}
}
}