diff --git a/src/agent/cleanup-manager.ts b/src/agent/cleanup-manager.ts index 86d5ce2..daed4f3 100644 --- a/src/agent/cleanup-manager.ts +++ b/src/agent/cleanup-manager.ts @@ -346,7 +346,7 @@ export class CleanupManager { }>, onStreamEvent: (agentId: string, event: StreamEvent) => void, onAgentOutput: (agentId: string, rawOutput: string, provider: NonNullable>) => Promise, - pollForCompletion: (agentId: string, pid: number, outputFilePath?: string) => void, + pollForCompletion: (agentId: string, pid: number) => void, ): Promise { const runningAgents = await this.repository.findByStatus('running'); log.info({ runningCount: runningAgents.length }, 'reconciling agents after restart'); @@ -381,7 +381,7 @@ export class CleanupManager { outputFilePath: agent.outputFilePath, }); - pollForCompletion(agent.id, pid, agent.outputFilePath); + pollForCompletion(agent.id, pid); } else if (agent.outputFilePath) { try { const rawOutput = await readFile(agent.outputFilePath, 'utf-8'); diff --git a/src/agent/manager.ts b/src/agent/manager.ts index e9f257c..d976413 100644 --- a/src/agent/manager.ts +++ b/src/agent/manager.ts @@ -256,7 +256,6 @@ export class MultiProviderAgentManager implements AgentManager { agentId, pid, () => this.handleDetachedAgentCompletion(agentId), () => this.activeAgents.get(agentId)?.tailer, - outputFilePath, ); return this.toAgentInfo(agent); @@ -375,7 +374,6 @@ export class MultiProviderAgentManager implements AgentManager { agentId, pid, () => this.handleDetachedAgentCompletion(agentId), () => this.activeAgents.get(agentId)?.tailer, - outputFilePath, ); return true; @@ -538,7 +536,6 @@ export class MultiProviderAgentManager implements AgentManager { agentId, pid, () => this.handleDetachedAgentCompletion(agentId), () => this.activeAgents.get(agentId)?.tailer, - outputFilePath, ); } @@ -645,11 +642,10 @@ export class MultiProviderAgentManager implements AgentManager { this.activeAgents, (agentId, event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers), (agentId, rawOutput, provider) => this.outputHandler.processAgentOutput(agentId, rawOutput, provider, (alias) => this.processManager.getAgentWorkdir(alias)), - (agentId, pid, outputFilePath) => this.processManager.pollForCompletion( + (agentId, pid) => this.processManager.pollForCompletion( agentId, pid, () => this.handleDetachedAgentCompletion(agentId), () => this.activeAgents.get(agentId)?.tailer, - outputFilePath, ), ); } diff --git a/src/agent/output-handler.ts b/src/agent/output-handler.ts index 7aeabe5..7ca616a 100644 --- a/src/agent/output-handler.ts +++ b/src/agent/output-handler.ts @@ -74,6 +74,8 @@ interface ClaudeCliResult { } export class OutputHandler { + private filePositions = new Map(); + constructor( private repository: AgentRepository, private eventBus?: EventBus, @@ -101,6 +103,43 @@ export class OutputHandler { } } + /** + * Read complete lines from a file, avoiding partial lines that might still be writing. + * This eliminates race conditions when agents are still writing output. + */ + private async readCompleteLines(filePath: string, fromPosition: number = 0): Promise<{ content: string; lastPosition: number }> { + try { + const fs = await import('node:fs/promises'); + const content = await fs.readFile(filePath, 'utf-8'); + + if (fromPosition >= content.length) { + return { content: '', lastPosition: fromPosition }; + } + + // Get content from our last read position + const newContent = content.slice(fromPosition); + + // Split into lines + const lines = newContent.split('\n'); + + // If file doesn't end with newline, last element is potentially incomplete + // Only process complete lines (all but the last, unless file ends with \n) + const hasTrailingNewline = newContent.endsWith('\n'); + const completeLines = hasTrailingNewline ? lines : lines.slice(0, -1); + + // Calculate new position (only count complete lines) + const completeLinesContent = completeLines.join('\n') + (completeLines.length > 0 && hasTrailingNewline ? '\n' : ''); + const newPosition = fromPosition + Buffer.byteLength(completeLinesContent, 'utf-8'); + + return { + content: completeLinesContent, + lastPosition: newPosition + }; + } catch { + return { content: '', lastPosition: fromPosition }; + } + } + /** * Handle a standardized stream event from a parser. */ @@ -213,12 +252,27 @@ export class OutputHandler { if (!signalText) { try { const outputFilePath = active?.outputFilePath ?? ''; - if (outputFilePath && await this.validateSignalFile(outputFilePath)) { - const fileContent = await readFile(outputFilePath, 'utf-8'); + if (outputFilePath) { + // Read only complete lines from the file, avoiding race conditions + const lastPosition = this.filePositions.get(agentId) || 0; + const { content: fileContent, lastPosition: newPosition } = await this.readCompleteLines(outputFilePath, lastPosition); + if (fileContent.trim()) { + this.filePositions.set(agentId, newPosition); await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir); return; } + + // If no new complete lines, but file might still be writing, try again with validation + if (await this.validateSignalFile(outputFilePath)) { + const fullContent = await readFile(outputFilePath, 'utf-8'); + if (fullContent.trim() && fullContent.length > newPosition) { + // File is complete and has content beyond what we've read + this.filePositions.delete(agentId); // Clean up tracking + await this.processAgentOutput(agentId, fullContent, provider, getAgentWorkdir); + return; + } + } } } catch { /* file empty or missing */ } @@ -400,6 +454,9 @@ export class OutputHandler { }; await this.repository.update(agentId, { result: JSON.stringify(resultPayload), status: 'idle' }); + // Clean up file position tracking for completed agent + this.filePositions.delete(agentId); + const reason = this.getStoppedReason(mode); if (this.eventBus) { const event: AgentStoppedEvent = { @@ -461,6 +518,10 @@ export class OutputHandler { result: JSON.stringify(errorResult), status: 'crashed' }); + + // Clean up file position tracking for crashed agent + this.filePositions.delete(agentId); + this.emitCrashed(agent, error); } @@ -589,6 +650,9 @@ export class OutputHandler { result: JSON.stringify(errorResult) }); + // Clean up file position tracking for crashed agent + this.filePositions.delete(agentId); + if (this.eventBus) { const event: AgentCrashedEvent = { type: 'agent:crashed', diff --git a/src/agent/process-manager.ts b/src/agent/process-manager.ts index 2ed1555..2819f0a 100644 --- a/src/agent/process-manager.ts +++ b/src/agent/process-manager.ts @@ -8,7 +8,6 @@ import { spawn } from 'node:child_process'; import { writeFileSync, mkdirSync, openSync, closeSync, existsSync } from 'node:fs'; -import { stat } from 'node:fs/promises'; import { join } from 'node:path'; import type { ProjectRepository } from '../db/repositories/project-repository.js'; import type { EventBus } from '../events/index.js'; @@ -314,33 +313,6 @@ export class ProcessManager { return { pid, outputFilePath, tailer }; } - /** - * Wait for a file to stabilize (no size changes for 300ms). - * More robust than hardcoded delays. - */ - private async waitForFileCompletion(filePath: string, timeout = 10000): Promise { - const startTime = Date.now(); - let lastSize = -1; - let stableCount = 0; - - while (Date.now() - startTime < timeout) { - try { - const stats = await stat(filePath); - if (stats.size === lastSize) { - stableCount++; - if (stableCount >= 3) return true; // Stable for 300ms - } else { - lastSize = stats.size; - stableCount = 0; - } - } catch { - // File doesn't exist yet - } - await new Promise(resolve => setTimeout(resolve, 100)); - } - return false; - } - /** * Poll for process completion by checking if PID is still alive. * When the process exits, calls onComplete callback. @@ -353,25 +325,12 @@ export class ProcessManager { pid: number, onComplete: () => Promise, getTailer: () => FileTailer | undefined, - outputFilePath?: string, ): void { - const processLog = log.child({ agentId, pid }); - const check = async () => { if (!isPidAlive(pid)) { const tailer = getTailer(); if (tailer) { - // Wait for output file to stabilize instead of hardcoded delay - if (outputFilePath) { - processLog.debug('waiting for output file completion'); - const completed = await this.waitForFileCompletion(outputFilePath); - if (!completed) { - processLog.warn('output file did not stabilize within timeout'); - } - } else { - // Fallback to short delay if no output file path - await new Promise((resolve) => setTimeout(resolve, 200)); - } + await new Promise((resolve) => setTimeout(resolve, 500)); await tailer.stop(); } await onComplete();