Files
Codewalkers/apps/server/process/manager.ts
Lukas May 34578d39c6 refactor: Restructure monorepo to apps/server/ and apps/web/ layout
Move src/ → apps/server/ and packages/web/ → apps/web/ to adopt
standard monorepo conventions (apps/ for runnable apps, packages/
for reusable libraries). Update all config files, shared package
imports, test fixtures, and documentation to reflect new paths.

Key fixes:
- Update workspace config to ["apps/*", "packages/*"]
- Update tsconfig.json rootDir/include for apps/server/
- Add apps/web/** to vitest exclude list
- Update drizzle.config.ts schema path
- Fix ensure-schema.ts migration path detection (3 levels up in dev,
  2 levels up in dist)
- Fix tests/integration/cli-server.test.ts import paths
- Update packages/shared imports to apps/server/ paths
- Update all docs/ files with new paths
2026-03-03 11:22:53 +01:00

317 lines
8.4 KiB
TypeScript

/**
* Process Manager
*
* Manages spawning, stopping, and lifecycle of child processes.
* Uses execa for process spawning with detached mode support.
* Emits domain events via optional EventBus for coordination.
*/
import { execa, type ResultPromise } from 'execa';
import type { ProcessInfo, SpawnOptions } from './types.js';
import type { ProcessRegistry } from './registry.js';
import type { EventBus, ProcessSpawnedEvent, ProcessStoppedEvent, ProcessCrashedEvent } from '../events/index.js';
/** Stop timeout in milliseconds before sending SIGKILL */
const STOP_TIMEOUT_MS = 5000;
/**
* Internal tracking for spawned process handles.
* Maps process ID to execa subprocess for control operations.
*/
interface ProcessHandle {
subprocess: ResultPromise;
options: SpawnOptions;
}
/**
* Manager for spawning, tracking, and controlling child processes.
*/
export class ProcessManager {
private handles: Map<string, ProcessHandle> = new Map();
/**
* Create a new ProcessManager.
* @param registry - Registry for tracking process metadata
* @param eventBus - Optional event bus for emitting domain events
*/
constructor(
private registry: ProcessRegistry,
private eventBus?: EventBus
) {}
/**
* Spawn a new child process.
* @param options - Spawn configuration
* @returns Process info for the spawned process
* @throws If process fails to start
*/
async spawn(options: SpawnOptions): Promise<ProcessInfo> {
const { id, command, args = [], cwd, env } = options;
// Check if process with this ID already exists
const existing = this.registry.get(id);
if (existing && existing.status === 'running') {
throw new Error(`Process with id '${id}' is already running`);
}
// Spawn the process in detached mode
const subprocess = execa(command, args, {
cwd,
env: env ? { ...process.env, ...env } : undefined,
detached: true,
stdio: 'ignore', // Don't inherit stdio for background processes
});
// Ensure we have a PID
const pid = subprocess.pid;
if (pid === undefined) {
throw new Error(`Failed to get PID for process '${id}'`);
}
// Create process info
const info: ProcessInfo = {
id,
pid,
command,
args,
startedAt: new Date(),
status: 'running',
};
// Store handle for later control
this.handles.set(id, { subprocess, options });
// Register in registry
this.registry.register(info);
// Emit ProcessSpawned event
if (this.eventBus) {
const event: ProcessSpawnedEvent = {
type: 'process:spawned',
timestamp: new Date(),
payload: {
processId: id,
pid,
command,
},
};
this.eventBus.emit(event);
}
// Set up exit handler to update status and emit events
subprocess.on('exit', (code, signal) => {
const status = code === 0 ? 'stopped' : 'crashed';
this.registry.updateStatus(id, status);
this.handles.delete(id);
// Emit appropriate event based on exit status
if (this.eventBus) {
if (status === 'stopped') {
const event: ProcessStoppedEvent = {
type: 'process:stopped',
timestamp: new Date(),
payload: {
processId: id,
pid,
exitCode: code,
},
};
this.eventBus.emit(event);
} else {
const event: ProcessCrashedEvent = {
type: 'process:crashed',
timestamp: new Date(),
payload: {
processId: id,
pid,
exitCode: code,
signal,
},
};
this.eventBus.emit(event);
}
}
});
// Suppress unhandled rejection when process is killed
// This is expected behavior - we're intentionally killing processes
subprocess.catch(() => {
// Intentionally ignored - we handle exit via the 'exit' event
});
// Unref the subprocess so it doesn't keep the parent alive
subprocess.unref();
return info;
}
/**
* Stop a running process.
* Sends SIGTERM first, then SIGKILL after timeout if needed.
* @param id - Process ID to stop
* @throws If process not found
*/
async stop(id: string): Promise<void> {
const handle = this.handles.get(id);
const info = this.registry.get(id);
if (!info) {
throw new Error(`Process with id '${id}' not found`);
}
if (info.status !== 'running') {
// Already stopped, just clean up
this.handles.delete(id);
return;
}
if (!handle) {
// Process exists in registry but we don't have a handle
// Try to kill by PID directly
try {
process.kill(info.pid, 'SIGTERM');
await this.waitForExit(info.pid, STOP_TIMEOUT_MS);
} catch {
// Process might already be dead
}
this.registry.updateStatus(id, 'stopped');
return;
}
// Send SIGTERM
handle.subprocess.kill('SIGTERM');
// Wait for graceful shutdown
const exited = await this.waitForProcessExit(handle.subprocess, STOP_TIMEOUT_MS);
if (!exited) {
// Force kill with SIGKILL
handle.subprocess.kill('SIGKILL');
await this.waitForProcessExit(handle.subprocess, 1000).catch(() => {});
}
// Update status (exit handler should have done this, but ensure it)
this.registry.updateStatus(id, 'stopped');
this.handles.delete(id);
}
/**
* Stop all running processes.
*/
async stopAll(): Promise<void> {
const processes = this.registry.getAll().filter(p => p.status === 'running');
await Promise.all(processes.map(p => this.stop(p.id).catch(() => {})));
}
/**
* Restart a process with the same configuration.
* @param id - Process ID to restart
* @returns New process info
* @throws If process not found or original config unavailable
*/
async restart(id: string): Promise<ProcessInfo> {
const handle = this.handles.get(id);
const info = this.registry.get(id);
if (!info) {
throw new Error(`Process with id '${id}' not found`);
}
// Get original spawn options
let options: SpawnOptions;
if (handle) {
options = handle.options;
} else {
// Reconstruct options from process info
options = {
id,
command: info.command,
args: info.args,
};
}
// Stop if running
if (info.status === 'running') {
await this.stop(id);
}
// Unregister old process
this.registry.unregister(id);
// Spawn with same options
return this.spawn(options);
}
/**
* Check if a process is currently running.
* @param id - Process ID to check
* @returns true if process exists and is running
*/
isRunning(id: string): boolean {
const info = this.registry.get(id);
if (!info || info.status !== 'running') {
return false;
}
// Double-check by probing the actual process
try {
process.kill(info.pid, 0);
return true;
} catch {
// Process is dead, update registry
this.registry.updateStatus(id, 'crashed');
this.handles.delete(id);
return false;
}
}
/**
* Wait for a subprocess to exit within a timeout.
* @returns true if process exited, false if timeout
*/
private waitForProcessExit(subprocess: ResultPromise, timeoutMs: number): Promise<boolean> {
return new Promise(resolve => {
const timeout = setTimeout(() => {
resolve(false);
}, timeoutMs);
// Use both then and catch to handle success and kill scenarios
subprocess
.then(() => {
clearTimeout(timeout);
resolve(true);
})
.catch(() => {
// Process was killed (expected) - this is success for our purposes
clearTimeout(timeout);
resolve(true);
});
});
}
/**
* Wait for a PID to exit within a timeout.
* @returns true if process exited, false if timeout
*/
private waitForExit(pid: number, timeoutMs: number): Promise<boolean> {
return new Promise(resolve => {
const start = Date.now();
const check = () => {
try {
process.kill(pid, 0);
// Still alive
if (Date.now() - start >= timeoutMs) {
resolve(false);
} else {
setTimeout(check, 100);
}
} catch {
// Dead
resolve(true);
}
};
check();
});
}
}