Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | 7x 7x 7x 7x 7x 7x 7x 49x 49x 49x 7x 7x 7x 7x 7x 14x 7x 7x 7x 7x 14x 14x 7x 7x 7x 7x | /**
* tRPC HTTP Adapter
*
* Handles tRPC requests over HTTP using node:http.
* Routes /trpc/* requests to the tRPC router.
*/
import type { IncomingMessage, ServerResponse } from 'node:http';
import { fetchRequestHandler } from '@trpc/server/adapters/fetch';
import { appRouter, createContext } from '../trpc/index.js';
import type { EventBus } from '../events/index.js';
import type { AgentManager } from '../agent/types.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { MessageRepository } from '../db/repositories/message-repository.js';
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { PageRepository } from '../db/repositories/page-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { AccountRepository } from '../db/repositories/account-repository.js';
import type { ChangeSetRepository } from '../db/repositories/change-set-repository.js';
import type { LogChunkRepository } from '../db/repositories/log-chunk-repository.js';
import type { ConversationRepository } from '../db/repositories/conversation-repository.js';
import type { AccountCredentialManager } from '../agent/credentials/types.js';
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
import type { CoordinationManager } from '../coordination/types.js';
import type { BranchManager } from '../git/branch-manager.js';
import type { ExecutionOrchestrator } from '../execution/orchestrator.js';
import type { PreviewManager } from '../preview/index.js';
/**
* Options for creating the tRPC request handler.
*/
export interface TrpcAdapterOptions {
/** Event bus for inter-module communication */
eventBus: EventBus;
/** When the server started */
serverStartedAt: Date;
/** Number of managed processes */
processCount: number;
/** Agent manager for agent lifecycle operations (optional until full wiring) */
agentManager?: AgentManager;
/** Task repository for task CRUD operations */
taskRepository?: TaskRepository;
/** Message repository for agent-user communication */
messageRepository?: MessageRepository;
/** Initiative repository for initiative CRUD operations */
initiativeRepository?: InitiativeRepository;
/** Phase repository for phase CRUD operations */
phaseRepository?: PhaseRepository;
/** Dispatch manager for task queue operations */
dispatchManager?: DispatchManager;
/** Coordination manager for merge queue operations */
coordinationManager?: CoordinationManager;
/** Phase dispatch manager for phase queue operations */
phaseDispatchManager?: PhaseDispatchManager;
/** Page repository for page CRUD operations */
pageRepository?: PageRepository;
/** Project repository for project CRUD and initiative-project junction operations */
projectRepository?: ProjectRepository;
/** Account repository for account CRUD and load balancing */
accountRepository?: AccountRepository;
/** Change set repository for agent change set operations */
changeSetRepository?: ChangeSetRepository;
/** Log chunk repository for agent output persistence */
logChunkRepository?: LogChunkRepository;
/** Credential manager for account OAuth token management */
credentialManager?: AccountCredentialManager;
/** Branch manager for git branch operations */
branchManager?: BranchManager;
/** Execution orchestrator for phase merge/review workflow */
executionOrchestrator?: ExecutionOrchestrator;
/** Preview manager for Docker-based preview deployments */
previewManager?: PreviewManager;
/** Conversation repository for inter-agent communication */
conversationRepository?: ConversationRepository;
/** Absolute path to the workspace root (.cwrc directory) */
workspaceRoot?: string;
}
/**
* Creates a tRPC request handler for node:http.
*
* Converts IncomingMessage/ServerResponse to fetch Request/Response
* and delegates to the tRPC fetch adapter.
*
* @param options - Adapter options with context values
* @returns Request handler function
*/
export function createTrpcHandler(options: TrpcAdapterOptions) {
return async (req: IncomingMessage, res: ServerResponse): Promise<void> => {
// Build full URL from request
const protocol = 'http';
const host = req.headers.host ?? 'localhost';
const url = new URL(req.url ?? '/', `${protocol}://${host}`);
// Read request body if present
let body: string | undefined;
Iif (req.method !== 'GET' && req.method !== 'HEAD') {
body = await new Promise<string>((resolve) => {
let data = '';
req.on('data', (chunk: Buffer) => {
data += chunk.toString();
});
req.on('end', () => {
resolve(data);
});
});
}
// Convert headers to fetch Headers
const headers = new Headers();
for (const [key, value] of Object.entries(req.headers)) {
Eif (value) {
Iif (Array.isArray(value)) {
value.forEach((v) => headers.append(key, v));
} else {
headers.set(key, value);
}
}
}
// Create fetch Request
const fetchRequest = new Request(url.toString(), {
method: req.method,
headers,
body: body ?? undefined,
});
// Handle with tRPC fetch adapter
const fetchResponse = await fetchRequestHandler({
endpoint: '/trpc',
req: fetchRequest,
router: appRouter,
createContext: () =>
createContext({
eventBus: options.eventBus,
serverStartedAt: options.serverStartedAt,
processCount: options.processCount,
agentManager: options.agentManager,
taskRepository: options.taskRepository,
messageRepository: options.messageRepository,
initiativeRepository: options.initiativeRepository,
phaseRepository: options.phaseRepository,
dispatchManager: options.dispatchManager,
coordinationManager: options.coordinationManager,
phaseDispatchManager: options.phaseDispatchManager,
pageRepository: options.pageRepository,
projectRepository: options.projectRepository,
accountRepository: options.accountRepository,
changeSetRepository: options.changeSetRepository,
logChunkRepository: options.logChunkRepository,
credentialManager: options.credentialManager,
branchManager: options.branchManager,
executionOrchestrator: options.executionOrchestrator,
previewManager: options.previewManager,
conversationRepository: options.conversationRepository,
workspaceRoot: options.workspaceRoot,
}),
});
// Send response
res.statusCode = fetchResponse.status;
// Set response headers BEFORE streaming body
fetchResponse.headers.forEach((value, key) => {
res.setHeader(key, value);
});
// Stream body if it's a ReadableStream (SSE subscriptions), otherwise buffer
if (fetchResponse.body) {
const reader = fetchResponse.body.getReader();
const pump = async () => {
while (true) {
const { done, value } = await reader.read();
if (done) {
res.end();
return;
}
res.write(value);
}
};
pump().catch(() => res.end());
} else E{
const responseBody = await fetchResponse.text();
res.end(responseBody);
}
};
}
|