Introduces GitHub-style threaded comments via parentCommentId self-reference. Users and agents can reply within comment threads, and review agents receive comment IDs so they can post targeted responses via comment-responses.json. - Migration 0032: parentCommentId column + index on review_comments - Repository: createReply() copies parent context, default author 'you' → 'user' - tRPC: replyToReviewComment procedure, requestPhaseChanges passes threaded comments - Orchestrator: formats [comment:ID] tags with full reply threads in task description - Agent IO: readCommentResponses() reads .cw/output/comment-responses.json - OutputHandler: processes agent comment responses (creates replies, resolves threads) - Execute prompt: conditional <review_comments> block when task has [comment:] markers - Frontend: CommentThread renders root+replies with agent-specific styling + reply form - Sidebar/ReviewTab: root-only comment counts, reply mutation plumbing through DiffViewer chain
1177 lines
47 KiB
TypeScript
1177 lines
47 KiB
TypeScript
/**
|
|
* OutputHandler — Stream event processing, signal parsing, file reading, result capture.
|
|
*
|
|
* Extracted from MultiProviderAgentManager. Processes all output from agent
|
|
* subprocesses: stream events, agent signals, output files, and result/question
|
|
* retrieval.
|
|
*/
|
|
|
|
import { readFile } from 'node:fs/promises';
|
|
import { existsSync } from 'node:fs';
|
|
import { join } from 'node:path';
|
|
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
|
import type { ChangeSetRepository, CreateChangeSetEntryData } from '../db/repositories/change-set-repository.js';
|
|
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
|
|
import type { TaskRepository } from '../db/repositories/task-repository.js';
|
|
import type { PageRepository } from '../db/repositories/page-repository.js';
|
|
import type { ChatSessionRepository } from '../db/repositories/chat-session-repository.js';
|
|
import type { ReviewCommentRepository } from '../db/repositories/review-comment-repository.js';
|
|
import type {
|
|
EventBus,
|
|
AgentStoppedEvent,
|
|
AgentCrashedEvent,
|
|
AgentWaitingEvent,
|
|
} from '../events/index.js';
|
|
import type {
|
|
AgentResult,
|
|
AgentMode,
|
|
PendingQuestions,
|
|
QuestionItem,
|
|
} from './types.js';
|
|
import type { StreamEvent } from './providers/parsers/index.js';
|
|
import type { AgentProviderConfig } from './providers/types.js';
|
|
import { agentSignalSchema } from './schema.js';
|
|
import {
|
|
readSummary,
|
|
readPhaseFiles,
|
|
readTaskFiles,
|
|
readDecisionFiles,
|
|
readPageFiles,
|
|
readFrontmatterFile,
|
|
readCommentResponses,
|
|
} from './file-io.js';
|
|
import { getProvider } from './providers/registry.js';
|
|
import { markdownToTiptapJson } from './markdown-to-tiptap.js';
|
|
import type { SignalManager } from './lifecycle/signal-manager.js';
|
|
import { createModuleLogger } from '../logger/index.js';
|
|
|
|
const log = createModuleLogger('output-handler');
|
|
|
|
/**
|
|
* Tracks an active agent with its PID and file tailer.
|
|
*/
|
|
export interface ActiveAgent {
|
|
agentId: string;
|
|
pid: number;
|
|
tailer: import('./file-tailer.js').FileTailer;
|
|
outputFilePath: string;
|
|
/** Actual working directory the agent process runs in (may differ from getAgentWorkdir for standalone agents) */
|
|
agentCwd?: string;
|
|
result?: AgentResult;
|
|
pendingQuestions?: PendingQuestions;
|
|
streamResultText?: string;
|
|
streamSessionId?: string;
|
|
streamCostUsd?: number;
|
|
/** True when the stream result indicated an error (e.g. auth failure) */
|
|
streamIsError?: boolean;
|
|
/** Cancel handle for polling timer — call to stop polling on cleanup */
|
|
cancelPoll?: () => void;
|
|
}
|
|
|
|
/**
|
|
* Result structure from Claude CLI with --output-format json.
|
|
*/
|
|
interface ClaudeCliResult {
|
|
type: 'result';
|
|
subtype: 'success' | 'error';
|
|
is_error: boolean;
|
|
session_id: string;
|
|
result: string;
|
|
structured_output?: unknown;
|
|
total_cost_usd?: number;
|
|
}
|
|
|
|
export class OutputHandler {
|
|
private filePositions = new Map<string, number>();
|
|
private completionLocks = new Set<string>(); // Track agents currently being processed
|
|
|
|
constructor(
|
|
private repository: AgentRepository,
|
|
private eventBus?: EventBus,
|
|
private changeSetRepository?: ChangeSetRepository,
|
|
private phaseRepository?: PhaseRepository,
|
|
private taskRepository?: TaskRepository,
|
|
private pageRepository?: PageRepository,
|
|
private signalManager?: SignalManager,
|
|
private chatSessionRepository?: ChatSessionRepository,
|
|
private reviewCommentRepository?: ReviewCommentRepository,
|
|
) {}
|
|
|
|
/**
|
|
* Validate that a signal file is complete and properly formatted.
|
|
*/
|
|
private async validateSignalFile(filePath: string): Promise<boolean> {
|
|
try {
|
|
const content = await readFile(filePath, 'utf-8');
|
|
const trimmed = content.trim();
|
|
if (!trimmed) return false;
|
|
|
|
// Check if JSON is complete (ends with } or ])
|
|
const endsCorrectly = trimmed.endsWith('}') || trimmed.endsWith(']');
|
|
if (!endsCorrectly) return false;
|
|
|
|
// Try to parse as JSON to ensure it's valid
|
|
JSON.parse(trimmed);
|
|
return true;
|
|
} catch {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Read complete lines from a file, avoiding partial lines that might still be writing.
|
|
* This eliminates race conditions when agents are still writing output.
|
|
*/
|
|
private async readCompleteLines(filePath: string, fromPosition: number = 0): Promise<{ content: string; lastPosition: number }> {
|
|
try {
|
|
const content = await readFile(filePath, 'utf-8');
|
|
|
|
if (fromPosition >= content.length) {
|
|
return { content: '', lastPosition: fromPosition };
|
|
}
|
|
|
|
// Get content from our last read position
|
|
const newContent = content.slice(fromPosition);
|
|
|
|
// Split into lines
|
|
const lines = newContent.split('\n');
|
|
|
|
// If file doesn't end with newline, last element is potentially incomplete
|
|
// Only process complete lines (all but the last, unless file ends with \n)
|
|
const hasTrailingNewline = newContent.endsWith('\n');
|
|
const completeLines = hasTrailingNewline ? lines : lines.slice(0, -1);
|
|
|
|
// Calculate new position (only count complete lines)
|
|
const completeLinesContent = completeLines.join('\n') + (completeLines.length > 0 && hasTrailingNewline ? '\n' : '');
|
|
const newPosition = fromPosition + Buffer.byteLength(completeLinesContent, 'utf-8');
|
|
|
|
return {
|
|
content: completeLinesContent,
|
|
lastPosition: newPosition
|
|
};
|
|
} catch (err) {
|
|
log.debug({ filePath, err: err instanceof Error ? err.message : String(err) }, 'failed to read output file lines');
|
|
return { content: '', lastPosition: fromPosition };
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle a standardized stream event from a parser.
|
|
*/
|
|
handleStreamEvent(
|
|
agentId: string,
|
|
event: StreamEvent,
|
|
active: ActiveAgent | undefined,
|
|
): void {
|
|
switch (event.type) {
|
|
case 'init':
|
|
if (active && event.sessionId) {
|
|
active.streamSessionId = event.sessionId;
|
|
this.repository.update(agentId, { sessionId: event.sessionId }).catch((err) => {
|
|
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to update session ID');
|
|
});
|
|
}
|
|
break;
|
|
|
|
case 'text_delta':
|
|
// Text deltas are now streamed via DB log chunks + EventBus in manager.createLogChunkCallback
|
|
break;
|
|
|
|
case 'tool_use_start':
|
|
log.debug({ agentId, tool: event.name, toolId: event.id }, 'tool use started');
|
|
break;
|
|
|
|
case 'result':
|
|
if (active) {
|
|
active.streamResultText = event.text;
|
|
active.streamCostUsd = event.costUsd;
|
|
active.streamIsError = event.isError === true;
|
|
if (!active.streamSessionId && event.sessionId) {
|
|
active.streamSessionId = event.sessionId;
|
|
}
|
|
}
|
|
break;
|
|
|
|
case 'error':
|
|
log.error({ agentId, error: event.message }, 'stream error event');
|
|
break;
|
|
|
|
case 'turn_end':
|
|
log.debug({ agentId, stopReason: event.stopReason }, 'turn ended');
|
|
break;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle completion of a detached agent.
|
|
* Processes the final result from the stream data captured by the tailer.
|
|
*
|
|
* RACE CONDITION FIX: Uses a completion lock to prevent duplicate processing.
|
|
* Both the polling handler (handleDetachedAgentCompletion) and crash handler
|
|
* (handleProcessCrashed) can call this method when a process exits with non-zero code.
|
|
* The mutex ensures only one handler processes the completion per agent.
|
|
*/
|
|
async handleCompletion(
|
|
agentId: string,
|
|
active: ActiveAgent | undefined,
|
|
getAgentWorkdir: (alias: string) => string,
|
|
): Promise<void> {
|
|
// CRITICAL: Prevent race condition - only one completion handler per agent
|
|
if (this.completionLocks.has(agentId)) {
|
|
log.debug({ agentId }, 'completion already being processed - skipping duplicate');
|
|
return;
|
|
}
|
|
|
|
this.completionLocks.add(agentId);
|
|
|
|
try {
|
|
const agent = await this.repository.findById(agentId);
|
|
if (!agent) return;
|
|
|
|
const provider = getProvider(agent.provider);
|
|
if (!provider) return;
|
|
|
|
log.debug({ agentId }, 'detached agent completed');
|
|
|
|
// Resolve actual agent working directory — standalone agents run in a
|
|
// "workspace/" subdirectory inside getAgentWorkdir, so prefer agentCwd
|
|
// recorded at spawn time when available.
|
|
const agentWorkdir = active?.agentCwd ?? getAgentWorkdir(agent.worktreeId);
|
|
const outputDir = join(agentWorkdir, '.cw', 'output');
|
|
const expectedPwdFile = join(agentWorkdir, '.cw', 'expected-pwd.txt');
|
|
const diagnosticFile = join(agentWorkdir, '.cw', 'spawn-diagnostic.json');
|
|
|
|
const outputDirExists = existsSync(outputDir);
|
|
const expectedPwdExists = existsSync(expectedPwdFile);
|
|
const diagnosticExists = existsSync(diagnosticFile);
|
|
|
|
log.info({
|
|
agentId,
|
|
agentWorkdir,
|
|
outputDirExists,
|
|
expectedPwdExists,
|
|
diagnosticExists,
|
|
verification: outputDirExists ? 'PASS' : 'FAIL'
|
|
}, 'agent workdir verification completed');
|
|
|
|
if (!outputDirExists) {
|
|
log.warn({
|
|
agentId,
|
|
agentWorkdir
|
|
}, 'No output files found in agent workdir! Agent may have run in wrong location.');
|
|
}
|
|
|
|
let signalText = active?.streamResultText;
|
|
|
|
// If the stream result indicated an error (e.g. auth failure, usage limit),
|
|
// route directly to error handling instead of trying to parse as signal JSON
|
|
if (signalText && active?.streamIsError) {
|
|
log.warn({ agentId, error: signalText }, 'agent returned error result');
|
|
await this.handleAgentError(agentId, new Error(signalText), provider, getAgentWorkdir);
|
|
return;
|
|
}
|
|
|
|
if (!signalText) {
|
|
try {
|
|
const outputFilePath = active?.outputFilePath ?? '';
|
|
if (outputFilePath) {
|
|
// First, check for robust signal.json completion before attempting incremental reading
|
|
log.debug({ agentId, worktreeId: agent.worktreeId, agentWorkdir }, 'checking signal completion');
|
|
|
|
const hasSignalCompletion = await this.readSignalCompletion(agentWorkdir);
|
|
log.debug({ agentId, agentWorkdir, hasSignalCompletion }, 'signal completion check result');
|
|
|
|
if (hasSignalCompletion) {
|
|
const signalPath = join(agentWorkdir, '.cw/output/signal.json');
|
|
const signalContent = await readFile(signalPath, 'utf-8');
|
|
log.debug({ agentId, signalPath }, 'detected completion via signal.json, processing');
|
|
await this.processSignalAndFiles(agentId, signalContent, agent.mode as AgentMode, getAgentWorkdir, active?.streamSessionId);
|
|
return;
|
|
} else {
|
|
log.debug({ agentId, agentWorkdir }, 'no signal completion found, proceeding with raw output');
|
|
}
|
|
|
|
// Read only complete lines from the file, avoiding race conditions
|
|
const lastPosition = this.filePositions.get(agentId) || 0;
|
|
const { content: fileContent, lastPosition: newPosition } = await this.readCompleteLines(outputFilePath, lastPosition);
|
|
|
|
if (fileContent.trim()) {
|
|
this.filePositions.set(agentId, newPosition);
|
|
await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir);
|
|
return;
|
|
}
|
|
|
|
// If no new complete lines, but file might still be writing, try again with validation
|
|
if (await this.validateSignalFile(outputFilePath)) {
|
|
const fullContent = await readFile(outputFilePath, 'utf-8');
|
|
if (fullContent.trim() && fullContent.length > newPosition) {
|
|
// File is complete and has content beyond what we've read
|
|
await this.processAgentOutput(agentId, fullContent, provider, getAgentWorkdir);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
} catch { /* file empty or missing */ }
|
|
|
|
log.debug({ agentId }, 'no result from stream or file, marking as error');
|
|
await this.handleAgentError(agentId, new Error('No output received'), provider, getAgentWorkdir);
|
|
return;
|
|
}
|
|
|
|
// Check for signal.json file first, then fall back to stream text
|
|
if (await this.readSignalCompletion(agentWorkdir)) {
|
|
const signalPath = join(agentWorkdir, '.cw/output/signal.json');
|
|
const signalContent = await readFile(signalPath, 'utf-8');
|
|
log.debug({ agentId, signalPath }, 'using signal.json content for completion');
|
|
await this.processSignalAndFiles(agentId, signalContent, agent.mode as AgentMode, getAgentWorkdir, active?.streamSessionId);
|
|
} else {
|
|
log.debug({ agentId }, 'using stream text for completion (no signal.json found)');
|
|
await this.processSignalAndFiles(
|
|
agentId,
|
|
signalText,
|
|
agent.mode as AgentMode,
|
|
getAgentWorkdir,
|
|
active?.streamSessionId,
|
|
);
|
|
}
|
|
} finally {
|
|
this.completionLocks.delete(agentId);
|
|
this.filePositions.delete(agentId);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Process agent signal JSON and read output files.
|
|
* Universal handler for all providers and modes.
|
|
*/
|
|
async processSignalAndFiles(
|
|
agentId: string,
|
|
signalText: string,
|
|
mode: AgentMode,
|
|
getAgentWorkdir: (alias: string) => string,
|
|
sessionId?: string,
|
|
): Promise<void> {
|
|
const agent = await this.repository.findById(agentId);
|
|
if (!agent) return;
|
|
|
|
let signal;
|
|
let parsed;
|
|
|
|
// Step 1: JSON parsing
|
|
try {
|
|
parsed = JSON.parse(signalText.trim());
|
|
log.debug({ agentId }, 'signal JSON parsing successful');
|
|
} catch (jsonError) {
|
|
log.error({
|
|
agentId,
|
|
signalText: signalText.trim(),
|
|
error: jsonError instanceof Error ? jsonError.message : String(jsonError),
|
|
stack: jsonError instanceof Error ? jsonError.stack : undefined
|
|
}, 'signal JSON parsing failed');
|
|
await this.repository.update(agentId, { status: 'crashed' });
|
|
this.emitCrashed(agent, 'Failed to parse agent signal JSON');
|
|
return;
|
|
}
|
|
|
|
// Step 2: Schema validation
|
|
try {
|
|
signal = agentSignalSchema.parse(parsed);
|
|
log.debug({ agentId, signalStatus: signal.status }, 'signal schema validation passed');
|
|
} catch (schemaError) {
|
|
log.error({
|
|
agentId,
|
|
signalText: signalText.trim(),
|
|
parsed,
|
|
error: schemaError instanceof Error ? schemaError.message : String(schemaError),
|
|
stack: schemaError instanceof Error ? schemaError.stack : undefined
|
|
}, 'signal schema validation failed');
|
|
await this.repository.update(agentId, { status: 'crashed' });
|
|
this.emitCrashed(agent, 'Failed to validate agent signal schema');
|
|
return;
|
|
}
|
|
|
|
switch (signal.status) {
|
|
case 'done':
|
|
await this.processOutputFiles(agentId, agent, mode, getAgentWorkdir);
|
|
break;
|
|
case 'questions':
|
|
// Chat mode: process output files before handling questions
|
|
if (mode === 'chat') {
|
|
await this.processOutputFiles(agentId, agent, mode, getAgentWorkdir);
|
|
}
|
|
await this.handleQuestions(agentId, agent, signal.questions, sessionId);
|
|
break;
|
|
case 'error':
|
|
await this.handleSignalError(agentId, agent, signal.error);
|
|
break;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Process output files from agent workdir after successful completion.
|
|
* Performs direct writes to entities and records change sets.
|
|
*/
|
|
private async processOutputFiles(
|
|
agentId: string,
|
|
agent: { id: string; name: string; taskId: string | null; worktreeId: string; mode: string; initiativeId?: string | null },
|
|
mode: AgentMode,
|
|
getAgentWorkdir: (alias: string) => string,
|
|
): Promise<void> {
|
|
const agentWorkdir = getAgentWorkdir(agent.worktreeId);
|
|
const summary = await readSummary(agentWorkdir);
|
|
const initiativeId = agent.initiativeId;
|
|
const canWriteChangeSets = this.changeSetRepository && initiativeId;
|
|
|
|
let resultMessage = summary?.body ?? 'Task completed';
|
|
switch (mode) {
|
|
case 'plan': {
|
|
const phases = await readPhaseFiles(agentWorkdir);
|
|
if (canWriteChangeSets && this.phaseRepository && phases.length > 0) {
|
|
const entries: CreateChangeSetEntryData[] = [];
|
|
|
|
// First pass: create phases
|
|
for (const [i, p] of phases.entries()) {
|
|
try {
|
|
const tiptapContent = p.body ? JSON.stringify(markdownToTiptapJson(p.body)) : undefined;
|
|
const created = await this.phaseRepository.create({
|
|
id: p.id ?? undefined,
|
|
initiativeId,
|
|
name: p.title,
|
|
content: tiptapContent,
|
|
});
|
|
entries.push({
|
|
entityType: 'phase',
|
|
entityId: created.id,
|
|
action: 'create',
|
|
newState: JSON.stringify(created),
|
|
sortOrder: i,
|
|
});
|
|
this.eventBus?.emit({
|
|
type: 'phase:started' as const,
|
|
timestamp: new Date(),
|
|
payload: { phaseId: created.id, initiativeId },
|
|
});
|
|
} catch (err) {
|
|
log.warn({ agentId, phase: p.title, err: err instanceof Error ? err.message : String(err) }, 'failed to create phase');
|
|
}
|
|
}
|
|
|
|
// Second pass: create phase dependencies
|
|
let depSortOrder = entries.length;
|
|
for (const p of phases) {
|
|
const phaseId = p.id;
|
|
if (!phaseId || !Array.isArray(p.dependencies)) continue;
|
|
for (const depFileId of p.dependencies) {
|
|
try {
|
|
await this.phaseRepository.createDependency(phaseId, depFileId);
|
|
entries.push({
|
|
entityType: 'phase_dependency',
|
|
entityId: `${phaseId}:${depFileId}`,
|
|
action: 'create',
|
|
newState: JSON.stringify({ phaseId, dependsOnPhaseId: depFileId }),
|
|
sortOrder: depSortOrder++,
|
|
});
|
|
} catch (err) {
|
|
log.warn({ agentId, phaseId, depFileId, err: err instanceof Error ? err.message : String(err) }, 'failed to create phase dependency');
|
|
}
|
|
}
|
|
}
|
|
|
|
if (entries.length > 0) {
|
|
try {
|
|
const cs = await this.changeSetRepository!.createWithEntries({
|
|
agentId,
|
|
agentName: agent.name,
|
|
initiativeId,
|
|
mode: 'plan',
|
|
summary: summary?.body ?? `Created ${phases.length} phases`,
|
|
}, entries);
|
|
this.eventBus?.emit({
|
|
type: 'changeset:created' as const,
|
|
timestamp: new Date(),
|
|
payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'plan', entryCount: entries.length },
|
|
});
|
|
} catch (err) {
|
|
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record change set after successful writes');
|
|
}
|
|
}
|
|
resultMessage = summary?.body ?? `${phases.length} phases created`;
|
|
} else {
|
|
resultMessage = JSON.stringify({ summary: summary?.body, phases });
|
|
}
|
|
break;
|
|
}
|
|
case 'detail': {
|
|
const tasks = await readTaskFiles(agentWorkdir);
|
|
if (canWriteChangeSets && this.taskRepository && tasks.length > 0) {
|
|
const phaseInput = await readFrontmatterFile(join(agentWorkdir, '.cw', 'input', 'phase.md'));
|
|
const phaseId = (phaseInput?.data?.id as string) ?? null;
|
|
const entries: CreateChangeSetEntryData[] = [];
|
|
const fileIdToDbId = new Map<string, string>();
|
|
|
|
// Load existing tasks for dedup — prevents duplicates when multiple agents finish concurrently
|
|
const existingTasks = phaseId ? await this.taskRepository.findByPhaseId(phaseId) : [];
|
|
const existingNames = new Set(existingTasks.map(t => t.name));
|
|
|
|
for (const [i, t] of tasks.entries()) {
|
|
if (existingNames.has(t.title)) {
|
|
log.info({ agentId, task: t.title, phaseId }, 'skipped duplicate task');
|
|
// Map deduped file ID to existing DB ID for dependency resolution
|
|
const existing = existingTasks.find(et => et.name === t.title);
|
|
if (existing) fileIdToDbId.set(t.id, existing.id);
|
|
continue;
|
|
}
|
|
try {
|
|
const created = await this.taskRepository.create({
|
|
initiativeId,
|
|
phaseId,
|
|
parentTaskId: agent.taskId ?? null,
|
|
name: t.title,
|
|
description: t.body ?? undefined,
|
|
category: (t.category as any) ?? 'execute',
|
|
type: (t.type as any) ?? 'auto',
|
|
});
|
|
fileIdToDbId.set(t.id, created.id);
|
|
existingNames.add(t.title); // prevent dupes within same agent output
|
|
entries.push({
|
|
entityType: 'task',
|
|
entityId: created.id,
|
|
action: 'create',
|
|
newState: JSON.stringify(created),
|
|
sortOrder: i,
|
|
});
|
|
this.eventBus?.emit({
|
|
type: 'task:completed' as const,
|
|
timestamp: new Date(),
|
|
payload: { taskId: created.id, agentId, success: true, message: 'Task created by detail' },
|
|
});
|
|
} catch (err) {
|
|
log.warn({ agentId, task: t.title, err: err instanceof Error ? err.message : String(err) }, 'failed to create task');
|
|
}
|
|
}
|
|
|
|
// Second pass: create task dependencies
|
|
let depSortOrder = entries.length;
|
|
for (const t of tasks) {
|
|
const taskDbId = fileIdToDbId.get(t.id);
|
|
if (!taskDbId || t.dependencies.length === 0) continue;
|
|
for (const depFileId of t.dependencies) {
|
|
const depDbId = fileIdToDbId.get(depFileId);
|
|
if (!depDbId) continue;
|
|
try {
|
|
await this.taskRepository.createDependency(taskDbId, depDbId);
|
|
entries.push({
|
|
entityType: 'task_dependency',
|
|
entityId: `${taskDbId}:${depDbId}`,
|
|
action: 'create',
|
|
newState: JSON.stringify({ taskId: taskDbId, dependsOnTaskId: depDbId }),
|
|
sortOrder: depSortOrder++,
|
|
});
|
|
} catch (err) {
|
|
log.warn({ agentId, taskDbId, depFileId, err: err instanceof Error ? err.message : String(err) }, 'failed to create task dependency');
|
|
}
|
|
}
|
|
}
|
|
|
|
if (entries.length > 0) {
|
|
try {
|
|
const cs = await this.changeSetRepository!.createWithEntries({
|
|
agentId,
|
|
agentName: agent.name,
|
|
initiativeId,
|
|
mode: 'detail',
|
|
summary: summary?.body ?? `Created ${tasks.length} tasks`,
|
|
}, entries);
|
|
this.eventBus?.emit({
|
|
type: 'changeset:created' as const,
|
|
timestamp: new Date(),
|
|
payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'detail', entryCount: entries.length },
|
|
});
|
|
} catch (err) {
|
|
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record change set after successful writes');
|
|
}
|
|
}
|
|
resultMessage = summary?.body ?? `${tasks.length} tasks created`;
|
|
} else {
|
|
resultMessage = JSON.stringify({ summary: summary?.body, tasks });
|
|
}
|
|
break;
|
|
}
|
|
case 'discuss': {
|
|
const decisions = await readDecisionFiles(agentWorkdir);
|
|
resultMessage = JSON.stringify({ summary: summary?.body, decisions });
|
|
break;
|
|
}
|
|
case 'refine': {
|
|
const pages = await readPageFiles(agentWorkdir);
|
|
if (canWriteChangeSets && this.pageRepository && pages.length > 0) {
|
|
const entries: CreateChangeSetEntryData[] = [];
|
|
|
|
for (const [i, p] of pages.entries()) {
|
|
try {
|
|
if (!p.pageId) continue;
|
|
const existing = await this.pageRepository.findById(p.pageId);
|
|
if (!existing) {
|
|
log.warn({ agentId, pageId: p.pageId }, 'page not found for refine update');
|
|
continue;
|
|
}
|
|
const previousState = JSON.stringify(existing);
|
|
const tiptapJson = markdownToTiptapJson(p.body || '');
|
|
await this.pageRepository.update(p.pageId, {
|
|
content: JSON.stringify(tiptapJson),
|
|
title: p.title,
|
|
});
|
|
const updated = await this.pageRepository.findById(p.pageId);
|
|
entries.push({
|
|
entityType: 'page',
|
|
entityId: p.pageId,
|
|
action: 'update',
|
|
previousState,
|
|
newState: JSON.stringify(updated),
|
|
sortOrder: i,
|
|
});
|
|
this.eventBus?.emit({
|
|
type: 'page:updated' as const,
|
|
timestamp: new Date(),
|
|
payload: { pageId: p.pageId, initiativeId, title: p.title },
|
|
});
|
|
} catch (err) {
|
|
log.warn({ agentId, pageId: p.pageId, err: err instanceof Error ? err.message : String(err) }, 'failed to update page');
|
|
}
|
|
}
|
|
|
|
if (entries.length > 0) {
|
|
try {
|
|
const cs = await this.changeSetRepository!.createWithEntries({
|
|
agentId,
|
|
agentName: agent.name,
|
|
initiativeId,
|
|
mode: 'refine',
|
|
summary: summary?.body ?? `Updated ${entries.length} pages`,
|
|
}, entries);
|
|
this.eventBus?.emit({
|
|
type: 'changeset:created' as const,
|
|
timestamp: new Date(),
|
|
payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'refine', entryCount: entries.length },
|
|
});
|
|
} catch (err) {
|
|
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record change set after successful writes');
|
|
}
|
|
}
|
|
resultMessage = summary?.body ?? `${entries.length} pages updated`;
|
|
} else {
|
|
resultMessage = JSON.stringify({ summary: summary?.body, pages });
|
|
}
|
|
break;
|
|
}
|
|
case 'chat': {
|
|
const chatPhases = await readPhaseFiles(agentWorkdir);
|
|
const chatTasks = await readTaskFiles(agentWorkdir);
|
|
const chatPages = await readPageFiles(agentWorkdir);
|
|
if (canWriteChangeSets) {
|
|
const entries: CreateChangeSetEntryData[] = [];
|
|
let sortOrd = 0;
|
|
|
|
// Process phases
|
|
if (this.phaseRepository) {
|
|
for (const p of chatPhases) {
|
|
try {
|
|
const action = p.action ?? 'create';
|
|
if (action === 'create') {
|
|
const tiptapContent = p.body ? JSON.stringify(markdownToTiptapJson(p.body)) : undefined;
|
|
const created = await this.phaseRepository.create({
|
|
id: p.id ?? undefined,
|
|
initiativeId: initiativeId!,
|
|
name: p.title,
|
|
content: tiptapContent,
|
|
});
|
|
entries.push({ entityType: 'phase', entityId: created.id, action: 'create', newState: JSON.stringify(created), sortOrder: sortOrd++ });
|
|
} else if (action === 'update') {
|
|
const existing = await this.phaseRepository.findById(p.id);
|
|
if (!existing) continue;
|
|
const previousState = JSON.stringify(existing);
|
|
const tiptapContent = p.body ? JSON.stringify(markdownToTiptapJson(p.body)) : undefined;
|
|
await this.phaseRepository.update(p.id, { name: p.title, content: tiptapContent });
|
|
const updated = await this.phaseRepository.findById(p.id);
|
|
entries.push({ entityType: 'phase', entityId: p.id, action: 'update', previousState, newState: JSON.stringify(updated), sortOrder: sortOrd++ });
|
|
} else if (action === 'delete') {
|
|
const existing = await this.phaseRepository.findById(p.id);
|
|
if (!existing) continue;
|
|
const previousState = JSON.stringify(existing);
|
|
await this.phaseRepository.delete(p.id);
|
|
entries.push({ entityType: 'phase', entityId: p.id, action: 'delete', previousState, sortOrder: sortOrd++ });
|
|
}
|
|
} catch (err) {
|
|
log.warn({ agentId, phase: p.title, action: p.action, err: err instanceof Error ? err.message : String(err) }, 'failed to process chat phase');
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process tasks — two pass (create first, then deps)
|
|
if (this.taskRepository) {
|
|
const fileIdToDbId = new Map<string, string>();
|
|
for (const t of chatTasks) {
|
|
try {
|
|
const action = t.action ?? 'create';
|
|
if (action === 'create') {
|
|
const created = await this.taskRepository.create({
|
|
initiativeId: initiativeId!,
|
|
phaseId: t.phaseId ?? null,
|
|
parentTaskId: t.parentTaskId ?? null,
|
|
name: t.title,
|
|
description: t.body ?? undefined,
|
|
category: (t.category as any) ?? 'execute',
|
|
type: (t.type as any) ?? 'auto',
|
|
});
|
|
fileIdToDbId.set(t.id, created.id);
|
|
entries.push({ entityType: 'task', entityId: created.id, action: 'create', newState: JSON.stringify(created), sortOrder: sortOrd++ });
|
|
} else if (action === 'update') {
|
|
const existing = await this.taskRepository.findById(t.id);
|
|
if (!existing) continue;
|
|
const previousState = JSON.stringify(existing);
|
|
await this.taskRepository.update(t.id, {
|
|
name: t.title,
|
|
description: t.body ?? undefined,
|
|
category: (t.category as any) ?? existing.category,
|
|
type: (t.type as any) ?? existing.type,
|
|
});
|
|
const updated = await this.taskRepository.findById(t.id);
|
|
fileIdToDbId.set(t.id, t.id);
|
|
entries.push({ entityType: 'task', entityId: t.id, action: 'update', previousState, newState: JSON.stringify(updated), sortOrder: sortOrd++ });
|
|
} else if (action === 'delete') {
|
|
const existing = await this.taskRepository.findById(t.id);
|
|
if (!existing) continue;
|
|
const previousState = JSON.stringify(existing);
|
|
await this.taskRepository.delete(t.id);
|
|
entries.push({ entityType: 'task', entityId: t.id, action: 'delete', previousState, sortOrder: sortOrd++ });
|
|
}
|
|
} catch (err) {
|
|
log.warn({ agentId, task: t.title, action: t.action, err: err instanceof Error ? err.message : String(err) }, 'failed to process chat task');
|
|
}
|
|
}
|
|
// Second pass: deps for created tasks
|
|
for (const t of chatTasks) {
|
|
if (t.action !== 'create' || t.dependencies.length === 0) continue;
|
|
const taskDbId = fileIdToDbId.get(t.id);
|
|
if (!taskDbId) continue;
|
|
for (const depFileId of t.dependencies) {
|
|
const depDbId = fileIdToDbId.get(depFileId) ?? depFileId;
|
|
try {
|
|
await this.taskRepository.createDependency(taskDbId, depDbId);
|
|
entries.push({ entityType: 'task_dependency', entityId: `${taskDbId}:${depDbId}`, action: 'create', newState: JSON.stringify({ taskId: taskDbId, dependsOnTaskId: depDbId }), sortOrder: sortOrd++ });
|
|
} catch (err) {
|
|
log.warn({ agentId, taskDbId, depFileId, err: err instanceof Error ? err.message : String(err) }, 'failed to create chat task dependency');
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process pages
|
|
if (this.pageRepository) {
|
|
for (const p of chatPages) {
|
|
try {
|
|
const action = p.action ?? 'create';
|
|
if (action === 'create') {
|
|
const tiptapJson = markdownToTiptapJson(p.body || '');
|
|
const created = await this.pageRepository.create({
|
|
id: p.pageId ?? undefined,
|
|
initiativeId: initiativeId!,
|
|
title: p.title,
|
|
content: JSON.stringify(tiptapJson),
|
|
});
|
|
entries.push({ entityType: 'page', entityId: created.id, action: 'create', newState: JSON.stringify(created), sortOrder: sortOrd++ });
|
|
} else if (action === 'update') {
|
|
const existing = await this.pageRepository.findById(p.pageId);
|
|
if (!existing) continue;
|
|
const previousState = JSON.stringify(existing);
|
|
const tiptapJson = markdownToTiptapJson(p.body || '');
|
|
await this.pageRepository.update(p.pageId, { content: JSON.stringify(tiptapJson), title: p.title });
|
|
const updated = await this.pageRepository.findById(p.pageId);
|
|
entries.push({ entityType: 'page', entityId: p.pageId, action: 'update', previousState, newState: JSON.stringify(updated), sortOrder: sortOrd++ });
|
|
} else if (action === 'delete') {
|
|
const existing = await this.pageRepository.findById(p.pageId);
|
|
if (!existing) continue;
|
|
const previousState = JSON.stringify(existing);
|
|
await this.pageRepository.delete(p.pageId);
|
|
entries.push({ entityType: 'page', entityId: p.pageId, action: 'delete', previousState, sortOrder: sortOrd++ });
|
|
}
|
|
} catch (err) {
|
|
log.warn({ agentId, pageId: p.pageId, action: p.action, err: err instanceof Error ? err.message : String(err) }, 'failed to process chat page');
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create change set
|
|
let changeSetId: string | null = null;
|
|
if (entries.length > 0) {
|
|
try {
|
|
const cs = await this.changeSetRepository!.createWithEntries({
|
|
agentId,
|
|
agentName: agent.name,
|
|
initiativeId: initiativeId!,
|
|
mode: 'chat' as 'plan' | 'detail' | 'refine',
|
|
summary: summary?.body ?? `Chat: ${entries.length} changes applied`,
|
|
}, entries);
|
|
changeSetId = cs.id;
|
|
this.eventBus?.emit({
|
|
type: 'changeset:created' as const,
|
|
timestamp: new Date(),
|
|
payload: { changeSetId: cs.id, initiativeId, agentId, mode: 'chat', entryCount: entries.length },
|
|
});
|
|
} catch (err) {
|
|
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to record chat change set');
|
|
}
|
|
}
|
|
|
|
// Store assistant message in chat session
|
|
if (this.chatSessionRepository) {
|
|
try {
|
|
const session = await this.chatSessionRepository.findActiveSessionByAgentId(agentId);
|
|
if (session) {
|
|
const assistantContent = summary?.body ?? `Applied ${entries.length} changes`;
|
|
await this.chatSessionRepository.createMessage({
|
|
chatSessionId: session.id,
|
|
role: 'assistant',
|
|
content: assistantContent,
|
|
changeSetId,
|
|
});
|
|
this.eventBus?.emit({
|
|
type: 'chat:message_created' as const,
|
|
timestamp: new Date(),
|
|
payload: { chatSessionId: session.id, role: 'assistant' },
|
|
});
|
|
}
|
|
} catch (err) {
|
|
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to store chat assistant message');
|
|
}
|
|
}
|
|
|
|
resultMessage = summary?.body ?? `${entries.length} changes applied`;
|
|
} else {
|
|
resultMessage = JSON.stringify({ summary: summary?.body, phases: chatPhases, tasks: chatTasks, pages: chatPages });
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Process comment responses from agent (for review/execute tasks)
|
|
if (this.reviewCommentRepository) {
|
|
try {
|
|
const commentResponses = await readCommentResponses(agentWorkdir);
|
|
for (const resp of commentResponses) {
|
|
try {
|
|
await this.reviewCommentRepository.createReply(resp.commentId, resp.body, 'agent');
|
|
if (resp.resolved) {
|
|
await this.reviewCommentRepository.resolve(resp.commentId);
|
|
}
|
|
} catch (err) {
|
|
log.warn({ agentId, commentId: resp.commentId, err: err instanceof Error ? err.message : String(err) }, 'failed to process comment response');
|
|
}
|
|
}
|
|
if (commentResponses.length > 0) {
|
|
log.info({ agentId, count: commentResponses.length }, 'processed agent comment responses');
|
|
}
|
|
} catch (err) {
|
|
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to read comment responses');
|
|
}
|
|
}
|
|
|
|
const resultPayload: AgentResult = {
|
|
success: true,
|
|
message: resultMessage,
|
|
filesModified: summary?.filesModified,
|
|
};
|
|
await this.repository.update(agentId, { result: JSON.stringify(resultPayload), status: 'idle' });
|
|
|
|
const reason = this.getStoppedReason(mode);
|
|
if (this.eventBus) {
|
|
const event: AgentStoppedEvent = {
|
|
type: 'agent:stopped',
|
|
timestamp: new Date(),
|
|
payload: {
|
|
agentId,
|
|
name: agent.name,
|
|
taskId: agent.taskId ?? '',
|
|
reason,
|
|
},
|
|
};
|
|
this.eventBus.emit(event);
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
/**
|
|
* Handle questions signal from agent.
|
|
*/
|
|
async handleQuestions(
|
|
agentId: string,
|
|
agent: { id: string; name: string; taskId: string | null; sessionId: string | null },
|
|
questions: QuestionItem[],
|
|
sessionId?: string,
|
|
): Promise<void> {
|
|
const questionsPayload: PendingQuestions = { questions };
|
|
|
|
await this.repository.update(agentId, { pendingQuestions: JSON.stringify(questionsPayload), status: 'waiting_for_input' });
|
|
|
|
if (this.eventBus) {
|
|
const event: AgentWaitingEvent = {
|
|
type: 'agent:waiting',
|
|
timestamp: new Date(),
|
|
payload: {
|
|
agentId,
|
|
name: agent.name,
|
|
taskId: agent.taskId ?? '',
|
|
sessionId: sessionId ?? agent.sessionId ?? '',
|
|
questions,
|
|
},
|
|
};
|
|
this.eventBus.emit(event);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle error signal from agent.
|
|
*/
|
|
async handleSignalError(
|
|
agentId: string,
|
|
agent: { id: string; name: string; taskId: string | null },
|
|
error: string,
|
|
): Promise<void> {
|
|
const errorResult: AgentResult = { success: false, message: error };
|
|
|
|
await this.repository.update(agentId, {
|
|
result: JSON.stringify(errorResult),
|
|
status: 'crashed'
|
|
});
|
|
|
|
this.emitCrashed(agent, error);
|
|
}
|
|
|
|
/**
|
|
* Map agent mode to stopped event reason.
|
|
*/
|
|
getStoppedReason(mode: AgentMode): AgentStoppedEvent['payload']['reason'] {
|
|
switch (mode) {
|
|
case 'discuss': return 'context_complete';
|
|
case 'plan': return 'plan_complete';
|
|
case 'detail': return 'detail_complete';
|
|
case 'refine': return 'refine_complete';
|
|
case 'chat': return 'chat_complete';
|
|
default: return 'task_complete';
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Process raw output from an agent (from file or direct).
|
|
*/
|
|
async processAgentOutput(
|
|
agentId: string,
|
|
rawOutput: string,
|
|
provider: AgentProviderConfig,
|
|
getAgentWorkdir: (alias: string) => string,
|
|
): Promise<void> {
|
|
const agent = await this.repository.findById(agentId);
|
|
if (!agent) return;
|
|
|
|
// Extract session ID using provider's extraction config
|
|
let sessionId: string | null = null;
|
|
if (provider.sessionId) {
|
|
const outputLines = rawOutput.trim().split('\n');
|
|
if (provider.sessionId.extractFrom === 'result') {
|
|
// Find the result line in JSONL output
|
|
for (const line of outputLines) {
|
|
try {
|
|
const parsed = JSON.parse(line);
|
|
if (parsed.type === 'result' || parsed[provider.sessionId.field]) {
|
|
sessionId = parsed[provider.sessionId.field] ?? null;
|
|
if (sessionId) break;
|
|
}
|
|
} catch { /* intentional: skip non-JSON JSONL lines */ }
|
|
}
|
|
} else if (provider.sessionId.extractFrom === 'event') {
|
|
for (const line of outputLines) {
|
|
try {
|
|
const event = JSON.parse(line);
|
|
if (event.type === provider.sessionId.eventType) {
|
|
sessionId = event[provider.sessionId.field] ?? null;
|
|
}
|
|
} catch { /* intentional: skip non-JSON JSONL lines */ }
|
|
}
|
|
}
|
|
}
|
|
|
|
if (sessionId) {
|
|
await this.repository.update(agentId, { sessionId });
|
|
}
|
|
log.debug({ agentId, provider: provider.name, hasSessionId: !!sessionId }, 'processing agent output');
|
|
|
|
if (provider.name === 'claude') {
|
|
// rawOutput may be a single JSON object or multi-line JSONL — find the result line
|
|
let cliResult: ClaudeCliResult | null = null;
|
|
const lines = rawOutput.trim().split('\n');
|
|
for (const line of lines) {
|
|
try {
|
|
const parsed = JSON.parse(line);
|
|
if (parsed.type === 'result') {
|
|
cliResult = parsed;
|
|
}
|
|
} catch { /* intentional: skip non-JSON JSONL lines */ }
|
|
}
|
|
|
|
if (!cliResult) {
|
|
log.error({ agentId }, 'no result event found in agent output');
|
|
await this.handleAgentError(agentId, new Error('No result event in output'), provider, getAgentWorkdir);
|
|
return;
|
|
}
|
|
|
|
// Handle error results (auth failure, usage limits, etc.)
|
|
if (cliResult.is_error) {
|
|
log.warn({ agentId, error: cliResult.result }, 'agent returned error result from file');
|
|
await this.handleAgentError(agentId, new Error(cliResult.result), provider, getAgentWorkdir);
|
|
return;
|
|
}
|
|
|
|
let signalText: string;
|
|
try {
|
|
const signal = cliResult.structured_output ?? JSON.parse(cliResult.result);
|
|
signalText = JSON.stringify(signal);
|
|
} catch (parseErr) {
|
|
log.error({ agentId, err: parseErr instanceof Error ? parseErr.message : String(parseErr) }, 'failed to parse agent signal from result');
|
|
await this.handleAgentError(agentId, new Error('Failed to parse agent signal'), provider, getAgentWorkdir);
|
|
return;
|
|
}
|
|
|
|
await this.processSignalAndFiles(agentId, signalText, agent.mode as AgentMode, getAgentWorkdir, sessionId ?? undefined);
|
|
} else {
|
|
await this.processSignalAndFiles(agentId, rawOutput, agent.mode as AgentMode, getAgentWorkdir, sessionId ?? undefined);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle agent errors. Detects usage limit exhaustion patterns.
|
|
* Returns true if error was an exhaustion error (caller should attempt failover).
|
|
*/
|
|
async handleAgentError(
|
|
agentId: string,
|
|
error: unknown,
|
|
provider: AgentProviderConfig,
|
|
_getAgentWorkdir: (alias: string) => string,
|
|
): Promise<void> {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
const agent = await this.repository.findById(agentId);
|
|
if (!agent) return;
|
|
|
|
log.error({ agentId, err: errorMessage }, 'agent error');
|
|
|
|
const errorResult: AgentResult = {
|
|
success: false,
|
|
message: errorMessage,
|
|
};
|
|
|
|
await this.repository.update(agentId, {
|
|
status: 'crashed',
|
|
result: JSON.stringify(errorResult)
|
|
});
|
|
|
|
if (this.eventBus) {
|
|
const event: AgentCrashedEvent = {
|
|
type: 'agent:crashed',
|
|
timestamp: new Date(),
|
|
payload: {
|
|
agentId,
|
|
name: agent.name,
|
|
taskId: agent.taskId ?? '',
|
|
error: errorMessage,
|
|
},
|
|
};
|
|
this.eventBus.emit(event);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Format answers map as structured prompt.
|
|
* Handles special __instruction__ key for retry scenarios.
|
|
*/
|
|
formatAnswersAsPrompt(answers: Record<string, string>): string {
|
|
const instruction = answers['__instruction__'];
|
|
const realAnswers = { ...answers };
|
|
delete realAnswers['__instruction__'];
|
|
|
|
const lines = Object.entries(realAnswers).map(
|
|
([questionId, answer]) => `[${questionId}]: ${answer}`,
|
|
);
|
|
const basePrompt = `Here are my answers to your questions:\n${lines.join('\n')}`;
|
|
|
|
return instruction ? `${instruction.trim()}\n\n${basePrompt}` : basePrompt;
|
|
}
|
|
|
|
/**
|
|
* Get the result of an agent's work.
|
|
*/
|
|
async getResult(agentId: string, active?: ActiveAgent): Promise<AgentResult | null> {
|
|
if (active?.result) return active.result;
|
|
const agent = await this.repository.findById(agentId);
|
|
return agent?.result ? JSON.parse(agent.result) : null;
|
|
}
|
|
|
|
/**
|
|
* Get pending questions for an agent waiting for input.
|
|
*/
|
|
async getPendingQuestions(agentId: string, active?: ActiveAgent): Promise<PendingQuestions | null> {
|
|
if (active?.pendingQuestions) return active.pendingQuestions;
|
|
const agent = await this.repository.findById(agentId);
|
|
return agent?.pendingQuestions ? JSON.parse(agent.pendingQuestions) : null;
|
|
}
|
|
|
|
// =========================================================================
|
|
// Private Helpers
|
|
// =========================================================================
|
|
|
|
/**
|
|
* Read signal.json and return its content if the agent completed successfully.
|
|
* Uses SignalManager for atomic read-and-validate when available.
|
|
* Returns the raw JSON string on success, null if missing/invalid.
|
|
*/
|
|
private async readSignalCompletion(agentWorkdir: string): Promise<string | null> {
|
|
// Prefer SignalManager (unified implementation with proper validation)
|
|
if (this.signalManager) {
|
|
const signal = await this.signalManager.readSignal(agentWorkdir);
|
|
return signal ? JSON.stringify(signal) : null;
|
|
}
|
|
|
|
// Fallback: inline read (for tests that don't inject SignalManager)
|
|
try {
|
|
const signalPath = join(agentWorkdir, '.cw/output/signal.json');
|
|
if (!existsSync(signalPath)) return null;
|
|
|
|
const signalContent = await readFile(signalPath, 'utf-8');
|
|
const signal = JSON.parse(signalContent);
|
|
|
|
if (signal.status === 'done' || signal.status === 'questions' || signal.status === 'error') {
|
|
return signalContent;
|
|
}
|
|
return null;
|
|
} catch (err) {
|
|
log.debug({ agentWorkdir, err: err instanceof Error ? err.message : String(err) }, 'failed to read or parse signal.json');
|
|
return null;
|
|
}
|
|
}
|
|
|
|
private emitCrashed(agent: { id: string; name: string; taskId: string | null }, error: string): void {
|
|
if (this.eventBus) {
|
|
const event: AgentCrashedEvent = {
|
|
type: 'agent:crashed',
|
|
timestamp: new Date(),
|
|
payload: {
|
|
agentId: agent.id,
|
|
name: agent.name,
|
|
taskId: agent.taskId ?? '',
|
|
error,
|
|
},
|
|
};
|
|
this.eventBus.emit(event);
|
|
}
|
|
}
|
|
}
|