feat(01.1-04): add event emission to ProcessLogWriter

- ProcessLogWriter accepts optional EventBus parameter
- Emits LogEntry events on writeStdout and writeStderr
- createLogger convenience function passes through eventBus
- Backwards compatible (eventBus is optional)
- 6 new tests verify event emission and optional behavior
This commit is contained in:
Lukas May
2026-01-30 14:03:24 +01:00
parent 1f255ee8e3
commit bac133db7a
3 changed files with 190 additions and 6 deletions

View File

@@ -14,6 +14,7 @@ export { ProcessLogWriter } from './writer.js';
// Convenience functions
import { LogManager } from './manager.js';
import { ProcessLogWriter } from './writer.js';
import type { EventBus } from '../events/index.js';
/**
* Creates a new ProcessLogWriter with default configuration.
@@ -22,6 +23,7 @@ import { ProcessLogWriter } from './writer.js';
* for a specific process using default log directory (~/.cw/logs).
*
* @param processId - Unique identifier for the process
* @param eventBus - Optional EventBus for emitting log entry events
* @returns A new ProcessLogWriter instance (call open() before writing)
*
* @example
@@ -31,8 +33,18 @@ import { ProcessLogWriter } from './writer.js';
* await writer.writeStdout('Hello from agent\n');
* await writer.close();
* ```
*
* @example
* ```typescript
* // With event bus for real-time streaming
* const bus = createEventBus();
* const writer = createLogger('agent-001', bus);
* bus.on('log:entry', (event) => console.log(event.payload));
* await writer.open();
* await writer.writeStdout('Hello from agent\n');
* ```
*/
export function createLogger(processId: string): ProcessLogWriter {
export function createLogger(processId: string, eventBus?: EventBus): ProcessLogWriter {
const manager = new LogManager();
return new ProcessLogWriter(processId, manager);
return new ProcessLogWriter(processId, manager, eventBus);
}

View File

@@ -5,13 +5,15 @@
* Uses temporary directories to avoid polluting the real log directory.
*/
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { rm, readFile } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { LogManager } from './manager.js';
import { ProcessLogWriter } from './writer.js';
import { createEventBus } from '../events/index.js';
import type { EventBus, LogEntryEvent } from '../events/index.js';
describe('ProcessLogWriter', () => {
let testDir: string;
@@ -263,3 +265,131 @@ describe('ProcessLogWriter', () => {
});
});
});
describe('ProcessLogWriter with EventBus', () => {
let testDir: string;
let manager: LogManager;
let eventBus: EventBus;
let writerWithBus: ProcessLogWriter;
const processId = 'event-test-process';
beforeEach(async () => {
testDir = join(tmpdir(), `cw-event-test-${Date.now()}-${Math.random().toString(36).slice(2)}`);
manager = new LogManager({ baseDir: testDir });
eventBus = createEventBus();
writerWithBus = new ProcessLogWriter(processId, manager, eventBus);
});
afterEach(async () => {
try {
await writerWithBus.close();
} catch {
// Ignore if already closed
}
try {
await rm(testDir, { recursive: true, force: true });
} catch {
// Ignore cleanup errors
}
});
describe('event emission', () => {
it('should emit log:entry event on writeStdout', async () => {
const handler = vi.fn();
eventBus.on<LogEntryEvent>('log:entry', handler);
await writerWithBus.open();
await writerWithBus.writeStdout('Hello stdout\n');
expect(handler).toHaveBeenCalledOnce();
const event = handler.mock.calls[0][0] as LogEntryEvent;
expect(event.type).toBe('log:entry');
expect(event.payload.processId).toBe(processId);
expect(event.payload.stream).toBe('stdout');
expect(event.payload.data).toBe('Hello stdout\n');
expect(event.timestamp).toBeInstanceOf(Date);
});
it('should emit log:entry event on writeStderr', async () => {
const handler = vi.fn();
eventBus.on<LogEntryEvent>('log:entry', handler);
await writerWithBus.open();
await writerWithBus.writeStderr('Error message\n');
expect(handler).toHaveBeenCalledOnce();
const event = handler.mock.calls[0][0] as LogEntryEvent;
expect(event.type).toBe('log:entry');
expect(event.payload.processId).toBe(processId);
expect(event.payload.stream).toBe('stderr');
expect(event.payload.data).toBe('Error message\n');
});
it('should emit events with Buffer data converted to string', async () => {
const handler = vi.fn();
eventBus.on<LogEntryEvent>('log:entry', handler);
await writerWithBus.open();
await writerWithBus.writeStdout(Buffer.from('Buffer data\n'));
const event = handler.mock.calls[0][0] as LogEntryEvent;
expect(event.payload.data).toBe('Buffer data\n');
});
it('should emit event for each write call', async () => {
const handler = vi.fn();
eventBus.on<LogEntryEvent>('log:entry', handler);
await writerWithBus.open();
await writerWithBus.writeStdout('Line 1\n');
await writerWithBus.writeStdout('Line 2\n');
await writerWithBus.writeStderr('Error\n');
expect(handler).toHaveBeenCalledTimes(3);
// Verify each call
expect(handler.mock.calls[0][0].payload.stream).toBe('stdout');
expect(handler.mock.calls[0][0].payload.data).toBe('Line 1\n');
expect(handler.mock.calls[1][0].payload.stream).toBe('stdout');
expect(handler.mock.calls[1][0].payload.data).toBe('Line 2\n');
expect(handler.mock.calls[2][0].payload.stream).toBe('stderr');
expect(handler.mock.calls[2][0].payload.data).toBe('Error\n');
});
});
describe('without eventBus', () => {
it('should NOT emit events when eventBus is not provided', async () => {
// Create a writer without eventBus
const writerNoBus = new ProcessLogWriter(processId + '-nobus', manager);
const handler = vi.fn();
eventBus.on<LogEntryEvent>('log:entry', handler);
await writerNoBus.open();
await writerNoBus.writeStdout('Hello\n');
await writerNoBus.writeStderr('Error\n');
await writerNoBus.close();
// Handler should not have been called because writerNoBus has no eventBus
expect(handler).not.toHaveBeenCalled();
});
});
describe('backwards compatibility', () => {
it('should work with two-argument constructor', async () => {
const writerCompat = new ProcessLogWriter(processId + '-compat', manager);
await writerCompat.open();
await writerCompat.writeStdout('Compat test\n');
await writerCompat.close();
const content = await readFile(
manager.getLogPath(processId + '-compat', 'stdout'),
'utf-8'
);
expect(content).toContain('Compat test');
});
});
});

View File

@@ -2,10 +2,12 @@
* Process Log Writer
*
* Handles per-process stdout/stderr capture to individual log files.
* Optionally emits log events to an EventBus for real-time streaming.
*/
import { createWriteStream, type WriteStream } from 'node:fs';
import type { LogManager } from './manager.js';
import type { EventBus, LogEntryEvent } from '../events/index.js';
/**
* Formats a timestamp for log output.
@@ -32,12 +34,20 @@ function formatTimestamp(date: Date): string {
export class ProcessLogWriter {
private readonly processId: string;
private readonly logManager: LogManager;
private readonly eventBus: EventBus | undefined;
private stdoutStream: WriteStream | null = null;
private stderrStream: WriteStream | null = null;
constructor(processId: string, logManager: LogManager) {
/**
* Creates a new ProcessLogWriter.
* @param processId - Unique identifier for the process
* @param logManager - LogManager instance for directory management
* @param eventBus - Optional EventBus for emitting log entry events
*/
constructor(processId: string, logManager: LogManager, eventBus?: EventBus) {
this.processId = processId;
this.logManager = logManager;
this.eventBus = eventBus;
}
/**
@@ -70,6 +80,7 @@ export class ProcessLogWriter {
/**
* Writes data to the stdout log file with timestamps.
* Also emits a LogEntry event if an EventBus was provided.
* @param data - String or Buffer to write
* @returns Promise that resolves when write is complete (including drain if needed)
*/
@@ -77,11 +88,27 @@ export class ProcessLogWriter {
if (!this.stdoutStream) {
throw new Error('Log writer not open. Call open() first.');
}
return this.writeWithTimestamp(this.stdoutStream, data);
await this.writeWithTimestamp(this.stdoutStream, data);
// Emit log entry event for real-time streaming
if (this.eventBus) {
const content = typeof data === 'string' ? data : data.toString('utf-8');
const event: LogEntryEvent = {
type: 'log:entry',
timestamp: new Date(),
payload: {
processId: this.processId,
stream: 'stdout',
data: content,
},
};
this.eventBus.emit(event);
}
}
/**
* Writes data to the stderr log file with timestamps.
* Also emits a LogEntry event if an EventBus was provided.
* @param data - String or Buffer to write
* @returns Promise that resolves when write is complete (including drain if needed)
*/
@@ -89,7 +116,22 @@ export class ProcessLogWriter {
if (!this.stderrStream) {
throw new Error('Log writer not open. Call open() first.');
}
return this.writeWithTimestamp(this.stderrStream, data);
await this.writeWithTimestamp(this.stderrStream, data);
// Emit log entry event for real-time streaming
if (this.eventBus) {
const content = typeof data === 'string' ? data : data.toString('utf-8');
const event: LogEntryEvent = {
type: 'log:entry',
timestamp: new Date(),
payload: {
processId: this.processId,
stream: 'stderr',
data: content,
},
};
this.eventBus.emit(event);
}
}
/**