All files / src/server trpc-adapter.ts

70.27% Statements 26/37
61.11% Branches 11/18
50% Functions 5/10
74.28% Lines 26/35

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189                                                                                                                                                                                  7x   7x 7x 7x       7x                         7x 7x 49x 49x     49x           7x             7x       7x                                                       7x     7x 14x       7x 7x 7x 7x 14x 14x 7x 7x   7x     7x              
/**
 * tRPC HTTP Adapter
 *
 * Handles tRPC requests over HTTP using node:http.
 * Routes /trpc/* requests to the tRPC router.
 */
 
import type { IncomingMessage, ServerResponse } from 'node:http';
import { fetchRequestHandler } from '@trpc/server/adapters/fetch';
import { appRouter, createContext } from '../trpc/index.js';
import type { EventBus } from '../events/index.js';
import type { AgentManager } from '../agent/types.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { MessageRepository } from '../db/repositories/message-repository.js';
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { PageRepository } from '../db/repositories/page-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { AccountRepository } from '../db/repositories/account-repository.js';
import type { ChangeSetRepository } from '../db/repositories/change-set-repository.js';
import type { LogChunkRepository } from '../db/repositories/log-chunk-repository.js';
import type { ConversationRepository } from '../db/repositories/conversation-repository.js';
import type { AccountCredentialManager } from '../agent/credentials/types.js';
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
import type { CoordinationManager } from '../coordination/types.js';
import type { BranchManager } from '../git/branch-manager.js';
import type { ExecutionOrchestrator } from '../execution/orchestrator.js';
import type { PreviewManager } from '../preview/index.js';
 
/**
 * Options for creating the tRPC request handler.
 */
export interface TrpcAdapterOptions {
  /** Event bus for inter-module communication */
  eventBus: EventBus;
  /** When the server started */
  serverStartedAt: Date;
  /** Number of managed processes */
  processCount: number;
  /** Agent manager for agent lifecycle operations (optional until full wiring) */
  agentManager?: AgentManager;
  /** Task repository for task CRUD operations */
  taskRepository?: TaskRepository;
  /** Message repository for agent-user communication */
  messageRepository?: MessageRepository;
  /** Initiative repository for initiative CRUD operations */
  initiativeRepository?: InitiativeRepository;
  /** Phase repository for phase CRUD operations */
  phaseRepository?: PhaseRepository;
  /** Dispatch manager for task queue operations */
  dispatchManager?: DispatchManager;
  /** Coordination manager for merge queue operations */
  coordinationManager?: CoordinationManager;
  /** Phase dispatch manager for phase queue operations */
  phaseDispatchManager?: PhaseDispatchManager;
  /** Page repository for page CRUD operations */
  pageRepository?: PageRepository;
  /** Project repository for project CRUD and initiative-project junction operations */
  projectRepository?: ProjectRepository;
  /** Account repository for account CRUD and load balancing */
  accountRepository?: AccountRepository;
  /** Change set repository for agent change set operations */
  changeSetRepository?: ChangeSetRepository;
  /** Log chunk repository for agent output persistence */
  logChunkRepository?: LogChunkRepository;
  /** Credential manager for account OAuth token management */
  credentialManager?: AccountCredentialManager;
  /** Branch manager for git branch operations */
  branchManager?: BranchManager;
  /** Execution orchestrator for phase merge/review workflow */
  executionOrchestrator?: ExecutionOrchestrator;
  /** Preview manager for Docker-based preview deployments */
  previewManager?: PreviewManager;
  /** Conversation repository for inter-agent communication */
  conversationRepository?: ConversationRepository;
  /** Absolute path to the workspace root (.cwrc directory) */
  workspaceRoot?: string;
}
 
/**
 * Creates a tRPC request handler for node:http.
 *
 * Converts IncomingMessage/ServerResponse to fetch Request/Response
 * and delegates to the tRPC fetch adapter.
 *
 * @param options - Adapter options with context values
 * @returns Request handler function
 */
export function createTrpcHandler(options: TrpcAdapterOptions) {
  return async (req: IncomingMessage, res: ServerResponse): Promise<void> => {
    // Build full URL from request
    const protocol = 'http';
    const host = req.headers.host ?? 'localhost';
    const url = new URL(req.url ?? '/', `${protocol}://${host}`);
 
    // Read request body if present
    let body: string | undefined;
    Iif (req.method !== 'GET' && req.method !== 'HEAD') {
      body = await new Promise<string>((resolve) => {
        let data = '';
        req.on('data', (chunk: Buffer) => {
          data += chunk.toString();
        });
        req.on('end', () => {
          resolve(data);
        });
      });
    }
 
    // Convert headers to fetch Headers
    const headers = new Headers();
    for (const [key, value] of Object.entries(req.headers)) {
      Eif (value) {
        Iif (Array.isArray(value)) {
          value.forEach((v) => headers.append(key, v));
        } else {
          headers.set(key, value);
        }
      }
    }
 
    // Create fetch Request
    const fetchRequest = new Request(url.toString(), {
      method: req.method,
      headers,
      body: body ?? undefined,
    });
 
    // Handle with tRPC fetch adapter
    const fetchResponse = await fetchRequestHandler({
      endpoint: '/trpc',
      req: fetchRequest,
      router: appRouter,
      createContext: () =>
        createContext({
          eventBus: options.eventBus,
          serverStartedAt: options.serverStartedAt,
          processCount: options.processCount,
          agentManager: options.agentManager,
          taskRepository: options.taskRepository,
          messageRepository: options.messageRepository,
          initiativeRepository: options.initiativeRepository,
          phaseRepository: options.phaseRepository,
          dispatchManager: options.dispatchManager,
          coordinationManager: options.coordinationManager,
          phaseDispatchManager: options.phaseDispatchManager,
          pageRepository: options.pageRepository,
          projectRepository: options.projectRepository,
          accountRepository: options.accountRepository,
          changeSetRepository: options.changeSetRepository,
          logChunkRepository: options.logChunkRepository,
          credentialManager: options.credentialManager,
          branchManager: options.branchManager,
          executionOrchestrator: options.executionOrchestrator,
          previewManager: options.previewManager,
          conversationRepository: options.conversationRepository,
          workspaceRoot: options.workspaceRoot,
        }),
    });
 
    // Send response
    res.statusCode = fetchResponse.status;
 
    // Set response headers BEFORE streaming body
    fetchResponse.headers.forEach((value, key) => {
      res.setHeader(key, value);
    });
 
    // Stream body if it's a ReadableStream (SSE subscriptions), otherwise buffer
    if (fetchResponse.body) {
      const reader = fetchResponse.body.getReader();
      const pump = async () => {
        while (true) {
          const { done, value } = await reader.read();
          if (done) {
            res.end();
            return;
          }
          res.write(value);
        }
      };
      pump().catch(() => res.end());
    } else E{
      const responseBody = await fetchResponse.text();
      res.end(responseBody);
    }
  };
}