/** * File Tailer * * Watches an output file and emits parsed events in real-time. * Used for crash-resilient agent spawning where subprocesses write * directly to files instead of using pipes. * * Uses fs.watch() for efficient change detection with a poll fallback * since fs.watch isn't 100% reliable on all platforms. */ import { watch, type FSWatcher } from 'node:fs'; import { open, stat } from 'node:fs/promises'; import type { FileHandle } from 'node:fs/promises'; import type { StreamParser, StreamEvent } from './providers/stream-types.js'; import { createModuleLogger } from '../logger/index.js'; const log = createModuleLogger('file-tailer'); /** Poll interval for fallback polling (ms) */ const POLL_INTERVAL_MS = 500; /** Read buffer size (bytes) */ const READ_BUFFER_SIZE = 64 * 1024; export interface FileTailerOptions { /** Path to the output file to watch */ filePath: string; /** Agent ID for logging */ agentId: string; /** Parser to convert lines to stream events */ parser: StreamParser; /** Optional callback for each stream event */ onEvent?: (event: StreamEvent) => void; /** If true, read from beginning of file; otherwise tail only new content (default: false) */ startFromBeginning?: boolean; /** Callback for raw file content chunks (for DB persistence + event emission) */ onRawContent?: (content: string) => void; } /** * FileTailer watches a file for changes and emits parsed stream events. * * Behavior: * - Uses fs.watch() for efficient change detection * - Falls back to polling every 500ms (fs.watch misses events sometimes) * - Reads new content incrementally, splits into lines * - Feeds each line to the parser, emits resulting events * - Handles partial lines at buffer boundaries */ export class FileTailer { private position = 0; private watcher: FSWatcher | null = null; private pollInterval: NodeJS.Timeout | null = null; private fileHandle: FileHandle | null = null; private stopped = false; private partialLine = ''; private reading = false; private readonly filePath: string; private readonly agentId: string; private readonly parser: StreamParser; private readonly onEvent?: (event: StreamEvent) => void; private readonly startFromBeginning: boolean; private readonly onRawContent?: (content: string) => void; constructor(options: FileTailerOptions) { this.filePath = options.filePath; this.agentId = options.agentId; this.parser = options.parser; this.onEvent = options.onEvent; this.startFromBeginning = options.startFromBeginning ?? false; this.onRawContent = options.onRawContent; } /** * Start watching the file for changes. * Initializes position, starts fs.watch, and begins poll fallback. */ async start(): Promise { if (this.stopped) return; log.debug({ filePath: this.filePath, agentId: this.agentId }, 'starting file tailer'); try { // Open file for reading this.fileHandle = await open(this.filePath, 'r'); // Set initial position if (this.startFromBeginning) { this.position = 0; } else { // Seek to end const stats = await stat(this.filePath); this.position = stats.size; } // Start fs.watch for efficient change detection this.watcher = watch(this.filePath, (eventType) => { if (eventType === 'change' && !this.stopped) { this.readNewContent().catch((err) => { log.warn({ err: err instanceof Error ? err.message : String(err), agentId: this.agentId }, 'error reading new content'); }); } }); this.watcher.on('error', (err) => { log.warn({ err: err instanceof Error ? err.message : String(err), agentId: this.agentId }, 'watcher error'); }); // Start poll fallback (fs.watch misses events sometimes) this.pollInterval = setInterval(() => { if (!this.stopped) { this.readNewContent().catch((err) => { log.warn({ err: err instanceof Error ? err.message : String(err), agentId: this.agentId }, 'poll read error'); }); } }, POLL_INTERVAL_MS); // If starting from beginning, do initial read if (this.startFromBeginning) { await this.readNewContent(); } } catch (err) { log.error({ err: err instanceof Error ? err.message : String(err), filePath: this.filePath }, 'failed to start file tailer'); } } /** * Read new content from the file since last position. * Splits into lines, feeds to parser, emits events. */ private async readNewContent(): Promise { if (this.stopped || !this.fileHandle || this.reading) return; this.reading = true; try { // Check current file size const stats = await stat(this.filePath); if (stats.size <= this.position) { return; // No new content } // Read new bytes const bytesToRead = stats.size - this.position; const buffer = Buffer.alloc(Math.min(bytesToRead, READ_BUFFER_SIZE)); const { bytesRead } = await this.fileHandle.read(buffer, 0, buffer.length, this.position); if (bytesRead === 0) return; this.position += bytesRead; // Fire raw content callback for DB persistence (before line splitting) const rawChunk = buffer.toString('utf-8', 0, bytesRead); if (this.onRawContent) { this.onRawContent(rawChunk); } // Convert to string and process lines const content = this.partialLine + rawChunk; const lines = content.split('\n'); // Last element is either empty (if content ended with \n) or a partial line this.partialLine = lines.pop() ?? ''; // Process complete lines for (const line of lines) { if (line.trim()) { this.processLine(line); } } // If there's more content to read, schedule another read if (stats.size > this.position) { setImmediate(() => { this.readNewContent().catch(() => {}); }); } } finally { this.reading = false; } } /** * Process a single line through the parser and emit events. */ private processLine(line: string): void { const events = this.parser.parseLine(line); for (const event of events) { if (this.onEvent) { this.onEvent(event); } } } /** * Stop watching the file. * Cleans up watcher, poll timer, and file handle. */ async stop(): Promise { if (this.stopped) return; this.stopped = true; log.debug({ filePath: this.filePath, agentId: this.agentId }, 'stopping file tailer'); // Close watcher if (this.watcher) { this.watcher.close(); this.watcher = null; } // Clear poll timer if (this.pollInterval) { clearInterval(this.pollInterval); this.pollInterval = null; } // Do one final read to catch any remaining content try { await this.readNewContent(); // Process any remaining partial line if (this.partialLine.trim()) { this.processLine(this.partialLine); this.partialLine = ''; } // Signal end of stream to parser const endEvents = this.parser.end(); for (const event of endEvents) { if (this.onEvent) { this.onEvent(event); } } } catch { // Ignore errors during cleanup } // Close file handle if (this.fileHandle) { try { await this.fileHandle.close(); } catch { // Ignore close errors } this.fileHandle = null; } } /** * Check if the tailer has been stopped. */ get isStopped(): boolean { return this.stopped; } }