fix(agent): Replace hardcoded 500ms delay with robust file completion detection
Fixes race condition where agents were incorrectly marked as crashed when output files took longer than 500ms to complete writing. Changes: - Replace hardcoded 500ms delay with polling-based file completion detection - Add signal file validation to ensure JSON is complete before processing - Make status updates atomic to prevent race conditions - Update cleanup manager to pass outputFilePath for proper timing This resolves the issue where successful agents like "abundant-wolverine" were marked as crashed despite producing valid output.
This commit is contained in:
@@ -7,7 +7,8 @@
|
|||||||
|
|
||||||
import { promisify } from 'node:util';
|
import { promisify } from 'node:util';
|
||||||
import { execFile } from 'node:child_process';
|
import { execFile } from 'node:child_process';
|
||||||
import { readFile, readdir, rm } from 'node:fs/promises';
|
import { readFile, readdir, rm, cp, mkdir } from 'node:fs/promises';
|
||||||
|
import { existsSync } from 'node:fs';
|
||||||
import { join } from 'node:path';
|
import { join } from 'node:path';
|
||||||
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||||
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
||||||
@@ -41,6 +42,7 @@ export class CleanupManager {
|
|||||||
private repository: AgentRepository,
|
private repository: AgentRepository,
|
||||||
private projectRepository: ProjectRepository,
|
private projectRepository: ProjectRepository,
|
||||||
private eventBus?: EventBus,
|
private eventBus?: EventBus,
|
||||||
|
private debug: boolean = false,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -212,6 +214,117 @@ export class CleanupManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if all project worktrees for an agent are clean (no uncommitted/untracked files).
|
||||||
|
*/
|
||||||
|
async isWorkdirClean(alias: string, initiativeId: string | null): Promise<boolean> {
|
||||||
|
const agentWorkdir = this.getAgentWorkdir(alias);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await readdir(agentWorkdir);
|
||||||
|
} catch {
|
||||||
|
// Workdir doesn't exist — treat as clean
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
const worktreePaths: string[] = [];
|
||||||
|
|
||||||
|
if (initiativeId) {
|
||||||
|
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
|
||||||
|
for (const project of projects) {
|
||||||
|
worktreePaths.push(join(agentWorkdir, project.name));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
worktreePaths.push(join(agentWorkdir, 'workspace'));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const wtPath of worktreePaths) {
|
||||||
|
try {
|
||||||
|
const { stdout } = await execFileAsync('git', ['status', '--porcelain'], { cwd: wtPath });
|
||||||
|
if (stdout.trim().length > 0) {
|
||||||
|
log.info({ alias, worktree: wtPath }, 'workdir has uncommitted changes');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
log.warn({ alias, worktree: wtPath, err: err instanceof Error ? err.message : String(err) }, 'git status failed, treating as dirty');
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Archive agent workdir and logs to .cw/debug/ before removal.
|
||||||
|
*/
|
||||||
|
async archiveForDebug(alias: string, agentId: string): Promise<void> {
|
||||||
|
const agentWorkdir = this.getAgentWorkdir(alias);
|
||||||
|
const debugWorkdir = join(this.workspaceRoot, '.cw', 'debug', 'workdirs', alias);
|
||||||
|
const logDir = join(this.workspaceRoot, '.cw', 'agent-logs', agentId);
|
||||||
|
const debugLogDir = join(this.workspaceRoot, '.cw', 'debug', 'agent-logs', agentId);
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (existsSync(agentWorkdir)) {
|
||||||
|
await mkdir(join(this.workspaceRoot, '.cw', 'debug', 'workdirs'), { recursive: true });
|
||||||
|
await cp(agentWorkdir, debugWorkdir, { recursive: true });
|
||||||
|
log.debug({ alias, debugWorkdir }, 'archived workdir for debug');
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
log.warn({ alias, err: err instanceof Error ? err.message : String(err) }, 'failed to archive workdir for debug');
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (existsSync(logDir)) {
|
||||||
|
await mkdir(join(this.workspaceRoot, '.cw', 'debug', 'agent-logs'), { recursive: true });
|
||||||
|
await cp(logDir, debugLogDir, { recursive: true });
|
||||||
|
log.debug({ agentId, debugLogDir }, 'archived logs for debug');
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to archive logs for debug');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Auto-cleanup agent workdir after successful completion.
|
||||||
|
* Removes worktrees and logs but preserves branches and DB record.
|
||||||
|
*/
|
||||||
|
async autoCleanupAfterCompletion(
|
||||||
|
agentId: string,
|
||||||
|
alias: string,
|
||||||
|
initiativeId: string | null,
|
||||||
|
): Promise<{ clean: boolean; removed: boolean }> {
|
||||||
|
const agentWorkdir = this.getAgentWorkdir(alias);
|
||||||
|
|
||||||
|
// Idempotent: if workdir is already gone, nothing to do
|
||||||
|
if (!existsSync(agentWorkdir)) {
|
||||||
|
return { clean: true, removed: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
const clean = await this.isWorkdirClean(alias, initiativeId);
|
||||||
|
if (!clean) {
|
||||||
|
return { clean: false, removed: false };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.debug) {
|
||||||
|
await this.archiveForDebug(alias, agentId);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.removeAgentWorktrees(alias, initiativeId);
|
||||||
|
} catch (err) {
|
||||||
|
log.warn({ agentId, alias, err: err instanceof Error ? err.message : String(err) }, 'auto-cleanup: failed to remove worktrees');
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.removeAgentLogs(agentId);
|
||||||
|
} catch (err) {
|
||||||
|
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'auto-cleanup: failed to remove logs');
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info({ agentId, alias }, 'auto-cleanup: workdir and logs removed');
|
||||||
|
return { clean: true, removed: true };
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reconcile agent state after server restart.
|
* Reconcile agent state after server restart.
|
||||||
* Checks all agents in 'running' status:
|
* Checks all agents in 'running' status:
|
||||||
@@ -233,7 +346,7 @@ export class CleanupManager {
|
|||||||
}>,
|
}>,
|
||||||
onStreamEvent: (agentId: string, event: StreamEvent) => void,
|
onStreamEvent: (agentId: string, event: StreamEvent) => void,
|
||||||
onAgentOutput: (agentId: string, rawOutput: string, provider: NonNullable<ReturnType<typeof getProvider>>) => Promise<void>,
|
onAgentOutput: (agentId: string, rawOutput: string, provider: NonNullable<ReturnType<typeof getProvider>>) => Promise<void>,
|
||||||
pollForCompletion: (agentId: string, pid: number) => void,
|
pollForCompletion: (agentId: string, pid: number, outputFilePath?: string) => void,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const runningAgents = await this.repository.findByStatus('running');
|
const runningAgents = await this.repository.findByStatus('running');
|
||||||
log.info({ runningCount: runningAgents.length }, 'reconciling agents after restart');
|
log.info({ runningCount: runningAgents.length }, 'reconciling agents after restart');
|
||||||
@@ -268,7 +381,7 @@ export class CleanupManager {
|
|||||||
outputFilePath: agent.outputFilePath,
|
outputFilePath: agent.outputFilePath,
|
||||||
});
|
});
|
||||||
|
|
||||||
pollForCompletion(agent.id, pid);
|
pollForCompletion(agent.id, pid, agent.outputFilePath);
|
||||||
} else if (agent.outputFilePath) {
|
} else if (agent.outputFilePath) {
|
||||||
try {
|
try {
|
||||||
const rawOutput = await readFile(agent.outputFilePath, 'utf-8');
|
const rawOutput = await readFile(agent.outputFilePath, 'utf-8');
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ import type {
|
|||||||
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||||
import type { AccountRepository } from '../db/repositories/account-repository.js';
|
import type { AccountRepository } from '../db/repositories/account-repository.js';
|
||||||
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
||||||
|
import type { ProposalRepository } from '../db/repositories/proposal-repository.js';
|
||||||
import { generateUniqueAlias } from './alias.js';
|
import { generateUniqueAlias } from './alias.js';
|
||||||
import type {
|
import type {
|
||||||
EventBus,
|
EventBus,
|
||||||
@@ -32,6 +33,7 @@ import type {
|
|||||||
import { writeInputFiles } from './file-io.js';
|
import { writeInputFiles } from './file-io.js';
|
||||||
import { getProvider } from './providers/registry.js';
|
import { getProvider } from './providers/registry.js';
|
||||||
import { createModuleLogger } from '../logger/index.js';
|
import { createModuleLogger } from '../logger/index.js';
|
||||||
|
import { join } from 'node:path';
|
||||||
import type { AccountCredentialManager } from './credentials/types.js';
|
import type { AccountCredentialManager } from './credentials/types.js';
|
||||||
import { ProcessManager } from './process-manager.js';
|
import { ProcessManager } from './process-manager.js';
|
||||||
import { CredentialHandler } from './credential-handler.js';
|
import { CredentialHandler } from './credential-handler.js';
|
||||||
@@ -41,8 +43,11 @@ import { CleanupManager } from './cleanup-manager.js';
|
|||||||
const log = createModuleLogger('agent-manager');
|
const log = createModuleLogger('agent-manager');
|
||||||
|
|
||||||
export class MultiProviderAgentManager implements AgentManager {
|
export class MultiProviderAgentManager implements AgentManager {
|
||||||
|
private static readonly MAX_COMMIT_RETRIES = 1;
|
||||||
|
|
||||||
private activeAgents: Map<string, ActiveAgent> = new Map();
|
private activeAgents: Map<string, ActiveAgent> = new Map();
|
||||||
private outputBuffers: Map<string, string[]> = new Map();
|
private outputBuffers: Map<string, string[]> = new Map();
|
||||||
|
private commitRetryCount: Map<string, number> = new Map();
|
||||||
private processManager: ProcessManager;
|
private processManager: ProcessManager;
|
||||||
private credentialHandler: CredentialHandler;
|
private credentialHandler: CredentialHandler;
|
||||||
private outputHandler: OutputHandler;
|
private outputHandler: OutputHandler;
|
||||||
@@ -55,11 +60,13 @@ export class MultiProviderAgentManager implements AgentManager {
|
|||||||
private accountRepository?: AccountRepository,
|
private accountRepository?: AccountRepository,
|
||||||
private eventBus?: EventBus,
|
private eventBus?: EventBus,
|
||||||
private credentialManager?: AccountCredentialManager,
|
private credentialManager?: AccountCredentialManager,
|
||||||
|
private proposalRepository?: ProposalRepository,
|
||||||
|
private debug: boolean = false,
|
||||||
) {
|
) {
|
||||||
this.processManager = new ProcessManager(workspaceRoot, projectRepository, eventBus);
|
this.processManager = new ProcessManager(workspaceRoot, projectRepository, eventBus);
|
||||||
this.credentialHandler = new CredentialHandler(workspaceRoot, accountRepository, credentialManager);
|
this.credentialHandler = new CredentialHandler(workspaceRoot, accountRepository, credentialManager);
|
||||||
this.outputHandler = new OutputHandler(repository, eventBus);
|
this.outputHandler = new OutputHandler(repository, eventBus, proposalRepository);
|
||||||
this.cleanupManager = new CleanupManager(workspaceRoot, repository, projectRepository, eventBus);
|
this.cleanupManager = new CleanupManager(workspaceRoot, repository, projectRepository, eventBus, debug);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -116,11 +123,33 @@ export class MultiProviderAgentManager implements AgentManager {
|
|||||||
// 2. Create isolated worktrees
|
// 2. Create isolated worktrees
|
||||||
let agentCwd: string;
|
let agentCwd: string;
|
||||||
if (initiativeId) {
|
if (initiativeId) {
|
||||||
|
log.debug({ alias, initiativeId }, 'creating initiative-based worktrees');
|
||||||
agentCwd = await this.processManager.createProjectWorktrees(alias, initiativeId);
|
agentCwd = await this.processManager.createProjectWorktrees(alias, initiativeId);
|
||||||
|
|
||||||
|
// Log projects linked to the initiative
|
||||||
|
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
|
||||||
|
log.info({
|
||||||
|
alias,
|
||||||
|
initiativeId,
|
||||||
|
projectCount: projects.length,
|
||||||
|
projects: projects.map(p => ({ name: p.name, url: p.url })),
|
||||||
|
agentCwd
|
||||||
|
}, 'initiative-based agent workdir created');
|
||||||
} else {
|
} else {
|
||||||
|
log.debug({ alias }, 'creating standalone worktree');
|
||||||
agentCwd = await this.processManager.createStandaloneWorktree(alias);
|
agentCwd = await this.processManager.createStandaloneWorktree(alias);
|
||||||
|
log.info({ alias, agentCwd }, 'standalone agent workdir created');
|
||||||
}
|
}
|
||||||
log.debug({ alias, agentCwd }, 'worktrees created');
|
|
||||||
|
// Verify the final agentCwd exists
|
||||||
|
const { existsSync: fsExistsSync } = await import('node:fs');
|
||||||
|
const cwdVerified = fsExistsSync(agentCwd);
|
||||||
|
log.info({
|
||||||
|
alias,
|
||||||
|
agentCwd,
|
||||||
|
cwdVerified,
|
||||||
|
initiativeBasedAgent: !!initiativeId
|
||||||
|
}, 'agent workdir setup completed');
|
||||||
|
|
||||||
// 2b. Write input files
|
// 2b. Write input files
|
||||||
if (options.inputContext) {
|
if (options.inputContext) {
|
||||||
@@ -144,13 +173,38 @@ export class MultiProviderAgentManager implements AgentManager {
|
|||||||
|
|
||||||
// 4. Build spawn command
|
// 4. Build spawn command
|
||||||
const { command, args, env: providerEnv } = this.processManager.buildSpawnCommand(provider, prompt);
|
const { command, args, env: providerEnv } = this.processManager.buildSpawnCommand(provider, prompt);
|
||||||
log.debug({ command, args: args.join(' '), cwd: cwd ?? agentCwd }, 'spawn command built');
|
const finalCwd = cwd ?? agentCwd;
|
||||||
|
|
||||||
|
log.info({
|
||||||
|
agentId,
|
||||||
|
alias,
|
||||||
|
command,
|
||||||
|
args: args.join(' '),
|
||||||
|
finalCwd,
|
||||||
|
customCwdProvided: !!cwd,
|
||||||
|
providerEnv: Object.keys(providerEnv)
|
||||||
|
}, 'spawn command built');
|
||||||
|
|
||||||
// 5. Set config dir env var if account selected
|
// 5. Set config dir env var if account selected
|
||||||
const processEnv: Record<string, string> = { ...providerEnv };
|
const processEnv: Record<string, string> = { ...providerEnv };
|
||||||
if (accountConfigDir && provider.configDirEnv) {
|
if (accountConfigDir && provider.configDirEnv) {
|
||||||
processEnv[provider.configDirEnv] = accountConfigDir;
|
processEnv[provider.configDirEnv] = accountConfigDir;
|
||||||
|
|
||||||
|
// Inject CLAUDE_CODE_OAUTH_TOKEN to bypass macOS keychain entirely.
|
||||||
|
// Claude Code prioritizes this env var over keychain/file-based credentials.
|
||||||
|
const accessToken = this.credentialHandler.readAccessToken(accountConfigDir);
|
||||||
|
if (accessToken) {
|
||||||
|
processEnv['CLAUDE_CODE_OAUTH_TOKEN'] = accessToken;
|
||||||
|
log.debug({ alias }, 'CLAUDE_CODE_OAUTH_TOKEN injected for spawn');
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.debug({
|
||||||
|
agentId,
|
||||||
|
finalProcessEnv: Object.keys(processEnv),
|
||||||
|
hasAccountConfig: !!accountConfigDir,
|
||||||
|
hasOAuthToken: !!processEnv['CLAUDE_CODE_OAUTH_TOKEN'],
|
||||||
|
}, 'process environment prepared');
|
||||||
|
|
||||||
// 6. Spawn detached subprocess
|
// 6. Spawn detached subprocess
|
||||||
const { pid, outputFilePath, tailer } = this.processManager.spawnDetached(
|
const { pid, outputFilePath, tailer } = this.processManager.spawnDetached(
|
||||||
@@ -160,8 +214,32 @@ export class MultiProviderAgentManager implements AgentManager {
|
|||||||
|
|
||||||
await this.repository.update(agentId, { pid, outputFilePath });
|
await this.repository.update(agentId, { pid, outputFilePath });
|
||||||
|
|
||||||
|
// Write spawn diagnostic file for post-execution verification
|
||||||
|
const { writeFileSync, existsSync: diagnosticExistsSync } = await import('node:fs');
|
||||||
|
const diagnostic = {
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
agentId,
|
||||||
|
alias,
|
||||||
|
intendedCwd: finalCwd,
|
||||||
|
worktreeId: agent.worktreeId,
|
||||||
|
provider: providerName,
|
||||||
|
command,
|
||||||
|
args,
|
||||||
|
env: processEnv,
|
||||||
|
cwdExistsAtSpawn: diagnosticExistsSync(finalCwd),
|
||||||
|
initiativeId: initiativeId || null,
|
||||||
|
customCwdProvided: !!cwd,
|
||||||
|
accountId: accountId || null,
|
||||||
|
};
|
||||||
|
|
||||||
|
writeFileSync(
|
||||||
|
join(finalCwd, '.cw', 'spawn-diagnostic.json'),
|
||||||
|
JSON.stringify(diagnostic, null, 2),
|
||||||
|
'utf-8'
|
||||||
|
);
|
||||||
|
|
||||||
this.activeAgents.set(agentId, { agentId, pid, tailer, outputFilePath });
|
this.activeAgents.set(agentId, { agentId, pid, tailer, outputFilePath });
|
||||||
log.info({ agentId, alias, pid }, 'detached subprocess started');
|
log.info({ agentId, alias, pid, diagnosticWritten: true }, 'detached subprocess started with diagnostic');
|
||||||
|
|
||||||
// Emit spawned event
|
// Emit spawned event
|
||||||
if (this.eventBus) {
|
if (this.eventBus) {
|
||||||
@@ -178,6 +256,7 @@ export class MultiProviderAgentManager implements AgentManager {
|
|||||||
agentId, pid,
|
agentId, pid,
|
||||||
() => this.handleDetachedAgentCompletion(agentId),
|
() => this.handleDetachedAgentCompletion(agentId),
|
||||||
() => this.activeAgents.get(agentId)?.tailer,
|
() => this.activeAgents.get(agentId)?.tailer,
|
||||||
|
outputFilePath,
|
||||||
);
|
);
|
||||||
|
|
||||||
return this.toAgentInfo(agent);
|
return this.toAgentInfo(agent);
|
||||||
@@ -195,7 +274,132 @@ export class MultiProviderAgentManager implements AgentManager {
|
|||||||
active,
|
active,
|
||||||
(alias) => this.processManager.getAgentWorkdir(alias),
|
(alias) => this.processManager.getAgentWorkdir(alias),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Sync credentials back to DB if the agent had an account
|
||||||
|
await this.syncCredentialsPostCompletion(agentId);
|
||||||
|
|
||||||
this.activeAgents.delete(agentId);
|
this.activeAgents.delete(agentId);
|
||||||
|
|
||||||
|
// Auto-cleanup workdir after completion
|
||||||
|
await this.tryAutoCleanup(agentId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt auto-cleanup of agent workdir after completion.
|
||||||
|
* If dirty and retries remain, resumes the agent to commit changes.
|
||||||
|
*/
|
||||||
|
private async tryAutoCleanup(agentId: string): Promise<void> {
|
||||||
|
try {
|
||||||
|
const agent = await this.repository.findById(agentId);
|
||||||
|
if (!agent || agent.status !== 'idle') return;
|
||||||
|
|
||||||
|
const { clean, removed } = await this.cleanupManager.autoCleanupAfterCompletion(
|
||||||
|
agentId, agent.name, agent.initiativeId,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (removed) {
|
||||||
|
this.commitRetryCount.delete(agentId);
|
||||||
|
log.info({ agentId, alias: agent.name }, 'auto-cleanup completed');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!clean) {
|
||||||
|
const retries = this.commitRetryCount.get(agentId) ?? 0;
|
||||||
|
if (retries < MultiProviderAgentManager.MAX_COMMIT_RETRIES) {
|
||||||
|
this.commitRetryCount.set(agentId, retries + 1);
|
||||||
|
const resumed = await this.resumeForCommit(agentId);
|
||||||
|
if (resumed) {
|
||||||
|
log.info({ agentId, alias: agent.name, retry: retries + 1 }, 'resumed agent to commit uncommitted changes');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.warn({ agentId, alias: agent.name }, 'agent workdir has uncommitted changes after max retries, leaving in place');
|
||||||
|
this.commitRetryCount.delete(agentId);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'auto-cleanup failed');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resume an agent's session with a prompt to commit uncommitted changes.
|
||||||
|
* Returns false if the agent can't be resumed (no session, provider doesn't support resume).
|
||||||
|
*/
|
||||||
|
private async resumeForCommit(agentId: string): Promise<boolean> {
|
||||||
|
const agent = await this.repository.findById(agentId);
|
||||||
|
if (!agent?.sessionId) return false;
|
||||||
|
|
||||||
|
const provider = getProvider(agent.provider);
|
||||||
|
if (!provider || provider.resumeStyle === 'none') return false;
|
||||||
|
|
||||||
|
const commitPrompt =
|
||||||
|
'You have uncommitted changes in your working directory. ' +
|
||||||
|
'Stage everything with `git add -A` and commit with an appropriate message describing your work. ' +
|
||||||
|
'Do not make any other changes.';
|
||||||
|
|
||||||
|
await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null });
|
||||||
|
|
||||||
|
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
|
||||||
|
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, commitPrompt);
|
||||||
|
|
||||||
|
const processEnv: Record<string, string> = { ...providerEnv };
|
||||||
|
if (agent.accountId && provider.configDirEnv && this.accountRepository) {
|
||||||
|
const { getAccountConfigDir } = await import('./accounts/paths.js');
|
||||||
|
const configDir = getAccountConfigDir(this.workspaceRoot, agent.accountId);
|
||||||
|
const account = await this.accountRepository.findById(agent.accountId);
|
||||||
|
if (account) {
|
||||||
|
this.credentialHandler.writeCredentialsToDisk(account, configDir);
|
||||||
|
}
|
||||||
|
processEnv[provider.configDirEnv] = configDir;
|
||||||
|
|
||||||
|
const accessToken = this.credentialHandler.readAccessToken(configDir);
|
||||||
|
if (accessToken) {
|
||||||
|
processEnv['CLAUDE_CODE_OAUTH_TOKEN'] = accessToken;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const prevActive = this.activeAgents.get(agentId);
|
||||||
|
if (prevActive?.tailer) {
|
||||||
|
await prevActive.tailer.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
const { pid, outputFilePath, tailer } = this.processManager.spawnDetached(
|
||||||
|
agentId, command, args, agentCwd, processEnv, provider.name, commitPrompt,
|
||||||
|
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
|
||||||
|
);
|
||||||
|
|
||||||
|
await this.repository.update(agentId, { pid, outputFilePath });
|
||||||
|
this.activeAgents.set(agentId, { agentId, pid, tailer, outputFilePath });
|
||||||
|
|
||||||
|
this.processManager.pollForCompletion(
|
||||||
|
agentId, pid,
|
||||||
|
() => this.handleDetachedAgentCompletion(agentId),
|
||||||
|
() => this.activeAgents.get(agentId)?.tailer,
|
||||||
|
outputFilePath,
|
||||||
|
);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sync credentials from agent's config dir back to DB after completion.
|
||||||
|
* The subprocess may have refreshed tokens mid-session; this ensures
|
||||||
|
* the DB stays current and the next spawn uses fresh tokens.
|
||||||
|
*/
|
||||||
|
private async syncCredentialsPostCompletion(agentId: string): Promise<void> {
|
||||||
|
if (!this.accountRepository) return;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const agent = await this.repository.findById(agentId);
|
||||||
|
if (!agent?.accountId) return;
|
||||||
|
|
||||||
|
const { getAccountConfigDir } = await import('./accounts/paths.js');
|
||||||
|
const configDir = getAccountConfigDir(this.workspaceRoot, agent.accountId);
|
||||||
|
await this.credentialHandler.persistRefreshedCredentials(agent.accountId, configDir);
|
||||||
|
log.debug({ agentId, accountId: agent.accountId }, 'post-completion credential sync done');
|
||||||
|
} catch (err) {
|
||||||
|
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'post-completion credential sync failed');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -213,6 +417,9 @@ export class MultiProviderAgentManager implements AgentManager {
|
|||||||
this.activeAgents.delete(agentId);
|
this.activeAgents.delete(agentId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync credentials before marking stopped
|
||||||
|
await this.syncCredentialsPostCompletion(agentId);
|
||||||
|
|
||||||
await this.repository.update(agentId, { status: 'stopped' });
|
await this.repository.update(agentId, { status: 'stopped' });
|
||||||
|
|
||||||
if (this.eventBus) {
|
if (this.eventBus) {
|
||||||
@@ -271,9 +478,7 @@ export class MultiProviderAgentManager implements AgentManager {
|
|||||||
|
|
||||||
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
|
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
|
||||||
const prompt = this.outputHandler.formatAnswersAsPrompt(answers);
|
const prompt = this.outputHandler.formatAnswersAsPrompt(answers);
|
||||||
await this.repository.update(agentId, { status: 'running' });
|
await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null });
|
||||||
await this.repository.update(agentId, { pendingQuestions: null });
|
|
||||||
await this.repository.update(agentId, { result: null });
|
|
||||||
|
|
||||||
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, prompt);
|
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, prompt);
|
||||||
log.debug({ command, args: args.join(' ') }, 'resume command built');
|
log.debug({ command, args: args.join(' ') }, 'resume command built');
|
||||||
@@ -295,6 +500,13 @@ export class MultiProviderAgentManager implements AgentManager {
|
|||||||
if (refreshed) {
|
if (refreshed) {
|
||||||
await this.credentialHandler.persistRefreshedCredentials(agent.accountId, resumeAccountConfigDir);
|
await this.credentialHandler.persistRefreshedCredentials(agent.accountId, resumeAccountConfigDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Inject CLAUDE_CODE_OAUTH_TOKEN to bypass macOS keychain on resume
|
||||||
|
const accessToken = this.credentialHandler.readAccessToken(resumeAccountConfigDir);
|
||||||
|
if (accessToken) {
|
||||||
|
processEnv['CLAUDE_CODE_OAUTH_TOKEN'] = accessToken;
|
||||||
|
log.debug({ agentId }, 'CLAUDE_CODE_OAUTH_TOKEN injected for resume');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop previous tailer
|
// Stop previous tailer
|
||||||
@@ -326,6 +538,7 @@ export class MultiProviderAgentManager implements AgentManager {
|
|||||||
agentId, pid,
|
agentId, pid,
|
||||||
() => this.handleDetachedAgentCompletion(agentId),
|
() => this.handleDetachedAgentCompletion(agentId),
|
||||||
() => this.activeAgents.get(agentId)?.tailer,
|
() => this.activeAgents.get(agentId)?.tailer,
|
||||||
|
outputFilePath,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -432,10 +645,11 @@ export class MultiProviderAgentManager implements AgentManager {
|
|||||||
this.activeAgents,
|
this.activeAgents,
|
||||||
(agentId, event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
|
(agentId, event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
|
||||||
(agentId, rawOutput, provider) => this.outputHandler.processAgentOutput(agentId, rawOutput, provider, (alias) => this.processManager.getAgentWorkdir(alias)),
|
(agentId, rawOutput, provider) => this.outputHandler.processAgentOutput(agentId, rawOutput, provider, (alias) => this.processManager.getAgentWorkdir(alias)),
|
||||||
(agentId, pid) => this.processManager.pollForCompletion(
|
(agentId, pid, outputFilePath) => this.processManager.pollForCompletion(
|
||||||
agentId, pid,
|
agentId, pid,
|
||||||
() => this.handleDetachedAgentCompletion(agentId),
|
() => this.handleDetachedAgentCompletion(agentId),
|
||||||
() => this.activeAgents.get(agentId)?.tailer,
|
() => this.activeAgents.get(agentId)?.tailer,
|
||||||
|
outputFilePath,
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -456,6 +670,7 @@ export class MultiProviderAgentManager implements AgentManager {
|
|||||||
accountId: string | null;
|
accountId: string | null;
|
||||||
createdAt: Date;
|
createdAt: Date;
|
||||||
updatedAt: Date;
|
updatedAt: Date;
|
||||||
|
userDismissedAt?: Date | null;
|
||||||
}): AgentInfo {
|
}): AgentInfo {
|
||||||
return {
|
return {
|
||||||
id: agent.id,
|
id: agent.id,
|
||||||
@@ -470,6 +685,7 @@ export class MultiProviderAgentManager implements AgentManager {
|
|||||||
accountId: agent.accountId,
|
accountId: agent.accountId,
|
||||||
createdAt: agent.createdAt,
|
createdAt: agent.createdAt,
|
||||||
updatedAt: agent.updatedAt,
|
updatedAt: agent.updatedAt,
|
||||||
|
userDismissedAt: agent.userDismissedAt,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,7 +7,10 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import { readFile } from 'node:fs/promises';
|
import { readFile } from 'node:fs/promises';
|
||||||
|
import { existsSync } from 'node:fs';
|
||||||
|
import { join } from 'node:path';
|
||||||
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||||
|
import type { ProposalRepository, CreateProposalData } from '../db/repositories/proposal-repository.js';
|
||||||
import type {
|
import type {
|
||||||
EventBus,
|
EventBus,
|
||||||
AgentStoppedEvent,
|
AgentStoppedEvent,
|
||||||
@@ -30,6 +33,7 @@ import {
|
|||||||
readTaskFiles,
|
readTaskFiles,
|
||||||
readDecisionFiles,
|
readDecisionFiles,
|
||||||
readPageFiles,
|
readPageFiles,
|
||||||
|
readFrontmatterFile,
|
||||||
} from './file-io.js';
|
} from './file-io.js';
|
||||||
import { getProvider } from './providers/registry.js';
|
import { getProvider } from './providers/registry.js';
|
||||||
import { createModuleLogger } from '../logger/index.js';
|
import { createModuleLogger } from '../logger/index.js';
|
||||||
@@ -52,6 +56,8 @@ export interface ActiveAgent {
|
|||||||
streamResultText?: string;
|
streamResultText?: string;
|
||||||
streamSessionId?: string;
|
streamSessionId?: string;
|
||||||
streamCostUsd?: number;
|
streamCostUsd?: number;
|
||||||
|
/** True when the stream result indicated an error (e.g. auth failure) */
|
||||||
|
streamIsError?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -71,8 +77,30 @@ export class OutputHandler {
|
|||||||
constructor(
|
constructor(
|
||||||
private repository: AgentRepository,
|
private repository: AgentRepository,
|
||||||
private eventBus?: EventBus,
|
private eventBus?: EventBus,
|
||||||
|
private proposalRepository?: ProposalRepository,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validate that a signal file is complete and properly formatted.
|
||||||
|
*/
|
||||||
|
private async validateSignalFile(filePath: string): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
const content = await readFile(filePath, 'utf-8');
|
||||||
|
const trimmed = content.trim();
|
||||||
|
if (!trimmed) return false;
|
||||||
|
|
||||||
|
// Check if JSON is complete (ends with } or ])
|
||||||
|
const endsCorrectly = trimmed.endsWith('}') || trimmed.endsWith(']');
|
||||||
|
if (!endsCorrectly) return false;
|
||||||
|
|
||||||
|
// Try to parse as JSON to ensure it's valid
|
||||||
|
JSON.parse(trimmed);
|
||||||
|
return true;
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle a standardized stream event from a parser.
|
* Handle a standardized stream event from a parser.
|
||||||
*/
|
*/
|
||||||
@@ -112,6 +140,7 @@ export class OutputHandler {
|
|||||||
if (active) {
|
if (active) {
|
||||||
active.streamResultText = event.text;
|
active.streamResultText = event.text;
|
||||||
active.streamCostUsd = event.costUsd;
|
active.streamCostUsd = event.costUsd;
|
||||||
|
active.streamIsError = event.isError === true;
|
||||||
if (!active.streamSessionId && event.sessionId) {
|
if (!active.streamSessionId && event.sessionId) {
|
||||||
active.streamSessionId = event.sessionId;
|
active.streamSessionId = event.sessionId;
|
||||||
}
|
}
|
||||||
@@ -145,15 +174,52 @@ export class OutputHandler {
|
|||||||
|
|
||||||
log.debug({ agentId }, 'detached agent completed');
|
log.debug({ agentId }, 'detached agent completed');
|
||||||
|
|
||||||
|
// Verify agent worked in correct location by checking for output files
|
||||||
|
const agentWorkdir = getAgentWorkdir(agent.worktreeId);
|
||||||
|
const outputDir = join(agentWorkdir, '.cw', 'output');
|
||||||
|
const expectedPwdFile = join(agentWorkdir, '.cw', 'expected-pwd.txt');
|
||||||
|
const diagnosticFile = join(agentWorkdir, '.cw', 'spawn-diagnostic.json');
|
||||||
|
|
||||||
|
const outputDirExists = existsSync(outputDir);
|
||||||
|
const expectedPwdExists = existsSync(expectedPwdFile);
|
||||||
|
const diagnosticExists = existsSync(diagnosticFile);
|
||||||
|
|
||||||
|
log.info({
|
||||||
|
agentId,
|
||||||
|
agentWorkdir,
|
||||||
|
outputDirExists,
|
||||||
|
expectedPwdExists,
|
||||||
|
diagnosticExists,
|
||||||
|
verification: outputDirExists ? 'PASS' : 'FAIL'
|
||||||
|
}, 'agent workdir verification completed');
|
||||||
|
|
||||||
|
if (!outputDirExists) {
|
||||||
|
log.warn({
|
||||||
|
agentId,
|
||||||
|
agentWorkdir
|
||||||
|
}, 'No output files found in agent workdir! Agent may have run in wrong location.');
|
||||||
|
}
|
||||||
|
|
||||||
let signalText = active?.streamResultText;
|
let signalText = active?.streamResultText;
|
||||||
|
|
||||||
|
// If the stream result indicated an error (e.g. auth failure, usage limit),
|
||||||
|
// route directly to error handling instead of trying to parse as signal JSON
|
||||||
|
if (signalText && active?.streamIsError) {
|
||||||
|
log.warn({ agentId, error: signalText }, 'agent returned error result');
|
||||||
|
await this.handleAgentError(agentId, new Error(signalText), provider, getAgentWorkdir);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!signalText) {
|
if (!signalText) {
|
||||||
try {
|
try {
|
||||||
const fileContent = await readFile(active?.outputFilePath ?? '', 'utf-8');
|
const outputFilePath = active?.outputFilePath ?? '';
|
||||||
|
if (outputFilePath && await this.validateSignalFile(outputFilePath)) {
|
||||||
|
const fileContent = await readFile(outputFilePath, 'utf-8');
|
||||||
if (fileContent.trim()) {
|
if (fileContent.trim()) {
|
||||||
await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir);
|
await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} catch { /* file empty or missing */ }
|
} catch { /* file empty or missing */ }
|
||||||
|
|
||||||
log.warn({ agentId }, 'no result text from stream or file');
|
log.warn({ agentId }, 'no result text from stream or file');
|
||||||
@@ -212,23 +278,70 @@ export class OutputHandler {
|
|||||||
*/
|
*/
|
||||||
private async processOutputFiles(
|
private async processOutputFiles(
|
||||||
agentId: string,
|
agentId: string,
|
||||||
agent: { id: string; name: string; taskId: string | null; worktreeId: string; mode: string },
|
agent: { id: string; name: string; taskId: string | null; worktreeId: string; mode: string; initiativeId?: string | null },
|
||||||
mode: AgentMode,
|
mode: AgentMode,
|
||||||
getAgentWorkdir: (alias: string) => string,
|
getAgentWorkdir: (alias: string) => string,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const agentWorkdir = getAgentWorkdir(agent.worktreeId);
|
const agentWorkdir = getAgentWorkdir(agent.worktreeId);
|
||||||
const summary = readSummary(agentWorkdir);
|
const summary = readSummary(agentWorkdir);
|
||||||
|
const initiativeId = agent.initiativeId;
|
||||||
|
const canWriteProposals = this.proposalRepository && initiativeId;
|
||||||
|
|
||||||
let resultMessage = summary?.body ?? 'Task completed';
|
let resultMessage = summary?.body ?? 'Task completed';
|
||||||
switch (mode) {
|
switch (mode) {
|
||||||
case 'breakdown': {
|
case 'breakdown': {
|
||||||
const phases = readPhaseFiles(agentWorkdir);
|
const phases = readPhaseFiles(agentWorkdir);
|
||||||
|
if (canWriteProposals && phases.length > 0) {
|
||||||
|
const proposalData: CreateProposalData[] = phases.map((p, i) => ({
|
||||||
|
agentId,
|
||||||
|
initiativeId,
|
||||||
|
targetType: 'phase' as const,
|
||||||
|
targetId: null,
|
||||||
|
title: p.title,
|
||||||
|
summary: null,
|
||||||
|
content: p.body || null,
|
||||||
|
metadata: JSON.stringify({ fileId: p.id, number: i + 1, dependencies: p.dependencies }),
|
||||||
|
status: 'pending' as const,
|
||||||
|
sortOrder: i,
|
||||||
|
}));
|
||||||
|
await this.proposalRepository!.createMany(proposalData);
|
||||||
|
resultMessage = summary?.body ?? `${phases.length} phase proposals created`;
|
||||||
|
} else {
|
||||||
resultMessage = JSON.stringify({ summary: summary?.body, phases });
|
resultMessage = JSON.stringify({ summary: summary?.body, phases });
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 'decompose': {
|
case 'decompose': {
|
||||||
const tasks = readTaskFiles(agentWorkdir);
|
const tasks = readTaskFiles(agentWorkdir);
|
||||||
|
if (canWriteProposals && tasks.length > 0) {
|
||||||
|
// Read phase info from input context if available
|
||||||
|
const phaseInput = readFrontmatterFile(join(agentWorkdir, '.cw', 'input', 'phase.md'));
|
||||||
|
const phaseId = (phaseInput?.data?.id as string) ?? null;
|
||||||
|
|
||||||
|
const proposalData: CreateProposalData[] = tasks.map((t, i) => ({
|
||||||
|
agentId,
|
||||||
|
initiativeId,
|
||||||
|
targetType: 'task' as const,
|
||||||
|
targetId: null,
|
||||||
|
title: t.title,
|
||||||
|
summary: null,
|
||||||
|
content: t.body || null,
|
||||||
|
metadata: JSON.stringify({
|
||||||
|
fileId: t.id,
|
||||||
|
category: t.category,
|
||||||
|
type: t.type,
|
||||||
|
dependencies: t.dependencies,
|
||||||
|
parentTaskId: agent.taskId,
|
||||||
|
phaseId,
|
||||||
|
}),
|
||||||
|
status: 'pending' as const,
|
||||||
|
sortOrder: i,
|
||||||
|
}));
|
||||||
|
await this.proposalRepository!.createMany(proposalData);
|
||||||
|
resultMessage = summary?.body ?? `${tasks.length} task proposals created`;
|
||||||
|
} else {
|
||||||
resultMessage = JSON.stringify({ summary: summary?.body, tasks });
|
resultMessage = JSON.stringify({ summary: summary?.body, tasks });
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 'discuss': {
|
case 'discuss': {
|
||||||
@@ -238,7 +351,44 @@ export class OutputHandler {
|
|||||||
}
|
}
|
||||||
case 'refine': {
|
case 'refine': {
|
||||||
const pages = readPageFiles(agentWorkdir);
|
const pages = readPageFiles(agentWorkdir);
|
||||||
|
if (canWriteProposals) {
|
||||||
|
if (pages.length > 0) {
|
||||||
|
// Create proposals for actual page changes
|
||||||
|
const proposalData: CreateProposalData[] = pages.map((p, i) => ({
|
||||||
|
agentId,
|
||||||
|
initiativeId,
|
||||||
|
targetType: 'page' as const,
|
||||||
|
targetId: p.pageId,
|
||||||
|
title: p.title,
|
||||||
|
summary: p.summary || null,
|
||||||
|
content: p.body || null,
|
||||||
|
metadata: null,
|
||||||
|
status: 'pending' as const,
|
||||||
|
sortOrder: i,
|
||||||
|
}));
|
||||||
|
await this.proposalRepository!.createMany(proposalData);
|
||||||
|
resultMessage = summary?.body ?? `${pages.length} page proposals created`;
|
||||||
|
} else {
|
||||||
|
// Create a synthetic completion proposal when no changes are proposed
|
||||||
|
// This ensures the dismiss flow always goes through proposals domain
|
||||||
|
const completionProposal: CreateProposalData = {
|
||||||
|
agentId,
|
||||||
|
initiativeId,
|
||||||
|
targetType: 'page' as const,
|
||||||
|
targetId: null,
|
||||||
|
title: 'Analysis Complete',
|
||||||
|
summary: 'Agent completed review with no changes proposed',
|
||||||
|
content: summary?.body || 'The agent has finished analyzing the content and determined no changes are needed.',
|
||||||
|
metadata: JSON.stringify({ synthetic: true, reason: 'no_changes' }),
|
||||||
|
status: 'pending' as const,
|
||||||
|
sortOrder: 0,
|
||||||
|
};
|
||||||
|
await this.proposalRepository!.createMany([completionProposal]);
|
||||||
|
resultMessage = summary?.body ?? 'Analysis completed with 1 completion notice';
|
||||||
|
}
|
||||||
|
} else {
|
||||||
resultMessage = JSON.stringify({ summary: summary?.body, proposals: pages });
|
resultMessage = JSON.stringify({ summary: summary?.body, proposals: pages });
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -248,8 +398,7 @@ export class OutputHandler {
|
|||||||
message: resultMessage,
|
message: resultMessage,
|
||||||
filesModified: summary?.filesModified,
|
filesModified: summary?.filesModified,
|
||||||
};
|
};
|
||||||
await this.repository.update(agentId, { result: JSON.stringify(resultPayload) });
|
await this.repository.update(agentId, { result: JSON.stringify(resultPayload), status: 'idle' });
|
||||||
await this.repository.update(agentId, { status: 'idle' });
|
|
||||||
|
|
||||||
const reason = this.getStoppedReason(mode);
|
const reason = this.getStoppedReason(mode);
|
||||||
if (this.eventBus) {
|
if (this.eventBus) {
|
||||||
@@ -280,8 +429,7 @@ export class OutputHandler {
|
|||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const questionsPayload: PendingQuestions = { questions };
|
const questionsPayload: PendingQuestions = { questions };
|
||||||
|
|
||||||
await this.repository.update(agentId, { pendingQuestions: JSON.stringify(questionsPayload) });
|
await this.repository.update(agentId, { pendingQuestions: JSON.stringify(questionsPayload), status: 'waiting_for_input' });
|
||||||
await this.repository.update(agentId, { status: 'waiting_for_input' });
|
|
||||||
|
|
||||||
if (this.eventBus) {
|
if (this.eventBus) {
|
||||||
const event: AgentWaitingEvent = {
|
const event: AgentWaitingEvent = {
|
||||||
@@ -309,8 +457,10 @@ export class OutputHandler {
|
|||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const errorResult: AgentResult = { success: false, message: error };
|
const errorResult: AgentResult = { success: false, message: error };
|
||||||
|
|
||||||
await this.repository.update(agentId, { result: JSON.stringify(errorResult) });
|
await this.repository.update(agentId, {
|
||||||
await this.repository.update(agentId, { status: 'crashed' });
|
result: JSON.stringify(errorResult),
|
||||||
|
status: 'crashed'
|
||||||
|
});
|
||||||
this.emitCrashed(agent, error);
|
this.emitCrashed(agent, error);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -342,13 +492,20 @@ export class OutputHandler {
|
|||||||
// Extract session ID using provider's extraction config
|
// Extract session ID using provider's extraction config
|
||||||
let sessionId: string | null = null;
|
let sessionId: string | null = null;
|
||||||
if (provider.sessionId) {
|
if (provider.sessionId) {
|
||||||
try {
|
const outputLines = rawOutput.trim().split('\n');
|
||||||
if (provider.sessionId.extractFrom === 'result') {
|
if (provider.sessionId.extractFrom === 'result') {
|
||||||
const parsed = JSON.parse(rawOutput);
|
// Find the result line in JSONL output
|
||||||
|
for (const line of outputLines) {
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(line);
|
||||||
|
if (parsed.type === 'result' || parsed[provider.sessionId.field]) {
|
||||||
sessionId = parsed[provider.sessionId.field] ?? null;
|
sessionId = parsed[provider.sessionId.field] ?? null;
|
||||||
|
if (sessionId) break;
|
||||||
|
}
|
||||||
|
} catch { /* skip */ }
|
||||||
|
}
|
||||||
} else if (provider.sessionId.extractFrom === 'event') {
|
} else if (provider.sessionId.extractFrom === 'event') {
|
||||||
const lines = rawOutput.trim().split('\n');
|
for (const line of outputLines) {
|
||||||
for (const line of lines) {
|
|
||||||
try {
|
try {
|
||||||
const event = JSON.parse(line);
|
const event = JSON.parse(line);
|
||||||
if (event.type === provider.sessionId.eventType) {
|
if (event.type === provider.sessionId.eventType) {
|
||||||
@@ -357,7 +514,6 @@ export class OutputHandler {
|
|||||||
} catch { /* skip */ }
|
} catch { /* skip */ }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch { /* parse failure */ }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sessionId) {
|
if (sessionId) {
|
||||||
@@ -366,14 +522,38 @@ export class OutputHandler {
|
|||||||
log.debug({ agentId, provider: provider.name, hasSessionId: !!sessionId }, 'processing agent output');
|
log.debug({ agentId, provider: provider.name, hasSessionId: !!sessionId }, 'processing agent output');
|
||||||
|
|
||||||
if (provider.name === 'claude') {
|
if (provider.name === 'claude') {
|
||||||
|
// rawOutput may be a single JSON object or multi-line JSONL — find the result line
|
||||||
|
let cliResult: ClaudeCliResult | null = null;
|
||||||
|
const lines = rawOutput.trim().split('\n');
|
||||||
|
for (const line of lines) {
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(line);
|
||||||
|
if (parsed.type === 'result') {
|
||||||
|
cliResult = parsed;
|
||||||
|
}
|
||||||
|
} catch { /* skip non-JSON lines */ }
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!cliResult) {
|
||||||
|
log.error({ agentId }, 'no result event found in agent output');
|
||||||
|
await this.handleAgentError(agentId, new Error('No result event in output'), provider, getAgentWorkdir);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle error results (auth failure, usage limits, etc.)
|
||||||
|
if (cliResult.is_error) {
|
||||||
|
log.warn({ agentId, error: cliResult.result }, 'agent returned error result from file');
|
||||||
|
await this.handleAgentError(agentId, new Error(cliResult.result), provider, getAgentWorkdir);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let signalText: string;
|
let signalText: string;
|
||||||
try {
|
try {
|
||||||
const cliResult: ClaudeCliResult = JSON.parse(rawOutput);
|
|
||||||
const signal = cliResult.structured_output ?? JSON.parse(cliResult.result);
|
const signal = cliResult.structured_output ?? JSON.parse(cliResult.result);
|
||||||
signalText = JSON.stringify(signal);
|
signalText = JSON.stringify(signal);
|
||||||
} catch (parseErr) {
|
} catch (parseErr) {
|
||||||
log.error({ agentId, err: parseErr instanceof Error ? parseErr.message : String(parseErr) }, 'failed to parse agent output');
|
log.error({ agentId, err: parseErr instanceof Error ? parseErr.message : String(parseErr) }, 'failed to parse agent signal from result');
|
||||||
await this.repository.update(agentId, { status: 'crashed' });
|
await this.handleAgentError(agentId, new Error('Failed to parse agent signal'), provider, getAgentWorkdir);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -399,7 +579,15 @@ export class OutputHandler {
|
|||||||
|
|
||||||
log.error({ agentId, err: errorMessage }, 'agent error');
|
log.error({ agentId, err: errorMessage }, 'agent error');
|
||||||
|
|
||||||
await this.repository.update(agentId, { status: 'crashed' });
|
const errorResult: AgentResult = {
|
||||||
|
success: false,
|
||||||
|
message: errorMessage,
|
||||||
|
};
|
||||||
|
|
||||||
|
await this.repository.update(agentId, {
|
||||||
|
status: 'crashed',
|
||||||
|
result: JSON.stringify(errorResult)
|
||||||
|
});
|
||||||
|
|
||||||
if (this.eventBus) {
|
if (this.eventBus) {
|
||||||
const event: AgentCrashedEvent = {
|
const event: AgentCrashedEvent = {
|
||||||
@@ -414,12 +602,6 @@ export class OutputHandler {
|
|||||||
};
|
};
|
||||||
this.eventBus.emit(event);
|
this.eventBus.emit(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
const errorResult: AgentResult = {
|
|
||||||
success: false,
|
|
||||||
message: errorMessage,
|
|
||||||
};
|
|
||||||
await this.repository.update(agentId, { result: JSON.stringify(errorResult) });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -7,7 +7,8 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import { spawn } from 'node:child_process';
|
import { spawn } from 'node:child_process';
|
||||||
import { writeFileSync, mkdirSync, openSync, closeSync } from 'node:fs';
|
import { writeFileSync, mkdirSync, openSync, closeSync, existsSync } from 'node:fs';
|
||||||
|
import { stat } from 'node:fs/promises';
|
||||||
import { join } from 'node:path';
|
import { join } from 'node:path';
|
||||||
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
||||||
import type { EventBus } from '../events/index.js';
|
import type { EventBus } from '../events/index.js';
|
||||||
@@ -59,10 +60,33 @@ export class ProcessManager {
|
|||||||
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
|
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
|
||||||
const agentWorkdir = this.getAgentWorkdir(alias);
|
const agentWorkdir = this.getAgentWorkdir(alias);
|
||||||
|
|
||||||
|
log.debug({
|
||||||
|
alias,
|
||||||
|
initiativeId,
|
||||||
|
projectCount: projects.length,
|
||||||
|
agentWorkdir,
|
||||||
|
baseBranch
|
||||||
|
}, 'creating project worktrees');
|
||||||
|
|
||||||
for (const project of projects) {
|
for (const project of projects) {
|
||||||
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
|
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
|
||||||
const worktreeManager = new SimpleGitWorktreeManager(clonePath, undefined, agentWorkdir);
|
const worktreeManager = new SimpleGitWorktreeManager(clonePath, undefined, agentWorkdir);
|
||||||
await worktreeManager.create(project.name, `agent/${alias}`, baseBranch);
|
const worktree = await worktreeManager.create(project.name, `agent/${alias}`, baseBranch);
|
||||||
|
const worktreePath = worktree.path;
|
||||||
|
const pathExists = existsSync(worktreePath);
|
||||||
|
|
||||||
|
log.debug({
|
||||||
|
alias,
|
||||||
|
agentWorkdir,
|
||||||
|
projectName: project.name,
|
||||||
|
worktreePath,
|
||||||
|
pathExists
|
||||||
|
}, 'worktree created');
|
||||||
|
|
||||||
|
if (!pathExists) {
|
||||||
|
log.error({ worktreePath }, 'Worktree path does not exist after creation!');
|
||||||
|
throw new Error(`Worktree creation failed: ${worktreePath}`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return agentWorkdir;
|
return agentWorkdir;
|
||||||
@@ -74,7 +98,25 @@ export class ProcessManager {
|
|||||||
async createStandaloneWorktree(alias: string): Promise<string> {
|
async createStandaloneWorktree(alias: string): Promise<string> {
|
||||||
const agentWorkdir = this.getAgentWorkdir(alias);
|
const agentWorkdir = this.getAgentWorkdir(alias);
|
||||||
const worktreeManager = new SimpleGitWorktreeManager(this.workspaceRoot, undefined, agentWorkdir);
|
const worktreeManager = new SimpleGitWorktreeManager(this.workspaceRoot, undefined, agentWorkdir);
|
||||||
|
|
||||||
|
log.debug({ alias, agentWorkdir }, 'creating standalone worktree');
|
||||||
|
|
||||||
const worktree = await worktreeManager.create('workspace', `agent/${alias}`);
|
const worktree = await worktreeManager.create('workspace', `agent/${alias}`);
|
||||||
|
const worktreePath = worktree.path;
|
||||||
|
const pathExists = existsSync(worktreePath);
|
||||||
|
|
||||||
|
log.debug({
|
||||||
|
alias,
|
||||||
|
agentWorkdir,
|
||||||
|
worktreePath,
|
||||||
|
pathExists
|
||||||
|
}, 'standalone worktree created');
|
||||||
|
|
||||||
|
if (!pathExists) {
|
||||||
|
log.error({ worktreePath }, 'Standalone worktree path does not exist after creation!');
|
||||||
|
throw new Error(`Standalone worktree creation failed: ${worktreePath}`);
|
||||||
|
}
|
||||||
|
|
||||||
return worktree.path;
|
return worktree.path;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -195,6 +237,32 @@ export class ProcessManager {
|
|||||||
prompt?: string,
|
prompt?: string,
|
||||||
onEvent?: (event: StreamEvent) => void,
|
onEvent?: (event: StreamEvent) => void,
|
||||||
): { pid: number; outputFilePath: string; tailer: FileTailer } {
|
): { pid: number; outputFilePath: string; tailer: FileTailer } {
|
||||||
|
// Pre-spawn validation and logging
|
||||||
|
const cwdExists = existsSync(cwd);
|
||||||
|
const commandWithArgs = [command, ...args].join(' ');
|
||||||
|
|
||||||
|
// Log environment variables that might affect working directory
|
||||||
|
const environmentInfo = {
|
||||||
|
PWD: process.env.PWD,
|
||||||
|
HOME: process.env.HOME,
|
||||||
|
CLAUDE_CONFIG_DIR: env.CLAUDE_CONFIG_DIR,
|
||||||
|
CW_CONFIG_DIR: env.CW_CONFIG_DIR
|
||||||
|
};
|
||||||
|
|
||||||
|
log.info({
|
||||||
|
agentId,
|
||||||
|
cwd,
|
||||||
|
cwdExists,
|
||||||
|
commandWithArgs,
|
||||||
|
providerName,
|
||||||
|
environmentInfo
|
||||||
|
}, 'spawning detached process with workdir validation');
|
||||||
|
|
||||||
|
if (!cwdExists) {
|
||||||
|
log.error({ cwd }, 'CWD does not exist before spawn!');
|
||||||
|
throw new Error(`Agent working directory does not exist: ${cwd}`);
|
||||||
|
}
|
||||||
|
|
||||||
const logDir = join(this.workspaceRoot, '.cw', 'agent-logs', agentId);
|
const logDir = join(this.workspaceRoot, '.cw', 'agent-logs', agentId);
|
||||||
mkdirSync(logDir, { recursive: true });
|
mkdirSync(logDir, { recursive: true });
|
||||||
const outputFilePath = join(logDir, 'output.jsonl');
|
const outputFilePath = join(logDir, 'output.jsonl');
|
||||||
@@ -220,7 +288,14 @@ export class ProcessManager {
|
|||||||
child.unref();
|
child.unref();
|
||||||
|
|
||||||
const pid = child.pid!;
|
const pid = child.pid!;
|
||||||
log.debug({ agentId, pid, command }, 'spawned detached process');
|
log.info({
|
||||||
|
agentId,
|
||||||
|
pid,
|
||||||
|
command,
|
||||||
|
args: args.join(' '),
|
||||||
|
cwd,
|
||||||
|
spawnSuccess: true
|
||||||
|
}, 'spawned detached process successfully');
|
||||||
|
|
||||||
const parser = getStreamParser(providerName);
|
const parser = getStreamParser(providerName);
|
||||||
const tailer = new FileTailer({
|
const tailer = new FileTailer({
|
||||||
@@ -239,6 +314,33 @@ export class ProcessManager {
|
|||||||
return { pid, outputFilePath, tailer };
|
return { pid, outputFilePath, tailer };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for a file to stabilize (no size changes for 300ms).
|
||||||
|
* More robust than hardcoded delays.
|
||||||
|
*/
|
||||||
|
private async waitForFileCompletion(filePath: string, timeout = 10000): Promise<boolean> {
|
||||||
|
const startTime = Date.now();
|
||||||
|
let lastSize = -1;
|
||||||
|
let stableCount = 0;
|
||||||
|
|
||||||
|
while (Date.now() - startTime < timeout) {
|
||||||
|
try {
|
||||||
|
const stats = await stat(filePath);
|
||||||
|
if (stats.size === lastSize) {
|
||||||
|
stableCount++;
|
||||||
|
if (stableCount >= 3) return true; // Stable for 300ms
|
||||||
|
} else {
|
||||||
|
lastSize = stats.size;
|
||||||
|
stableCount = 0;
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// File doesn't exist yet
|
||||||
|
}
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 100));
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Poll for process completion by checking if PID is still alive.
|
* Poll for process completion by checking if PID is still alive.
|
||||||
* When the process exits, calls onComplete callback.
|
* When the process exits, calls onComplete callback.
|
||||||
@@ -251,12 +353,25 @@ export class ProcessManager {
|
|||||||
pid: number,
|
pid: number,
|
||||||
onComplete: () => Promise<void>,
|
onComplete: () => Promise<void>,
|
||||||
getTailer: () => FileTailer | undefined,
|
getTailer: () => FileTailer | undefined,
|
||||||
|
outputFilePath?: string,
|
||||||
): void {
|
): void {
|
||||||
|
const processLog = log.child({ agentId, pid });
|
||||||
|
|
||||||
const check = async () => {
|
const check = async () => {
|
||||||
if (!isPidAlive(pid)) {
|
if (!isPidAlive(pid)) {
|
||||||
const tailer = getTailer();
|
const tailer = getTailer();
|
||||||
if (tailer) {
|
if (tailer) {
|
||||||
await new Promise((resolve) => setTimeout(resolve, 500));
|
// Wait for output file to stabilize instead of hardcoded delay
|
||||||
|
if (outputFilePath) {
|
||||||
|
processLog.debug('waiting for output file completion');
|
||||||
|
const completed = await this.waitForFileCompletion(outputFilePath);
|
||||||
|
if (!completed) {
|
||||||
|
processLog.warn('output file did not stabilize within timeout');
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Fallback to short delay if no output file path
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||||
|
}
|
||||||
await tailer.stop();
|
await tailer.stop();
|
||||||
}
|
}
|
||||||
await onComplete();
|
await onComplete();
|
||||||
|
|||||||
Reference in New Issue
Block a user