The createContext call manually enumerates every field rather than spreading, so the new repo was silently dropped even though it existed on the options object.
193 lines
7.6 KiB
TypeScript
193 lines
7.6 KiB
TypeScript
/**
|
|
* tRPC HTTP Adapter
|
|
*
|
|
* Handles tRPC requests over HTTP using node:http.
|
|
* Routes /trpc/* requests to the tRPC router.
|
|
*/
|
|
|
|
import type { IncomingMessage, ServerResponse } from 'node:http';
|
|
import { fetchRequestHandler } from '@trpc/server/adapters/fetch';
|
|
import { appRouter, createContext } from '../trpc/index.js';
|
|
import type { EventBus } from '../events/index.js';
|
|
import type { AgentManager } from '../agent/types.js';
|
|
import type { TaskRepository } from '../db/repositories/task-repository.js';
|
|
import type { MessageRepository } from '../db/repositories/message-repository.js';
|
|
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
|
|
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
|
|
import type { PageRepository } from '../db/repositories/page-repository.js';
|
|
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
|
import type { AccountRepository } from '../db/repositories/account-repository.js';
|
|
import type { ChangeSetRepository } from '../db/repositories/change-set-repository.js';
|
|
import type { LogChunkRepository } from '../db/repositories/log-chunk-repository.js';
|
|
import type { ConversationRepository } from '../db/repositories/conversation-repository.js';
|
|
import type { ChatSessionRepository } from '../db/repositories/chat-session-repository.js';
|
|
import type { AccountCredentialManager } from '../agent/credentials/types.js';
|
|
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
|
|
import type { CoordinationManager } from '../coordination/types.js';
|
|
import type { BranchManager } from '../git/branch-manager.js';
|
|
import type { ExecutionOrchestrator } from '../execution/orchestrator.js';
|
|
import type { PreviewManager } from '../preview/index.js';
|
|
|
|
/**
|
|
* Options for creating the tRPC request handler.
|
|
*/
|
|
export interface TrpcAdapterOptions {
|
|
/** Event bus for inter-module communication */
|
|
eventBus: EventBus;
|
|
/** When the server started */
|
|
serverStartedAt: Date;
|
|
/** Number of managed processes */
|
|
processCount: number;
|
|
/** Agent manager for agent lifecycle operations (optional until full wiring) */
|
|
agentManager?: AgentManager;
|
|
/** Task repository for task CRUD operations */
|
|
taskRepository?: TaskRepository;
|
|
/** Message repository for agent-user communication */
|
|
messageRepository?: MessageRepository;
|
|
/** Initiative repository for initiative CRUD operations */
|
|
initiativeRepository?: InitiativeRepository;
|
|
/** Phase repository for phase CRUD operations */
|
|
phaseRepository?: PhaseRepository;
|
|
/** Dispatch manager for task queue operations */
|
|
dispatchManager?: DispatchManager;
|
|
/** Coordination manager for merge queue operations */
|
|
coordinationManager?: CoordinationManager;
|
|
/** Phase dispatch manager for phase queue operations */
|
|
phaseDispatchManager?: PhaseDispatchManager;
|
|
/** Page repository for page CRUD operations */
|
|
pageRepository?: PageRepository;
|
|
/** Project repository for project CRUD and initiative-project junction operations */
|
|
projectRepository?: ProjectRepository;
|
|
/** Account repository for account CRUD and load balancing */
|
|
accountRepository?: AccountRepository;
|
|
/** Change set repository for agent change set operations */
|
|
changeSetRepository?: ChangeSetRepository;
|
|
/** Log chunk repository for agent output persistence */
|
|
logChunkRepository?: LogChunkRepository;
|
|
/** Credential manager for account OAuth token management */
|
|
credentialManager?: AccountCredentialManager;
|
|
/** Branch manager for git branch operations */
|
|
branchManager?: BranchManager;
|
|
/** Execution orchestrator for phase merge/review workflow */
|
|
executionOrchestrator?: ExecutionOrchestrator;
|
|
/** Preview manager for Docker-based preview deployments */
|
|
previewManager?: PreviewManager;
|
|
/** Conversation repository for inter-agent communication */
|
|
conversationRepository?: ConversationRepository;
|
|
/** Chat session repository for iterative phase/task chat */
|
|
chatSessionRepository?: ChatSessionRepository;
|
|
/** Absolute path to the workspace root (.cwrc directory) */
|
|
workspaceRoot?: string;
|
|
}
|
|
|
|
/**
|
|
* Creates a tRPC request handler for node:http.
|
|
*
|
|
* Converts IncomingMessage/ServerResponse to fetch Request/Response
|
|
* and delegates to the tRPC fetch adapter.
|
|
*
|
|
* @param options - Adapter options with context values
|
|
* @returns Request handler function
|
|
*/
|
|
export function createTrpcHandler(options: TrpcAdapterOptions) {
|
|
return async (req: IncomingMessage, res: ServerResponse): Promise<void> => {
|
|
// Build full URL from request
|
|
const protocol = 'http';
|
|
const host = req.headers.host ?? 'localhost';
|
|
const url = new URL(req.url ?? '/', `${protocol}://${host}`);
|
|
|
|
// Read request body if present
|
|
let body: string | undefined;
|
|
if (req.method !== 'GET' && req.method !== 'HEAD') {
|
|
body = await new Promise<string>((resolve) => {
|
|
let data = '';
|
|
req.on('data', (chunk: Buffer) => {
|
|
data += chunk.toString();
|
|
});
|
|
req.on('end', () => {
|
|
resolve(data);
|
|
});
|
|
});
|
|
}
|
|
|
|
// Convert headers to fetch Headers
|
|
const headers = new Headers();
|
|
for (const [key, value] of Object.entries(req.headers)) {
|
|
if (value) {
|
|
if (Array.isArray(value)) {
|
|
value.forEach((v) => headers.append(key, v));
|
|
} else {
|
|
headers.set(key, value);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create fetch Request
|
|
const fetchRequest = new Request(url.toString(), {
|
|
method: req.method,
|
|
headers,
|
|
body: body ?? undefined,
|
|
});
|
|
|
|
// Handle with tRPC fetch adapter
|
|
const fetchResponse = await fetchRequestHandler({
|
|
endpoint: '/trpc',
|
|
req: fetchRequest,
|
|
router: appRouter,
|
|
createContext: () =>
|
|
createContext({
|
|
eventBus: options.eventBus,
|
|
serverStartedAt: options.serverStartedAt,
|
|
processCount: options.processCount,
|
|
agentManager: options.agentManager,
|
|
taskRepository: options.taskRepository,
|
|
messageRepository: options.messageRepository,
|
|
initiativeRepository: options.initiativeRepository,
|
|
phaseRepository: options.phaseRepository,
|
|
dispatchManager: options.dispatchManager,
|
|
coordinationManager: options.coordinationManager,
|
|
phaseDispatchManager: options.phaseDispatchManager,
|
|
pageRepository: options.pageRepository,
|
|
projectRepository: options.projectRepository,
|
|
accountRepository: options.accountRepository,
|
|
changeSetRepository: options.changeSetRepository,
|
|
logChunkRepository: options.logChunkRepository,
|
|
credentialManager: options.credentialManager,
|
|
branchManager: options.branchManager,
|
|
executionOrchestrator: options.executionOrchestrator,
|
|
previewManager: options.previewManager,
|
|
conversationRepository: options.conversationRepository,
|
|
chatSessionRepository: options.chatSessionRepository,
|
|
workspaceRoot: options.workspaceRoot,
|
|
}),
|
|
});
|
|
|
|
// Send response
|
|
res.statusCode = fetchResponse.status;
|
|
|
|
// Set response headers BEFORE streaming body
|
|
fetchResponse.headers.forEach((value, key) => {
|
|
res.setHeader(key, value);
|
|
});
|
|
|
|
// Stream body if it's a ReadableStream (SSE subscriptions), otherwise buffer
|
|
if (fetchResponse.body) {
|
|
const reader = fetchResponse.body.getReader();
|
|
const pump = async () => {
|
|
while (true) {
|
|
const { done, value } = await reader.read();
|
|
if (done) {
|
|
res.end();
|
|
return;
|
|
}
|
|
res.write(value);
|
|
}
|
|
};
|
|
pump().catch(() => res.end());
|
|
} else {
|
|
const responseBody = await fetchResponse.text();
|
|
res.end(responseBody);
|
|
}
|
|
};
|
|
}
|