Files
Codewalkers/apps/server/server/trpc-adapter.ts
Lukas May 5e77bf104c feat: Add remote sync for project clones
Fetch remote changes before agents start working so they build on
up-to-date code. Adds ProjectSyncManager with git fetch + ff-only
merge of defaultBranch, integrated into phase dispatch to sync
before branch creation.

- Schema: lastFetchedAt column on projects table (migration 0029)
- Events: project:synced, project:sync_failed
- Phase dispatch: sync all linked projects before creating branches
- tRPC: syncProject, syncAllProjects, getProjectSyncStatus
- CLI: cw project sync [name] --all, cw project status [name]
- Frontend: sync button + ahead/behind badge on projects settings
2026-03-05 11:45:09 +01:00

201 lines
8.2 KiB
TypeScript

/**
* tRPC HTTP Adapter
*
* Handles tRPC requests over HTTP using node:http.
* Routes /trpc/* requests to the tRPC router.
*/
import type { IncomingMessage, ServerResponse } from 'node:http';
import { fetchRequestHandler } from '@trpc/server/adapters/fetch';
import { appRouter, createContext } from '../trpc/index.js';
import type { EventBus } from '../events/index.js';
import type { AgentManager } from '../agent/types.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { MessageRepository } from '../db/repositories/message-repository.js';
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { PageRepository } from '../db/repositories/page-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { AccountRepository } from '../db/repositories/account-repository.js';
import type { ChangeSetRepository } from '../db/repositories/change-set-repository.js';
import type { LogChunkRepository } from '../db/repositories/log-chunk-repository.js';
import type { ConversationRepository } from '../db/repositories/conversation-repository.js';
import type { ChatSessionRepository } from '../db/repositories/chat-session-repository.js';
import type { ReviewCommentRepository } from '../db/repositories/review-comment-repository.js';
import type { AccountCredentialManager } from '../agent/credentials/types.js';
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
import type { CoordinationManager } from '../coordination/types.js';
import type { BranchManager } from '../git/branch-manager.js';
import type { ExecutionOrchestrator } from '../execution/orchestrator.js';
import type { PreviewManager } from '../preview/index.js';
import type { ProjectSyncManager } from '../git/remote-sync.js';
/**
* Options for creating the tRPC request handler.
*/
export interface TrpcAdapterOptions {
/** Event bus for inter-module communication */
eventBus: EventBus;
/** When the server started */
serverStartedAt: Date;
/** Number of managed processes */
processCount: number;
/** Agent manager for agent lifecycle operations (optional until full wiring) */
agentManager?: AgentManager;
/** Task repository for task CRUD operations */
taskRepository?: TaskRepository;
/** Message repository for agent-user communication */
messageRepository?: MessageRepository;
/** Initiative repository for initiative CRUD operations */
initiativeRepository?: InitiativeRepository;
/** Phase repository for phase CRUD operations */
phaseRepository?: PhaseRepository;
/** Dispatch manager for task queue operations */
dispatchManager?: DispatchManager;
/** Coordination manager for merge queue operations */
coordinationManager?: CoordinationManager;
/** Phase dispatch manager for phase queue operations */
phaseDispatchManager?: PhaseDispatchManager;
/** Page repository for page CRUD operations */
pageRepository?: PageRepository;
/** Project repository for project CRUD and initiative-project junction operations */
projectRepository?: ProjectRepository;
/** Account repository for account CRUD and load balancing */
accountRepository?: AccountRepository;
/** Change set repository for agent change set operations */
changeSetRepository?: ChangeSetRepository;
/** Log chunk repository for agent output persistence */
logChunkRepository?: LogChunkRepository;
/** Credential manager for account OAuth token management */
credentialManager?: AccountCredentialManager;
/** Branch manager for git branch operations */
branchManager?: BranchManager;
/** Execution orchestrator for phase merge/review workflow */
executionOrchestrator?: ExecutionOrchestrator;
/** Preview manager for Docker-based preview deployments */
previewManager?: PreviewManager;
/** Conversation repository for inter-agent communication */
conversationRepository?: ConversationRepository;
/** Chat session repository for iterative phase/task chat */
chatSessionRepository?: ChatSessionRepository;
/** Review comment repository for inline review comments on phase diffs */
reviewCommentRepository?: ReviewCommentRepository;
/** Project sync manager for remote fetch/sync operations */
projectSyncManager?: ProjectSyncManager;
/** Absolute path to the workspace root (.cwrc directory) */
workspaceRoot?: string;
}
/**
* Creates a tRPC request handler for node:http.
*
* Converts IncomingMessage/ServerResponse to fetch Request/Response
* and delegates to the tRPC fetch adapter.
*
* @param options - Adapter options with context values
* @returns Request handler function
*/
export function createTrpcHandler(options: TrpcAdapterOptions) {
return async (req: IncomingMessage, res: ServerResponse): Promise<void> => {
// Build full URL from request
const protocol = 'http';
const host = req.headers.host ?? 'localhost';
const url = new URL(req.url ?? '/', `${protocol}://${host}`);
// Read request body if present
let body: string | undefined;
if (req.method !== 'GET' && req.method !== 'HEAD') {
body = await new Promise<string>((resolve) => {
let data = '';
req.on('data', (chunk: Buffer) => {
data += chunk.toString();
});
req.on('end', () => {
resolve(data);
});
});
}
// Convert headers to fetch Headers
const headers = new Headers();
for (const [key, value] of Object.entries(req.headers)) {
if (value) {
if (Array.isArray(value)) {
value.forEach((v) => headers.append(key, v));
} else {
headers.set(key, value);
}
}
}
// Create fetch Request
const fetchRequest = new Request(url.toString(), {
method: req.method,
headers,
body: body ?? undefined,
});
// Handle with tRPC fetch adapter
const fetchResponse = await fetchRequestHandler({
endpoint: '/trpc',
req: fetchRequest,
router: appRouter,
createContext: () =>
createContext({
eventBus: options.eventBus,
serverStartedAt: options.serverStartedAt,
processCount: options.processCount,
agentManager: options.agentManager,
taskRepository: options.taskRepository,
messageRepository: options.messageRepository,
initiativeRepository: options.initiativeRepository,
phaseRepository: options.phaseRepository,
dispatchManager: options.dispatchManager,
coordinationManager: options.coordinationManager,
phaseDispatchManager: options.phaseDispatchManager,
pageRepository: options.pageRepository,
projectRepository: options.projectRepository,
accountRepository: options.accountRepository,
changeSetRepository: options.changeSetRepository,
logChunkRepository: options.logChunkRepository,
credentialManager: options.credentialManager,
branchManager: options.branchManager,
executionOrchestrator: options.executionOrchestrator,
previewManager: options.previewManager,
conversationRepository: options.conversationRepository,
chatSessionRepository: options.chatSessionRepository,
reviewCommentRepository: options.reviewCommentRepository,
projectSyncManager: options.projectSyncManager,
workspaceRoot: options.workspaceRoot,
}),
});
// Send response
res.statusCode = fetchResponse.status;
// Set response headers BEFORE streaming body
fetchResponse.headers.forEach((value, key) => {
res.setHeader(key, value);
});
// Stream body if it's a ReadableStream (SSE subscriptions), otherwise buffer
if (fetchResponse.body) {
const reader = fetchResponse.body.getReader();
const pump = async () => {
while (true) {
const { done, value } = await reader.read();
if (done) {
res.end();
return;
}
res.write(value);
}
};
pump().catch(() => res.end());
} else {
const responseBody = await fetchResponse.text();
res.end(responseBody);
}
};
}