From bac133db7ab9d17afb0ac440239f7f86e5f29fcc Mon Sep 17 00:00:00 2001 From: Lukas May Date: Fri, 30 Jan 2026 14:03:24 +0100 Subject: [PATCH] feat(01.1-04): add event emission to ProcessLogWriter - ProcessLogWriter accepts optional EventBus parameter - Emits LogEntry events on writeStdout and writeStderr - createLogger convenience function passes through eventBus - Backwards compatible (eventBus is optional) - 6 new tests verify event emission and optional behavior --- src/logging/index.ts | 16 ++++- src/logging/writer.test.ts | 132 ++++++++++++++++++++++++++++++++++++- src/logging/writer.ts | 48 +++++++++++++- 3 files changed, 190 insertions(+), 6 deletions(-) diff --git a/src/logging/index.ts b/src/logging/index.ts index 8195298..6413b0d 100644 --- a/src/logging/index.ts +++ b/src/logging/index.ts @@ -14,6 +14,7 @@ export { ProcessLogWriter } from './writer.js'; // Convenience functions import { LogManager } from './manager.js'; import { ProcessLogWriter } from './writer.js'; +import type { EventBus } from '../events/index.js'; /** * Creates a new ProcessLogWriter with default configuration. @@ -22,6 +23,7 @@ import { ProcessLogWriter } from './writer.js'; * for a specific process using default log directory (~/.cw/logs). * * @param processId - Unique identifier for the process + * @param eventBus - Optional EventBus for emitting log entry events * @returns A new ProcessLogWriter instance (call open() before writing) * * @example @@ -31,8 +33,18 @@ import { ProcessLogWriter } from './writer.js'; * await writer.writeStdout('Hello from agent\n'); * await writer.close(); * ``` + * + * @example + * ```typescript + * // With event bus for real-time streaming + * const bus = createEventBus(); + * const writer = createLogger('agent-001', bus); + * bus.on('log:entry', (event) => console.log(event.payload)); + * await writer.open(); + * await writer.writeStdout('Hello from agent\n'); + * ``` */ -export function createLogger(processId: string): ProcessLogWriter { +export function createLogger(processId: string, eventBus?: EventBus): ProcessLogWriter { const manager = new LogManager(); - return new ProcessLogWriter(processId, manager); + return new ProcessLogWriter(processId, manager, eventBus); } diff --git a/src/logging/writer.test.ts b/src/logging/writer.test.ts index f2c203c..d1d4402 100644 --- a/src/logging/writer.test.ts +++ b/src/logging/writer.test.ts @@ -5,13 +5,15 @@ * Uses temporary directories to avoid polluting the real log directory. */ -import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; import { rm, readFile } from 'node:fs/promises'; import { existsSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { LogManager } from './manager.js'; import { ProcessLogWriter } from './writer.js'; +import { createEventBus } from '../events/index.js'; +import type { EventBus, LogEntryEvent } from '../events/index.js'; describe('ProcessLogWriter', () => { let testDir: string; @@ -263,3 +265,131 @@ describe('ProcessLogWriter', () => { }); }); }); + +describe('ProcessLogWriter with EventBus', () => { + let testDir: string; + let manager: LogManager; + let eventBus: EventBus; + let writerWithBus: ProcessLogWriter; + const processId = 'event-test-process'; + + beforeEach(async () => { + testDir = join(tmpdir(), `cw-event-test-${Date.now()}-${Math.random().toString(36).slice(2)}`); + manager = new LogManager({ baseDir: testDir }); + eventBus = createEventBus(); + writerWithBus = new ProcessLogWriter(processId, manager, eventBus); + }); + + afterEach(async () => { + try { + await writerWithBus.close(); + } catch { + // Ignore if already closed + } + + try { + await rm(testDir, { recursive: true, force: true }); + } catch { + // Ignore cleanup errors + } + }); + + describe('event emission', () => { + it('should emit log:entry event on writeStdout', async () => { + const handler = vi.fn(); + eventBus.on('log:entry', handler); + + await writerWithBus.open(); + await writerWithBus.writeStdout('Hello stdout\n'); + + expect(handler).toHaveBeenCalledOnce(); + + const event = handler.mock.calls[0][0] as LogEntryEvent; + expect(event.type).toBe('log:entry'); + expect(event.payload.processId).toBe(processId); + expect(event.payload.stream).toBe('stdout'); + expect(event.payload.data).toBe('Hello stdout\n'); + expect(event.timestamp).toBeInstanceOf(Date); + }); + + it('should emit log:entry event on writeStderr', async () => { + const handler = vi.fn(); + eventBus.on('log:entry', handler); + + await writerWithBus.open(); + await writerWithBus.writeStderr('Error message\n'); + + expect(handler).toHaveBeenCalledOnce(); + + const event = handler.mock.calls[0][0] as LogEntryEvent; + expect(event.type).toBe('log:entry'); + expect(event.payload.processId).toBe(processId); + expect(event.payload.stream).toBe('stderr'); + expect(event.payload.data).toBe('Error message\n'); + }); + + it('should emit events with Buffer data converted to string', async () => { + const handler = vi.fn(); + eventBus.on('log:entry', handler); + + await writerWithBus.open(); + await writerWithBus.writeStdout(Buffer.from('Buffer data\n')); + + const event = handler.mock.calls[0][0] as LogEntryEvent; + expect(event.payload.data).toBe('Buffer data\n'); + }); + + it('should emit event for each write call', async () => { + const handler = vi.fn(); + eventBus.on('log:entry', handler); + + await writerWithBus.open(); + await writerWithBus.writeStdout('Line 1\n'); + await writerWithBus.writeStdout('Line 2\n'); + await writerWithBus.writeStderr('Error\n'); + + expect(handler).toHaveBeenCalledTimes(3); + + // Verify each call + expect(handler.mock.calls[0][0].payload.stream).toBe('stdout'); + expect(handler.mock.calls[0][0].payload.data).toBe('Line 1\n'); + expect(handler.mock.calls[1][0].payload.stream).toBe('stdout'); + expect(handler.mock.calls[1][0].payload.data).toBe('Line 2\n'); + expect(handler.mock.calls[2][0].payload.stream).toBe('stderr'); + expect(handler.mock.calls[2][0].payload.data).toBe('Error\n'); + }); + }); + + describe('without eventBus', () => { + it('should NOT emit events when eventBus is not provided', async () => { + // Create a writer without eventBus + const writerNoBus = new ProcessLogWriter(processId + '-nobus', manager); + const handler = vi.fn(); + eventBus.on('log:entry', handler); + + await writerNoBus.open(); + await writerNoBus.writeStdout('Hello\n'); + await writerNoBus.writeStderr('Error\n'); + await writerNoBus.close(); + + // Handler should not have been called because writerNoBus has no eventBus + expect(handler).not.toHaveBeenCalled(); + }); + }); + + describe('backwards compatibility', () => { + it('should work with two-argument constructor', async () => { + const writerCompat = new ProcessLogWriter(processId + '-compat', manager); + + await writerCompat.open(); + await writerCompat.writeStdout('Compat test\n'); + await writerCompat.close(); + + const content = await readFile( + manager.getLogPath(processId + '-compat', 'stdout'), + 'utf-8' + ); + expect(content).toContain('Compat test'); + }); + }); +}); diff --git a/src/logging/writer.ts b/src/logging/writer.ts index 5c76e44..cc83b94 100644 --- a/src/logging/writer.ts +++ b/src/logging/writer.ts @@ -2,10 +2,12 @@ * Process Log Writer * * Handles per-process stdout/stderr capture to individual log files. + * Optionally emits log events to an EventBus for real-time streaming. */ import { createWriteStream, type WriteStream } from 'node:fs'; import type { LogManager } from './manager.js'; +import type { EventBus, LogEntryEvent } from '../events/index.js'; /** * Formats a timestamp for log output. @@ -32,12 +34,20 @@ function formatTimestamp(date: Date): string { export class ProcessLogWriter { private readonly processId: string; private readonly logManager: LogManager; + private readonly eventBus: EventBus | undefined; private stdoutStream: WriteStream | null = null; private stderrStream: WriteStream | null = null; - constructor(processId: string, logManager: LogManager) { + /** + * Creates a new ProcessLogWriter. + * @param processId - Unique identifier for the process + * @param logManager - LogManager instance for directory management + * @param eventBus - Optional EventBus for emitting log entry events + */ + constructor(processId: string, logManager: LogManager, eventBus?: EventBus) { this.processId = processId; this.logManager = logManager; + this.eventBus = eventBus; } /** @@ -70,6 +80,7 @@ export class ProcessLogWriter { /** * Writes data to the stdout log file with timestamps. + * Also emits a LogEntry event if an EventBus was provided. * @param data - String or Buffer to write * @returns Promise that resolves when write is complete (including drain if needed) */ @@ -77,11 +88,27 @@ export class ProcessLogWriter { if (!this.stdoutStream) { throw new Error('Log writer not open. Call open() first.'); } - return this.writeWithTimestamp(this.stdoutStream, data); + await this.writeWithTimestamp(this.stdoutStream, data); + + // Emit log entry event for real-time streaming + if (this.eventBus) { + const content = typeof data === 'string' ? data : data.toString('utf-8'); + const event: LogEntryEvent = { + type: 'log:entry', + timestamp: new Date(), + payload: { + processId: this.processId, + stream: 'stdout', + data: content, + }, + }; + this.eventBus.emit(event); + } } /** * Writes data to the stderr log file with timestamps. + * Also emits a LogEntry event if an EventBus was provided. * @param data - String or Buffer to write * @returns Promise that resolves when write is complete (including drain if needed) */ @@ -89,7 +116,22 @@ export class ProcessLogWriter { if (!this.stderrStream) { throw new Error('Log writer not open. Call open() first.'); } - return this.writeWithTimestamp(this.stderrStream, data); + await this.writeWithTimestamp(this.stderrStream, data); + + // Emit log entry event for real-time streaming + if (this.eventBus) { + const content = typeof data === 'string' ? data : data.toString('utf-8'); + const event: LogEntryEvent = { + type: 'log:entry', + timestamp: new Date(), + payload: { + processId: this.processId, + stream: 'stderr', + data: content, + }, + }; + this.eventBus.emit(event); + } } /**