diff --git a/package.json b/package.json index e51b0cb..82ddc49 100644 --- a/package.json +++ b/package.json @@ -24,13 +24,13 @@ "author": "", "license": "ISC", "dependencies": { + "@trpc/client": "^11.9.0", "@trpc/server": "^11.9.0", "commander": "^12.1.0", "execa": "^9.5.2", "zod": "^4.3.6" }, "devDependencies": { - "@trpc/client": "^11.9.0", "@types/node": "^22.10.7", "rimraf": "^6.0.1", "tsx": "^4.19.2", diff --git a/src/cli/trpc-client.ts b/src/cli/trpc-client.ts new file mode 100644 index 0000000..ac3de79 --- /dev/null +++ b/src/cli/trpc-client.ts @@ -0,0 +1,54 @@ +/** + * tRPC Client for CLI + * + * Type-safe client for communicating with the coordination server. + * Uses httpBatchLink for efficient request batching. + */ + +import { createTRPCClient, httpBatchLink } from '@trpc/client'; +import type { AppRouter } from '../trpc/index.js'; + +/** Default server port */ +const DEFAULT_PORT = 3847; + +/** Default server host */ +const DEFAULT_HOST = '127.0.0.1'; + +/** + * Type-safe tRPC client for the coordination server. + */ +export type TrpcClient = ReturnType>; + +/** + * Creates a tRPC client for the coordination server. + * + * @param port - Server port (default: 3847) + * @param host - Server host (default: 127.0.0.1) + * @returns Type-safe tRPC client + */ +export function createTrpcClient( + port: number = DEFAULT_PORT, + host: string = DEFAULT_HOST +): TrpcClient { + return createTRPCClient({ + links: [ + httpBatchLink({ + url: `http://${host}:${port}/trpc`, + }), + ], + }); +} + +/** + * Creates a tRPC client using environment variables or defaults. + * + * Uses CW_PORT and CW_HOST environment variables if available, + * falling back to defaults (127.0.0.1:3847). + * + * @returns Type-safe tRPC client + */ +export function createDefaultTrpcClient(): TrpcClient { + const port = process.env.CW_PORT ? parseInt(process.env.CW_PORT, 10) : DEFAULT_PORT; + const host = process.env.CW_HOST ?? DEFAULT_HOST; + return createTrpcClient(port, host); +} diff --git a/src/server/index.ts b/src/server/index.ts index 9ab0938..9d9ae1d 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -3,6 +3,7 @@ * * HTTP server with health endpoint for agent coordination. * Uses native node:http for minimal dependencies. + * Supports both traditional HTTP endpoints and tRPC for type-safe client communication. */ import { createServer, type Server, type IncomingMessage, type ServerResponse } from 'node:http'; @@ -14,6 +15,7 @@ import type { ServerConfig, ServerState, HealthResponse, StatusResponse } from ' import type { ProcessManager } from '../process/index.js'; import type { LogManager } from '../logging/index.js'; import type { EventBus, ServerStartedEvent, ServerStoppedEvent } from '../events/index.js'; +import { createTrpcHandler } from './trpc-adapter.js'; /** Default port for the coordination server */ const DEFAULT_PORT = 3847; @@ -180,14 +182,22 @@ export class CoordinationServer { * Handles incoming HTTP requests with simple path matching. */ private handleRequest(req: IncomingMessage, res: ServerResponse): void { - // Only accept GET requests + const url = req.url ?? '/'; + + // Route tRPC requests to tRPC handler + if (url.startsWith('/trpc')) { + this.handleTrpc(req, res); + return; + } + + // Only accept GET requests for non-tRPC routes if (req.method !== 'GET') { this.sendJson(res, 405, { error: 'Method not allowed' }); return; } - // Simple path routing - switch (req.url) { + // Simple path routing for backwards-compatible HTTP endpoints + switch (url) { case '/health': this.handleHealth(res); break; @@ -199,6 +209,27 @@ export class CoordinationServer { } } + /** + * Handles tRPC requests via the fetch adapter. + */ + private handleTrpc(req: IncomingMessage, res: ServerResponse): void { + if (!this.state || !this.eventBus) { + this.sendJson(res, 500, { error: 'Server not initialized or missing eventBus' }); + return; + } + + const trpcHandler = createTrpcHandler({ + eventBus: this.eventBus, + serverStartedAt: this.state.startedAt, + processCount: this.state.processCount, + }); + + trpcHandler(req, res).catch((error: Error) => { + console.error('tRPC handler error:', error); + this.sendJson(res, 500, { error: 'Internal server error' }); + }); + } + /** * Handles GET /health endpoint. */ diff --git a/src/server/trpc-adapter.ts b/src/server/trpc-adapter.ts new file mode 100644 index 0000000..da01cf0 --- /dev/null +++ b/src/server/trpc-adapter.ts @@ -0,0 +1,99 @@ +/** + * tRPC HTTP Adapter + * + * Handles tRPC requests over HTTP using node:http. + * Routes /trpc/* requests to the tRPC router. + */ + +import type { IncomingMessage, ServerResponse } from 'node:http'; +import { fetchRequestHandler } from '@trpc/server/adapters/fetch'; +import { appRouter, createContext } from '../trpc/index.js'; +import type { EventBus } from '../events/index.js'; + +/** + * Options for creating the tRPC request handler. + */ +export interface TrpcAdapterOptions { + /** Event bus for inter-module communication */ + eventBus: EventBus; + /** When the server started */ + serverStartedAt: Date; + /** Number of managed processes */ + processCount: number; +} + +/** + * Creates a tRPC request handler for node:http. + * + * Converts IncomingMessage/ServerResponse to fetch Request/Response + * and delegates to the tRPC fetch adapter. + * + * @param options - Adapter options with context values + * @returns Request handler function + */ +export function createTrpcHandler(options: TrpcAdapterOptions) { + return async (req: IncomingMessage, res: ServerResponse): Promise => { + // Build full URL from request + const protocol = 'http'; + const host = req.headers.host ?? 'localhost'; + const url = new URL(req.url ?? '/', `${protocol}://${host}`); + + // Read request body if present + let body: string | undefined; + if (req.method !== 'GET' && req.method !== 'HEAD') { + body = await new Promise((resolve) => { + let data = ''; + req.on('data', (chunk: Buffer) => { + data += chunk.toString(); + }); + req.on('end', () => { + resolve(data); + }); + }); + } + + // Convert headers to fetch Headers + const headers = new Headers(); + for (const [key, value] of Object.entries(req.headers)) { + if (value) { + if (Array.isArray(value)) { + value.forEach((v) => headers.append(key, v)); + } else { + headers.set(key, value); + } + } + } + + // Create fetch Request + const fetchRequest = new Request(url.toString(), { + method: req.method, + headers, + body: body ?? undefined, + }); + + // Handle with tRPC fetch adapter + const fetchResponse = await fetchRequestHandler({ + endpoint: '/trpc', + req: fetchRequest, + router: appRouter, + createContext: () => + createContext({ + eventBus: options.eventBus, + serverStartedAt: options.serverStartedAt, + processCount: options.processCount, + }), + }); + + // Send response + res.statusCode = fetchResponse.status; + + // Set response headers + fetchResponse.headers.forEach((value, key) => { + res.setHeader(key, value); + }); + + // Send body + const responseBody = await fetchResponse.text(); + res.end(responseBody); + }; +}