Files
Lukas May 34578d39c6 refactor: Restructure monorepo to apps/server/ and apps/web/ layout
Move src/ → apps/server/ and packages/web/ → apps/web/ to adopt
standard monorepo conventions (apps/ for runnable apps, packages/
for reusable libraries). Update all config files, shared package
imports, test fixtures, and documentation to reflect new paths.

Key fixes:
- Update workspace config to ["apps/*", "packages/*"]
- Update tsconfig.json rootDir/include for apps/server/
- Add apps/web/** to vitest exclude list
- Update drizzle.config.ts schema path
- Fix ensure-schema.ts migration path detection (3 levels up in dev,
  2 levels up in dist)
- Fix tests/integration/cli-server.test.ts import paths
- Update packages/shared imports to apps/server/ paths
- Update all docs/ files with new paths
2026-03-03 11:22:53 +01:00

225 lines
6.7 KiB
TypeScript

/**
* Process Log Writer
*
* Handles per-process stdout/stderr capture to individual log files.
* Optionally emits log events to an EventBus for real-time streaming.
*/
import { createWriteStream, type WriteStream } from 'node:fs';
import type { LogManager } from './manager.js';
import type { EventBus, LogEntryEvent } from '../events/index.js';
/**
* Formats a timestamp for log output.
* Format: [YYYY-MM-DD HH:mm:ss.SSS]
*/
function formatTimestamp(date: Date): string {
const pad = (n: number, w = 2) => n.toString().padStart(w, '0');
const year = date.getFullYear();
const month = pad(date.getMonth() + 1);
const day = pad(date.getDate());
const hours = pad(date.getHours());
const minutes = pad(date.getMinutes());
const seconds = pad(date.getSeconds());
const ms = pad(date.getMilliseconds(), 3);
return `[${year}-${month}-${day} ${hours}:${minutes}:${seconds}.${ms}]`;
}
/**
* Writes stdout/stderr output to per-process log files.
*
* Each line of output is prefixed with a timestamp.
* Handles backpressure by exposing drain events on the underlying streams.
*/
export class ProcessLogWriter {
private readonly processId: string;
private readonly logManager: LogManager;
private readonly eventBus: EventBus | undefined;
private stdoutStream: WriteStream | null = null;
private stderrStream: WriteStream | null = null;
/**
* Creates a new ProcessLogWriter.
* @param processId - Unique identifier for the process
* @param logManager - LogManager instance for directory management
* @param eventBus - Optional EventBus for emitting log entry events
*/
constructor(processId: string, logManager: LogManager, eventBus?: EventBus) {
this.processId = processId;
this.logManager = logManager;
this.eventBus = eventBus;
}
/**
* Opens file handles for stdout and stderr log files.
* Creates the process log directory if it doesn't exist.
*/
async open(): Promise<void> {
// Ensure the process directory exists
await this.logManager.ensureProcessDir(this.processId);
// Open write streams in append mode
const stdoutPath = this.logManager.getLogPath(this.processId, 'stdout');
const stderrPath = this.logManager.getLogPath(this.processId, 'stderr');
this.stdoutStream = createWriteStream(stdoutPath, { flags: 'a' });
this.stderrStream = createWriteStream(stderrPath, { flags: 'a' });
// Wait for both streams to be ready
await Promise.all([
new Promise<void>((resolve, reject) => {
this.stdoutStream!.once('open', () => resolve());
this.stdoutStream!.once('error', reject);
}),
new Promise<void>((resolve, reject) => {
this.stderrStream!.once('open', () => resolve());
this.stderrStream!.once('error', reject);
}),
]);
}
/**
* Writes data to the stdout log file with timestamps.
* Also emits a LogEntry event if an EventBus was provided.
* @param data - String or Buffer to write
* @returns Promise that resolves when write is complete (including drain if needed)
*/
async writeStdout(data: string | Buffer): Promise<void> {
if (!this.stdoutStream) {
throw new Error('Log writer not open. Call open() first.');
}
await this.writeWithTimestamp(this.stdoutStream, data);
// Emit log entry event for real-time streaming
if (this.eventBus) {
const content = typeof data === 'string' ? data : data.toString('utf-8');
const event: LogEntryEvent = {
type: 'log:entry',
timestamp: new Date(),
payload: {
processId: this.processId,
stream: 'stdout',
data: content,
},
};
this.eventBus.emit(event);
}
}
/**
* Writes data to the stderr log file with timestamps.
* Also emits a LogEntry event if an EventBus was provided.
* @param data - String or Buffer to write
* @returns Promise that resolves when write is complete (including drain if needed)
*/
async writeStderr(data: string | Buffer): Promise<void> {
if (!this.stderrStream) {
throw new Error('Log writer not open. Call open() first.');
}
await this.writeWithTimestamp(this.stderrStream, data);
// Emit log entry event for real-time streaming
if (this.eventBus) {
const content = typeof data === 'string' ? data : data.toString('utf-8');
const event: LogEntryEvent = {
type: 'log:entry',
timestamp: new Date(),
payload: {
processId: this.processId,
stream: 'stderr',
data: content,
},
};
this.eventBus.emit(event);
}
}
/**
* Writes data with timestamp prefix, handling backpressure.
*/
private async writeWithTimestamp(
stream: WriteStream,
data: string | Buffer
): Promise<void> {
const content = typeof data === 'string' ? data : data.toString('utf-8');
const timestamp = formatTimestamp(new Date());
// Prefix each line with timestamp
const lines = content.split('\n');
const timestampedLines = lines
.map((line, index) => {
// Don't add timestamp to empty trailing line from split
if (index === lines.length - 1 && line === '') {
return '';
}
return `${timestamp} ${line}`;
})
.join('\n');
// Write with backpressure handling
const canWrite = stream.write(timestampedLines);
if (!canWrite) {
// Wait for drain event before continuing
await new Promise<void>((resolve) => {
stream.once('drain', resolve);
});
}
}
/**
* Flushes and closes both file handles.
*/
async close(): Promise<void> {
const closePromises: Promise<void>[] = [];
if (this.stdoutStream) {
closePromises.push(
new Promise<void>((resolve, reject) => {
this.stdoutStream!.end(() => {
this.stdoutStream = null;
resolve();
});
this.stdoutStream!.once('error', reject);
})
);
}
if (this.stderrStream) {
closePromises.push(
new Promise<void>((resolve, reject) => {
this.stderrStream!.end(() => {
this.stderrStream = null;
resolve();
});
this.stderrStream!.once('error', reject);
})
);
}
await Promise.all(closePromises);
}
/**
* Gets the stdout write stream for direct piping.
* @returns The stdout WriteStream or null if not open
*/
getStdoutStream(): WriteStream | null {
return this.stdoutStream;
}
/**
* Gets the stderr write stream for direct piping.
* @returns The stderr WriteStream or null if not open
*/
getStderrStream(): WriteStream | null {
return this.stderrStream;
}
/**
* Gets the process ID for this writer.
*/
getProcessId(): string {
return this.processId;
}
}