Files
Codewalkers/apps/server/agent/output-handler.ts
Lukas May fcf822363c feat: Add persistent chat sessions for iterative phase/task refinement
Introduces a chat loop where users send instructions to an agent that
applies changes (create/update/delete phases, tasks, pages) and stays
alive for follow-up messages. Includes schema + migration, repository
layer, chat prompt, file-io action field extension, output handler chat
mode, revert support for deletes, tRPC procedures, events, frontend
slide-over UI with inline changeset display and revert, and docs.
2026-03-04 10:14:28 +01:00

1152 lines
46 KiB
TypeScript

/**
* OutputHandler — Stream event processing, signal parsing, file reading, result capture.
*
* Extracted from MultiProviderAgentManager. Processes all output from agent
* subprocesses: stream events, agent signals, output files, and result/question
* retrieval.
*/
import { readFile } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import { join } from 'node:path';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { ChangeSetRepository, CreateChangeSetEntryData } from '../db/repositories/change-set-repository.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { PageRepository } from '../db/repositories/page-repository.js';
import type { ChatSessionRepository } from '../db/repositories/chat-session-repository.js';
import type {
EventBus,
AgentStoppedEvent,
AgentCrashedEvent,
AgentWaitingEvent,
} from '../events/index.js';
import type {
AgentResult,
AgentMode,
PendingQuestions,
QuestionItem,
} from './types.js';
import type { StreamEvent } from './providers/parsers/index.js';
import type { AgentProviderConfig } from './providers/types.js';
import { agentSignalSchema } from './schema.js';
import {
readSummary,
readPhaseFiles,
readTaskFiles,
readDecisionFiles,
readPageFiles,
readFrontmatterFile,
} from './file-io.js';
import { getProvider } from './providers/registry.js';
import { markdownToTiptapJson } from './markdown-to-tiptap.js';
import type { SignalManager } from './lifecycle/signal-manager.js';
import { createModuleLogger } from '../logger/index.js';
const log = createModuleLogger('output-handler');
/**
* Tracks an active agent with its PID and file tailer.
*/
export interface ActiveAgent {
agentId: string;
pid: number;
tailer: import('./file-tailer.js').FileTailer;
outputFilePath: string;
/** Actual working directory the agent process runs in (may differ from getAgentWorkdir for standalone agents) */
agentCwd?: string;
result?: AgentResult;
pendingQuestions?: PendingQuestions;
streamResultText?: string;
streamSessionId?: string;
streamCostUsd?: number;
/** True when the stream result indicated an error (e.g. auth failure) */
streamIsError?: boolean;
/** Cancel handle for polling timer — call to stop polling on cleanup */
cancelPoll?: () => void;
}
/**
* Result structure from Claude CLI with --output-format json.
*/
interface ClaudeCliResult {
type: 'result';
subtype: 'success' | 'error';
is_error: boolean;
session_id: string;
result: string;
structured_output?: unknown;
total_cost_usd?: number;
}
export class OutputHandler {
private filePositions = new Map<string, number>();
private completionLocks = new Set<string>(); // Track agents currently being processed
constructor(
private repository: AgentRepository,
private eventBus?: EventBus,
private changeSetRepository?: ChangeSetRepository,
private phaseRepository?: PhaseRepository,
private taskRepository?: TaskRepository,
private pageRepository?: PageRepository,
private signalManager?: SignalManager,
private chatSessionRepository?: ChatSessionRepository,
) {}
/**
* Validate that a signal file is complete and properly formatted.
*/
private async validateSignalFile(filePath: string): Promise<boolean> {
try {
const content = await readFile(filePath, 'utf-8');
const trimmed = content.trim();
if (!trimmed) return false;
// Check if JSON is complete (ends with } or ])
const endsCorrectly = trimmed.endsWith('}') || trimmed.endsWith(']');
if (!endsCorrectly) return false;
// Try to parse as JSON to ensure it's valid
JSON.parse(trimmed);
return true;
} catch {
return false;
}
}
/**
* Read complete lines from a file, avoiding partial lines that might still be writing.
* This eliminates race conditions when agents are still writing output.
*/
private async readCompleteLines(filePath: string, fromPosition: number = 0): Promise<{ content: string; lastPosition: number }> {
try {
const content = await readFile(filePath, 'utf-8');
if (fromPosition >= content.length) {
return { content: '', lastPosition: fromPosition };
}
// Get content from our last read position
const newContent = content.slice(fromPosition);
// Split into lines
const lines = newContent.split('\n');
// If file doesn't end with newline, last element is potentially incomplete
// Only process complete lines (all but the last, unless file ends with \n)
const hasTrailingNewline = newContent.endsWith('\n');
const completeLines = hasTrailingNewline ? lines : lines.slice(0, -1);
// Calculate new position (only count complete lines)
const completeLinesContent = completeLines.join('\n') + (completeLines.length > 0 && hasTrailingNewline ? '\n' : '');
const newPosition = fromPosition + Buffer.byteLength(completeLinesContent, 'utf-8');
return {
content: completeLinesContent,
lastPosition: newPosition
};
} catch (err) {
log.debug({ filePath, err: err instanceof Error ? err.message : String(err) }, 'failed to read output file lines');
return { content: '', lastPosition: fromPosition };
}
}
/**
* Handle a standardized stream event from a parser.
*/
handleStreamEvent(
agentId: string,
event: StreamEvent,
active: ActiveAgent | undefined,
): void {
switch (event.type) {
case 'init':
if (active && event.sessionId) {
active.streamSessionId = event.sessionId;
this.repository.update(agentId, { sessionId: event.sessionId }).catch((err) => {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to update session ID');
});
}
break;
case 'text_delta':
// Text deltas are now streamed via DB log chunks + EventBus in manager.createLogChunkCallback
break;
case 'tool_use_start':
log.debug({ agentId, tool: event.name, toolId: event.id }, 'tool use started');
break;
case 'result':
if (active) {
active.streamResultText = event.text;
active.streamCostUsd = event.costUsd;
active.streamIsError = event.isError === true;
if (!active.streamSessionId && event.sessionId) {
active.streamSessionId = event.sessionId;
}
}
break;
case 'error':
log.error({ agentId, error: event.message }, 'stream error event');
break;
case 'turn_end':
log.debug({ agentId, stopReason: event.stopReason }, 'turn ended');
break;
}
}
/**
* Handle completion of a detached agent.
* Processes the final result from the stream data captured by the tailer.
*
* RACE CONDITION FIX: Uses a completion lock to prevent duplicate processing.
* Both the polling handler (handleDetachedAgentCompletion) and crash handler
* (handleProcessCrashed) can call this method when a process exits with non-zero code.
* The mutex ensures only one handler processes the completion per agent.
*/
async handleCompletion(
agentId: string,
active: ActiveAgent | undefined,
getAgentWorkdir: (alias: string) => string,
): Promise<void> {
// CRITICAL: Prevent race condition - only one completion handler per agent
if (this.completionLocks.has(agentId)) {
log.debug({ agentId }, 'completion already being processed - skipping duplicate');
return;
}
this.completionLocks.add(agentId);
try {
const agent = await this.repository.findById(agentId);
if (!agent) return;
const provider = getProvider(agent.provider);
if (!provider) return;
log.debug({ agentId }, 'detached agent completed');
// Resolve actual agent working directory — standalone agents run in a
// "workspace/" subdirectory inside getAgentWorkdir, so prefer agentCwd
// recorded at spawn time when available.
const agentWorkdir = active?.agentCwd ?? getAgentWorkdir(agent.worktreeId);
const outputDir = join(agentWorkdir, '.cw', 'output');
const expectedPwdFile = join(agentWorkdir, '.cw', 'expected-pwd.txt');
const diagnosticFile = join(agentWorkdir, '.cw', 'spawn-diagnostic.json');
const outputDirExists = existsSync(outputDir);
const expectedPwdExists = existsSync(expectedPwdFile);
const diagnosticExists = existsSync(diagnosticFile);
log.info({
agentId,
agentWorkdir,
outputDirExists,
expectedPwdExists,
diagnosticExists,
verification: outputDirExists ? 'PASS' : 'FAIL'
}, 'agent workdir verification completed');
if (!outputDirExists) {
log.warn({
agentId,
agentWorkdir
}, 'No output files found in agent workdir! Agent may have run in wrong location.');
}
let signalText = active?.streamResultText;
// If the stream result indicated an error (e.g. auth failure, usage limit),
// route directly to error handling instead of trying to parse as signal JSON
if (signalText && active?.streamIsError) {
log.warn({ agentId, error: signalText }, 'agent returned error result');
await this.handleAgentError(agentId, new Error(signalText), provider, getAgentWorkdir);
return;
}
if (!signalText) {
try {
const outputFilePath = active?.outputFilePath ?? '';
if (outputFilePath) {
// First, check for robust signal.json completion before attempting incremental reading
log.debug({ agentId, worktreeId: agent.worktreeId, agentWorkdir }, 'checking signal completion');
const hasSignalCompletion = await this.readSignalCompletion(agentWorkdir);
log.debug({ agentId, agentWorkdir, hasSignalCompletion }, 'signal completion check result');
if (hasSignalCompletion) {
const signalPath = join(agentWorkdir, '.cw/output/signal.json');
const signalContent = await readFile(signalPath, 'utf-8');
log.debug({ agentId, signalPath }, 'detected completion via signal.json, processing');
await this.processSignalAndFiles(agentId, signalContent, agent.mode as AgentMode, getAgentWorkdir, active?.streamSessionId);
return;
} else {
log.debug({ agentId, agentWorkdir }, 'no signal completion found, proceeding with raw output');
}
// Read only complete lines from the file, avoiding race conditions
const lastPosition = this.filePositions.get(agentId) || 0;
const { content: fileContent, lastPosition: newPosition } = await this.readCompleteLines(outputFilePath, lastPosition);
if (fileContent.trim()) {
this.filePositions.set(agentId, newPosition);
await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir);
return;
}
// If no new complete lines, but file might still be writing, try again with validation
if (await this.validateSignalFile(outputFilePath)) {
const fullContent = await readFile(outputFilePath, 'utf-8');
if (fullContent.trim() && fullContent.length > newPosition) {
// File is complete and has content beyond what we've read
await this.processAgentOutput(agentId, fullContent, provider, getAgentWorkdir);
return;
}
}
}
} catch { /* file empty or missing */ }
log.debug({ agentId }, 'no result from stream or file, marking as error');
await this.handleAgentError(agentId, new Error('No output received'), provider, getAgentWorkdir);
return;
}
// Check for signal.json file first, then fall back to stream text
if (await this.readSignalCompletion(agentWorkdir)) {
const signalPath = join(agentWorkdir, '.cw/output/signal.json');
const signalContent = await readFile(signalPath, 'utf-8');
log.debug({ agentId, signalPath }, 'using signal.json content for completion');
await this.processSignalAndFiles(agentId, signalContent, agent.mode as AgentMode, getAgentWorkdir, active?.streamSessionId);
} else {
log.debug({ agentId }, 'using stream text for completion (no signal.json found)');
await this.processSignalAndFiles(
agentId,
signalText,
agent.mode as AgentMode,
getAgentWorkdir,
active?.streamSessionId,
);
}
} finally {
this.completionLocks.delete(agentId);
this.filePositions.delete(agentId);
}
}
/**
* Process agent signal JSON and read output files.
* Universal handler for all providers and modes.
*/
async processSignalAndFiles(
agentId: string,
signalText: string,
mode: AgentMode,
getAgentWorkdir: (alias: string) => string,
sessionId?: string,
): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) return;
let signal;
let parsed;
// Step 1: JSON parsing
try {
parsed = JSON.parse(signalText.trim());
log.debug({ agentId }, 'signal JSON parsing successful');
} catch (jsonError) {
log.error({
agentId,
signalText: signalText.trim(),
error: jsonError instanceof Error ? jsonError.message : String(jsonError),
stack: jsonError instanceof Error ? jsonError.stack : undefined
}, 'signal JSON parsing failed');
await this.repository.update(agentId, { status: 'crashed' });
this.emitCrashed(agent, 'Failed to parse agent signal JSON');
return;
}
// Step 2: Schema validation
try {
signal = agentSignalSchema.parse(parsed);
log.debug({ agentId, signalStatus: signal.status }, 'signal schema validation passed');
} catch (schemaError) {
log.error({
agentId,
signalText: signalText.trim(),
parsed,
error: schemaError instanceof Error ? schemaError.message : String(schemaError),
stack: schemaError instanceof Error ? schemaError.stack : undefined
}, 'signal schema validation failed');
await this.repository.update(agentId, { status: 'crashed' });
this.emitCrashed(agent, 'Failed to validate agent signal schema');
return;
}
switch (signal.status) {
case 'done':
await this.processOutputFiles(agentId, agent, mode, getAgentWorkdir);
break;
case 'questions':
// Chat mode: process output files before handling questions
if (mode === 'chat') {
await this.processOutputFiles(agentId, agent, mode, getAgentWorkdir);
}
await this.handleQuestions(agentId, agent, signal.questions, sessionId);
break;
case 'error':
await this.handleSignalError(agentId, agent, signal.error);
break;
}
}
/**
* Process output files from agent workdir after successful completion.
* Performs direct writes to entities and records change sets.
*/
private async processOutputFiles(
agentId: string,
agent: { id: string; name: string; taskId: string | null; worktreeId: string; mode: string; initiativeId?: string | null },
mode: AgentMode,
getAgentWorkdir: (alias: string) => string,
): Promise<void> {
const agentWorkdir = getAgentWorkdir(agent.worktreeId);
const summary = readSummary(agentWorkdir);
const initiativeId = agent.initiativeId;
const canWriteChangeSets = this.changeSetRepository && initiativeId;
let resultMessage = summary?.body ?? 'Task completed';
switch (mode) {
case 'plan': {
const phases = readPhaseFiles(agentWorkdir);
if (canWriteChangeSets && this.phaseRepository && phases.length > 0) {
const entries: CreateChangeSetEntryData[] = [];
// First pass: create phases
for (const [i, p] of phases.entries()) {
try {
const tiptapContent = p.body ? JSON.stringify(markdownToTiptapJson(p.body)) : undefined;
const created = await this.phaseRepository.create({
id: p.id ?? undefined,
initiativeId,
name: p.title,
content: tiptapContent,
});
entries.push({
entityType: 'phase',
entityId: created.id,
action: 'create',
newState: JSON.stringify(created),
sortOrder: i,
});
this.eventBus?.emit({
type: 'phase:started' as const,
timestamp: new Date(),
payload: { phaseId: created.id, initiativeId },
});
} catch (err) {
log.warn({ agentId, phase: p.title, err: err instanceof Error ? err.message : String(err) }, 'failed to create phase');
}
}
// Second pass: create phase dependencies
let depSortOrder = entries.length;
for (const p of phases) {
const phaseId = p.id;
if (!phaseId || !Array.isArray(p.dependencies)) continue;
for (const depFileId of p.dependencies) {
try {
await this.phaseRepository.createDependency(phaseId, depFileId);
entries.push({
entityType: 'phase_dependency',
entityId: `${phaseId}:${depFileId}`,
action: 'create',
newState: JSON.stringify({ phaseId, dependsOnPhaseId: depFileId }),
sortOrder: depSortOrder++,
});
} catch (err) {
log.warn({ agentId, phaseId, depFileId, err: err instanceof Error ? err.message : String(err) }, 'failed to create phase dependency');
}
}
}
if (entries.length > 0) {
try {
const cs = await this.changeSetRepository!.createWithEntries({
agentId,
agentName: agent.name,
initiativeId,
mode: 'plan',
summary: summary?.body ?? `Created ${phases.length} phases`,
}, entries);
this.eventBus?.emit({
type: 'changeset:created' as const,
timestamp: new Date(),
payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'plan', entryCount: entries.length },
});
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record change set after successful writes');
}
}
resultMessage = summary?.body ?? `${phases.length} phases created`;
} else {
resultMessage = JSON.stringify({ summary: summary?.body, phases });
}
break;
}
case 'detail': {
const tasks = readTaskFiles(agentWorkdir);
if (canWriteChangeSets && this.taskRepository && tasks.length > 0) {
const phaseInput = readFrontmatterFile(join(agentWorkdir, '.cw', 'input', 'phase.md'));
const phaseId = (phaseInput?.data?.id as string) ?? null;
const entries: CreateChangeSetEntryData[] = [];
const fileIdToDbId = new Map<string, string>();
// Load existing tasks for dedup — prevents duplicates when multiple agents finish concurrently
const existingTasks = phaseId ? await this.taskRepository.findByPhaseId(phaseId) : [];
const existingNames = new Set(existingTasks.map(t => t.name));
for (const [i, t] of tasks.entries()) {
if (existingNames.has(t.title)) {
log.info({ agentId, task: t.title, phaseId }, 'skipped duplicate task');
// Map deduped file ID to existing DB ID for dependency resolution
const existing = existingTasks.find(et => et.name === t.title);
if (existing) fileIdToDbId.set(t.id, existing.id);
continue;
}
try {
const created = await this.taskRepository.create({
initiativeId,
phaseId,
parentTaskId: agent.taskId ?? null,
name: t.title,
description: t.body ?? undefined,
category: (t.category as any) ?? 'execute',
type: (t.type as any) ?? 'auto',
});
fileIdToDbId.set(t.id, created.id);
existingNames.add(t.title); // prevent dupes within same agent output
entries.push({
entityType: 'task',
entityId: created.id,
action: 'create',
newState: JSON.stringify(created),
sortOrder: i,
});
this.eventBus?.emit({
type: 'task:completed' as const,
timestamp: new Date(),
payload: { taskId: created.id, agentId, success: true, message: 'Task created by detail' },
});
} catch (err) {
log.warn({ agentId, task: t.title, err: err instanceof Error ? err.message : String(err) }, 'failed to create task');
}
}
// Second pass: create task dependencies
let depSortOrder = entries.length;
for (const t of tasks) {
const taskDbId = fileIdToDbId.get(t.id);
if (!taskDbId || t.dependencies.length === 0) continue;
for (const depFileId of t.dependencies) {
const depDbId = fileIdToDbId.get(depFileId);
if (!depDbId) continue;
try {
await this.taskRepository.createDependency(taskDbId, depDbId);
entries.push({
entityType: 'task_dependency',
entityId: `${taskDbId}:${depDbId}`,
action: 'create',
newState: JSON.stringify({ taskId: taskDbId, dependsOnTaskId: depDbId }),
sortOrder: depSortOrder++,
});
} catch (err) {
log.warn({ agentId, taskDbId, depFileId, err: err instanceof Error ? err.message : String(err) }, 'failed to create task dependency');
}
}
}
if (entries.length > 0) {
try {
const cs = await this.changeSetRepository!.createWithEntries({
agentId,
agentName: agent.name,
initiativeId,
mode: 'detail',
summary: summary?.body ?? `Created ${tasks.length} tasks`,
}, entries);
this.eventBus?.emit({
type: 'changeset:created' as const,
timestamp: new Date(),
payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'detail', entryCount: entries.length },
});
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record change set after successful writes');
}
}
resultMessage = summary?.body ?? `${tasks.length} tasks created`;
} else {
resultMessage = JSON.stringify({ summary: summary?.body, tasks });
}
break;
}
case 'discuss': {
const decisions = readDecisionFiles(agentWorkdir);
resultMessage = JSON.stringify({ summary: summary?.body, decisions });
break;
}
case 'refine': {
const pages = readPageFiles(agentWorkdir);
if (canWriteChangeSets && this.pageRepository && pages.length > 0) {
const entries: CreateChangeSetEntryData[] = [];
for (const [i, p] of pages.entries()) {
try {
if (!p.pageId) continue;
const existing = await this.pageRepository.findById(p.pageId);
if (!existing) {
log.warn({ agentId, pageId: p.pageId }, 'page not found for refine update');
continue;
}
const previousState = JSON.stringify(existing);
const tiptapJson = markdownToTiptapJson(p.body || '');
await this.pageRepository.update(p.pageId, {
content: JSON.stringify(tiptapJson),
title: p.title,
});
const updated = await this.pageRepository.findById(p.pageId);
entries.push({
entityType: 'page',
entityId: p.pageId,
action: 'update',
previousState,
newState: JSON.stringify(updated),
sortOrder: i,
});
this.eventBus?.emit({
type: 'page:updated' as const,
timestamp: new Date(),
payload: { pageId: p.pageId, initiativeId, title: p.title },
});
} catch (err) {
log.warn({ agentId, pageId: p.pageId, err: err instanceof Error ? err.message : String(err) }, 'failed to update page');
}
}
if (entries.length > 0) {
try {
const cs = await this.changeSetRepository!.createWithEntries({
agentId,
agentName: agent.name,
initiativeId,
mode: 'refine',
summary: summary?.body ?? `Updated ${entries.length} pages`,
}, entries);
this.eventBus?.emit({
type: 'changeset:created' as const,
timestamp: new Date(),
payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'refine', entryCount: entries.length },
});
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record change set after successful writes');
}
}
resultMessage = summary?.body ?? `${entries.length} pages updated`;
} else {
resultMessage = JSON.stringify({ summary: summary?.body, pages });
}
break;
}
case 'chat': {
const chatPhases = readPhaseFiles(agentWorkdir);
const chatTasks = readTaskFiles(agentWorkdir);
const chatPages = readPageFiles(agentWorkdir);
if (canWriteChangeSets) {
const entries: CreateChangeSetEntryData[] = [];
let sortOrd = 0;
// Process phases
if (this.phaseRepository) {
for (const p of chatPhases) {
try {
const action = p.action ?? 'create';
if (action === 'create') {
const tiptapContent = p.body ? JSON.stringify(markdownToTiptapJson(p.body)) : undefined;
const created = await this.phaseRepository.create({
id: p.id ?? undefined,
initiativeId: initiativeId!,
name: p.title,
content: tiptapContent,
});
entries.push({ entityType: 'phase', entityId: created.id, action: 'create', newState: JSON.stringify(created), sortOrder: sortOrd++ });
} else if (action === 'update') {
const existing = await this.phaseRepository.findById(p.id);
if (!existing) continue;
const previousState = JSON.stringify(existing);
const tiptapContent = p.body ? JSON.stringify(markdownToTiptapJson(p.body)) : undefined;
await this.phaseRepository.update(p.id, { name: p.title, content: tiptapContent });
const updated = await this.phaseRepository.findById(p.id);
entries.push({ entityType: 'phase', entityId: p.id, action: 'update', previousState, newState: JSON.stringify(updated), sortOrder: sortOrd++ });
} else if (action === 'delete') {
const existing = await this.phaseRepository.findById(p.id);
if (!existing) continue;
const previousState = JSON.stringify(existing);
await this.phaseRepository.delete(p.id);
entries.push({ entityType: 'phase', entityId: p.id, action: 'delete', previousState, sortOrder: sortOrd++ });
}
} catch (err) {
log.warn({ agentId, phase: p.title, action: p.action, err: err instanceof Error ? err.message : String(err) }, 'failed to process chat phase');
}
}
}
// Process tasks — two pass (create first, then deps)
if (this.taskRepository) {
const fileIdToDbId = new Map<string, string>();
for (const t of chatTasks) {
try {
const action = t.action ?? 'create';
if (action === 'create') {
const created = await this.taskRepository.create({
initiativeId: initiativeId!,
phaseId: t.phaseId ?? null,
parentTaskId: t.parentTaskId ?? null,
name: t.title,
description: t.body ?? undefined,
category: (t.category as any) ?? 'execute',
type: (t.type as any) ?? 'auto',
});
fileIdToDbId.set(t.id, created.id);
entries.push({ entityType: 'task', entityId: created.id, action: 'create', newState: JSON.stringify(created), sortOrder: sortOrd++ });
} else if (action === 'update') {
const existing = await this.taskRepository.findById(t.id);
if (!existing) continue;
const previousState = JSON.stringify(existing);
await this.taskRepository.update(t.id, {
name: t.title,
description: t.body ?? undefined,
category: (t.category as any) ?? existing.category,
type: (t.type as any) ?? existing.type,
});
const updated = await this.taskRepository.findById(t.id);
fileIdToDbId.set(t.id, t.id);
entries.push({ entityType: 'task', entityId: t.id, action: 'update', previousState, newState: JSON.stringify(updated), sortOrder: sortOrd++ });
} else if (action === 'delete') {
const existing = await this.taskRepository.findById(t.id);
if (!existing) continue;
const previousState = JSON.stringify(existing);
await this.taskRepository.delete(t.id);
entries.push({ entityType: 'task', entityId: t.id, action: 'delete', previousState, sortOrder: sortOrd++ });
}
} catch (err) {
log.warn({ agentId, task: t.title, action: t.action, err: err instanceof Error ? err.message : String(err) }, 'failed to process chat task');
}
}
// Second pass: deps for created tasks
for (const t of chatTasks) {
if (t.action !== 'create' || t.dependencies.length === 0) continue;
const taskDbId = fileIdToDbId.get(t.id);
if (!taskDbId) continue;
for (const depFileId of t.dependencies) {
const depDbId = fileIdToDbId.get(depFileId) ?? depFileId;
try {
await this.taskRepository.createDependency(taskDbId, depDbId);
entries.push({ entityType: 'task_dependency', entityId: `${taskDbId}:${depDbId}`, action: 'create', newState: JSON.stringify({ taskId: taskDbId, dependsOnTaskId: depDbId }), sortOrder: sortOrd++ });
} catch (err) {
log.warn({ agentId, taskDbId, depFileId, err: err instanceof Error ? err.message : String(err) }, 'failed to create chat task dependency');
}
}
}
}
// Process pages
if (this.pageRepository) {
for (const p of chatPages) {
try {
const action = p.action ?? 'create';
if (action === 'create') {
const tiptapJson = markdownToTiptapJson(p.body || '');
const created = await this.pageRepository.create({
id: p.pageId ?? undefined,
initiativeId: initiativeId!,
title: p.title,
content: JSON.stringify(tiptapJson),
});
entries.push({ entityType: 'page', entityId: created.id, action: 'create', newState: JSON.stringify(created), sortOrder: sortOrd++ });
} else if (action === 'update') {
const existing = await this.pageRepository.findById(p.pageId);
if (!existing) continue;
const previousState = JSON.stringify(existing);
const tiptapJson = markdownToTiptapJson(p.body || '');
await this.pageRepository.update(p.pageId, { content: JSON.stringify(tiptapJson), title: p.title });
const updated = await this.pageRepository.findById(p.pageId);
entries.push({ entityType: 'page', entityId: p.pageId, action: 'update', previousState, newState: JSON.stringify(updated), sortOrder: sortOrd++ });
} else if (action === 'delete') {
const existing = await this.pageRepository.findById(p.pageId);
if (!existing) continue;
const previousState = JSON.stringify(existing);
await this.pageRepository.delete(p.pageId);
entries.push({ entityType: 'page', entityId: p.pageId, action: 'delete', previousState, sortOrder: sortOrd++ });
}
} catch (err) {
log.warn({ agentId, pageId: p.pageId, action: p.action, err: err instanceof Error ? err.message : String(err) }, 'failed to process chat page');
}
}
}
// Create change set
let changeSetId: string | null = null;
if (entries.length > 0) {
try {
const cs = await this.changeSetRepository!.createWithEntries({
agentId,
agentName: agent.name,
initiativeId: initiativeId!,
mode: 'chat' as 'plan' | 'detail' | 'refine',
summary: summary?.body ?? `Chat: ${entries.length} changes applied`,
}, entries);
changeSetId = cs.id;
this.eventBus?.emit({
type: 'changeset:created' as const,
timestamp: new Date(),
payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'chat', entryCount: entries.length },
});
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record chat change set');
}
}
// Store assistant message in chat session
if (this.chatSessionRepository) {
try {
const session = await this.chatSessionRepository.findActiveSessionByAgentId(agentId);
if (session) {
const assistantContent = summary?.body ?? `Applied ${entries.length} changes`;
await this.chatSessionRepository.createMessage({
chatSessionId: session.id,
role: 'assistant',
content: assistantContent,
changeSetId,
});
this.eventBus?.emit({
type: 'chat:message_created' as const,
timestamp: new Date(),
payload: { chatSessionId: session.id, role: 'assistant' },
});
}
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to store chat assistant message');
}
}
resultMessage = summary?.body ?? `${entries.length} changes applied`;
} else {
resultMessage = JSON.stringify({ summary: summary?.body, phases: chatPhases, tasks: chatTasks, pages: chatPages });
}
break;
}
}
const resultPayload: AgentResult = {
success: true,
message: resultMessage,
filesModified: summary?.filesModified,
};
await this.repository.update(agentId, { result: JSON.stringify(resultPayload), status: 'idle' });
const reason = this.getStoppedReason(mode);
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
reason,
},
};
this.eventBus.emit(event);
}
return;
}
/**
* Handle questions signal from agent.
*/
async handleQuestions(
agentId: string,
agent: { id: string; name: string; taskId: string | null; sessionId: string | null },
questions: QuestionItem[],
sessionId?: string,
): Promise<void> {
const questionsPayload: PendingQuestions = { questions };
await this.repository.update(agentId, { pendingQuestions: JSON.stringify(questionsPayload), status: 'waiting_for_input' });
if (this.eventBus) {
const event: AgentWaitingEvent = {
type: 'agent:waiting',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
sessionId: sessionId ?? agent.sessionId ?? '',
questions,
},
};
this.eventBus.emit(event);
}
}
/**
* Handle error signal from agent.
*/
async handleSignalError(
agentId: string,
agent: { id: string; name: string; taskId: string | null },
error: string,
): Promise<void> {
const errorResult: AgentResult = { success: false, message: error };
await this.repository.update(agentId, {
result: JSON.stringify(errorResult),
status: 'crashed'
});
this.emitCrashed(agent, error);
}
/**
* Map agent mode to stopped event reason.
*/
getStoppedReason(mode: AgentMode): AgentStoppedEvent['payload']['reason'] {
switch (mode) {
case 'discuss': return 'context_complete';
case 'plan': return 'plan_complete';
case 'detail': return 'detail_complete';
case 'refine': return 'refine_complete';
case 'chat': return 'chat_complete';
default: return 'task_complete';
}
}
/**
* Process raw output from an agent (from file or direct).
*/
async processAgentOutput(
agentId: string,
rawOutput: string,
provider: AgentProviderConfig,
getAgentWorkdir: (alias: string) => string,
): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) return;
// Extract session ID using provider's extraction config
let sessionId: string | null = null;
if (provider.sessionId) {
const outputLines = rawOutput.trim().split('\n');
if (provider.sessionId.extractFrom === 'result') {
// Find the result line in JSONL output
for (const line of outputLines) {
try {
const parsed = JSON.parse(line);
if (parsed.type === 'result' || parsed[provider.sessionId.field]) {
sessionId = parsed[provider.sessionId.field] ?? null;
if (sessionId) break;
}
} catch { /* intentional: skip non-JSON JSONL lines */ }
}
} else if (provider.sessionId.extractFrom === 'event') {
for (const line of outputLines) {
try {
const event = JSON.parse(line);
if (event.type === provider.sessionId.eventType) {
sessionId = event[provider.sessionId.field] ?? null;
}
} catch { /* intentional: skip non-JSON JSONL lines */ }
}
}
}
if (sessionId) {
await this.repository.update(agentId, { sessionId });
}
log.debug({ agentId, provider: provider.name, hasSessionId: !!sessionId }, 'processing agent output');
if (provider.name === 'claude') {
// rawOutput may be a single JSON object or multi-line JSONL — find the result line
let cliResult: ClaudeCliResult | null = null;
const lines = rawOutput.trim().split('\n');
for (const line of lines) {
try {
const parsed = JSON.parse(line);
if (parsed.type === 'result') {
cliResult = parsed;
}
} catch { /* intentional: skip non-JSON JSONL lines */ }
}
if (!cliResult) {
log.error({ agentId }, 'no result event found in agent output');
await this.handleAgentError(agentId, new Error('No result event in output'), provider, getAgentWorkdir);
return;
}
// Handle error results (auth failure, usage limits, etc.)
if (cliResult.is_error) {
log.warn({ agentId, error: cliResult.result }, 'agent returned error result from file');
await this.handleAgentError(agentId, new Error(cliResult.result), provider, getAgentWorkdir);
return;
}
let signalText: string;
try {
const signal = cliResult.structured_output ?? JSON.parse(cliResult.result);
signalText = JSON.stringify(signal);
} catch (parseErr) {
log.error({ agentId, err: parseErr instanceof Error ? parseErr.message : String(parseErr) }, 'failed to parse agent signal from result');
await this.handleAgentError(agentId, new Error('Failed to parse agent signal'), provider, getAgentWorkdir);
return;
}
await this.processSignalAndFiles(agentId, signalText, agent.mode as AgentMode, getAgentWorkdir, sessionId ?? undefined);
} else {
await this.processSignalAndFiles(agentId, rawOutput, agent.mode as AgentMode, getAgentWorkdir, sessionId ?? undefined);
}
}
/**
* Handle agent errors. Detects usage limit exhaustion patterns.
* Returns true if error was an exhaustion error (caller should attempt failover).
*/
async handleAgentError(
agentId: string,
error: unknown,
provider: AgentProviderConfig,
_getAgentWorkdir: (alias: string) => string,
): Promise<void> {
const errorMessage = error instanceof Error ? error.message : String(error);
const agent = await this.repository.findById(agentId);
if (!agent) return;
log.error({ agentId, err: errorMessage }, 'agent error');
const errorResult: AgentResult = {
success: false,
message: errorMessage,
};
await this.repository.update(agentId, {
status: 'crashed',
result: JSON.stringify(errorResult)
});
if (this.eventBus) {
const event: AgentCrashedEvent = {
type: 'agent:crashed',
timestamp: new Date(),
payload: {
agentId,
name: agent.name,
taskId: agent.taskId ?? '',
error: errorMessage,
},
};
this.eventBus.emit(event);
}
}
/**
* Format answers map as structured prompt.
* Handles special __instruction__ key for retry scenarios.
*/
formatAnswersAsPrompt(answers: Record<string, string>): string {
const instruction = answers['__instruction__'];
const realAnswers = { ...answers };
delete realAnswers['__instruction__'];
const lines = Object.entries(realAnswers).map(
([questionId, answer]) => `[${questionId}]: ${answer}`,
);
const basePrompt = `Here are my answers to your questions:\n${lines.join('\n')}`;
return instruction ? `${instruction.trim()}\n\n${basePrompt}` : basePrompt;
}
/**
* Get the result of an agent's work.
*/
async getResult(agentId: string, active?: ActiveAgent): Promise<AgentResult | null> {
if (active?.result) return active.result;
const agent = await this.repository.findById(agentId);
return agent?.result ? JSON.parse(agent.result) : null;
}
/**
* Get pending questions for an agent waiting for input.
*/
async getPendingQuestions(agentId: string, active?: ActiveAgent): Promise<PendingQuestions | null> {
if (active?.pendingQuestions) return active.pendingQuestions;
const agent = await this.repository.findById(agentId);
return agent?.pendingQuestions ? JSON.parse(agent.pendingQuestions) : null;
}
// =========================================================================
// Private Helpers
// =========================================================================
/**
* Read signal.json and return its content if the agent completed successfully.
* Uses SignalManager for atomic read-and-validate when available.
* Returns the raw JSON string on success, null if missing/invalid.
*/
private async readSignalCompletion(agentWorkdir: string): Promise<string | null> {
// Prefer SignalManager (unified implementation with proper validation)
if (this.signalManager) {
const signal = await this.signalManager.readSignal(agentWorkdir);
return signal ? JSON.stringify(signal) : null;
}
// Fallback: inline read (for tests that don't inject SignalManager)
try {
const signalPath = join(agentWorkdir, '.cw/output/signal.json');
if (!existsSync(signalPath)) return null;
const signalContent = await readFile(signalPath, 'utf-8');
const signal = JSON.parse(signalContent);
if (signal.status === 'done' || signal.status === 'questions' || signal.status === 'error') {
return signalContent;
}
return null;
} catch (err) {
log.debug({ agentWorkdir, err: err instanceof Error ? err.message : String(err) }, 'failed to read or parse signal.json');
return null;
}
}
private emitCrashed(agent: { id: string; name: string; taskId: string | null }, error: string): void {
if (this.eventBus) {
const event: AgentCrashedEvent = {
type: 'agent:crashed',
timestamp: new Date(),
payload: {
agentId: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
error,
},
};
this.eventBus.emit(event);
}
}
}