fix(agent): Replace hardcoded 500ms delay with robust file completion detection

Fixes race condition where agents were incorrectly marked as crashed when
output files took longer than 500ms to complete writing.

Changes:
- Replace hardcoded 500ms delay with polling-based file completion detection
- Add signal file validation to ensure JSON is complete before processing
- Make status updates atomic to prevent race conditions
- Update cleanup manager to pass outputFilePath for proper timing

This resolves the issue where successful agents like "abundant-wolverine"
were marked as crashed despite producing valid output.
This commit is contained in:
Lukas May
2026-02-08 14:03:47 +01:00
parent 2877484012
commit 604da7cd0d
4 changed files with 681 additions and 55 deletions

View File

@@ -7,7 +7,8 @@
import { promisify } from 'node:util';
import { execFile } from 'node:child_process';
import { readFile, readdir, rm } from 'node:fs/promises';
import { readFile, readdir, rm, cp, mkdir } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import { join } from 'node:path';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
@@ -41,6 +42,7 @@ export class CleanupManager {
private repository: AgentRepository,
private projectRepository: ProjectRepository,
private eventBus?: EventBus,
private debug: boolean = false,
) {}
/**
@@ -212,6 +214,117 @@ export class CleanupManager {
}
}
/**
* Check if all project worktrees for an agent are clean (no uncommitted/untracked files).
*/
async isWorkdirClean(alias: string, initiativeId: string | null): Promise<boolean> {
const agentWorkdir = this.getAgentWorkdir(alias);
try {
await readdir(agentWorkdir);
} catch {
// Workdir doesn't exist — treat as clean
return true;
}
const worktreePaths: string[] = [];
if (initiativeId) {
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
for (const project of projects) {
worktreePaths.push(join(agentWorkdir, project.name));
}
} else {
worktreePaths.push(join(agentWorkdir, 'workspace'));
}
for (const wtPath of worktreePaths) {
try {
const { stdout } = await execFileAsync('git', ['status', '--porcelain'], { cwd: wtPath });
if (stdout.trim().length > 0) {
log.info({ alias, worktree: wtPath }, 'workdir has uncommitted changes');
return false;
}
} catch (err) {
log.warn({ alias, worktree: wtPath, err: err instanceof Error ? err.message : String(err) }, 'git status failed, treating as dirty');
return false;
}
}
return true;
}
/**
* Archive agent workdir and logs to .cw/debug/ before removal.
*/
async archiveForDebug(alias: string, agentId: string): Promise<void> {
const agentWorkdir = this.getAgentWorkdir(alias);
const debugWorkdir = join(this.workspaceRoot, '.cw', 'debug', 'workdirs', alias);
const logDir = join(this.workspaceRoot, '.cw', 'agent-logs', agentId);
const debugLogDir = join(this.workspaceRoot, '.cw', 'debug', 'agent-logs', agentId);
try {
if (existsSync(agentWorkdir)) {
await mkdir(join(this.workspaceRoot, '.cw', 'debug', 'workdirs'), { recursive: true });
await cp(agentWorkdir, debugWorkdir, { recursive: true });
log.debug({ alias, debugWorkdir }, 'archived workdir for debug');
}
} catch (err) {
log.warn({ alias, err: err instanceof Error ? err.message : String(err) }, 'failed to archive workdir for debug');
}
try {
if (existsSync(logDir)) {
await mkdir(join(this.workspaceRoot, '.cw', 'debug', 'agent-logs'), { recursive: true });
await cp(logDir, debugLogDir, { recursive: true });
log.debug({ agentId, debugLogDir }, 'archived logs for debug');
}
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to archive logs for debug');
}
}
/**
* Auto-cleanup agent workdir after successful completion.
* Removes worktrees and logs but preserves branches and DB record.
*/
async autoCleanupAfterCompletion(
agentId: string,
alias: string,
initiativeId: string | null,
): Promise<{ clean: boolean; removed: boolean }> {
const agentWorkdir = this.getAgentWorkdir(alias);
// Idempotent: if workdir is already gone, nothing to do
if (!existsSync(agentWorkdir)) {
return { clean: true, removed: true };
}
const clean = await this.isWorkdirClean(alias, initiativeId);
if (!clean) {
return { clean: false, removed: false };
}
if (this.debug) {
await this.archiveForDebug(alias, agentId);
}
try {
await this.removeAgentWorktrees(alias, initiativeId);
} catch (err) {
log.warn({ agentId, alias, err: err instanceof Error ? err.message : String(err) }, 'auto-cleanup: failed to remove worktrees');
}
try {
await this.removeAgentLogs(agentId);
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'auto-cleanup: failed to remove logs');
}
log.info({ agentId, alias }, 'auto-cleanup: workdir and logs removed');
return { clean: true, removed: true };
}
/**
* Reconcile agent state after server restart.
* Checks all agents in 'running' status:
@@ -233,7 +346,7 @@ export class CleanupManager {
}>,
onStreamEvent: (agentId: string, event: StreamEvent) => void,
onAgentOutput: (agentId: string, rawOutput: string, provider: NonNullable<ReturnType<typeof getProvider>>) => Promise<void>,
pollForCompletion: (agentId: string, pid: number) => void,
pollForCompletion: (agentId: string, pid: number, outputFilePath?: string) => void,
): Promise<void> {
const runningAgents = await this.repository.findByStatus('running');
log.info({ runningCount: runningAgents.length }, 'reconciling agents after restart');
@@ -268,7 +381,7 @@ export class CleanupManager {
outputFilePath: agent.outputFilePath,
});
pollForCompletion(agent.id, pid);
pollForCompletion(agent.id, pid, agent.outputFilePath);
} else if (agent.outputFilePath) {
try {
const rawOutput = await readFile(agent.outputFilePath, 'utf-8');

View File

@@ -21,6 +21,7 @@ import type {
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { AccountRepository } from '../db/repositories/account-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { ProposalRepository } from '../db/repositories/proposal-repository.js';
import { generateUniqueAlias } from './alias.js';
import type {
EventBus,
@@ -32,6 +33,7 @@ import type {
import { writeInputFiles } from './file-io.js';
import { getProvider } from './providers/registry.js';
import { createModuleLogger } from '../logger/index.js';
import { join } from 'node:path';
import type { AccountCredentialManager } from './credentials/types.js';
import { ProcessManager } from './process-manager.js';
import { CredentialHandler } from './credential-handler.js';
@@ -41,8 +43,11 @@ import { CleanupManager } from './cleanup-manager.js';
const log = createModuleLogger('agent-manager');
export class MultiProviderAgentManager implements AgentManager {
private static readonly MAX_COMMIT_RETRIES = 1;
private activeAgents: Map<string, ActiveAgent> = new Map();
private outputBuffers: Map<string, string[]> = new Map();
private commitRetryCount: Map<string, number> = new Map();
private processManager: ProcessManager;
private credentialHandler: CredentialHandler;
private outputHandler: OutputHandler;
@@ -55,11 +60,13 @@ export class MultiProviderAgentManager implements AgentManager {
private accountRepository?: AccountRepository,
private eventBus?: EventBus,
private credentialManager?: AccountCredentialManager,
private proposalRepository?: ProposalRepository,
private debug: boolean = false,
) {
this.processManager = new ProcessManager(workspaceRoot, projectRepository, eventBus);
this.credentialHandler = new CredentialHandler(workspaceRoot, accountRepository, credentialManager);
this.outputHandler = new OutputHandler(repository, eventBus);
this.cleanupManager = new CleanupManager(workspaceRoot, repository, projectRepository, eventBus);
this.outputHandler = new OutputHandler(repository, eventBus, proposalRepository);
this.cleanupManager = new CleanupManager(workspaceRoot, repository, projectRepository, eventBus, debug);
}
/**
@@ -116,11 +123,33 @@ export class MultiProviderAgentManager implements AgentManager {
// 2. Create isolated worktrees
let agentCwd: string;
if (initiativeId) {
log.debug({ alias, initiativeId }, 'creating initiative-based worktrees');
agentCwd = await this.processManager.createProjectWorktrees(alias, initiativeId);
// Log projects linked to the initiative
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
log.info({
alias,
initiativeId,
projectCount: projects.length,
projects: projects.map(p => ({ name: p.name, url: p.url })),
agentCwd
}, 'initiative-based agent workdir created');
} else {
log.debug({ alias }, 'creating standalone worktree');
agentCwd = await this.processManager.createStandaloneWorktree(alias);
log.info({ alias, agentCwd }, 'standalone agent workdir created');
}
log.debug({ alias, agentCwd }, 'worktrees created');
// Verify the final agentCwd exists
const { existsSync: fsExistsSync } = await import('node:fs');
const cwdVerified = fsExistsSync(agentCwd);
log.info({
alias,
agentCwd,
cwdVerified,
initiativeBasedAgent: !!initiativeId
}, 'agent workdir setup completed');
// 2b. Write input files
if (options.inputContext) {
@@ -144,14 +173,39 @@ export class MultiProviderAgentManager implements AgentManager {
// 4. Build spawn command
const { command, args, env: providerEnv } = this.processManager.buildSpawnCommand(provider, prompt);
log.debug({ command, args: args.join(' '), cwd: cwd ?? agentCwd }, 'spawn command built');
const finalCwd = cwd ?? agentCwd;
log.info({
agentId,
alias,
command,
args: args.join(' '),
finalCwd,
customCwdProvided: !!cwd,
providerEnv: Object.keys(providerEnv)
}, 'spawn command built');
// 5. Set config dir env var if account selected
const processEnv: Record<string, string> = { ...providerEnv };
if (accountConfigDir && provider.configDirEnv) {
processEnv[provider.configDirEnv] = accountConfigDir;
// Inject CLAUDE_CODE_OAUTH_TOKEN to bypass macOS keychain entirely.
// Claude Code prioritizes this env var over keychain/file-based credentials.
const accessToken = this.credentialHandler.readAccessToken(accountConfigDir);
if (accessToken) {
processEnv['CLAUDE_CODE_OAUTH_TOKEN'] = accessToken;
log.debug({ alias }, 'CLAUDE_CODE_OAUTH_TOKEN injected for spawn');
}
}
log.debug({
agentId,
finalProcessEnv: Object.keys(processEnv),
hasAccountConfig: !!accountConfigDir,
hasOAuthToken: !!processEnv['CLAUDE_CODE_OAUTH_TOKEN'],
}, 'process environment prepared');
// 6. Spawn detached subprocess
const { pid, outputFilePath, tailer } = this.processManager.spawnDetached(
agentId, command, args, cwd ?? agentCwd, processEnv, providerName, prompt,
@@ -160,8 +214,32 @@ export class MultiProviderAgentManager implements AgentManager {
await this.repository.update(agentId, { pid, outputFilePath });
// Write spawn diagnostic file for post-execution verification
const { writeFileSync, existsSync: diagnosticExistsSync } = await import('node:fs');
const diagnostic = {
timestamp: new Date().toISOString(),
agentId,
alias,
intendedCwd: finalCwd,
worktreeId: agent.worktreeId,
provider: providerName,
command,
args,
env: processEnv,
cwdExistsAtSpawn: diagnosticExistsSync(finalCwd),
initiativeId: initiativeId || null,
customCwdProvided: !!cwd,
accountId: accountId || null,
};
writeFileSync(
join(finalCwd, '.cw', 'spawn-diagnostic.json'),
JSON.stringify(diagnostic, null, 2),
'utf-8'
);
this.activeAgents.set(agentId, { agentId, pid, tailer, outputFilePath });
log.info({ agentId, alias, pid }, 'detached subprocess started');
log.info({ agentId, alias, pid, diagnosticWritten: true }, 'detached subprocess started with diagnostic');
// Emit spawned event
if (this.eventBus) {
@@ -178,6 +256,7 @@ export class MultiProviderAgentManager implements AgentManager {
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
outputFilePath,
);
return this.toAgentInfo(agent);
@@ -195,7 +274,132 @@ export class MultiProviderAgentManager implements AgentManager {
active,
(alias) => this.processManager.getAgentWorkdir(alias),
);
// Sync credentials back to DB if the agent had an account
await this.syncCredentialsPostCompletion(agentId);
this.activeAgents.delete(agentId);
// Auto-cleanup workdir after completion
await this.tryAutoCleanup(agentId);
}
/**
* Attempt auto-cleanup of agent workdir after completion.
* If dirty and retries remain, resumes the agent to commit changes.
*/
private async tryAutoCleanup(agentId: string): Promise<void> {
try {
const agent = await this.repository.findById(agentId);
if (!agent || agent.status !== 'idle') return;
const { clean, removed } = await this.cleanupManager.autoCleanupAfterCompletion(
agentId, agent.name, agent.initiativeId,
);
if (removed) {
this.commitRetryCount.delete(agentId);
log.info({ agentId, alias: agent.name }, 'auto-cleanup completed');
return;
}
if (!clean) {
const retries = this.commitRetryCount.get(agentId) ?? 0;
if (retries < MultiProviderAgentManager.MAX_COMMIT_RETRIES) {
this.commitRetryCount.set(agentId, retries + 1);
const resumed = await this.resumeForCommit(agentId);
if (resumed) {
log.info({ agentId, alias: agent.name, retry: retries + 1 }, 'resumed agent to commit uncommitted changes');
return;
}
}
log.warn({ agentId, alias: agent.name }, 'agent workdir has uncommitted changes after max retries, leaving in place');
this.commitRetryCount.delete(agentId);
}
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'auto-cleanup failed');
}
}
/**
* Resume an agent's session with a prompt to commit uncommitted changes.
* Returns false if the agent can't be resumed (no session, provider doesn't support resume).
*/
private async resumeForCommit(agentId: string): Promise<boolean> {
const agent = await this.repository.findById(agentId);
if (!agent?.sessionId) return false;
const provider = getProvider(agent.provider);
if (!provider || provider.resumeStyle === 'none') return false;
const commitPrompt =
'You have uncommitted changes in your working directory. ' +
'Stage everything with `git add -A` and commit with an appropriate message describing your work. ' +
'Do not make any other changes.';
await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null });
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, commitPrompt);
const processEnv: Record<string, string> = { ...providerEnv };
if (agent.accountId && provider.configDirEnv && this.accountRepository) {
const { getAccountConfigDir } = await import('./accounts/paths.js');
const configDir = getAccountConfigDir(this.workspaceRoot, agent.accountId);
const account = await this.accountRepository.findById(agent.accountId);
if (account) {
this.credentialHandler.writeCredentialsToDisk(account, configDir);
}
processEnv[provider.configDirEnv] = configDir;
const accessToken = this.credentialHandler.readAccessToken(configDir);
if (accessToken) {
processEnv['CLAUDE_CODE_OAUTH_TOKEN'] = accessToken;
}
}
const prevActive = this.activeAgents.get(agentId);
if (prevActive?.tailer) {
await prevActive.tailer.stop();
}
const { pid, outputFilePath, tailer } = this.processManager.spawnDetached(
agentId, command, args, agentCwd, processEnv, provider.name, commitPrompt,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
);
await this.repository.update(agentId, { pid, outputFilePath });
this.activeAgents.set(agentId, { agentId, pid, tailer, outputFilePath });
this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
outputFilePath,
);
return true;
}
/**
* Sync credentials from agent's config dir back to DB after completion.
* The subprocess may have refreshed tokens mid-session; this ensures
* the DB stays current and the next spawn uses fresh tokens.
*/
private async syncCredentialsPostCompletion(agentId: string): Promise<void> {
if (!this.accountRepository) return;
try {
const agent = await this.repository.findById(agentId);
if (!agent?.accountId) return;
const { getAccountConfigDir } = await import('./accounts/paths.js');
const configDir = getAccountConfigDir(this.workspaceRoot, agent.accountId);
await this.credentialHandler.persistRefreshedCredentials(agent.accountId, configDir);
log.debug({ agentId, accountId: agent.accountId }, 'post-completion credential sync done');
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'post-completion credential sync failed');
}
}
/**
@@ -213,6 +417,9 @@ export class MultiProviderAgentManager implements AgentManager {
this.activeAgents.delete(agentId);
}
// Sync credentials before marking stopped
await this.syncCredentialsPostCompletion(agentId);
await this.repository.update(agentId, { status: 'stopped' });
if (this.eventBus) {
@@ -271,9 +478,7 @@ export class MultiProviderAgentManager implements AgentManager {
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
const prompt = this.outputHandler.formatAnswersAsPrompt(answers);
await this.repository.update(agentId, { status: 'running' });
await this.repository.update(agentId, { pendingQuestions: null });
await this.repository.update(agentId, { result: null });
await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null });
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, prompt);
log.debug({ command, args: args.join(' ') }, 'resume command built');
@@ -295,6 +500,13 @@ export class MultiProviderAgentManager implements AgentManager {
if (refreshed) {
await this.credentialHandler.persistRefreshedCredentials(agent.accountId, resumeAccountConfigDir);
}
// Inject CLAUDE_CODE_OAUTH_TOKEN to bypass macOS keychain on resume
const accessToken = this.credentialHandler.readAccessToken(resumeAccountConfigDir);
if (accessToken) {
processEnv['CLAUDE_CODE_OAUTH_TOKEN'] = accessToken;
log.debug({ agentId }, 'CLAUDE_CODE_OAUTH_TOKEN injected for resume');
}
}
// Stop previous tailer
@@ -326,6 +538,7 @@ export class MultiProviderAgentManager implements AgentManager {
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
outputFilePath,
);
}
@@ -432,10 +645,11 @@ export class MultiProviderAgentManager implements AgentManager {
this.activeAgents,
(agentId, event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
(agentId, rawOutput, provider) => this.outputHandler.processAgentOutput(agentId, rawOutput, provider, (alias) => this.processManager.getAgentWorkdir(alias)),
(agentId, pid) => this.processManager.pollForCompletion(
(agentId, pid, outputFilePath) => this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
outputFilePath,
),
);
}
@@ -456,6 +670,7 @@ export class MultiProviderAgentManager implements AgentManager {
accountId: string | null;
createdAt: Date;
updatedAt: Date;
userDismissedAt?: Date | null;
}): AgentInfo {
return {
id: agent.id,
@@ -470,6 +685,7 @@ export class MultiProviderAgentManager implements AgentManager {
accountId: agent.accountId,
createdAt: agent.createdAt,
updatedAt: agent.updatedAt,
userDismissedAt: agent.userDismissedAt,
};
}
}

View File

@@ -7,7 +7,10 @@
*/
import { readFile } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import { join } from 'node:path';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { ProposalRepository, CreateProposalData } from '../db/repositories/proposal-repository.js';
import type {
EventBus,
AgentStoppedEvent,
@@ -30,6 +33,7 @@ import {
readTaskFiles,
readDecisionFiles,
readPageFiles,
readFrontmatterFile,
} from './file-io.js';
import { getProvider } from './providers/registry.js';
import { createModuleLogger } from '../logger/index.js';
@@ -52,6 +56,8 @@ export interface ActiveAgent {
streamResultText?: string;
streamSessionId?: string;
streamCostUsd?: number;
/** True when the stream result indicated an error (e.g. auth failure) */
streamIsError?: boolean;
}
/**
@@ -71,8 +77,30 @@ export class OutputHandler {
constructor(
private repository: AgentRepository,
private eventBus?: EventBus,
private proposalRepository?: ProposalRepository,
) {}
/**
* Validate that a signal file is complete and properly formatted.
*/
private async validateSignalFile(filePath: string): Promise<boolean> {
try {
const content = await readFile(filePath, 'utf-8');
const trimmed = content.trim();
if (!trimmed) return false;
// Check if JSON is complete (ends with } or ])
const endsCorrectly = trimmed.endsWith('}') || trimmed.endsWith(']');
if (!endsCorrectly) return false;
// Try to parse as JSON to ensure it's valid
JSON.parse(trimmed);
return true;
} catch {
return false;
}
}
/**
* Handle a standardized stream event from a parser.
*/
@@ -112,6 +140,7 @@ export class OutputHandler {
if (active) {
active.streamResultText = event.text;
active.streamCostUsd = event.costUsd;
active.streamIsError = event.isError === true;
if (!active.streamSessionId && event.sessionId) {
active.streamSessionId = event.sessionId;
}
@@ -145,14 +174,51 @@ export class OutputHandler {
log.debug({ agentId }, 'detached agent completed');
// Verify agent worked in correct location by checking for output files
const agentWorkdir = getAgentWorkdir(agent.worktreeId);
const outputDir = join(agentWorkdir, '.cw', 'output');
const expectedPwdFile = join(agentWorkdir, '.cw', 'expected-pwd.txt');
const diagnosticFile = join(agentWorkdir, '.cw', 'spawn-diagnostic.json');
const outputDirExists = existsSync(outputDir);
const expectedPwdExists = existsSync(expectedPwdFile);
const diagnosticExists = existsSync(diagnosticFile);
log.info({
agentId,
agentWorkdir,
outputDirExists,
expectedPwdExists,
diagnosticExists,
verification: outputDirExists ? 'PASS' : 'FAIL'
}, 'agent workdir verification completed');
if (!outputDirExists) {
log.warn({
agentId,
agentWorkdir
}, 'No output files found in agent workdir! Agent may have run in wrong location.');
}
let signalText = active?.streamResultText;
// If the stream result indicated an error (e.g. auth failure, usage limit),
// route directly to error handling instead of trying to parse as signal JSON
if (signalText && active?.streamIsError) {
log.warn({ agentId, error: signalText }, 'agent returned error result');
await this.handleAgentError(agentId, new Error(signalText), provider, getAgentWorkdir);
return;
}
if (!signalText) {
try {
const fileContent = await readFile(active?.outputFilePath ?? '', 'utf-8');
if (fileContent.trim()) {
await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir);
return;
const outputFilePath = active?.outputFilePath ?? '';
if (outputFilePath && await this.validateSignalFile(outputFilePath)) {
const fileContent = await readFile(outputFilePath, 'utf-8');
if (fileContent.trim()) {
await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir);
return;
}
}
} catch { /* file empty or missing */ }
@@ -212,23 +278,70 @@ export class OutputHandler {
*/
private async processOutputFiles(
agentId: string,
agent: { id: string; name: string; taskId: string | null; worktreeId: string; mode: string },
agent: { id: string; name: string; taskId: string | null; worktreeId: string; mode: string; initiativeId?: string | null },
mode: AgentMode,
getAgentWorkdir: (alias: string) => string,
): Promise<void> {
const agentWorkdir = getAgentWorkdir(agent.worktreeId);
const summary = readSummary(agentWorkdir);
const initiativeId = agent.initiativeId;
const canWriteProposals = this.proposalRepository && initiativeId;
let resultMessage = summary?.body ?? 'Task completed';
switch (mode) {
case 'breakdown': {
const phases = readPhaseFiles(agentWorkdir);
resultMessage = JSON.stringify({ summary: summary?.body, phases });
if (canWriteProposals && phases.length > 0) {
const proposalData: CreateProposalData[] = phases.map((p, i) => ({
agentId,
initiativeId,
targetType: 'phase' as const,
targetId: null,
title: p.title,
summary: null,
content: p.body || null,
metadata: JSON.stringify({ fileId: p.id, number: i + 1, dependencies: p.dependencies }),
status: 'pending' as const,
sortOrder: i,
}));
await this.proposalRepository!.createMany(proposalData);
resultMessage = summary?.body ?? `${phases.length} phase proposals created`;
} else {
resultMessage = JSON.stringify({ summary: summary?.body, phases });
}
break;
}
case 'decompose': {
const tasks = readTaskFiles(agentWorkdir);
resultMessage = JSON.stringify({ summary: summary?.body, tasks });
if (canWriteProposals && tasks.length > 0) {
// Read phase info from input context if available
const phaseInput = readFrontmatterFile(join(agentWorkdir, '.cw', 'input', 'phase.md'));
const phaseId = (phaseInput?.data?.id as string) ?? null;
const proposalData: CreateProposalData[] = tasks.map((t, i) => ({
agentId,
initiativeId,
targetType: 'task' as const,
targetId: null,
title: t.title,
summary: null,
content: t.body || null,
metadata: JSON.stringify({
fileId: t.id,
category: t.category,
type: t.type,
dependencies: t.dependencies,
parentTaskId: agent.taskId,
phaseId,
}),
status: 'pending' as const,
sortOrder: i,
}));
await this.proposalRepository!.createMany(proposalData);
resultMessage = summary?.body ?? `${tasks.length} task proposals created`;
} else {
resultMessage = JSON.stringify({ summary: summary?.body, tasks });
}
break;
}
case 'discuss': {
@@ -238,7 +351,44 @@ export class OutputHandler {
}
case 'refine': {
const pages = readPageFiles(agentWorkdir);
resultMessage = JSON.stringify({ summary: summary?.body, proposals: pages });
if (canWriteProposals) {
if (pages.length > 0) {
// Create proposals for actual page changes
const proposalData: CreateProposalData[] = pages.map((p, i) => ({
agentId,
initiativeId,
targetType: 'page' as const,
targetId: p.pageId,
title: p.title,
summary: p.summary || null,
content: p.body || null,
metadata: null,
status: 'pending' as const,
sortOrder: i,
}));
await this.proposalRepository!.createMany(proposalData);
resultMessage = summary?.body ?? `${pages.length} page proposals created`;
} else {
// Create a synthetic completion proposal when no changes are proposed
// This ensures the dismiss flow always goes through proposals domain
const completionProposal: CreateProposalData = {
agentId,
initiativeId,
targetType: 'page' as const,
targetId: null,
title: 'Analysis Complete',
summary: 'Agent completed review with no changes proposed',
content: summary?.body || 'The agent has finished analyzing the content and determined no changes are needed.',
metadata: JSON.stringify({ synthetic: true, reason: 'no_changes' }),
status: 'pending' as const,
sortOrder: 0,
};
await this.proposalRepository!.createMany([completionProposal]);
resultMessage = summary?.body ?? 'Analysis completed with 1 completion notice';
}
} else {
resultMessage = JSON.stringify({ summary: summary?.body, proposals: pages });
}
break;
}
}
@@ -248,8 +398,7 @@ export class OutputHandler {
message: resultMessage,
filesModified: summary?.filesModified,
};
await this.repository.update(agentId, { result: JSON.stringify(resultPayload) });
await this.repository.update(agentId, { status: 'idle' });
await this.repository.update(agentId, { result: JSON.stringify(resultPayload), status: 'idle' });
const reason = this.getStoppedReason(mode);
if (this.eventBus) {
@@ -280,8 +429,7 @@ export class OutputHandler {
): Promise<void> {
const questionsPayload: PendingQuestions = { questions };
await this.repository.update(agentId, { pendingQuestions: JSON.stringify(questionsPayload) });
await this.repository.update(agentId, { status: 'waiting_for_input' });
await this.repository.update(agentId, { pendingQuestions: JSON.stringify(questionsPayload), status: 'waiting_for_input' });
if (this.eventBus) {
const event: AgentWaitingEvent = {
@@ -309,8 +457,10 @@ export class OutputHandler {
): Promise<void> {
const errorResult: AgentResult = { success: false, message: error };
await this.repository.update(agentId, { result: JSON.stringify(errorResult) });
await this.repository.update(agentId, { status: 'crashed' });
await this.repository.update(agentId, {
result: JSON.stringify(errorResult),
status: 'crashed'
});
this.emitCrashed(agent, error);
}
@@ -342,22 +492,28 @@ export class OutputHandler {
// Extract session ID using provider's extraction config
let sessionId: string | null = null;
if (provider.sessionId) {
try {
if (provider.sessionId.extractFrom === 'result') {
const parsed = JSON.parse(rawOutput);
sessionId = parsed[provider.sessionId.field] ?? null;
} else if (provider.sessionId.extractFrom === 'event') {
const lines = rawOutput.trim().split('\n');
for (const line of lines) {
try {
const event = JSON.parse(line);
if (event.type === provider.sessionId.eventType) {
sessionId = event[provider.sessionId.field] ?? null;
}
} catch { /* skip */ }
}
const outputLines = rawOutput.trim().split('\n');
if (provider.sessionId.extractFrom === 'result') {
// Find the result line in JSONL output
for (const line of outputLines) {
try {
const parsed = JSON.parse(line);
if (parsed.type === 'result' || parsed[provider.sessionId.field]) {
sessionId = parsed[provider.sessionId.field] ?? null;
if (sessionId) break;
}
} catch { /* skip */ }
}
} catch { /* parse failure */ }
} else if (provider.sessionId.extractFrom === 'event') {
for (const line of outputLines) {
try {
const event = JSON.parse(line);
if (event.type === provider.sessionId.eventType) {
sessionId = event[provider.sessionId.field] ?? null;
}
} catch { /* skip */ }
}
}
}
if (sessionId) {
@@ -366,14 +522,38 @@ export class OutputHandler {
log.debug({ agentId, provider: provider.name, hasSessionId: !!sessionId }, 'processing agent output');
if (provider.name === 'claude') {
// rawOutput may be a single JSON object or multi-line JSONL — find the result line
let cliResult: ClaudeCliResult | null = null;
const lines = rawOutput.trim().split('\n');
for (const line of lines) {
try {
const parsed = JSON.parse(line);
if (parsed.type === 'result') {
cliResult = parsed;
}
} catch { /* skip non-JSON lines */ }
}
if (!cliResult) {
log.error({ agentId }, 'no result event found in agent output');
await this.handleAgentError(agentId, new Error('No result event in output'), provider, getAgentWorkdir);
return;
}
// Handle error results (auth failure, usage limits, etc.)
if (cliResult.is_error) {
log.warn({ agentId, error: cliResult.result }, 'agent returned error result from file');
await this.handleAgentError(agentId, new Error(cliResult.result), provider, getAgentWorkdir);
return;
}
let signalText: string;
try {
const cliResult: ClaudeCliResult = JSON.parse(rawOutput);
const signal = cliResult.structured_output ?? JSON.parse(cliResult.result);
signalText = JSON.stringify(signal);
} catch (parseErr) {
log.error({ agentId, err: parseErr instanceof Error ? parseErr.message : String(parseErr) }, 'failed to parse agent output');
await this.repository.update(agentId, { status: 'crashed' });
log.error({ agentId, err: parseErr instanceof Error ? parseErr.message : String(parseErr) }, 'failed to parse agent signal from result');
await this.handleAgentError(agentId, new Error('Failed to parse agent signal'), provider, getAgentWorkdir);
return;
}
@@ -399,7 +579,15 @@ export class OutputHandler {
log.error({ agentId, err: errorMessage }, 'agent error');
await this.repository.update(agentId, { status: 'crashed' });
const errorResult: AgentResult = {
success: false,
message: errorMessage,
};
await this.repository.update(agentId, {
status: 'crashed',
result: JSON.stringify(errorResult)
});
if (this.eventBus) {
const event: AgentCrashedEvent = {
@@ -414,12 +602,6 @@ export class OutputHandler {
};
this.eventBus.emit(event);
}
const errorResult: AgentResult = {
success: false,
message: errorMessage,
};
await this.repository.update(agentId, { result: JSON.stringify(errorResult) });
}
/**

View File

@@ -7,7 +7,8 @@
*/
import { spawn } from 'node:child_process';
import { writeFileSync, mkdirSync, openSync, closeSync } from 'node:fs';
import { writeFileSync, mkdirSync, openSync, closeSync, existsSync } from 'node:fs';
import { stat } from 'node:fs/promises';
import { join } from 'node:path';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { EventBus } from '../events/index.js';
@@ -59,10 +60,33 @@ export class ProcessManager {
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
const agentWorkdir = this.getAgentWorkdir(alias);
log.debug({
alias,
initiativeId,
projectCount: projects.length,
agentWorkdir,
baseBranch
}, 'creating project worktrees');
for (const project of projects) {
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
const worktreeManager = new SimpleGitWorktreeManager(clonePath, undefined, agentWorkdir);
await worktreeManager.create(project.name, `agent/${alias}`, baseBranch);
const worktree = await worktreeManager.create(project.name, `agent/${alias}`, baseBranch);
const worktreePath = worktree.path;
const pathExists = existsSync(worktreePath);
log.debug({
alias,
agentWorkdir,
projectName: project.name,
worktreePath,
pathExists
}, 'worktree created');
if (!pathExists) {
log.error({ worktreePath }, 'Worktree path does not exist after creation!');
throw new Error(`Worktree creation failed: ${worktreePath}`);
}
}
return agentWorkdir;
@@ -74,7 +98,25 @@ export class ProcessManager {
async createStandaloneWorktree(alias: string): Promise<string> {
const agentWorkdir = this.getAgentWorkdir(alias);
const worktreeManager = new SimpleGitWorktreeManager(this.workspaceRoot, undefined, agentWorkdir);
log.debug({ alias, agentWorkdir }, 'creating standalone worktree');
const worktree = await worktreeManager.create('workspace', `agent/${alias}`);
const worktreePath = worktree.path;
const pathExists = existsSync(worktreePath);
log.debug({
alias,
agentWorkdir,
worktreePath,
pathExists
}, 'standalone worktree created');
if (!pathExists) {
log.error({ worktreePath }, 'Standalone worktree path does not exist after creation!');
throw new Error(`Standalone worktree creation failed: ${worktreePath}`);
}
return worktree.path;
}
@@ -195,6 +237,32 @@ export class ProcessManager {
prompt?: string,
onEvent?: (event: StreamEvent) => void,
): { pid: number; outputFilePath: string; tailer: FileTailer } {
// Pre-spawn validation and logging
const cwdExists = existsSync(cwd);
const commandWithArgs = [command, ...args].join(' ');
// Log environment variables that might affect working directory
const environmentInfo = {
PWD: process.env.PWD,
HOME: process.env.HOME,
CLAUDE_CONFIG_DIR: env.CLAUDE_CONFIG_DIR,
CW_CONFIG_DIR: env.CW_CONFIG_DIR
};
log.info({
agentId,
cwd,
cwdExists,
commandWithArgs,
providerName,
environmentInfo
}, 'spawning detached process with workdir validation');
if (!cwdExists) {
log.error({ cwd }, 'CWD does not exist before spawn!');
throw new Error(`Agent working directory does not exist: ${cwd}`);
}
const logDir = join(this.workspaceRoot, '.cw', 'agent-logs', agentId);
mkdirSync(logDir, { recursive: true });
const outputFilePath = join(logDir, 'output.jsonl');
@@ -220,7 +288,14 @@ export class ProcessManager {
child.unref();
const pid = child.pid!;
log.debug({ agentId, pid, command }, 'spawned detached process');
log.info({
agentId,
pid,
command,
args: args.join(' '),
cwd,
spawnSuccess: true
}, 'spawned detached process successfully');
const parser = getStreamParser(providerName);
const tailer = new FileTailer({
@@ -239,6 +314,33 @@ export class ProcessManager {
return { pid, outputFilePath, tailer };
}
/**
* Wait for a file to stabilize (no size changes for 300ms).
* More robust than hardcoded delays.
*/
private async waitForFileCompletion(filePath: string, timeout = 10000): Promise<boolean> {
const startTime = Date.now();
let lastSize = -1;
let stableCount = 0;
while (Date.now() - startTime < timeout) {
try {
const stats = await stat(filePath);
if (stats.size === lastSize) {
stableCount++;
if (stableCount >= 3) return true; // Stable for 300ms
} else {
lastSize = stats.size;
stableCount = 0;
}
} catch {
// File doesn't exist yet
}
await new Promise(resolve => setTimeout(resolve, 100));
}
return false;
}
/**
* Poll for process completion by checking if PID is still alive.
* When the process exits, calls onComplete callback.
@@ -251,12 +353,25 @@ export class ProcessManager {
pid: number,
onComplete: () => Promise<void>,
getTailer: () => FileTailer | undefined,
outputFilePath?: string,
): void {
const processLog = log.child({ agentId, pid });
const check = async () => {
if (!isPidAlive(pid)) {
const tailer = getTailer();
if (tailer) {
await new Promise((resolve) => setTimeout(resolve, 500));
// Wait for output file to stabilize instead of hardcoded delay
if (outputFilePath) {
processLog.debug('waiting for output file completion');
const completed = await this.waitForFileCompletion(outputFilePath);
if (!completed) {
processLog.warn('output file did not stabilize within timeout');
}
} else {
// Fallback to short delay if no output file path
await new Promise((resolve) => setTimeout(resolve, 200));
}
await tailer.stop();
}
await onComplete();