All files / src/server index.ts

86.13% Statements 87/101
74.46% Branches 35/47
90% Functions 18/20
86% Lines 86/100

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357                                                    2x     2x     2x     2x                             20x 20x                 20x         20x 20x 20x 20x                 17x 1x       16x 16x 1x             15x     15x 15x 15x 15x 15x         15x           15x     15x 3x                 3x     15x 15x             16x 1x       15x         16x 3x             3x       15x 15x 15x     15x         15x 15x     15x             28x             1x                           11x     11x 7x 7x       4x 1x 1x       3x   1x 1x   1x 1x   1x               7x         7x             7x                   1x         1x 1x           1x             1x         1x       1x                 1x             4x 4x               16x 16x 2x   2x         2x 2x 2x     1x 1x 1x       14x 14x                     15x 15x             16x 16x                        
/**
 * Coordination Server
 *
 * HTTP server with health endpoint for agent coordination.
 * Uses native node:http for minimal dependencies.
 * Supports both traditional HTTP endpoints and tRPC for type-safe client communication.
 */
 
import { createServer, type Server, type IncomingMessage, type ServerResponse } from 'node:http';
import { writeFile, unlink, readFile, mkdir } from 'node:fs/promises';
import { dirname } from 'node:path';
import { homedir } from 'node:os';
import { join } from 'node:path';
import type { ServerConfig, ServerState, HealthResponse, StatusResponse } from './types.js';
import type { ProcessManager } from '../process/index.js';
import type { LogManager } from '../logging/index.js';
import type { EventBus, ServerStartedEvent, ServerStoppedEvent } from '../events/index.js';
import { createTrpcHandler, type TrpcAdapterOptions } from './trpc-adapter.js';
import { createModuleLogger } from '../logger/index.js';
 
/**
 * Optional dependencies for tRPC context.
 * Passed through to the tRPC adapter for procedure access.
 */
export type ServerContextDeps = Omit<TrpcAdapterOptions, 'eventBus' | 'serverStartedAt' | 'processCount'>;
 
const log = createModuleLogger('http');
 
/** Default port for the coordination server */
const DEFAULT_PORT = 3847;
 
/** Default host to bind to */
const DEFAULT_HOST = '127.0.0.1';
 
/** Default PID file location */
const DEFAULT_PID_FILE = join(homedir(), '.cw', 'server.pid');
 
/**
 * HTTP server for agent coordination.
 *
 * Routes:
 * - GET /health - Health check with uptime and process count
 * - GET /status - Full status with process list
 */
export class CoordinationServer {
  private readonly config: ServerConfig;
  private readonly processManager: ProcessManager;
  private readonly logManager: LogManager;
  private readonly eventBus: EventBus | undefined;
  private readonly contextDeps: ServerContextDeps;
  private server: Server | null = null;
  private state: ServerState | null = null;
 
  constructor(
    config: Partial<ServerConfig>,
    processManager: ProcessManager,
    logManager: LogManager,
    eventBus?: EventBus,
    contextDeps?: ServerContextDeps
  ) {
    this.config = {
      port: config.port ?? DEFAULT_PORT,
      host: config.host ?? DEFAULT_HOST,
      pidFile: config.pidFile ?? DEFAULT_PID_FILE,
    };
    this.processManager = processManager;
    this.logManager = logManager;
    this.eventBus = eventBus;
    this.contextDeps = contextDeps ?? {};
  }
 
  /**
   * Starts the HTTP server and writes PID file.
   * @throws If server is already running or PID file exists (another server running)
   */
  async start(): Promise<void> {
    // Check if already running
    if (this.server) {
      throw new Error('Server is already running');
    }
 
    // Check for existing PID file (another server might be running)
    const existingPid = await this.checkExistingServer();
    if (existingPid !== null) {
      throw new Error(
        `Another server appears to be running (PID: ${existingPid}). ` +
        `If this is incorrect, remove ${this.config.pidFile} and try again.`
      );
    }
 
    // Create server
    this.server = createServer((req, res) => this.handleRequest(req, res));
 
    // Start listening
    await new Promise<void>((resolve, reject) => {
      this.server!.once('error', reject);
      this.server!.listen(this.config.port, this.config.host, () => {
        this.server!.removeListener('error', reject);
        resolve();
      });
    });
 
    // Set server state
    this.state = {
      startedAt: new Date(),
      processCount: 0,
    };
 
    // Write PID file
    await this.writePidFile();
 
    // Emit ServerStarted event
    if (this.eventBus) {
      const event: ServerStartedEvent = {
        type: 'server:started',
        timestamp: new Date(),
        payload: {
          port: this.config.port,
          host: this.config.host,
          pid: process.pid,
        },
      };
      this.eventBus.emit(event);
    }
 
    console.log(`Coordination server listening on http://${this.config.host}:${this.config.port}`);
    log.info({ port: this.config.port, host: this.config.host, pid: process.pid }, 'server listening');
  }
 
  /**
   * Stops the HTTP server and removes PID file.
   */
  async stop(): Promise<void> {
    if (!this.server) {
      return;
    }
 
    // Calculate uptime before clearing state
    const uptime = this.state
      ? Math.floor((Date.now() - this.state.startedAt.getTime()) / 1000)
      : 0;
 
    // Emit ServerStopped event before stopping
    if (this.eventBus) {
      const event: ServerStoppedEvent = {
        type: 'server:stopped',
        timestamp: new Date(),
        payload: {
          uptime,
        },
      };
      this.eventBus.emit(event);
    }
 
    // Close server
    await new Promise<void>((resolve, reject) => {
      this.server!.close((err) => {
        Iif (err) {
          reject(err);
        } else {
          resolve();
        }
      });
    });
 
    this.server = null;
    this.state = null;
 
    // Remove PID file
    await this.removePidFile();
  }
 
  /**
   * Returns whether the server is currently running.
   */
  isRunning(): boolean {
    return this.server !== null && this.server.listening;
  }
 
  /**
   * Gets the server port.
   */
  getPort(): number {
    return this.config.port;
  }
 
  /**
   * Gets the PID file path.
   */
  getPidFile(): string {
    return this.config.pidFile;
  }
 
  /**
   * Handles incoming HTTP requests with simple path matching.
   */
  private handleRequest(req: IncomingMessage, res: ServerResponse): void {
    const url = req.url ?? '/';
 
    // Route tRPC requests to tRPC handler
    if (url.startsWith('/trpc')) {
      this.handleTrpc(req, res);
      return;
    }
 
    // Only accept GET requests for non-tRPC routes
    if (req.method !== 'GET') {
      this.sendJson(res, 405, { error: 'Method not allowed' });
      return;
    }
 
    // Simple path routing for backwards-compatible HTTP endpoints
    switch (url) {
      case '/health':
        this.handleHealth(res);
        break;
      case '/status':
        this.handleStatus(res);
        break;
      default:
        this.sendJson(res, 404, { error: 'Not found' });
    }
  }
 
  /**
   * Handles tRPC requests via the fetch adapter.
   */
  private handleTrpc(req: IncomingMessage, res: ServerResponse): void {
    Iif (!this.state || !this.eventBus) {
      this.sendJson(res, 500, { error: 'Server not initialized or missing eventBus' });
      return;
    }
 
    const trpcHandler = createTrpcHandler({
      eventBus: this.eventBus,
      serverStartedAt: this.state.startedAt,
      processCount: this.state.processCount,
      ...this.contextDeps,
    });
 
    trpcHandler(req, res).catch((error: Error) => {
      log.error({ err: error }, 'tRPC handler error');
      this.sendJson(res, 500, { error: 'Internal server error' });
    });
  }
 
  /**
   * Handles GET /health endpoint.
   */
  private handleHealth(res: ServerResponse): void {
    Iif (!this.state) {
      this.sendJson(res, 500, { error: 'Server not initialized' });
      return;
    }
 
    const uptime = Math.floor((Date.now() - this.state.startedAt.getTime()) / 1000);
    const response: HealthResponse = {
      status: 'ok',
      uptime,
      processCount: this.state.processCount,
    };
 
    this.sendJson(res, 200, response);
  }
 
  /**
   * Handles GET /status endpoint.
   */
  private handleStatus(res: ServerResponse): void {
    Iif (!this.state) {
      this.sendJson(res, 500, { error: 'Server not initialized' });
      return;
    }
 
    const uptime = Math.floor((Date.now() - this.state.startedAt.getTime()) / 1000);
 
    // Get process list from process manager registry
    // Note: We access processManager's registry indirectly through its public API
    const response: StatusResponse = {
      server: {
        startedAt: this.state.startedAt.toISOString(),
        uptime,
        pid: process.pid,
      },
      processes: [],
    };
 
    this.sendJson(res, 200, response);
  }
 
  /**
   * Sends a JSON response.
   */
  private sendJson(res: ServerResponse, statusCode: number, data: unknown): void {
    res.writeHead(statusCode, { 'Content-Type': 'application/json' });
    res.end(JSON.stringify(data));
  }
 
  /**
   * Checks if another server is already running by checking PID file.
   * @returns The PID if a server is running, null otherwise
   */
  private async checkExistingServer(): Promise<number | null> {
    try {
      const content = await readFile(this.config.pidFile, 'utf-8');
      const pid = parseInt(content.trim(), 10);
 
      Iif (isNaN(pid)) {
        return null;
      }
 
      // Check if process is actually running
      try {
        process.kill(pid, 0);
        return pid; // Process is alive
      } catch {
        // Process is dead, PID file is stale
        log.warn({ stalePid: pid }, 'stale PID file cleaned up');
        await this.removePidFile();
        return null;
      }
    } catch (error) {
      // PID file doesn't exist
      Eif ((error as NodeJS.ErrnoException).code === 'ENOENT') {
        return null;
      }
      throw error;
    }
  }
 
  /**
   * Writes the PID file.
   */
  private async writePidFile(): Promise<void> {
    // Ensure directory exists
    await mkdir(dirname(this.config.pidFile), { recursive: true });
    await writeFile(this.config.pidFile, process.pid.toString(), 'utf-8');
  }
 
  /**
   * Removes the PID file.
   */
  private async removePidFile(): Promise<void> {
    try {
      await unlink(this.config.pidFile);
    } catch (error) {
      // Ignore if file doesn't exist
      if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
        throw error;
      }
    }
  }
}
 
// Re-export types
export type { ServerConfig, ServerState, HealthResponse, StatusResponse } from './types.js';