/** * Process Log Writer * * Handles per-process stdout/stderr capture to individual log files. * Optionally emits log events to an EventBus for real-time streaming. */ import { createWriteStream, type WriteStream } from 'node:fs'; import type { LogManager } from './manager.js'; import type { EventBus, LogEntryEvent } from '../events/index.js'; /** * Formats a timestamp for log output. * Format: [YYYY-MM-DD HH:mm:ss.SSS] */ function formatTimestamp(date: Date): string { const pad = (n: number, w = 2) => n.toString().padStart(w, '0'); const year = date.getFullYear(); const month = pad(date.getMonth() + 1); const day = pad(date.getDate()); const hours = pad(date.getHours()); const minutes = pad(date.getMinutes()); const seconds = pad(date.getSeconds()); const ms = pad(date.getMilliseconds(), 3); return `[${year}-${month}-${day} ${hours}:${minutes}:${seconds}.${ms}]`; } /** * Writes stdout/stderr output to per-process log files. * * Each line of output is prefixed with a timestamp. * Handles backpressure by exposing drain events on the underlying streams. */ export class ProcessLogWriter { private readonly processId: string; private readonly logManager: LogManager; private readonly eventBus: EventBus | undefined; private stdoutStream: WriteStream | null = null; private stderrStream: WriteStream | null = null; /** * Creates a new ProcessLogWriter. * @param processId - Unique identifier for the process * @param logManager - LogManager instance for directory management * @param eventBus - Optional EventBus for emitting log entry events */ constructor(processId: string, logManager: LogManager, eventBus?: EventBus) { this.processId = processId; this.logManager = logManager; this.eventBus = eventBus; } /** * Opens file handles for stdout and stderr log files. * Creates the process log directory if it doesn't exist. */ async open(): Promise { // Ensure the process directory exists await this.logManager.ensureProcessDir(this.processId); // Open write streams in append mode const stdoutPath = this.logManager.getLogPath(this.processId, 'stdout'); const stderrPath = this.logManager.getLogPath(this.processId, 'stderr'); this.stdoutStream = createWriteStream(stdoutPath, { flags: 'a' }); this.stderrStream = createWriteStream(stderrPath, { flags: 'a' }); // Wait for both streams to be ready await Promise.all([ new Promise((resolve, reject) => { this.stdoutStream!.once('open', () => resolve()); this.stdoutStream!.once('error', reject); }), new Promise((resolve, reject) => { this.stderrStream!.once('open', () => resolve()); this.stderrStream!.once('error', reject); }), ]); } /** * Writes data to the stdout log file with timestamps. * Also emits a LogEntry event if an EventBus was provided. * @param data - String or Buffer to write * @returns Promise that resolves when write is complete (including drain if needed) */ async writeStdout(data: string | Buffer): Promise { if (!this.stdoutStream) { throw new Error('Log writer not open. Call open() first.'); } await this.writeWithTimestamp(this.stdoutStream, data); // Emit log entry event for real-time streaming if (this.eventBus) { const content = typeof data === 'string' ? data : data.toString('utf-8'); const event: LogEntryEvent = { type: 'log:entry', timestamp: new Date(), payload: { processId: this.processId, stream: 'stdout', data: content, }, }; this.eventBus.emit(event); } } /** * Writes data to the stderr log file with timestamps. * Also emits a LogEntry event if an EventBus was provided. * @param data - String or Buffer to write * @returns Promise that resolves when write is complete (including drain if needed) */ async writeStderr(data: string | Buffer): Promise { if (!this.stderrStream) { throw new Error('Log writer not open. Call open() first.'); } await this.writeWithTimestamp(this.stderrStream, data); // Emit log entry event for real-time streaming if (this.eventBus) { const content = typeof data === 'string' ? data : data.toString('utf-8'); const event: LogEntryEvent = { type: 'log:entry', timestamp: new Date(), payload: { processId: this.processId, stream: 'stderr', data: content, }, }; this.eventBus.emit(event); } } /** * Writes data with timestamp prefix, handling backpressure. */ private async writeWithTimestamp( stream: WriteStream, data: string | Buffer ): Promise { const content = typeof data === 'string' ? data : data.toString('utf-8'); const timestamp = formatTimestamp(new Date()); // Prefix each line with timestamp const lines = content.split('\n'); const timestampedLines = lines .map((line, index) => { // Don't add timestamp to empty trailing line from split if (index === lines.length - 1 && line === '') { return ''; } return `${timestamp} ${line}`; }) .join('\n'); // Write with backpressure handling const canWrite = stream.write(timestampedLines); if (!canWrite) { // Wait for drain event before continuing await new Promise((resolve) => { stream.once('drain', resolve); }); } } /** * Flushes and closes both file handles. */ async close(): Promise { const closePromises: Promise[] = []; if (this.stdoutStream) { closePromises.push( new Promise((resolve, reject) => { this.stdoutStream!.end(() => { this.stdoutStream = null; resolve(); }); this.stdoutStream!.once('error', reject); }) ); } if (this.stderrStream) { closePromises.push( new Promise((resolve, reject) => { this.stderrStream!.end(() => { this.stderrStream = null; resolve(); }); this.stderrStream!.once('error', reject); }) ); } await Promise.all(closePromises); } /** * Gets the stdout write stream for direct piping. * @returns The stdout WriteStream or null if not open */ getStdoutStream(): WriteStream | null { return this.stdoutStream; } /** * Gets the stderr write stream for direct piping. * @returns The stderr WriteStream or null if not open */ getStderrStream(): WriteStream | null { return this.stderrStream; } /** * Gets the process ID for this writer. */ getProcessId(): string { return this.processId; } }