fix(agent): Replace hardcoded 500ms delay with robust file completion detection
Fixes race condition where agents were incorrectly marked as crashed when output files took longer than 500ms to complete writing. Changes: - Replace hardcoded 500ms delay with polling-based file completion detection - Add signal file validation to ensure JSON is complete before processing - Make status updates atomic to prevent race conditions - Update cleanup manager to pass outputFilePath for proper timing This resolves the issue where successful agents like "abundant-wolverine" were marked as crashed despite producing valid output.
This commit is contained in:
@@ -7,7 +7,8 @@
|
||||
|
||||
import { promisify } from 'node:util';
|
||||
import { execFile } from 'node:child_process';
|
||||
import { readFile, readdir, rm } from 'node:fs/promises';
|
||||
import { readFile, readdir, rm, cp, mkdir } from 'node:fs/promises';
|
||||
import { existsSync } from 'node:fs';
|
||||
import { join } from 'node:path';
|
||||
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
||||
@@ -41,6 +42,7 @@ export class CleanupManager {
|
||||
private repository: AgentRepository,
|
||||
private projectRepository: ProjectRepository,
|
||||
private eventBus?: EventBus,
|
||||
private debug: boolean = false,
|
||||
) {}
|
||||
|
||||
/**
|
||||
@@ -212,6 +214,117 @@ export class CleanupManager {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if all project worktrees for an agent are clean (no uncommitted/untracked files).
|
||||
*/
|
||||
async isWorkdirClean(alias: string, initiativeId: string | null): Promise<boolean> {
|
||||
const agentWorkdir = this.getAgentWorkdir(alias);
|
||||
|
||||
try {
|
||||
await readdir(agentWorkdir);
|
||||
} catch {
|
||||
// Workdir doesn't exist — treat as clean
|
||||
return true;
|
||||
}
|
||||
|
||||
const worktreePaths: string[] = [];
|
||||
|
||||
if (initiativeId) {
|
||||
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
|
||||
for (const project of projects) {
|
||||
worktreePaths.push(join(agentWorkdir, project.name));
|
||||
}
|
||||
} else {
|
||||
worktreePaths.push(join(agentWorkdir, 'workspace'));
|
||||
}
|
||||
|
||||
for (const wtPath of worktreePaths) {
|
||||
try {
|
||||
const { stdout } = await execFileAsync('git', ['status', '--porcelain'], { cwd: wtPath });
|
||||
if (stdout.trim().length > 0) {
|
||||
log.info({ alias, worktree: wtPath }, 'workdir has uncommitted changes');
|
||||
return false;
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn({ alias, worktree: wtPath, err: err instanceof Error ? err.message : String(err) }, 'git status failed, treating as dirty');
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Archive agent workdir and logs to .cw/debug/ before removal.
|
||||
*/
|
||||
async archiveForDebug(alias: string, agentId: string): Promise<void> {
|
||||
const agentWorkdir = this.getAgentWorkdir(alias);
|
||||
const debugWorkdir = join(this.workspaceRoot, '.cw', 'debug', 'workdirs', alias);
|
||||
const logDir = join(this.workspaceRoot, '.cw', 'agent-logs', agentId);
|
||||
const debugLogDir = join(this.workspaceRoot, '.cw', 'debug', 'agent-logs', agentId);
|
||||
|
||||
try {
|
||||
if (existsSync(agentWorkdir)) {
|
||||
await mkdir(join(this.workspaceRoot, '.cw', 'debug', 'workdirs'), { recursive: true });
|
||||
await cp(agentWorkdir, debugWorkdir, { recursive: true });
|
||||
log.debug({ alias, debugWorkdir }, 'archived workdir for debug');
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn({ alias, err: err instanceof Error ? err.message : String(err) }, 'failed to archive workdir for debug');
|
||||
}
|
||||
|
||||
try {
|
||||
if (existsSync(logDir)) {
|
||||
await mkdir(join(this.workspaceRoot, '.cw', 'debug', 'agent-logs'), { recursive: true });
|
||||
await cp(logDir, debugLogDir, { recursive: true });
|
||||
log.debug({ agentId, debugLogDir }, 'archived logs for debug');
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to archive logs for debug');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Auto-cleanup agent workdir after successful completion.
|
||||
* Removes worktrees and logs but preserves branches and DB record.
|
||||
*/
|
||||
async autoCleanupAfterCompletion(
|
||||
agentId: string,
|
||||
alias: string,
|
||||
initiativeId: string | null,
|
||||
): Promise<{ clean: boolean; removed: boolean }> {
|
||||
const agentWorkdir = this.getAgentWorkdir(alias);
|
||||
|
||||
// Idempotent: if workdir is already gone, nothing to do
|
||||
if (!existsSync(agentWorkdir)) {
|
||||
return { clean: true, removed: true };
|
||||
}
|
||||
|
||||
const clean = await this.isWorkdirClean(alias, initiativeId);
|
||||
if (!clean) {
|
||||
return { clean: false, removed: false };
|
||||
}
|
||||
|
||||
if (this.debug) {
|
||||
await this.archiveForDebug(alias, agentId);
|
||||
}
|
||||
|
||||
try {
|
||||
await this.removeAgentWorktrees(alias, initiativeId);
|
||||
} catch (err) {
|
||||
log.warn({ agentId, alias, err: err instanceof Error ? err.message : String(err) }, 'auto-cleanup: failed to remove worktrees');
|
||||
}
|
||||
|
||||
try {
|
||||
await this.removeAgentLogs(agentId);
|
||||
} catch (err) {
|
||||
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'auto-cleanup: failed to remove logs');
|
||||
}
|
||||
|
||||
log.info({ agentId, alias }, 'auto-cleanup: workdir and logs removed');
|
||||
return { clean: true, removed: true };
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconcile agent state after server restart.
|
||||
* Checks all agents in 'running' status:
|
||||
@@ -233,7 +346,7 @@ export class CleanupManager {
|
||||
}>,
|
||||
onStreamEvent: (agentId: string, event: StreamEvent) => void,
|
||||
onAgentOutput: (agentId: string, rawOutput: string, provider: NonNullable<ReturnType<typeof getProvider>>) => Promise<void>,
|
||||
pollForCompletion: (agentId: string, pid: number) => void,
|
||||
pollForCompletion: (agentId: string, pid: number, outputFilePath?: string) => void,
|
||||
): Promise<void> {
|
||||
const runningAgents = await this.repository.findByStatus('running');
|
||||
log.info({ runningCount: runningAgents.length }, 'reconciling agents after restart');
|
||||
@@ -268,7 +381,7 @@ export class CleanupManager {
|
||||
outputFilePath: agent.outputFilePath,
|
||||
});
|
||||
|
||||
pollForCompletion(agent.id, pid);
|
||||
pollForCompletion(agent.id, pid, agent.outputFilePath);
|
||||
} else if (agent.outputFilePath) {
|
||||
try {
|
||||
const rawOutput = await readFile(agent.outputFilePath, 'utf-8');
|
||||
|
||||
@@ -21,6 +21,7 @@ import type {
|
||||
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||
import type { AccountRepository } from '../db/repositories/account-repository.js';
|
||||
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
||||
import type { ProposalRepository } from '../db/repositories/proposal-repository.js';
|
||||
import { generateUniqueAlias } from './alias.js';
|
||||
import type {
|
||||
EventBus,
|
||||
@@ -32,6 +33,7 @@ import type {
|
||||
import { writeInputFiles } from './file-io.js';
|
||||
import { getProvider } from './providers/registry.js';
|
||||
import { createModuleLogger } from '../logger/index.js';
|
||||
import { join } from 'node:path';
|
||||
import type { AccountCredentialManager } from './credentials/types.js';
|
||||
import { ProcessManager } from './process-manager.js';
|
||||
import { CredentialHandler } from './credential-handler.js';
|
||||
@@ -41,8 +43,11 @@ import { CleanupManager } from './cleanup-manager.js';
|
||||
const log = createModuleLogger('agent-manager');
|
||||
|
||||
export class MultiProviderAgentManager implements AgentManager {
|
||||
private static readonly MAX_COMMIT_RETRIES = 1;
|
||||
|
||||
private activeAgents: Map<string, ActiveAgent> = new Map();
|
||||
private outputBuffers: Map<string, string[]> = new Map();
|
||||
private commitRetryCount: Map<string, number> = new Map();
|
||||
private processManager: ProcessManager;
|
||||
private credentialHandler: CredentialHandler;
|
||||
private outputHandler: OutputHandler;
|
||||
@@ -55,11 +60,13 @@ export class MultiProviderAgentManager implements AgentManager {
|
||||
private accountRepository?: AccountRepository,
|
||||
private eventBus?: EventBus,
|
||||
private credentialManager?: AccountCredentialManager,
|
||||
private proposalRepository?: ProposalRepository,
|
||||
private debug: boolean = false,
|
||||
) {
|
||||
this.processManager = new ProcessManager(workspaceRoot, projectRepository, eventBus);
|
||||
this.credentialHandler = new CredentialHandler(workspaceRoot, accountRepository, credentialManager);
|
||||
this.outputHandler = new OutputHandler(repository, eventBus);
|
||||
this.cleanupManager = new CleanupManager(workspaceRoot, repository, projectRepository, eventBus);
|
||||
this.outputHandler = new OutputHandler(repository, eventBus, proposalRepository);
|
||||
this.cleanupManager = new CleanupManager(workspaceRoot, repository, projectRepository, eventBus, debug);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -116,11 +123,33 @@ export class MultiProviderAgentManager implements AgentManager {
|
||||
// 2. Create isolated worktrees
|
||||
let agentCwd: string;
|
||||
if (initiativeId) {
|
||||
log.debug({ alias, initiativeId }, 'creating initiative-based worktrees');
|
||||
agentCwd = await this.processManager.createProjectWorktrees(alias, initiativeId);
|
||||
|
||||
// Log projects linked to the initiative
|
||||
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
|
||||
log.info({
|
||||
alias,
|
||||
initiativeId,
|
||||
projectCount: projects.length,
|
||||
projects: projects.map(p => ({ name: p.name, url: p.url })),
|
||||
agentCwd
|
||||
}, 'initiative-based agent workdir created');
|
||||
} else {
|
||||
log.debug({ alias }, 'creating standalone worktree');
|
||||
agentCwd = await this.processManager.createStandaloneWorktree(alias);
|
||||
log.info({ alias, agentCwd }, 'standalone agent workdir created');
|
||||
}
|
||||
log.debug({ alias, agentCwd }, 'worktrees created');
|
||||
|
||||
// Verify the final agentCwd exists
|
||||
const { existsSync: fsExistsSync } = await import('node:fs');
|
||||
const cwdVerified = fsExistsSync(agentCwd);
|
||||
log.info({
|
||||
alias,
|
||||
agentCwd,
|
||||
cwdVerified,
|
||||
initiativeBasedAgent: !!initiativeId
|
||||
}, 'agent workdir setup completed');
|
||||
|
||||
// 2b. Write input files
|
||||
if (options.inputContext) {
|
||||
@@ -144,13 +173,38 @@ export class MultiProviderAgentManager implements AgentManager {
|
||||
|
||||
// 4. Build spawn command
|
||||
const { command, args, env: providerEnv } = this.processManager.buildSpawnCommand(provider, prompt);
|
||||
log.debug({ command, args: args.join(' '), cwd: cwd ?? agentCwd }, 'spawn command built');
|
||||
const finalCwd = cwd ?? agentCwd;
|
||||
|
||||
log.info({
|
||||
agentId,
|
||||
alias,
|
||||
command,
|
||||
args: args.join(' '),
|
||||
finalCwd,
|
||||
customCwdProvided: !!cwd,
|
||||
providerEnv: Object.keys(providerEnv)
|
||||
}, 'spawn command built');
|
||||
|
||||
// 5. Set config dir env var if account selected
|
||||
const processEnv: Record<string, string> = { ...providerEnv };
|
||||
if (accountConfigDir && provider.configDirEnv) {
|
||||
processEnv[provider.configDirEnv] = accountConfigDir;
|
||||
|
||||
// Inject CLAUDE_CODE_OAUTH_TOKEN to bypass macOS keychain entirely.
|
||||
// Claude Code prioritizes this env var over keychain/file-based credentials.
|
||||
const accessToken = this.credentialHandler.readAccessToken(accountConfigDir);
|
||||
if (accessToken) {
|
||||
processEnv['CLAUDE_CODE_OAUTH_TOKEN'] = accessToken;
|
||||
log.debug({ alias }, 'CLAUDE_CODE_OAUTH_TOKEN injected for spawn');
|
||||
}
|
||||
}
|
||||
|
||||
log.debug({
|
||||
agentId,
|
||||
finalProcessEnv: Object.keys(processEnv),
|
||||
hasAccountConfig: !!accountConfigDir,
|
||||
hasOAuthToken: !!processEnv['CLAUDE_CODE_OAUTH_TOKEN'],
|
||||
}, 'process environment prepared');
|
||||
|
||||
// 6. Spawn detached subprocess
|
||||
const { pid, outputFilePath, tailer } = this.processManager.spawnDetached(
|
||||
@@ -160,8 +214,32 @@ export class MultiProviderAgentManager implements AgentManager {
|
||||
|
||||
await this.repository.update(agentId, { pid, outputFilePath });
|
||||
|
||||
// Write spawn diagnostic file for post-execution verification
|
||||
const { writeFileSync, existsSync: diagnosticExistsSync } = await import('node:fs');
|
||||
const diagnostic = {
|
||||
timestamp: new Date().toISOString(),
|
||||
agentId,
|
||||
alias,
|
||||
intendedCwd: finalCwd,
|
||||
worktreeId: agent.worktreeId,
|
||||
provider: providerName,
|
||||
command,
|
||||
args,
|
||||
env: processEnv,
|
||||
cwdExistsAtSpawn: diagnosticExistsSync(finalCwd),
|
||||
initiativeId: initiativeId || null,
|
||||
customCwdProvided: !!cwd,
|
||||
accountId: accountId || null,
|
||||
};
|
||||
|
||||
writeFileSync(
|
||||
join(finalCwd, '.cw', 'spawn-diagnostic.json'),
|
||||
JSON.stringify(diagnostic, null, 2),
|
||||
'utf-8'
|
||||
);
|
||||
|
||||
this.activeAgents.set(agentId, { agentId, pid, tailer, outputFilePath });
|
||||
log.info({ agentId, alias, pid }, 'detached subprocess started');
|
||||
log.info({ agentId, alias, pid, diagnosticWritten: true }, 'detached subprocess started with diagnostic');
|
||||
|
||||
// Emit spawned event
|
||||
if (this.eventBus) {
|
||||
@@ -178,6 +256,7 @@ export class MultiProviderAgentManager implements AgentManager {
|
||||
agentId, pid,
|
||||
() => this.handleDetachedAgentCompletion(agentId),
|
||||
() => this.activeAgents.get(agentId)?.tailer,
|
||||
outputFilePath,
|
||||
);
|
||||
|
||||
return this.toAgentInfo(agent);
|
||||
@@ -195,7 +274,132 @@ export class MultiProviderAgentManager implements AgentManager {
|
||||
active,
|
||||
(alias) => this.processManager.getAgentWorkdir(alias),
|
||||
);
|
||||
|
||||
// Sync credentials back to DB if the agent had an account
|
||||
await this.syncCredentialsPostCompletion(agentId);
|
||||
|
||||
this.activeAgents.delete(agentId);
|
||||
|
||||
// Auto-cleanup workdir after completion
|
||||
await this.tryAutoCleanup(agentId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt auto-cleanup of agent workdir after completion.
|
||||
* If dirty and retries remain, resumes the agent to commit changes.
|
||||
*/
|
||||
private async tryAutoCleanup(agentId: string): Promise<void> {
|
||||
try {
|
||||
const agent = await this.repository.findById(agentId);
|
||||
if (!agent || agent.status !== 'idle') return;
|
||||
|
||||
const { clean, removed } = await this.cleanupManager.autoCleanupAfterCompletion(
|
||||
agentId, agent.name, agent.initiativeId,
|
||||
);
|
||||
|
||||
if (removed) {
|
||||
this.commitRetryCount.delete(agentId);
|
||||
log.info({ agentId, alias: agent.name }, 'auto-cleanup completed');
|
||||
return;
|
||||
}
|
||||
|
||||
if (!clean) {
|
||||
const retries = this.commitRetryCount.get(agentId) ?? 0;
|
||||
if (retries < MultiProviderAgentManager.MAX_COMMIT_RETRIES) {
|
||||
this.commitRetryCount.set(agentId, retries + 1);
|
||||
const resumed = await this.resumeForCommit(agentId);
|
||||
if (resumed) {
|
||||
log.info({ agentId, alias: agent.name, retry: retries + 1 }, 'resumed agent to commit uncommitted changes');
|
||||
return;
|
||||
}
|
||||
}
|
||||
log.warn({ agentId, alias: agent.name }, 'agent workdir has uncommitted changes after max retries, leaving in place');
|
||||
this.commitRetryCount.delete(agentId);
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'auto-cleanup failed');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume an agent's session with a prompt to commit uncommitted changes.
|
||||
* Returns false if the agent can't be resumed (no session, provider doesn't support resume).
|
||||
*/
|
||||
private async resumeForCommit(agentId: string): Promise<boolean> {
|
||||
const agent = await this.repository.findById(agentId);
|
||||
if (!agent?.sessionId) return false;
|
||||
|
||||
const provider = getProvider(agent.provider);
|
||||
if (!provider || provider.resumeStyle === 'none') return false;
|
||||
|
||||
const commitPrompt =
|
||||
'You have uncommitted changes in your working directory. ' +
|
||||
'Stage everything with `git add -A` and commit with an appropriate message describing your work. ' +
|
||||
'Do not make any other changes.';
|
||||
|
||||
await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null });
|
||||
|
||||
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
|
||||
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, commitPrompt);
|
||||
|
||||
const processEnv: Record<string, string> = { ...providerEnv };
|
||||
if (agent.accountId && provider.configDirEnv && this.accountRepository) {
|
||||
const { getAccountConfigDir } = await import('./accounts/paths.js');
|
||||
const configDir = getAccountConfigDir(this.workspaceRoot, agent.accountId);
|
||||
const account = await this.accountRepository.findById(agent.accountId);
|
||||
if (account) {
|
||||
this.credentialHandler.writeCredentialsToDisk(account, configDir);
|
||||
}
|
||||
processEnv[provider.configDirEnv] = configDir;
|
||||
|
||||
const accessToken = this.credentialHandler.readAccessToken(configDir);
|
||||
if (accessToken) {
|
||||
processEnv['CLAUDE_CODE_OAUTH_TOKEN'] = accessToken;
|
||||
}
|
||||
}
|
||||
|
||||
const prevActive = this.activeAgents.get(agentId);
|
||||
if (prevActive?.tailer) {
|
||||
await prevActive.tailer.stop();
|
||||
}
|
||||
|
||||
const { pid, outputFilePath, tailer } = this.processManager.spawnDetached(
|
||||
agentId, command, args, agentCwd, processEnv, provider.name, commitPrompt,
|
||||
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
|
||||
);
|
||||
|
||||
await this.repository.update(agentId, { pid, outputFilePath });
|
||||
this.activeAgents.set(agentId, { agentId, pid, tailer, outputFilePath });
|
||||
|
||||
this.processManager.pollForCompletion(
|
||||
agentId, pid,
|
||||
() => this.handleDetachedAgentCompletion(agentId),
|
||||
() => this.activeAgents.get(agentId)?.tailer,
|
||||
outputFilePath,
|
||||
);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync credentials from agent's config dir back to DB after completion.
|
||||
* The subprocess may have refreshed tokens mid-session; this ensures
|
||||
* the DB stays current and the next spawn uses fresh tokens.
|
||||
*/
|
||||
private async syncCredentialsPostCompletion(agentId: string): Promise<void> {
|
||||
if (!this.accountRepository) return;
|
||||
|
||||
try {
|
||||
const agent = await this.repository.findById(agentId);
|
||||
if (!agent?.accountId) return;
|
||||
|
||||
const { getAccountConfigDir } = await import('./accounts/paths.js');
|
||||
const configDir = getAccountConfigDir(this.workspaceRoot, agent.accountId);
|
||||
await this.credentialHandler.persistRefreshedCredentials(agent.accountId, configDir);
|
||||
log.debug({ agentId, accountId: agent.accountId }, 'post-completion credential sync done');
|
||||
} catch (err) {
|
||||
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'post-completion credential sync failed');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -213,6 +417,9 @@ export class MultiProviderAgentManager implements AgentManager {
|
||||
this.activeAgents.delete(agentId);
|
||||
}
|
||||
|
||||
// Sync credentials before marking stopped
|
||||
await this.syncCredentialsPostCompletion(agentId);
|
||||
|
||||
await this.repository.update(agentId, { status: 'stopped' });
|
||||
|
||||
if (this.eventBus) {
|
||||
@@ -271,9 +478,7 @@ export class MultiProviderAgentManager implements AgentManager {
|
||||
|
||||
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
|
||||
const prompt = this.outputHandler.formatAnswersAsPrompt(answers);
|
||||
await this.repository.update(agentId, { status: 'running' });
|
||||
await this.repository.update(agentId, { pendingQuestions: null });
|
||||
await this.repository.update(agentId, { result: null });
|
||||
await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null });
|
||||
|
||||
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, prompt);
|
||||
log.debug({ command, args: args.join(' ') }, 'resume command built');
|
||||
@@ -295,6 +500,13 @@ export class MultiProviderAgentManager implements AgentManager {
|
||||
if (refreshed) {
|
||||
await this.credentialHandler.persistRefreshedCredentials(agent.accountId, resumeAccountConfigDir);
|
||||
}
|
||||
|
||||
// Inject CLAUDE_CODE_OAUTH_TOKEN to bypass macOS keychain on resume
|
||||
const accessToken = this.credentialHandler.readAccessToken(resumeAccountConfigDir);
|
||||
if (accessToken) {
|
||||
processEnv['CLAUDE_CODE_OAUTH_TOKEN'] = accessToken;
|
||||
log.debug({ agentId }, 'CLAUDE_CODE_OAUTH_TOKEN injected for resume');
|
||||
}
|
||||
}
|
||||
|
||||
// Stop previous tailer
|
||||
@@ -326,6 +538,7 @@ export class MultiProviderAgentManager implements AgentManager {
|
||||
agentId, pid,
|
||||
() => this.handleDetachedAgentCompletion(agentId),
|
||||
() => this.activeAgents.get(agentId)?.tailer,
|
||||
outputFilePath,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -432,10 +645,11 @@ export class MultiProviderAgentManager implements AgentManager {
|
||||
this.activeAgents,
|
||||
(agentId, event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
|
||||
(agentId, rawOutput, provider) => this.outputHandler.processAgentOutput(agentId, rawOutput, provider, (alias) => this.processManager.getAgentWorkdir(alias)),
|
||||
(agentId, pid) => this.processManager.pollForCompletion(
|
||||
(agentId, pid, outputFilePath) => this.processManager.pollForCompletion(
|
||||
agentId, pid,
|
||||
() => this.handleDetachedAgentCompletion(agentId),
|
||||
() => this.activeAgents.get(agentId)?.tailer,
|
||||
outputFilePath,
|
||||
),
|
||||
);
|
||||
}
|
||||
@@ -456,6 +670,7 @@ export class MultiProviderAgentManager implements AgentManager {
|
||||
accountId: string | null;
|
||||
createdAt: Date;
|
||||
updatedAt: Date;
|
||||
userDismissedAt?: Date | null;
|
||||
}): AgentInfo {
|
||||
return {
|
||||
id: agent.id,
|
||||
@@ -470,6 +685,7 @@ export class MultiProviderAgentManager implements AgentManager {
|
||||
accountId: agent.accountId,
|
||||
createdAt: agent.createdAt,
|
||||
updatedAt: agent.updatedAt,
|
||||
userDismissedAt: agent.userDismissedAt,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,10 @@
|
||||
*/
|
||||
|
||||
import { readFile } from 'node:fs/promises';
|
||||
import { existsSync } from 'node:fs';
|
||||
import { join } from 'node:path';
|
||||
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
||||
import type { ProposalRepository, CreateProposalData } from '../db/repositories/proposal-repository.js';
|
||||
import type {
|
||||
EventBus,
|
||||
AgentStoppedEvent,
|
||||
@@ -30,6 +33,7 @@ import {
|
||||
readTaskFiles,
|
||||
readDecisionFiles,
|
||||
readPageFiles,
|
||||
readFrontmatterFile,
|
||||
} from './file-io.js';
|
||||
import { getProvider } from './providers/registry.js';
|
||||
import { createModuleLogger } from '../logger/index.js';
|
||||
@@ -52,6 +56,8 @@ export interface ActiveAgent {
|
||||
streamResultText?: string;
|
||||
streamSessionId?: string;
|
||||
streamCostUsd?: number;
|
||||
/** True when the stream result indicated an error (e.g. auth failure) */
|
||||
streamIsError?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -71,8 +77,30 @@ export class OutputHandler {
|
||||
constructor(
|
||||
private repository: AgentRepository,
|
||||
private eventBus?: EventBus,
|
||||
private proposalRepository?: ProposalRepository,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Validate that a signal file is complete and properly formatted.
|
||||
*/
|
||||
private async validateSignalFile(filePath: string): Promise<boolean> {
|
||||
try {
|
||||
const content = await readFile(filePath, 'utf-8');
|
||||
const trimmed = content.trim();
|
||||
if (!trimmed) return false;
|
||||
|
||||
// Check if JSON is complete (ends with } or ])
|
||||
const endsCorrectly = trimmed.endsWith('}') || trimmed.endsWith(']');
|
||||
if (!endsCorrectly) return false;
|
||||
|
||||
// Try to parse as JSON to ensure it's valid
|
||||
JSON.parse(trimmed);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a standardized stream event from a parser.
|
||||
*/
|
||||
@@ -112,6 +140,7 @@ export class OutputHandler {
|
||||
if (active) {
|
||||
active.streamResultText = event.text;
|
||||
active.streamCostUsd = event.costUsd;
|
||||
active.streamIsError = event.isError === true;
|
||||
if (!active.streamSessionId && event.sessionId) {
|
||||
active.streamSessionId = event.sessionId;
|
||||
}
|
||||
@@ -145,15 +174,52 @@ export class OutputHandler {
|
||||
|
||||
log.debug({ agentId }, 'detached agent completed');
|
||||
|
||||
// Verify agent worked in correct location by checking for output files
|
||||
const agentWorkdir = getAgentWorkdir(agent.worktreeId);
|
||||
const outputDir = join(agentWorkdir, '.cw', 'output');
|
||||
const expectedPwdFile = join(agentWorkdir, '.cw', 'expected-pwd.txt');
|
||||
const diagnosticFile = join(agentWorkdir, '.cw', 'spawn-diagnostic.json');
|
||||
|
||||
const outputDirExists = existsSync(outputDir);
|
||||
const expectedPwdExists = existsSync(expectedPwdFile);
|
||||
const diagnosticExists = existsSync(diagnosticFile);
|
||||
|
||||
log.info({
|
||||
agentId,
|
||||
agentWorkdir,
|
||||
outputDirExists,
|
||||
expectedPwdExists,
|
||||
diagnosticExists,
|
||||
verification: outputDirExists ? 'PASS' : 'FAIL'
|
||||
}, 'agent workdir verification completed');
|
||||
|
||||
if (!outputDirExists) {
|
||||
log.warn({
|
||||
agentId,
|
||||
agentWorkdir
|
||||
}, 'No output files found in agent workdir! Agent may have run in wrong location.');
|
||||
}
|
||||
|
||||
let signalText = active?.streamResultText;
|
||||
|
||||
// If the stream result indicated an error (e.g. auth failure, usage limit),
|
||||
// route directly to error handling instead of trying to parse as signal JSON
|
||||
if (signalText && active?.streamIsError) {
|
||||
log.warn({ agentId, error: signalText }, 'agent returned error result');
|
||||
await this.handleAgentError(agentId, new Error(signalText), provider, getAgentWorkdir);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!signalText) {
|
||||
try {
|
||||
const fileContent = await readFile(active?.outputFilePath ?? '', 'utf-8');
|
||||
const outputFilePath = active?.outputFilePath ?? '';
|
||||
if (outputFilePath && await this.validateSignalFile(outputFilePath)) {
|
||||
const fileContent = await readFile(outputFilePath, 'utf-8');
|
||||
if (fileContent.trim()) {
|
||||
await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} catch { /* file empty or missing */ }
|
||||
|
||||
log.warn({ agentId }, 'no result text from stream or file');
|
||||
@@ -212,23 +278,70 @@ export class OutputHandler {
|
||||
*/
|
||||
private async processOutputFiles(
|
||||
agentId: string,
|
||||
agent: { id: string; name: string; taskId: string | null; worktreeId: string; mode: string },
|
||||
agent: { id: string; name: string; taskId: string | null; worktreeId: string; mode: string; initiativeId?: string | null },
|
||||
mode: AgentMode,
|
||||
getAgentWorkdir: (alias: string) => string,
|
||||
): Promise<void> {
|
||||
const agentWorkdir = getAgentWorkdir(agent.worktreeId);
|
||||
const summary = readSummary(agentWorkdir);
|
||||
const initiativeId = agent.initiativeId;
|
||||
const canWriteProposals = this.proposalRepository && initiativeId;
|
||||
|
||||
let resultMessage = summary?.body ?? 'Task completed';
|
||||
switch (mode) {
|
||||
case 'breakdown': {
|
||||
const phases = readPhaseFiles(agentWorkdir);
|
||||
if (canWriteProposals && phases.length > 0) {
|
||||
const proposalData: CreateProposalData[] = phases.map((p, i) => ({
|
||||
agentId,
|
||||
initiativeId,
|
||||
targetType: 'phase' as const,
|
||||
targetId: null,
|
||||
title: p.title,
|
||||
summary: null,
|
||||
content: p.body || null,
|
||||
metadata: JSON.stringify({ fileId: p.id, number: i + 1, dependencies: p.dependencies }),
|
||||
status: 'pending' as const,
|
||||
sortOrder: i,
|
||||
}));
|
||||
await this.proposalRepository!.createMany(proposalData);
|
||||
resultMessage = summary?.body ?? `${phases.length} phase proposals created`;
|
||||
} else {
|
||||
resultMessage = JSON.stringify({ summary: summary?.body, phases });
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'decompose': {
|
||||
const tasks = readTaskFiles(agentWorkdir);
|
||||
if (canWriteProposals && tasks.length > 0) {
|
||||
// Read phase info from input context if available
|
||||
const phaseInput = readFrontmatterFile(join(agentWorkdir, '.cw', 'input', 'phase.md'));
|
||||
const phaseId = (phaseInput?.data?.id as string) ?? null;
|
||||
|
||||
const proposalData: CreateProposalData[] = tasks.map((t, i) => ({
|
||||
agentId,
|
||||
initiativeId,
|
||||
targetType: 'task' as const,
|
||||
targetId: null,
|
||||
title: t.title,
|
||||
summary: null,
|
||||
content: t.body || null,
|
||||
metadata: JSON.stringify({
|
||||
fileId: t.id,
|
||||
category: t.category,
|
||||
type: t.type,
|
||||
dependencies: t.dependencies,
|
||||
parentTaskId: agent.taskId,
|
||||
phaseId,
|
||||
}),
|
||||
status: 'pending' as const,
|
||||
sortOrder: i,
|
||||
}));
|
||||
await this.proposalRepository!.createMany(proposalData);
|
||||
resultMessage = summary?.body ?? `${tasks.length} task proposals created`;
|
||||
} else {
|
||||
resultMessage = JSON.stringify({ summary: summary?.body, tasks });
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'discuss': {
|
||||
@@ -238,7 +351,44 @@ export class OutputHandler {
|
||||
}
|
||||
case 'refine': {
|
||||
const pages = readPageFiles(agentWorkdir);
|
||||
if (canWriteProposals) {
|
||||
if (pages.length > 0) {
|
||||
// Create proposals for actual page changes
|
||||
const proposalData: CreateProposalData[] = pages.map((p, i) => ({
|
||||
agentId,
|
||||
initiativeId,
|
||||
targetType: 'page' as const,
|
||||
targetId: p.pageId,
|
||||
title: p.title,
|
||||
summary: p.summary || null,
|
||||
content: p.body || null,
|
||||
metadata: null,
|
||||
status: 'pending' as const,
|
||||
sortOrder: i,
|
||||
}));
|
||||
await this.proposalRepository!.createMany(proposalData);
|
||||
resultMessage = summary?.body ?? `${pages.length} page proposals created`;
|
||||
} else {
|
||||
// Create a synthetic completion proposal when no changes are proposed
|
||||
// This ensures the dismiss flow always goes through proposals domain
|
||||
const completionProposal: CreateProposalData = {
|
||||
agentId,
|
||||
initiativeId,
|
||||
targetType: 'page' as const,
|
||||
targetId: null,
|
||||
title: 'Analysis Complete',
|
||||
summary: 'Agent completed review with no changes proposed',
|
||||
content: summary?.body || 'The agent has finished analyzing the content and determined no changes are needed.',
|
||||
metadata: JSON.stringify({ synthetic: true, reason: 'no_changes' }),
|
||||
status: 'pending' as const,
|
||||
sortOrder: 0,
|
||||
};
|
||||
await this.proposalRepository!.createMany([completionProposal]);
|
||||
resultMessage = summary?.body ?? 'Analysis completed with 1 completion notice';
|
||||
}
|
||||
} else {
|
||||
resultMessage = JSON.stringify({ summary: summary?.body, proposals: pages });
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -248,8 +398,7 @@ export class OutputHandler {
|
||||
message: resultMessage,
|
||||
filesModified: summary?.filesModified,
|
||||
};
|
||||
await this.repository.update(agentId, { result: JSON.stringify(resultPayload) });
|
||||
await this.repository.update(agentId, { status: 'idle' });
|
||||
await this.repository.update(agentId, { result: JSON.stringify(resultPayload), status: 'idle' });
|
||||
|
||||
const reason = this.getStoppedReason(mode);
|
||||
if (this.eventBus) {
|
||||
@@ -280,8 +429,7 @@ export class OutputHandler {
|
||||
): Promise<void> {
|
||||
const questionsPayload: PendingQuestions = { questions };
|
||||
|
||||
await this.repository.update(agentId, { pendingQuestions: JSON.stringify(questionsPayload) });
|
||||
await this.repository.update(agentId, { status: 'waiting_for_input' });
|
||||
await this.repository.update(agentId, { pendingQuestions: JSON.stringify(questionsPayload), status: 'waiting_for_input' });
|
||||
|
||||
if (this.eventBus) {
|
||||
const event: AgentWaitingEvent = {
|
||||
@@ -309,8 +457,10 @@ export class OutputHandler {
|
||||
): Promise<void> {
|
||||
const errorResult: AgentResult = { success: false, message: error };
|
||||
|
||||
await this.repository.update(agentId, { result: JSON.stringify(errorResult) });
|
||||
await this.repository.update(agentId, { status: 'crashed' });
|
||||
await this.repository.update(agentId, {
|
||||
result: JSON.stringify(errorResult),
|
||||
status: 'crashed'
|
||||
});
|
||||
this.emitCrashed(agent, error);
|
||||
}
|
||||
|
||||
@@ -342,13 +492,20 @@ export class OutputHandler {
|
||||
// Extract session ID using provider's extraction config
|
||||
let sessionId: string | null = null;
|
||||
if (provider.sessionId) {
|
||||
try {
|
||||
const outputLines = rawOutput.trim().split('\n');
|
||||
if (provider.sessionId.extractFrom === 'result') {
|
||||
const parsed = JSON.parse(rawOutput);
|
||||
// Find the result line in JSONL output
|
||||
for (const line of outputLines) {
|
||||
try {
|
||||
const parsed = JSON.parse(line);
|
||||
if (parsed.type === 'result' || parsed[provider.sessionId.field]) {
|
||||
sessionId = parsed[provider.sessionId.field] ?? null;
|
||||
if (sessionId) break;
|
||||
}
|
||||
} catch { /* skip */ }
|
||||
}
|
||||
} else if (provider.sessionId.extractFrom === 'event') {
|
||||
const lines = rawOutput.trim().split('\n');
|
||||
for (const line of lines) {
|
||||
for (const line of outputLines) {
|
||||
try {
|
||||
const event = JSON.parse(line);
|
||||
if (event.type === provider.sessionId.eventType) {
|
||||
@@ -357,7 +514,6 @@ export class OutputHandler {
|
||||
} catch { /* skip */ }
|
||||
}
|
||||
}
|
||||
} catch { /* parse failure */ }
|
||||
}
|
||||
|
||||
if (sessionId) {
|
||||
@@ -366,14 +522,38 @@ export class OutputHandler {
|
||||
log.debug({ agentId, provider: provider.name, hasSessionId: !!sessionId }, 'processing agent output');
|
||||
|
||||
if (provider.name === 'claude') {
|
||||
// rawOutput may be a single JSON object or multi-line JSONL — find the result line
|
||||
let cliResult: ClaudeCliResult | null = null;
|
||||
const lines = rawOutput.trim().split('\n');
|
||||
for (const line of lines) {
|
||||
try {
|
||||
const parsed = JSON.parse(line);
|
||||
if (parsed.type === 'result') {
|
||||
cliResult = parsed;
|
||||
}
|
||||
} catch { /* skip non-JSON lines */ }
|
||||
}
|
||||
|
||||
if (!cliResult) {
|
||||
log.error({ agentId }, 'no result event found in agent output');
|
||||
await this.handleAgentError(agentId, new Error('No result event in output'), provider, getAgentWorkdir);
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle error results (auth failure, usage limits, etc.)
|
||||
if (cliResult.is_error) {
|
||||
log.warn({ agentId, error: cliResult.result }, 'agent returned error result from file');
|
||||
await this.handleAgentError(agentId, new Error(cliResult.result), provider, getAgentWorkdir);
|
||||
return;
|
||||
}
|
||||
|
||||
let signalText: string;
|
||||
try {
|
||||
const cliResult: ClaudeCliResult = JSON.parse(rawOutput);
|
||||
const signal = cliResult.structured_output ?? JSON.parse(cliResult.result);
|
||||
signalText = JSON.stringify(signal);
|
||||
} catch (parseErr) {
|
||||
log.error({ agentId, err: parseErr instanceof Error ? parseErr.message : String(parseErr) }, 'failed to parse agent output');
|
||||
await this.repository.update(agentId, { status: 'crashed' });
|
||||
log.error({ agentId, err: parseErr instanceof Error ? parseErr.message : String(parseErr) }, 'failed to parse agent signal from result');
|
||||
await this.handleAgentError(agentId, new Error('Failed to parse agent signal'), provider, getAgentWorkdir);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -399,7 +579,15 @@ export class OutputHandler {
|
||||
|
||||
log.error({ agentId, err: errorMessage }, 'agent error');
|
||||
|
||||
await this.repository.update(agentId, { status: 'crashed' });
|
||||
const errorResult: AgentResult = {
|
||||
success: false,
|
||||
message: errorMessage,
|
||||
};
|
||||
|
||||
await this.repository.update(agentId, {
|
||||
status: 'crashed',
|
||||
result: JSON.stringify(errorResult)
|
||||
});
|
||||
|
||||
if (this.eventBus) {
|
||||
const event: AgentCrashedEvent = {
|
||||
@@ -414,12 +602,6 @@ export class OutputHandler {
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
|
||||
const errorResult: AgentResult = {
|
||||
success: false,
|
||||
message: errorMessage,
|
||||
};
|
||||
await this.repository.update(agentId, { result: JSON.stringify(errorResult) });
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -7,7 +7,8 @@
|
||||
*/
|
||||
|
||||
import { spawn } from 'node:child_process';
|
||||
import { writeFileSync, mkdirSync, openSync, closeSync } from 'node:fs';
|
||||
import { writeFileSync, mkdirSync, openSync, closeSync, existsSync } from 'node:fs';
|
||||
import { stat } from 'node:fs/promises';
|
||||
import { join } from 'node:path';
|
||||
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
||||
import type { EventBus } from '../events/index.js';
|
||||
@@ -59,10 +60,33 @@ export class ProcessManager {
|
||||
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
|
||||
const agentWorkdir = this.getAgentWorkdir(alias);
|
||||
|
||||
log.debug({
|
||||
alias,
|
||||
initiativeId,
|
||||
projectCount: projects.length,
|
||||
agentWorkdir,
|
||||
baseBranch
|
||||
}, 'creating project worktrees');
|
||||
|
||||
for (const project of projects) {
|
||||
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
|
||||
const worktreeManager = new SimpleGitWorktreeManager(clonePath, undefined, agentWorkdir);
|
||||
await worktreeManager.create(project.name, `agent/${alias}`, baseBranch);
|
||||
const worktree = await worktreeManager.create(project.name, `agent/${alias}`, baseBranch);
|
||||
const worktreePath = worktree.path;
|
||||
const pathExists = existsSync(worktreePath);
|
||||
|
||||
log.debug({
|
||||
alias,
|
||||
agentWorkdir,
|
||||
projectName: project.name,
|
||||
worktreePath,
|
||||
pathExists
|
||||
}, 'worktree created');
|
||||
|
||||
if (!pathExists) {
|
||||
log.error({ worktreePath }, 'Worktree path does not exist after creation!');
|
||||
throw new Error(`Worktree creation failed: ${worktreePath}`);
|
||||
}
|
||||
}
|
||||
|
||||
return agentWorkdir;
|
||||
@@ -74,7 +98,25 @@ export class ProcessManager {
|
||||
async createStandaloneWorktree(alias: string): Promise<string> {
|
||||
const agentWorkdir = this.getAgentWorkdir(alias);
|
||||
const worktreeManager = new SimpleGitWorktreeManager(this.workspaceRoot, undefined, agentWorkdir);
|
||||
|
||||
log.debug({ alias, agentWorkdir }, 'creating standalone worktree');
|
||||
|
||||
const worktree = await worktreeManager.create('workspace', `agent/${alias}`);
|
||||
const worktreePath = worktree.path;
|
||||
const pathExists = existsSync(worktreePath);
|
||||
|
||||
log.debug({
|
||||
alias,
|
||||
agentWorkdir,
|
||||
worktreePath,
|
||||
pathExists
|
||||
}, 'standalone worktree created');
|
||||
|
||||
if (!pathExists) {
|
||||
log.error({ worktreePath }, 'Standalone worktree path does not exist after creation!');
|
||||
throw new Error(`Standalone worktree creation failed: ${worktreePath}`);
|
||||
}
|
||||
|
||||
return worktree.path;
|
||||
}
|
||||
|
||||
@@ -195,6 +237,32 @@ export class ProcessManager {
|
||||
prompt?: string,
|
||||
onEvent?: (event: StreamEvent) => void,
|
||||
): { pid: number; outputFilePath: string; tailer: FileTailer } {
|
||||
// Pre-spawn validation and logging
|
||||
const cwdExists = existsSync(cwd);
|
||||
const commandWithArgs = [command, ...args].join(' ');
|
||||
|
||||
// Log environment variables that might affect working directory
|
||||
const environmentInfo = {
|
||||
PWD: process.env.PWD,
|
||||
HOME: process.env.HOME,
|
||||
CLAUDE_CONFIG_DIR: env.CLAUDE_CONFIG_DIR,
|
||||
CW_CONFIG_DIR: env.CW_CONFIG_DIR
|
||||
};
|
||||
|
||||
log.info({
|
||||
agentId,
|
||||
cwd,
|
||||
cwdExists,
|
||||
commandWithArgs,
|
||||
providerName,
|
||||
environmentInfo
|
||||
}, 'spawning detached process with workdir validation');
|
||||
|
||||
if (!cwdExists) {
|
||||
log.error({ cwd }, 'CWD does not exist before spawn!');
|
||||
throw new Error(`Agent working directory does not exist: ${cwd}`);
|
||||
}
|
||||
|
||||
const logDir = join(this.workspaceRoot, '.cw', 'agent-logs', agentId);
|
||||
mkdirSync(logDir, { recursive: true });
|
||||
const outputFilePath = join(logDir, 'output.jsonl');
|
||||
@@ -220,7 +288,14 @@ export class ProcessManager {
|
||||
child.unref();
|
||||
|
||||
const pid = child.pid!;
|
||||
log.debug({ agentId, pid, command }, 'spawned detached process');
|
||||
log.info({
|
||||
agentId,
|
||||
pid,
|
||||
command,
|
||||
args: args.join(' '),
|
||||
cwd,
|
||||
spawnSuccess: true
|
||||
}, 'spawned detached process successfully');
|
||||
|
||||
const parser = getStreamParser(providerName);
|
||||
const tailer = new FileTailer({
|
||||
@@ -239,6 +314,33 @@ export class ProcessManager {
|
||||
return { pid, outputFilePath, tailer };
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a file to stabilize (no size changes for 300ms).
|
||||
* More robust than hardcoded delays.
|
||||
*/
|
||||
private async waitForFileCompletion(filePath: string, timeout = 10000): Promise<boolean> {
|
||||
const startTime = Date.now();
|
||||
let lastSize = -1;
|
||||
let stableCount = 0;
|
||||
|
||||
while (Date.now() - startTime < timeout) {
|
||||
try {
|
||||
const stats = await stat(filePath);
|
||||
if (stats.size === lastSize) {
|
||||
stableCount++;
|
||||
if (stableCount >= 3) return true; // Stable for 300ms
|
||||
} else {
|
||||
lastSize = stats.size;
|
||||
stableCount = 0;
|
||||
}
|
||||
} catch {
|
||||
// File doesn't exist yet
|
||||
}
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll for process completion by checking if PID is still alive.
|
||||
* When the process exits, calls onComplete callback.
|
||||
@@ -251,12 +353,25 @@ export class ProcessManager {
|
||||
pid: number,
|
||||
onComplete: () => Promise<void>,
|
||||
getTailer: () => FileTailer | undefined,
|
||||
outputFilePath?: string,
|
||||
): void {
|
||||
const processLog = log.child({ agentId, pid });
|
||||
|
||||
const check = async () => {
|
||||
if (!isPidAlive(pid)) {
|
||||
const tailer = getTailer();
|
||||
if (tailer) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 500));
|
||||
// Wait for output file to stabilize instead of hardcoded delay
|
||||
if (outputFilePath) {
|
||||
processLog.debug('waiting for output file completion');
|
||||
const completed = await this.waitForFileCompletion(outputFilePath);
|
||||
if (!completed) {
|
||||
processLog.warn('output file did not stabilize within timeout');
|
||||
}
|
||||
} else {
|
||||
// Fallback to short delay if no output file path
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
}
|
||||
await tailer.stop();
|
||||
}
|
||||
await onComplete();
|
||||
|
||||
Reference in New Issue
Block a user