Files
Codewalkers/src/server/index.ts
Lukas May 9da12a890d feat(01.1-06): add tRPC HTTP adapter and CLI client
- Create src/server/trpc-adapter.ts with fetch adapter for node:http
- Create src/cli/trpc-client.ts with typed client factory functions
- Update CoordinationServer to route /trpc/* to tRPC handler
- Move @trpc/client from devDeps to regular deps
- Keep /health and /status HTTP endpoints for backwards compatibility
2026-01-30 14:07:31 +01:00

342 lines
9.0 KiB
TypeScript

/**
* Coordination Server
*
* HTTP server with health endpoint for agent coordination.
* Uses native node:http for minimal dependencies.
* Supports both traditional HTTP endpoints and tRPC for type-safe client communication.
*/
import { createServer, type Server, type IncomingMessage, type ServerResponse } from 'node:http';
import { writeFile, unlink, readFile, mkdir } from 'node:fs/promises';
import { dirname } from 'node:path';
import { homedir } from 'node:os';
import { join } from 'node:path';
import type { ServerConfig, ServerState, HealthResponse, StatusResponse } from './types.js';
import type { ProcessManager } from '../process/index.js';
import type { LogManager } from '../logging/index.js';
import type { EventBus, ServerStartedEvent, ServerStoppedEvent } from '../events/index.js';
import { createTrpcHandler } from './trpc-adapter.js';
/** Default port for the coordination server */
const DEFAULT_PORT = 3847;
/** Default host to bind to */
const DEFAULT_HOST = '127.0.0.1';
/** Default PID file location */
const DEFAULT_PID_FILE = join(homedir(), '.cw', 'server.pid');
/**
* HTTP server for agent coordination.
*
* Routes:
* - GET /health - Health check with uptime and process count
* - GET /status - Full status with process list
*/
export class CoordinationServer {
private readonly config: ServerConfig;
private readonly processManager: ProcessManager;
private readonly logManager: LogManager;
private readonly eventBus: EventBus | undefined;
private server: Server | null = null;
private state: ServerState | null = null;
constructor(
config: Partial<ServerConfig>,
processManager: ProcessManager,
logManager: LogManager,
eventBus?: EventBus
) {
this.config = {
port: config.port ?? DEFAULT_PORT,
host: config.host ?? DEFAULT_HOST,
pidFile: config.pidFile ?? DEFAULT_PID_FILE,
};
this.processManager = processManager;
this.logManager = logManager;
this.eventBus = eventBus;
}
/**
* Starts the HTTP server and writes PID file.
* @throws If server is already running or PID file exists (another server running)
*/
async start(): Promise<void> {
// Check if already running
if (this.server) {
throw new Error('Server is already running');
}
// Check for existing PID file (another server might be running)
const existingPid = await this.checkExistingServer();
if (existingPid !== null) {
throw new Error(
`Another server appears to be running (PID: ${existingPid}). ` +
`If this is incorrect, remove ${this.config.pidFile} and try again.`
);
}
// Create server
this.server = createServer((req, res) => this.handleRequest(req, res));
// Start listening
await new Promise<void>((resolve, reject) => {
this.server!.once('error', reject);
this.server!.listen(this.config.port, this.config.host, () => {
this.server!.removeListener('error', reject);
resolve();
});
});
// Set server state
this.state = {
startedAt: new Date(),
processCount: 0,
};
// Write PID file
await this.writePidFile();
// Emit ServerStarted event
if (this.eventBus) {
const event: ServerStartedEvent = {
type: 'server:started',
timestamp: new Date(),
payload: {
port: this.config.port,
host: this.config.host,
pid: process.pid,
},
};
this.eventBus.emit(event);
}
console.log(`Coordination server listening on http://${this.config.host}:${this.config.port}`);
}
/**
* Stops the HTTP server and removes PID file.
*/
async stop(): Promise<void> {
if (!this.server) {
return;
}
// Calculate uptime before clearing state
const uptime = this.state
? Math.floor((Date.now() - this.state.startedAt.getTime()) / 1000)
: 0;
// Emit ServerStopped event before stopping
if (this.eventBus) {
const event: ServerStoppedEvent = {
type: 'server:stopped',
timestamp: new Date(),
payload: {
uptime,
},
};
this.eventBus.emit(event);
}
// Close server
await new Promise<void>((resolve, reject) => {
this.server!.close((err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
this.server = null;
this.state = null;
// Remove PID file
await this.removePidFile();
}
/**
* Returns whether the server is currently running.
*/
isRunning(): boolean {
return this.server !== null && this.server.listening;
}
/**
* Gets the server port.
*/
getPort(): number {
return this.config.port;
}
/**
* Gets the PID file path.
*/
getPidFile(): string {
return this.config.pidFile;
}
/**
* Handles incoming HTTP requests with simple path matching.
*/
private handleRequest(req: IncomingMessage, res: ServerResponse): void {
const url = req.url ?? '/';
// Route tRPC requests to tRPC handler
if (url.startsWith('/trpc')) {
this.handleTrpc(req, res);
return;
}
// Only accept GET requests for non-tRPC routes
if (req.method !== 'GET') {
this.sendJson(res, 405, { error: 'Method not allowed' });
return;
}
// Simple path routing for backwards-compatible HTTP endpoints
switch (url) {
case '/health':
this.handleHealth(res);
break;
case '/status':
this.handleStatus(res);
break;
default:
this.sendJson(res, 404, { error: 'Not found' });
}
}
/**
* Handles tRPC requests via the fetch adapter.
*/
private handleTrpc(req: IncomingMessage, res: ServerResponse): void {
if (!this.state || !this.eventBus) {
this.sendJson(res, 500, { error: 'Server not initialized or missing eventBus' });
return;
}
const trpcHandler = createTrpcHandler({
eventBus: this.eventBus,
serverStartedAt: this.state.startedAt,
processCount: this.state.processCount,
});
trpcHandler(req, res).catch((error: Error) => {
console.error('tRPC handler error:', error);
this.sendJson(res, 500, { error: 'Internal server error' });
});
}
/**
* Handles GET /health endpoint.
*/
private handleHealth(res: ServerResponse): void {
if (!this.state) {
this.sendJson(res, 500, { error: 'Server not initialized' });
return;
}
const uptime = Math.floor((Date.now() - this.state.startedAt.getTime()) / 1000);
const response: HealthResponse = {
status: 'ok',
uptime,
processCount: this.state.processCount,
};
this.sendJson(res, 200, response);
}
/**
* Handles GET /status endpoint.
*/
private handleStatus(res: ServerResponse): void {
if (!this.state) {
this.sendJson(res, 500, { error: 'Server not initialized' });
return;
}
const uptime = Math.floor((Date.now() - this.state.startedAt.getTime()) / 1000);
// Get process list from process manager registry
// Note: We access processManager's registry indirectly through its public API
const response: StatusResponse = {
server: {
startedAt: this.state.startedAt.toISOString(),
uptime,
pid: process.pid,
},
processes: [],
};
this.sendJson(res, 200, response);
}
/**
* Sends a JSON response.
*/
private sendJson(res: ServerResponse, statusCode: number, data: unknown): void {
res.writeHead(statusCode, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(data));
}
/**
* Checks if another server is already running by checking PID file.
* @returns The PID if a server is running, null otherwise
*/
private async checkExistingServer(): Promise<number | null> {
try {
const content = await readFile(this.config.pidFile, 'utf-8');
const pid = parseInt(content.trim(), 10);
if (isNaN(pid)) {
return null;
}
// Check if process is actually running
try {
process.kill(pid, 0);
return pid; // Process is alive
} catch {
// Process is dead, PID file is stale
await this.removePidFile();
return null;
}
} catch (error) {
// PID file doesn't exist
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
return null;
}
throw error;
}
}
/**
* Writes the PID file.
*/
private async writePidFile(): Promise<void> {
// Ensure directory exists
await mkdir(dirname(this.config.pidFile), { recursive: true });
await writeFile(this.config.pidFile, process.pid.toString(), 'utf-8');
}
/**
* Removes the PID file.
*/
private async removePidFile(): Promise<void> {
try {
await unlink(this.config.pidFile);
} catch (error) {
// Ignore if file doesn't exist
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
throw error;
}
}
}
}
// Re-export types
export type { ServerConfig, ServerState, HealthResponse, StatusResponse } from './types.js';