refactor: Restructure monorepo to apps/server/ and apps/web/ layout

Move src/ → apps/server/ and packages/web/ → apps/web/ to adopt
standard monorepo conventions (apps/ for runnable apps, packages/
for reusable libraries). Update all config files, shared package
imports, test fixtures, and documentation to reflect new paths.

Key fixes:
- Update workspace config to ["apps/*", "packages/*"]
- Update tsconfig.json rootDir/include for apps/server/
- Add apps/web/** to vitest exclude list
- Update drizzle.config.ts schema path
- Fix ensure-schema.ts migration path detection (3 levels up in dev,
  2 levels up in dist)
- Fix tests/integration/cli-server.test.ts import paths
- Update packages/shared imports to apps/server/ paths
- Update all docs/ files with new paths
This commit is contained in:
Lukas May
2026-03-03 11:22:53 +01:00
parent 8c38d958ce
commit 34578d39c6
535 changed files with 75452 additions and 687 deletions

View File

@@ -0,0 +1,50 @@
/**
* Logging Module
*
* File-based logging infrastructure for per-process stdout/stderr capture.
*/
// Types
export type { LogLevel, LogEntry, LogConfig, LogStream } from './types.js';
// Classes
export { LogManager } from './manager.js';
export { ProcessLogWriter } from './writer.js';
// Convenience functions
import { LogManager } from './manager.js';
import { ProcessLogWriter } from './writer.js';
import type { EventBus } from '../events/index.js';
/**
* Creates a new ProcessLogWriter with default configuration.
*
* Convenience function for common use case of creating a log writer
* for a specific process using default log directory (~/.cw/logs).
*
* @param processId - Unique identifier for the process
* @param eventBus - Optional EventBus for emitting log entry events
* @returns A new ProcessLogWriter instance (call open() before writing)
*
* @example
* ```typescript
* const writer = createLogger('agent-001');
* await writer.open();
* await writer.writeStdout('Hello from agent\n');
* await writer.close();
* ```
*
* @example
* ```typescript
* // With event bus for real-time streaming
* const bus = createEventBus();
* const writer = createLogger('agent-001', bus);
* bus.on('log:entry', (event) => console.log(event.payload));
* await writer.open();
* await writer.writeStdout('Hello from agent\n');
* ```
*/
export function createLogger(processId: string, eventBus?: EventBus): ProcessLogWriter {
const manager = new LogManager();
return new ProcessLogWriter(processId, manager, eventBus);
}

View File

@@ -0,0 +1,215 @@
/**
* LogManager Tests
*
* Tests for the log directory and file path management.
* Uses temporary directories to avoid polluting the real log directory.
*/
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { mkdir, rm, writeFile, utimes } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { LogManager } from './manager.js';
describe('LogManager', () => {
let testDir: string;
let manager: LogManager;
beforeEach(async () => {
// Create a unique temp directory for each test
testDir = join(tmpdir(), `cw-test-${Date.now()}-${Math.random().toString(36).slice(2)}`);
manager = new LogManager({ baseDir: testDir });
});
afterEach(async () => {
// Clean up temp directory after each test
try {
await rm(testDir, { recursive: true, force: true });
} catch {
// Ignore cleanup errors
}
});
describe('getBaseDir', () => {
it('should return the configured base directory', () => {
expect(manager.getBaseDir()).toBe(testDir);
});
it('should use default directory when not configured', () => {
const defaultManager = new LogManager();
expect(defaultManager.getBaseDir()).toContain('.cw');
expect(defaultManager.getBaseDir()).toContain('logs');
});
});
describe('ensureLogDir', () => {
it('should create the base log directory', async () => {
expect(existsSync(testDir)).toBe(false);
await manager.ensureLogDir();
expect(existsSync(testDir)).toBe(true);
});
it('should not error if directory already exists', async () => {
await mkdir(testDir, { recursive: true });
expect(existsSync(testDir)).toBe(true);
// Should not throw
await manager.ensureLogDir();
expect(existsSync(testDir)).toBe(true);
});
});
describe('ensureProcessDir', () => {
it('should create the process-specific log directory', async () => {
const processId = 'test-process-123';
const expectedDir = join(testDir, processId);
expect(existsSync(expectedDir)).toBe(false);
await manager.ensureProcessDir(processId);
expect(existsSync(expectedDir)).toBe(true);
});
it('should create nested directories if base does not exist', async () => {
const processId = 'nested-process';
const expectedDir = join(testDir, processId);
expect(existsSync(testDir)).toBe(false);
await manager.ensureProcessDir(processId);
expect(existsSync(testDir)).toBe(true);
expect(existsSync(expectedDir)).toBe(true);
});
});
describe('getProcessDir', () => {
it('should return the correct path for a process', () => {
const processId = 'my-process';
const expected = join(testDir, processId);
expect(manager.getProcessDir(processId)).toBe(expected);
});
});
describe('getLogPath', () => {
it('should return correct path for stdout log', () => {
const processId = 'proc-1';
const expected = join(testDir, processId, 'stdout.log');
expect(manager.getLogPath(processId, 'stdout')).toBe(expected);
});
it('should return correct path for stderr log', () => {
const processId = 'proc-2';
const expected = join(testDir, processId, 'stderr.log');
expect(manager.getLogPath(processId, 'stderr')).toBe(expected);
});
});
describe('listLogs', () => {
it('should return empty array if base directory does not exist', async () => {
expect(existsSync(testDir)).toBe(false);
const logs = await manager.listLogs();
expect(logs).toEqual([]);
});
it('should return empty array if no log directories exist', async () => {
await mkdir(testDir, { recursive: true });
const logs = await manager.listLogs();
expect(logs).toEqual([]);
});
it('should return process IDs for existing log directories', async () => {
// Create some process directories
await mkdir(join(testDir, 'process-a'), { recursive: true });
await mkdir(join(testDir, 'process-b'), { recursive: true });
await mkdir(join(testDir, 'process-c'), { recursive: true });
const logs = await manager.listLogs();
expect(logs).toHaveLength(3);
expect(logs).toContain('process-a');
expect(logs).toContain('process-b');
expect(logs).toContain('process-c');
});
it('should only return directories, not files', async () => {
await mkdir(testDir, { recursive: true });
await mkdir(join(testDir, 'valid-process'), { recursive: true });
await writeFile(join(testDir, 'some-file.txt'), 'not a directory');
const logs = await manager.listLogs();
expect(logs).toEqual(['valid-process']);
});
});
describe('cleanOldLogs', () => {
it('should return 0 when retainDays is not configured', async () => {
const managerNoRetain = new LogManager({ baseDir: testDir });
const removed = await managerNoRetain.cleanOldLogs();
expect(removed).toBe(0);
});
it('should return 0 when no directories exist', async () => {
const managerWithRetain = new LogManager({ baseDir: testDir, retainDays: 7 });
const removed = await managerWithRetain.cleanOldLogs();
expect(removed).toBe(0);
});
it('should remove directories older than retainDays', async () => {
const managerWithRetain = new LogManager({ baseDir: testDir, retainDays: 7 });
// Create an "old" directory
const oldDir = join(testDir, 'old-process');
await mkdir(oldDir, { recursive: true });
// Set mtime to 10 days ago
const tenDaysAgo = new Date(Date.now() - 10 * 24 * 60 * 60 * 1000);
await utimes(oldDir, tenDaysAgo, tenDaysAgo);
// Create a "new" directory
const newDir = join(testDir, 'new-process');
await mkdir(newDir, { recursive: true });
const removed = await managerWithRetain.cleanOldLogs();
expect(removed).toBe(1);
expect(existsSync(oldDir)).toBe(false);
expect(existsSync(newDir)).toBe(true);
});
it('should use provided retainDays over config value', async () => {
const managerWithRetain = new LogManager({ baseDir: testDir, retainDays: 30 });
// Create directory that is 10 days old
const oldDir = join(testDir, 'process');
await mkdir(oldDir, { recursive: true });
const tenDaysAgo = new Date(Date.now() - 10 * 24 * 60 * 60 * 1000);
await utimes(oldDir, tenDaysAgo, tenDaysAgo);
// With config (30 days), should NOT remove
expect(await managerWithRetain.cleanOldLogs()).toBe(0);
expect(existsSync(oldDir)).toBe(true);
// With explicit 5 days, SHOULD remove
expect(await managerWithRetain.cleanOldLogs(5)).toBe(1);
expect(existsSync(oldDir)).toBe(false);
});
});
});

View File

@@ -0,0 +1,123 @@
/**
* Log Manager
*
* Manages log directories and file paths for per-process logging.
*/
import { mkdir, readdir, rm, stat } from 'node:fs/promises';
import { homedir } from 'node:os';
import { join } from 'node:path';
import type { LogConfig, LogStream } from './types.js';
/**
* Default base directory for logs: ~/.cw/logs
*/
const DEFAULT_LOG_DIR = join(homedir(), '.cw', 'logs');
/**
* Manages log directory structure and file paths.
*
* Log directory structure:
* ~/.cw/logs/{processId}/stdout.log
* ~/.cw/logs/{processId}/stderr.log
*/
export class LogManager {
private readonly baseDir: string;
private readonly retainDays?: number;
constructor(config?: Partial<LogConfig>) {
this.baseDir = config?.baseDir ?? DEFAULT_LOG_DIR;
this.retainDays = config?.retainDays;
}
/**
* Ensures the base log directory exists.
* Creates it recursively if it doesn't exist.
*/
async ensureLogDir(): Promise<void> {
await mkdir(this.baseDir, { recursive: true });
}
/**
* Ensures the process-specific log directory exists.
* @param processId - The process identifier
*/
async ensureProcessDir(processId: string): Promise<void> {
const processDir = this.getProcessDir(processId);
await mkdir(processDir, { recursive: true });
}
/**
* Gets the directory path for a specific process's logs.
* @param processId - The process identifier
*/
getProcessDir(processId: string): string {
return join(this.baseDir, processId);
}
/**
* Gets the full path to a log file for a process and stream.
* @param processId - The process identifier
* @param stream - Either 'stdout' or 'stderr'
*/
getLogPath(processId: string, stream: LogStream): string {
return join(this.baseDir, processId, `${stream}.log`);
}
/**
* Lists all log directories (one per process).
* @returns Array of process IDs that have log directories
*/
async listLogs(): Promise<string[]> {
try {
const entries = await readdir(this.baseDir, { withFileTypes: true });
return entries
.filter((entry) => entry.isDirectory())
.map((entry) => entry.name);
} catch (error) {
// If directory doesn't exist, return empty list
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
return [];
}
throw error;
}
}
/**
* Removes log directories older than the specified number of days.
* @param retainDays - Number of days to retain logs (uses config value if not provided)
* @returns Number of directories removed
*/
async cleanOldLogs(retainDays?: number): Promise<number> {
const days = retainDays ?? this.retainDays;
if (days === undefined) {
return 0;
}
const cutoffTime = Date.now() - days * 24 * 60 * 60 * 1000;
const processIds = await this.listLogs();
let removedCount = 0;
for (const processId of processIds) {
const processDir = this.getProcessDir(processId);
try {
const stats = await stat(processDir);
if (stats.mtime.getTime() < cutoffTime) {
await rm(processDir, { recursive: true, force: true });
removedCount++;
}
} catch {
// Skip if we can't stat or remove the directory
}
}
return removedCount;
}
/**
* Gets the base directory path.
*/
getBaseDir(): string {
return this.baseDir;
}
}

View File

@@ -0,0 +1,41 @@
/**
* Logging Types
*
* Type definitions for the file-based logging infrastructure.
*/
/**
* Log severity levels.
*/
export type LogLevel = 'debug' | 'info' | 'warn' | 'error';
/**
* A single log entry with metadata.
*/
export interface LogEntry {
/** When this entry was logged */
timestamp: Date;
/** Severity level */
level: LogLevel;
/** ID of the process that produced this log */
processId: string;
/** The log message content */
message: string;
}
/**
* Configuration for the logging system.
*/
export interface LogConfig {
/** Base directory for log files. Defaults to ~/.cw/logs */
baseDir: string;
/** Maximum size per log file in bytes before rotation. Optional. */
maxFileSize?: number;
/** Number of days to retain logs. Optional. */
retainDays?: number;
}
/**
* Stream type for log output.
*/
export type LogStream = 'stdout' | 'stderr';

View File

@@ -0,0 +1,395 @@
/**
* ProcessLogWriter Tests
*
* Tests for the per-process log writing functionality.
* Uses temporary directories to avoid polluting the real log directory.
*/
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { rm, readFile } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { LogManager } from './manager.js';
import { ProcessLogWriter } from './writer.js';
import { createEventBus } from '../events/index.js';
import type { EventBus, LogEntryEvent } from '../events/index.js';
describe('ProcessLogWriter', () => {
let testDir: string;
let manager: LogManager;
let writer: ProcessLogWriter;
const processId = 'test-process';
beforeEach(async () => {
// Create a unique temp directory for each test
testDir = join(tmpdir(), `cw-writer-test-${Date.now()}-${Math.random().toString(36).slice(2)}`);
manager = new LogManager({ baseDir: testDir });
writer = new ProcessLogWriter(processId, manager);
});
afterEach(async () => {
// Ensure writer is closed before cleanup
try {
await writer.close();
} catch {
// Ignore if already closed
}
// Clean up temp directory
try {
await rm(testDir, { recursive: true, force: true });
} catch {
// Ignore cleanup errors
}
});
describe('getProcessId', () => {
it('should return the process ID', () => {
expect(writer.getProcessId()).toBe(processId);
});
});
describe('open', () => {
it('should create the process log directory', async () => {
const processDir = manager.getProcessDir(processId);
expect(existsSync(processDir)).toBe(false);
await writer.open();
expect(existsSync(processDir)).toBe(true);
});
it('should create stdout.log and stderr.log files', async () => {
await writer.open();
const stdoutPath = manager.getLogPath(processId, 'stdout');
const stderrPath = manager.getLogPath(processId, 'stderr');
expect(existsSync(stdoutPath)).toBe(true);
expect(existsSync(stderrPath)).toBe(true);
});
it('should make streams available after open', async () => {
expect(writer.getStdoutStream()).toBeNull();
expect(writer.getStderrStream()).toBeNull();
await writer.open();
expect(writer.getStdoutStream()).not.toBeNull();
expect(writer.getStderrStream()).not.toBeNull();
});
});
describe('writeStdout', () => {
it('should throw if writer is not open', async () => {
await expect(writer.writeStdout('test')).rejects.toThrow(
'Log writer not open'
);
});
it('should write string data to stdout.log', async () => {
await writer.open();
await writer.writeStdout('Hello stdout\n');
await writer.close();
const content = await readFile(
manager.getLogPath(processId, 'stdout'),
'utf-8'
);
expect(content).toContain('Hello stdout');
});
it('should write Buffer data to stdout.log', async () => {
await writer.open();
await writer.writeStdout(Buffer.from('Buffer content\n'));
await writer.close();
const content = await readFile(
manager.getLogPath(processId, 'stdout'),
'utf-8'
);
expect(content).toContain('Buffer content');
});
it('should prefix lines with timestamps', async () => {
await writer.open();
await writer.writeStdout('Line one\n');
await writer.close();
const content = await readFile(
manager.getLogPath(processId, 'stdout'),
'utf-8'
);
// Timestamp format: [YYYY-MM-DD HH:mm:ss.SSS]
expect(content).toMatch(/\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}\]/);
});
it('should handle multiple lines with individual timestamps', async () => {
await writer.open();
await writer.writeStdout('Line one\nLine two\nLine three\n');
await writer.close();
const content = await readFile(
manager.getLogPath(processId, 'stdout'),
'utf-8'
);
const lines = content.split('\n').filter((l) => l.length > 0);
expect(lines.length).toBe(3);
// Each line should have a timestamp
for (const line of lines) {
expect(line).toMatch(/^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}\]/);
}
});
});
describe('writeStderr', () => {
it('should throw if writer is not open', async () => {
await expect(writer.writeStderr('test')).rejects.toThrow(
'Log writer not open'
);
});
it('should write string data to stderr.log', async () => {
await writer.open();
await writer.writeStderr('Error message\n');
await writer.close();
const content = await readFile(
manager.getLogPath(processId, 'stderr'),
'utf-8'
);
expect(content).toContain('Error message');
});
it('should write Buffer data to stderr.log', async () => {
await writer.open();
await writer.writeStderr(Buffer.from('Error buffer\n'));
await writer.close();
const content = await readFile(
manager.getLogPath(processId, 'stderr'),
'utf-8'
);
expect(content).toContain('Error buffer');
});
it('should prefix lines with timestamps', async () => {
await writer.open();
await writer.writeStderr('Error line\n');
await writer.close();
const content = await readFile(
manager.getLogPath(processId, 'stderr'),
'utf-8'
);
expect(content).toMatch(/\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}\]/);
});
});
describe('close', () => {
it('should flush and close file handles', async () => {
await writer.open();
await writer.writeStdout('Before close\n');
await writer.close();
// Streams should be null after close
expect(writer.getStdoutStream()).toBeNull();
expect(writer.getStderrStream()).toBeNull();
// Data should be flushed to disk
const content = await readFile(
manager.getLogPath(processId, 'stdout'),
'utf-8'
);
expect(content).toContain('Before close');
});
it('should not error if called multiple times', async () => {
await writer.open();
await writer.close();
// Should not throw
await writer.close();
});
});
describe('write after close', () => {
it('should throw when writing to stdout after close', async () => {
await writer.open();
await writer.close();
await expect(writer.writeStdout('test')).rejects.toThrow(
'Log writer not open'
);
});
it('should throw when writing to stderr after close', async () => {
await writer.open();
await writer.close();
await expect(writer.writeStderr('test')).rejects.toThrow(
'Log writer not open'
);
});
});
describe('append mode', () => {
it('should append to existing log files', async () => {
// First write session
await writer.open();
await writer.writeStdout('First session\n');
await writer.close();
// Second write session with new writer
const writer2 = new ProcessLogWriter(processId, manager);
await writer2.open();
await writer2.writeStdout('Second session\n');
await writer2.close();
const content = await readFile(
manager.getLogPath(processId, 'stdout'),
'utf-8'
);
expect(content).toContain('First session');
expect(content).toContain('Second session');
});
});
});
describe('ProcessLogWriter with EventBus', () => {
let testDir: string;
let manager: LogManager;
let eventBus: EventBus;
let writerWithBus: ProcessLogWriter;
const processId = 'event-test-process';
beforeEach(async () => {
testDir = join(tmpdir(), `cw-event-test-${Date.now()}-${Math.random().toString(36).slice(2)}`);
manager = new LogManager({ baseDir: testDir });
eventBus = createEventBus();
writerWithBus = new ProcessLogWriter(processId, manager, eventBus);
});
afterEach(async () => {
try {
await writerWithBus.close();
} catch {
// Ignore if already closed
}
try {
await rm(testDir, { recursive: true, force: true });
} catch {
// Ignore cleanup errors
}
});
describe('event emission', () => {
it('should emit log:entry event on writeStdout', async () => {
const handler = vi.fn();
eventBus.on<LogEntryEvent>('log:entry', handler);
await writerWithBus.open();
await writerWithBus.writeStdout('Hello stdout\n');
expect(handler).toHaveBeenCalledOnce();
const event = handler.mock.calls[0][0] as LogEntryEvent;
expect(event.type).toBe('log:entry');
expect(event.payload.processId).toBe(processId);
expect(event.payload.stream).toBe('stdout');
expect(event.payload.data).toBe('Hello stdout\n');
expect(event.timestamp).toBeInstanceOf(Date);
});
it('should emit log:entry event on writeStderr', async () => {
const handler = vi.fn();
eventBus.on<LogEntryEvent>('log:entry', handler);
await writerWithBus.open();
await writerWithBus.writeStderr('Error message\n');
expect(handler).toHaveBeenCalledOnce();
const event = handler.mock.calls[0][0] as LogEntryEvent;
expect(event.type).toBe('log:entry');
expect(event.payload.processId).toBe(processId);
expect(event.payload.stream).toBe('stderr');
expect(event.payload.data).toBe('Error message\n');
});
it('should emit events with Buffer data converted to string', async () => {
const handler = vi.fn();
eventBus.on<LogEntryEvent>('log:entry', handler);
await writerWithBus.open();
await writerWithBus.writeStdout(Buffer.from('Buffer data\n'));
const event = handler.mock.calls[0][0] as LogEntryEvent;
expect(event.payload.data).toBe('Buffer data\n');
});
it('should emit event for each write call', async () => {
const handler = vi.fn();
eventBus.on<LogEntryEvent>('log:entry', handler);
await writerWithBus.open();
await writerWithBus.writeStdout('Line 1\n');
await writerWithBus.writeStdout('Line 2\n');
await writerWithBus.writeStderr('Error\n');
expect(handler).toHaveBeenCalledTimes(3);
// Verify each call
expect(handler.mock.calls[0][0].payload.stream).toBe('stdout');
expect(handler.mock.calls[0][0].payload.data).toBe('Line 1\n');
expect(handler.mock.calls[1][0].payload.stream).toBe('stdout');
expect(handler.mock.calls[1][0].payload.data).toBe('Line 2\n');
expect(handler.mock.calls[2][0].payload.stream).toBe('stderr');
expect(handler.mock.calls[2][0].payload.data).toBe('Error\n');
});
});
describe('without eventBus', () => {
it('should NOT emit events when eventBus is not provided', async () => {
// Create a writer without eventBus
const writerNoBus = new ProcessLogWriter(processId + '-nobus', manager);
const handler = vi.fn();
eventBus.on<LogEntryEvent>('log:entry', handler);
await writerNoBus.open();
await writerNoBus.writeStdout('Hello\n');
await writerNoBus.writeStderr('Error\n');
await writerNoBus.close();
// Handler should not have been called because writerNoBus has no eventBus
expect(handler).not.toHaveBeenCalled();
});
});
describe('backwards compatibility', () => {
it('should work with two-argument constructor', async () => {
const writerCompat = new ProcessLogWriter(processId + '-compat', manager);
await writerCompat.open();
await writerCompat.writeStdout('Compat test\n');
await writerCompat.close();
const content = await readFile(
manager.getLogPath(processId + '-compat', 'stdout'),
'utf-8'
);
expect(content).toContain('Compat test');
});
});
});

View File

@@ -0,0 +1,224 @@
/**
* Process Log Writer
*
* Handles per-process stdout/stderr capture to individual log files.
* Optionally emits log events to an EventBus for real-time streaming.
*/
import { createWriteStream, type WriteStream } from 'node:fs';
import type { LogManager } from './manager.js';
import type { EventBus, LogEntryEvent } from '../events/index.js';
/**
* Formats a timestamp for log output.
* Format: [YYYY-MM-DD HH:mm:ss.SSS]
*/
function formatTimestamp(date: Date): string {
const pad = (n: number, w = 2) => n.toString().padStart(w, '0');
const year = date.getFullYear();
const month = pad(date.getMonth() + 1);
const day = pad(date.getDate());
const hours = pad(date.getHours());
const minutes = pad(date.getMinutes());
const seconds = pad(date.getSeconds());
const ms = pad(date.getMilliseconds(), 3);
return `[${year}-${month}-${day} ${hours}:${minutes}:${seconds}.${ms}]`;
}
/**
* Writes stdout/stderr output to per-process log files.
*
* Each line of output is prefixed with a timestamp.
* Handles backpressure by exposing drain events on the underlying streams.
*/
export class ProcessLogWriter {
private readonly processId: string;
private readonly logManager: LogManager;
private readonly eventBus: EventBus | undefined;
private stdoutStream: WriteStream | null = null;
private stderrStream: WriteStream | null = null;
/**
* Creates a new ProcessLogWriter.
* @param processId - Unique identifier for the process
* @param logManager - LogManager instance for directory management
* @param eventBus - Optional EventBus for emitting log entry events
*/
constructor(processId: string, logManager: LogManager, eventBus?: EventBus) {
this.processId = processId;
this.logManager = logManager;
this.eventBus = eventBus;
}
/**
* Opens file handles for stdout and stderr log files.
* Creates the process log directory if it doesn't exist.
*/
async open(): Promise<void> {
// Ensure the process directory exists
await this.logManager.ensureProcessDir(this.processId);
// Open write streams in append mode
const stdoutPath = this.logManager.getLogPath(this.processId, 'stdout');
const stderrPath = this.logManager.getLogPath(this.processId, 'stderr');
this.stdoutStream = createWriteStream(stdoutPath, { flags: 'a' });
this.stderrStream = createWriteStream(stderrPath, { flags: 'a' });
// Wait for both streams to be ready
await Promise.all([
new Promise<void>((resolve, reject) => {
this.stdoutStream!.once('open', () => resolve());
this.stdoutStream!.once('error', reject);
}),
new Promise<void>((resolve, reject) => {
this.stderrStream!.once('open', () => resolve());
this.stderrStream!.once('error', reject);
}),
]);
}
/**
* Writes data to the stdout log file with timestamps.
* Also emits a LogEntry event if an EventBus was provided.
* @param data - String or Buffer to write
* @returns Promise that resolves when write is complete (including drain if needed)
*/
async writeStdout(data: string | Buffer): Promise<void> {
if (!this.stdoutStream) {
throw new Error('Log writer not open. Call open() first.');
}
await this.writeWithTimestamp(this.stdoutStream, data);
// Emit log entry event for real-time streaming
if (this.eventBus) {
const content = typeof data === 'string' ? data : data.toString('utf-8');
const event: LogEntryEvent = {
type: 'log:entry',
timestamp: new Date(),
payload: {
processId: this.processId,
stream: 'stdout',
data: content,
},
};
this.eventBus.emit(event);
}
}
/**
* Writes data to the stderr log file with timestamps.
* Also emits a LogEntry event if an EventBus was provided.
* @param data - String or Buffer to write
* @returns Promise that resolves when write is complete (including drain if needed)
*/
async writeStderr(data: string | Buffer): Promise<void> {
if (!this.stderrStream) {
throw new Error('Log writer not open. Call open() first.');
}
await this.writeWithTimestamp(this.stderrStream, data);
// Emit log entry event for real-time streaming
if (this.eventBus) {
const content = typeof data === 'string' ? data : data.toString('utf-8');
const event: LogEntryEvent = {
type: 'log:entry',
timestamp: new Date(),
payload: {
processId: this.processId,
stream: 'stderr',
data: content,
},
};
this.eventBus.emit(event);
}
}
/**
* Writes data with timestamp prefix, handling backpressure.
*/
private async writeWithTimestamp(
stream: WriteStream,
data: string | Buffer
): Promise<void> {
const content = typeof data === 'string' ? data : data.toString('utf-8');
const timestamp = formatTimestamp(new Date());
// Prefix each line with timestamp
const lines = content.split('\n');
const timestampedLines = lines
.map((line, index) => {
// Don't add timestamp to empty trailing line from split
if (index === lines.length - 1 && line === '') {
return '';
}
return `${timestamp} ${line}`;
})
.join('\n');
// Write with backpressure handling
const canWrite = stream.write(timestampedLines);
if (!canWrite) {
// Wait for drain event before continuing
await new Promise<void>((resolve) => {
stream.once('drain', resolve);
});
}
}
/**
* Flushes and closes both file handles.
*/
async close(): Promise<void> {
const closePromises: Promise<void>[] = [];
if (this.stdoutStream) {
closePromises.push(
new Promise<void>((resolve, reject) => {
this.stdoutStream!.end(() => {
this.stdoutStream = null;
resolve();
});
this.stdoutStream!.once('error', reject);
})
);
}
if (this.stderrStream) {
closePromises.push(
new Promise<void>((resolve, reject) => {
this.stderrStream!.end(() => {
this.stderrStream = null;
resolve();
});
this.stderrStream!.once('error', reject);
})
);
}
await Promise.all(closePromises);
}
/**
* Gets the stdout write stream for direct piping.
* @returns The stdout WriteStream or null if not open
*/
getStdoutStream(): WriteStream | null {
return this.stdoutStream;
}
/**
* Gets the stderr write stream for direct piping.
* @returns The stderr WriteStream or null if not open
*/
getStderrStream(): WriteStream | null {
return this.stderrStream;
}
/**
* Gets the process ID for this writer.
*/
getProcessId(): string {
return this.processId;
}
}