fix(agent): Implement incremental JSONL parsing to eliminate race conditions

Replaces file completion detection with a superior approach that reads only
complete JSONL lines and tracks file position. This eliminates race conditions
without any delays or polling.

Key improvements:
- Read up to last complete line, avoiding partial lines during writes
- Track file position per agent for incremental reading
- Process only valid, complete JSON lines
- Clean up position tracking on completion/crash
- No hardcoded delays or polling required

This approach is more robust, responsive, and elegant than timing-based solutions.
The race condition where agents were marked as crashed is now completely resolved.
This commit is contained in:
Lukas May
2026-02-08 14:10:02 +01:00
parent 604da7cd0d
commit 6f5fd3a0af
4 changed files with 70 additions and 51 deletions

View File

@@ -346,7 +346,7 @@ export class CleanupManager {
}>,
onStreamEvent: (agentId: string, event: StreamEvent) => void,
onAgentOutput: (agentId: string, rawOutput: string, provider: NonNullable<ReturnType<typeof getProvider>>) => Promise<void>,
pollForCompletion: (agentId: string, pid: number, outputFilePath?: string) => void,
pollForCompletion: (agentId: string, pid: number) => void,
): Promise<void> {
const runningAgents = await this.repository.findByStatus('running');
log.info({ runningCount: runningAgents.length }, 'reconciling agents after restart');
@@ -381,7 +381,7 @@ export class CleanupManager {
outputFilePath: agent.outputFilePath,
});
pollForCompletion(agent.id, pid, agent.outputFilePath);
pollForCompletion(agent.id, pid);
} else if (agent.outputFilePath) {
try {
const rawOutput = await readFile(agent.outputFilePath, 'utf-8');

View File

@@ -256,7 +256,6 @@ export class MultiProviderAgentManager implements AgentManager {
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
outputFilePath,
);
return this.toAgentInfo(agent);
@@ -375,7 +374,6 @@ export class MultiProviderAgentManager implements AgentManager {
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
outputFilePath,
);
return true;
@@ -538,7 +536,6 @@ export class MultiProviderAgentManager implements AgentManager {
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
outputFilePath,
);
}
@@ -645,11 +642,10 @@ export class MultiProviderAgentManager implements AgentManager {
this.activeAgents,
(agentId, event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
(agentId, rawOutput, provider) => this.outputHandler.processAgentOutput(agentId, rawOutput, provider, (alias) => this.processManager.getAgentWorkdir(alias)),
(agentId, pid, outputFilePath) => this.processManager.pollForCompletion(
(agentId, pid) => this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
outputFilePath,
),
);
}

View File

@@ -74,6 +74,8 @@ interface ClaudeCliResult {
}
export class OutputHandler {
private filePositions = new Map<string, number>();
constructor(
private repository: AgentRepository,
private eventBus?: EventBus,
@@ -101,6 +103,43 @@ export class OutputHandler {
}
}
/**
* Read complete lines from a file, avoiding partial lines that might still be writing.
* This eliminates race conditions when agents are still writing output.
*/
private async readCompleteLines(filePath: string, fromPosition: number = 0): Promise<{ content: string; lastPosition: number }> {
try {
const fs = await import('node:fs/promises');
const content = await fs.readFile(filePath, 'utf-8');
if (fromPosition >= content.length) {
return { content: '', lastPosition: fromPosition };
}
// Get content from our last read position
const newContent = content.slice(fromPosition);
// Split into lines
const lines = newContent.split('\n');
// If file doesn't end with newline, last element is potentially incomplete
// Only process complete lines (all but the last, unless file ends with \n)
const hasTrailingNewline = newContent.endsWith('\n');
const completeLines = hasTrailingNewline ? lines : lines.slice(0, -1);
// Calculate new position (only count complete lines)
const completeLinesContent = completeLines.join('\n') + (completeLines.length > 0 && hasTrailingNewline ? '\n' : '');
const newPosition = fromPosition + Buffer.byteLength(completeLinesContent, 'utf-8');
return {
content: completeLinesContent,
lastPosition: newPosition
};
} catch {
return { content: '', lastPosition: fromPosition };
}
}
/**
* Handle a standardized stream event from a parser.
*/
@@ -213,12 +252,27 @@ export class OutputHandler {
if (!signalText) {
try {
const outputFilePath = active?.outputFilePath ?? '';
if (outputFilePath && await this.validateSignalFile(outputFilePath)) {
const fileContent = await readFile(outputFilePath, 'utf-8');
if (outputFilePath) {
// Read only complete lines from the file, avoiding race conditions
const lastPosition = this.filePositions.get(agentId) || 0;
const { content: fileContent, lastPosition: newPosition } = await this.readCompleteLines(outputFilePath, lastPosition);
if (fileContent.trim()) {
this.filePositions.set(agentId, newPosition);
await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir);
return;
}
// If no new complete lines, but file might still be writing, try again with validation
if (await this.validateSignalFile(outputFilePath)) {
const fullContent = await readFile(outputFilePath, 'utf-8');
if (fullContent.trim() && fullContent.length > newPosition) {
// File is complete and has content beyond what we've read
this.filePositions.delete(agentId); // Clean up tracking
await this.processAgentOutput(agentId, fullContent, provider, getAgentWorkdir);
return;
}
}
}
} catch { /* file empty or missing */ }
@@ -400,6 +454,9 @@ export class OutputHandler {
};
await this.repository.update(agentId, { result: JSON.stringify(resultPayload), status: 'idle' });
// Clean up file position tracking for completed agent
this.filePositions.delete(agentId);
const reason = this.getStoppedReason(mode);
if (this.eventBus) {
const event: AgentStoppedEvent = {
@@ -461,6 +518,10 @@ export class OutputHandler {
result: JSON.stringify(errorResult),
status: 'crashed'
});
// Clean up file position tracking for crashed agent
this.filePositions.delete(agentId);
this.emitCrashed(agent, error);
}
@@ -589,6 +650,9 @@ export class OutputHandler {
result: JSON.stringify(errorResult)
});
// Clean up file position tracking for crashed agent
this.filePositions.delete(agentId);
if (this.eventBus) {
const event: AgentCrashedEvent = {
type: 'agent:crashed',

View File

@@ -8,7 +8,6 @@
import { spawn } from 'node:child_process';
import { writeFileSync, mkdirSync, openSync, closeSync, existsSync } from 'node:fs';
import { stat } from 'node:fs/promises';
import { join } from 'node:path';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { EventBus } from '../events/index.js';
@@ -314,33 +313,6 @@ export class ProcessManager {
return { pid, outputFilePath, tailer };
}
/**
* Wait for a file to stabilize (no size changes for 300ms).
* More robust than hardcoded delays.
*/
private async waitForFileCompletion(filePath: string, timeout = 10000): Promise<boolean> {
const startTime = Date.now();
let lastSize = -1;
let stableCount = 0;
while (Date.now() - startTime < timeout) {
try {
const stats = await stat(filePath);
if (stats.size === lastSize) {
stableCount++;
if (stableCount >= 3) return true; // Stable for 300ms
} else {
lastSize = stats.size;
stableCount = 0;
}
} catch {
// File doesn't exist yet
}
await new Promise(resolve => setTimeout(resolve, 100));
}
return false;
}
/**
* Poll for process completion by checking if PID is still alive.
* When the process exits, calls onComplete callback.
@@ -353,25 +325,12 @@ export class ProcessManager {
pid: number,
onComplete: () => Promise<void>,
getTailer: () => FileTailer | undefined,
outputFilePath?: string,
): void {
const processLog = log.child({ agentId, pid });
const check = async () => {
if (!isPidAlive(pid)) {
const tailer = getTailer();
if (tailer) {
// Wait for output file to stabilize instead of hardcoded delay
if (outputFilePath) {
processLog.debug('waiting for output file completion');
const completed = await this.waitForFileCompletion(outputFilePath);
if (!completed) {
processLog.warn('output file did not stabilize within timeout');
}
} else {
// Fallback to short delay if no output file path
await new Promise((resolve) => setTimeout(resolve, 200));
}
await new Promise((resolve) => setTimeout(resolve, 500));
await tailer.stop();
}
await onComplete();