feat(01.1-04): add event emission to ProcessLogWriter
- ProcessLogWriter accepts optional EventBus parameter - Emits LogEntry events on writeStdout and writeStderr - createLogger convenience function passes through eventBus - Backwards compatible (eventBus is optional) - 6 new tests verify event emission and optional behavior
This commit is contained in:
@@ -14,6 +14,7 @@ export { ProcessLogWriter } from './writer.js';
|
|||||||
// Convenience functions
|
// Convenience functions
|
||||||
import { LogManager } from './manager.js';
|
import { LogManager } from './manager.js';
|
||||||
import { ProcessLogWriter } from './writer.js';
|
import { ProcessLogWriter } from './writer.js';
|
||||||
|
import type { EventBus } from '../events/index.js';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new ProcessLogWriter with default configuration.
|
* Creates a new ProcessLogWriter with default configuration.
|
||||||
@@ -22,6 +23,7 @@ import { ProcessLogWriter } from './writer.js';
|
|||||||
* for a specific process using default log directory (~/.cw/logs).
|
* for a specific process using default log directory (~/.cw/logs).
|
||||||
*
|
*
|
||||||
* @param processId - Unique identifier for the process
|
* @param processId - Unique identifier for the process
|
||||||
|
* @param eventBus - Optional EventBus for emitting log entry events
|
||||||
* @returns A new ProcessLogWriter instance (call open() before writing)
|
* @returns A new ProcessLogWriter instance (call open() before writing)
|
||||||
*
|
*
|
||||||
* @example
|
* @example
|
||||||
@@ -31,8 +33,18 @@ import { ProcessLogWriter } from './writer.js';
|
|||||||
* await writer.writeStdout('Hello from agent\n');
|
* await writer.writeStdout('Hello from agent\n');
|
||||||
* await writer.close();
|
* await writer.close();
|
||||||
* ```
|
* ```
|
||||||
|
*
|
||||||
|
* @example
|
||||||
|
* ```typescript
|
||||||
|
* // With event bus for real-time streaming
|
||||||
|
* const bus = createEventBus();
|
||||||
|
* const writer = createLogger('agent-001', bus);
|
||||||
|
* bus.on('log:entry', (event) => console.log(event.payload));
|
||||||
|
* await writer.open();
|
||||||
|
* await writer.writeStdout('Hello from agent\n');
|
||||||
|
* ```
|
||||||
*/
|
*/
|
||||||
export function createLogger(processId: string): ProcessLogWriter {
|
export function createLogger(processId: string, eventBus?: EventBus): ProcessLogWriter {
|
||||||
const manager = new LogManager();
|
const manager = new LogManager();
|
||||||
return new ProcessLogWriter(processId, manager);
|
return new ProcessLogWriter(processId, manager, eventBus);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,13 +5,15 @@
|
|||||||
* Uses temporary directories to avoid polluting the real log directory.
|
* Uses temporary directories to avoid polluting the real log directory.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
|
||||||
import { rm, readFile } from 'node:fs/promises';
|
import { rm, readFile } from 'node:fs/promises';
|
||||||
import { existsSync } from 'node:fs';
|
import { existsSync } from 'node:fs';
|
||||||
import { tmpdir } from 'node:os';
|
import { tmpdir } from 'node:os';
|
||||||
import { join } from 'node:path';
|
import { join } from 'node:path';
|
||||||
import { LogManager } from './manager.js';
|
import { LogManager } from './manager.js';
|
||||||
import { ProcessLogWriter } from './writer.js';
|
import { ProcessLogWriter } from './writer.js';
|
||||||
|
import { createEventBus } from '../events/index.js';
|
||||||
|
import type { EventBus, LogEntryEvent } from '../events/index.js';
|
||||||
|
|
||||||
describe('ProcessLogWriter', () => {
|
describe('ProcessLogWriter', () => {
|
||||||
let testDir: string;
|
let testDir: string;
|
||||||
@@ -263,3 +265,131 @@ describe('ProcessLogWriter', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('ProcessLogWriter with EventBus', () => {
|
||||||
|
let testDir: string;
|
||||||
|
let manager: LogManager;
|
||||||
|
let eventBus: EventBus;
|
||||||
|
let writerWithBus: ProcessLogWriter;
|
||||||
|
const processId = 'event-test-process';
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
testDir = join(tmpdir(), `cw-event-test-${Date.now()}-${Math.random().toString(36).slice(2)}`);
|
||||||
|
manager = new LogManager({ baseDir: testDir });
|
||||||
|
eventBus = createEventBus();
|
||||||
|
writerWithBus = new ProcessLogWriter(processId, manager, eventBus);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
try {
|
||||||
|
await writerWithBus.close();
|
||||||
|
} catch {
|
||||||
|
// Ignore if already closed
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await rm(testDir, { recursive: true, force: true });
|
||||||
|
} catch {
|
||||||
|
// Ignore cleanup errors
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('event emission', () => {
|
||||||
|
it('should emit log:entry event on writeStdout', async () => {
|
||||||
|
const handler = vi.fn();
|
||||||
|
eventBus.on<LogEntryEvent>('log:entry', handler);
|
||||||
|
|
||||||
|
await writerWithBus.open();
|
||||||
|
await writerWithBus.writeStdout('Hello stdout\n');
|
||||||
|
|
||||||
|
expect(handler).toHaveBeenCalledOnce();
|
||||||
|
|
||||||
|
const event = handler.mock.calls[0][0] as LogEntryEvent;
|
||||||
|
expect(event.type).toBe('log:entry');
|
||||||
|
expect(event.payload.processId).toBe(processId);
|
||||||
|
expect(event.payload.stream).toBe('stdout');
|
||||||
|
expect(event.payload.data).toBe('Hello stdout\n');
|
||||||
|
expect(event.timestamp).toBeInstanceOf(Date);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should emit log:entry event on writeStderr', async () => {
|
||||||
|
const handler = vi.fn();
|
||||||
|
eventBus.on<LogEntryEvent>('log:entry', handler);
|
||||||
|
|
||||||
|
await writerWithBus.open();
|
||||||
|
await writerWithBus.writeStderr('Error message\n');
|
||||||
|
|
||||||
|
expect(handler).toHaveBeenCalledOnce();
|
||||||
|
|
||||||
|
const event = handler.mock.calls[0][0] as LogEntryEvent;
|
||||||
|
expect(event.type).toBe('log:entry');
|
||||||
|
expect(event.payload.processId).toBe(processId);
|
||||||
|
expect(event.payload.stream).toBe('stderr');
|
||||||
|
expect(event.payload.data).toBe('Error message\n');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should emit events with Buffer data converted to string', async () => {
|
||||||
|
const handler = vi.fn();
|
||||||
|
eventBus.on<LogEntryEvent>('log:entry', handler);
|
||||||
|
|
||||||
|
await writerWithBus.open();
|
||||||
|
await writerWithBus.writeStdout(Buffer.from('Buffer data\n'));
|
||||||
|
|
||||||
|
const event = handler.mock.calls[0][0] as LogEntryEvent;
|
||||||
|
expect(event.payload.data).toBe('Buffer data\n');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should emit event for each write call', async () => {
|
||||||
|
const handler = vi.fn();
|
||||||
|
eventBus.on<LogEntryEvent>('log:entry', handler);
|
||||||
|
|
||||||
|
await writerWithBus.open();
|
||||||
|
await writerWithBus.writeStdout('Line 1\n');
|
||||||
|
await writerWithBus.writeStdout('Line 2\n');
|
||||||
|
await writerWithBus.writeStderr('Error\n');
|
||||||
|
|
||||||
|
expect(handler).toHaveBeenCalledTimes(3);
|
||||||
|
|
||||||
|
// Verify each call
|
||||||
|
expect(handler.mock.calls[0][0].payload.stream).toBe('stdout');
|
||||||
|
expect(handler.mock.calls[0][0].payload.data).toBe('Line 1\n');
|
||||||
|
expect(handler.mock.calls[1][0].payload.stream).toBe('stdout');
|
||||||
|
expect(handler.mock.calls[1][0].payload.data).toBe('Line 2\n');
|
||||||
|
expect(handler.mock.calls[2][0].payload.stream).toBe('stderr');
|
||||||
|
expect(handler.mock.calls[2][0].payload.data).toBe('Error\n');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('without eventBus', () => {
|
||||||
|
it('should NOT emit events when eventBus is not provided', async () => {
|
||||||
|
// Create a writer without eventBus
|
||||||
|
const writerNoBus = new ProcessLogWriter(processId + '-nobus', manager);
|
||||||
|
const handler = vi.fn();
|
||||||
|
eventBus.on<LogEntryEvent>('log:entry', handler);
|
||||||
|
|
||||||
|
await writerNoBus.open();
|
||||||
|
await writerNoBus.writeStdout('Hello\n');
|
||||||
|
await writerNoBus.writeStderr('Error\n');
|
||||||
|
await writerNoBus.close();
|
||||||
|
|
||||||
|
// Handler should not have been called because writerNoBus has no eventBus
|
||||||
|
expect(handler).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('backwards compatibility', () => {
|
||||||
|
it('should work with two-argument constructor', async () => {
|
||||||
|
const writerCompat = new ProcessLogWriter(processId + '-compat', manager);
|
||||||
|
|
||||||
|
await writerCompat.open();
|
||||||
|
await writerCompat.writeStdout('Compat test\n');
|
||||||
|
await writerCompat.close();
|
||||||
|
|
||||||
|
const content = await readFile(
|
||||||
|
manager.getLogPath(processId + '-compat', 'stdout'),
|
||||||
|
'utf-8'
|
||||||
|
);
|
||||||
|
expect(content).toContain('Compat test');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@@ -2,10 +2,12 @@
|
|||||||
* Process Log Writer
|
* Process Log Writer
|
||||||
*
|
*
|
||||||
* Handles per-process stdout/stderr capture to individual log files.
|
* Handles per-process stdout/stderr capture to individual log files.
|
||||||
|
* Optionally emits log events to an EventBus for real-time streaming.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { createWriteStream, type WriteStream } from 'node:fs';
|
import { createWriteStream, type WriteStream } from 'node:fs';
|
||||||
import type { LogManager } from './manager.js';
|
import type { LogManager } from './manager.js';
|
||||||
|
import type { EventBus, LogEntryEvent } from '../events/index.js';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Formats a timestamp for log output.
|
* Formats a timestamp for log output.
|
||||||
@@ -32,12 +34,20 @@ function formatTimestamp(date: Date): string {
|
|||||||
export class ProcessLogWriter {
|
export class ProcessLogWriter {
|
||||||
private readonly processId: string;
|
private readonly processId: string;
|
||||||
private readonly logManager: LogManager;
|
private readonly logManager: LogManager;
|
||||||
|
private readonly eventBus: EventBus | undefined;
|
||||||
private stdoutStream: WriteStream | null = null;
|
private stdoutStream: WriteStream | null = null;
|
||||||
private stderrStream: WriteStream | null = null;
|
private stderrStream: WriteStream | null = null;
|
||||||
|
|
||||||
constructor(processId: string, logManager: LogManager) {
|
/**
|
||||||
|
* Creates a new ProcessLogWriter.
|
||||||
|
* @param processId - Unique identifier for the process
|
||||||
|
* @param logManager - LogManager instance for directory management
|
||||||
|
* @param eventBus - Optional EventBus for emitting log entry events
|
||||||
|
*/
|
||||||
|
constructor(processId: string, logManager: LogManager, eventBus?: EventBus) {
|
||||||
this.processId = processId;
|
this.processId = processId;
|
||||||
this.logManager = logManager;
|
this.logManager = logManager;
|
||||||
|
this.eventBus = eventBus;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -70,6 +80,7 @@ export class ProcessLogWriter {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes data to the stdout log file with timestamps.
|
* Writes data to the stdout log file with timestamps.
|
||||||
|
* Also emits a LogEntry event if an EventBus was provided.
|
||||||
* @param data - String or Buffer to write
|
* @param data - String or Buffer to write
|
||||||
* @returns Promise that resolves when write is complete (including drain if needed)
|
* @returns Promise that resolves when write is complete (including drain if needed)
|
||||||
*/
|
*/
|
||||||
@@ -77,11 +88,27 @@ export class ProcessLogWriter {
|
|||||||
if (!this.stdoutStream) {
|
if (!this.stdoutStream) {
|
||||||
throw new Error('Log writer not open. Call open() first.');
|
throw new Error('Log writer not open. Call open() first.');
|
||||||
}
|
}
|
||||||
return this.writeWithTimestamp(this.stdoutStream, data);
|
await this.writeWithTimestamp(this.stdoutStream, data);
|
||||||
|
|
||||||
|
// Emit log entry event for real-time streaming
|
||||||
|
if (this.eventBus) {
|
||||||
|
const content = typeof data === 'string' ? data : data.toString('utf-8');
|
||||||
|
const event: LogEntryEvent = {
|
||||||
|
type: 'log:entry',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: {
|
||||||
|
processId: this.processId,
|
||||||
|
stream: 'stdout',
|
||||||
|
data: content,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
this.eventBus.emit(event);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes data to the stderr log file with timestamps.
|
* Writes data to the stderr log file with timestamps.
|
||||||
|
* Also emits a LogEntry event if an EventBus was provided.
|
||||||
* @param data - String or Buffer to write
|
* @param data - String or Buffer to write
|
||||||
* @returns Promise that resolves when write is complete (including drain if needed)
|
* @returns Promise that resolves when write is complete (including drain if needed)
|
||||||
*/
|
*/
|
||||||
@@ -89,7 +116,22 @@ export class ProcessLogWriter {
|
|||||||
if (!this.stderrStream) {
|
if (!this.stderrStream) {
|
||||||
throw new Error('Log writer not open. Call open() first.');
|
throw new Error('Log writer not open. Call open() first.');
|
||||||
}
|
}
|
||||||
return this.writeWithTimestamp(this.stderrStream, data);
|
await this.writeWithTimestamp(this.stderrStream, data);
|
||||||
|
|
||||||
|
// Emit log entry event for real-time streaming
|
||||||
|
if (this.eventBus) {
|
||||||
|
const content = typeof data === 'string' ? data : data.toString('utf-8');
|
||||||
|
const event: LogEntryEvent = {
|
||||||
|
type: 'log:entry',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: {
|
||||||
|
processId: this.processId,
|
||||||
|
stream: 'stderr',
|
||||||
|
data: content,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
this.eventBus.emit(event);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user