Files
Codewalkers/src/trpc/router.ts
Lukas May 2c41e52029 fix(10-02): update downstream code for batched answers API
- Update resumeAgentInputSchema: prompt → answers (Record<string, string>)
- Update tRPC router to pass answers map
- Update CLI to accept JSON or single answer (fallback to q1 key)
- Update E2E tests for new resume signature
2026-01-31 18:03:00 +01:00

643 lines
19 KiB
TypeScript

/**
* tRPC Router
*
* Type-safe RPC router for CLI-server communication.
* Uses Zod for runtime validation of procedure inputs/outputs.
*/
import { initTRPC, TRPCError } from '@trpc/server';
import { z } from 'zod';
import type { TRPCContext } from './context.js';
import type { AgentInfo, AgentResult } from '../agent/types.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { MessageRepository } from '../db/repositories/message-repository.js';
import type { DispatchManager } from '../dispatch/types.js';
import type { CoordinationManager } from '../coordination/types.js';
/**
* Initialize tRPC with our context type.
* This creates the tRPC instance that all procedures will use.
*/
const t = initTRPC.context<TRPCContext>().create();
/**
* Base router - used to create the app router.
*/
export const router = t.router;
/**
* Public procedure - no authentication required.
* All current procedures are public since this is a local-only server.
*/
export const publicProcedure = t.procedure;
/**
* Middleware builder for custom middleware.
*/
export const middleware = t.middleware;
/**
* Create caller factory for testing.
* Allows calling procedures directly without HTTP transport.
*/
export const createCallerFactory = t.createCallerFactory;
// =============================================================================
// Zod Schemas for procedure outputs
// =============================================================================
/**
* Schema for health check response.
*/
export const healthResponseSchema = z.object({
status: z.literal('ok'),
uptime: z.number().int().nonnegative(),
processCount: z.number().int().nonnegative(),
});
export type HealthResponse = z.infer<typeof healthResponseSchema>;
/**
* Schema for process info in status response.
*/
export const processInfoSchema = z.object({
id: z.string(),
pid: z.number().int().positive(),
command: z.string(),
status: z.string(),
startedAt: z.string(),
});
export type ProcessInfo = z.infer<typeof processInfoSchema>;
/**
* Schema for status response.
*/
export const statusResponseSchema = z.object({
server: z.object({
startedAt: z.string(),
uptime: z.number().int().nonnegative(),
pid: z.number().int().positive(),
}),
processes: z.array(processInfoSchema),
});
export type StatusResponse = z.infer<typeof statusResponseSchema>;
// =============================================================================
// Agent Input Schemas
// =============================================================================
/**
* Schema for spawning a new agent.
*/
export const spawnAgentInputSchema = z.object({
/** Human-readable name for the agent (required, must be unique) */
name: z.string().min(1),
/** Task ID to assign to agent */
taskId: z.string().min(1),
/** Initial prompt/instruction for the agent */
prompt: z.string().min(1),
/** Optional working directory (defaults to worktree path) */
cwd: z.string().optional(),
});
export type SpawnAgentInput = z.infer<typeof spawnAgentInputSchema>;
/**
* Schema for identifying an agent by name or ID.
*/
export const agentIdentifierSchema = z.object({
/** Lookup by human-readable name (preferred) */
name: z.string().optional(),
/** Or lookup by ID */
id: z.string().optional(),
}).refine(data => data.name || data.id, {
message: 'Either name or id must be provided',
});
export type AgentIdentifier = z.infer<typeof agentIdentifierSchema>;
/**
* Schema for resuming an agent with batched answers.
*/
export const resumeAgentInputSchema = z.object({
/** Lookup by human-readable name (preferred) */
name: z.string().optional(),
/** Or lookup by ID */
id: z.string().optional(),
/** Map of question ID to user's answer */
answers: z.record(z.string(), z.string()),
}).refine(data => data.name || data.id, {
message: 'Either name or id must be provided',
});
export type ResumeAgentInput = z.infer<typeof resumeAgentInputSchema>;
// =============================================================================
// Helper Functions
// =============================================================================
/**
* Helper to resolve agent by name or id.
* Throws TRPCError if agent not found.
*/
async function resolveAgent(
ctx: TRPCContext,
input: { name?: string; id?: string }
): Promise<AgentInfo> {
if (!ctx.agentManager) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Agent manager not available',
});
}
const agent = input.name
? await ctx.agentManager.getByName(input.name)
: await ctx.agentManager.get(input.id!);
if (!agent) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Agent '${input.name ?? input.id}' not found`,
});
}
return agent;
}
/**
* Helper to ensure agentManager is available in context.
*/
function requireAgentManager(ctx: TRPCContext) {
if (!ctx.agentManager) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Agent manager not available',
});
}
return ctx.agentManager;
}
/**
* Helper to ensure taskRepository is available in context.
*/
function requireTaskRepository(ctx: TRPCContext): TaskRepository {
if (!ctx.taskRepository) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Task repository not available',
});
}
return ctx.taskRepository;
}
/**
* Helper to ensure messageRepository is available in context.
*/
function requireMessageRepository(ctx: TRPCContext): MessageRepository {
if (!ctx.messageRepository) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Message repository not available',
});
}
return ctx.messageRepository;
}
/**
* Helper to ensure dispatchManager is available in context.
*/
function requireDispatchManager(ctx: TRPCContext): DispatchManager {
if (!ctx.dispatchManager) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Dispatch manager not available',
});
}
return ctx.dispatchManager;
}
/**
* Helper to ensure coordinationManager is available in context.
*/
function requireCoordinationManager(ctx: TRPCContext): CoordinationManager {
if (!ctx.coordinationManager) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Coordination manager not available',
});
}
return ctx.coordinationManager;
}
// =============================================================================
// Application Router with Procedures
// =============================================================================
/**
* Application router with all procedures.
*
* Procedures:
* - health: Quick health check with uptime and process count
* - status: Full server status with process list
* - spawnAgent: Spawn a new agent to work on a task
* - stopAgent: Stop a running agent
* - listAgents: List all agents with status
* - getAgent: Get agent details by name or ID
* - getAgentByName: Get agent by name
* - resumeAgent: Resume an agent waiting for input
* - getAgentResult: Get result of agent's work
* - listTasks: List tasks for a plan
* - getTask: Get task by ID
* - updateTaskStatus: Update task status
* - listMessages: List messages with optional filtering
* - getMessage: Get message by ID
* - respondToMessage: Respond to a message
* - queueTask: Queue a task for dispatch
* - dispatchNext: Dispatch next available task
* - getQueueState: Get dispatch queue state
* - completeTask: Mark a task as complete
* - queueMerge: Queue a completed task for merge
* - processMerges: Process all ready merges in dependency order
* - getMergeQueueStatus: Get merge queue status
* - getNextMergeable: Get next task ready to merge
*/
export const appRouter = router({
/**
* Health check procedure.
*
* Returns a lightweight response suitable for health monitoring.
* Calculates uptime from serverStartedAt in context.
*/
health: publicProcedure
.output(healthResponseSchema)
.query(({ ctx }): HealthResponse => {
const uptime = ctx.serverStartedAt
? Math.floor((Date.now() - ctx.serverStartedAt.getTime()) / 1000)
: 0;
return {
status: 'ok',
uptime,
processCount: ctx.processCount,
};
}),
/**
* Full status procedure.
*
* Returns detailed server state including process list.
* More comprehensive than health for admin/debugging purposes.
*/
status: publicProcedure
.output(statusResponseSchema)
.query(({ ctx }): StatusResponse => {
const uptime = ctx.serverStartedAt
? Math.floor((Date.now() - ctx.serverStartedAt.getTime()) / 1000)
: 0;
return {
server: {
startedAt: ctx.serverStartedAt?.toISOString() ?? '',
uptime,
pid: process.pid,
},
// Process list will be populated when ProcessManager integration is complete
processes: [],
};
}),
// ===========================================================================
// Agent Procedures
// ===========================================================================
/**
* Spawn a new agent to work on a task.
* Creates isolated worktree, starts Claude CLI, persists state.
*/
spawnAgent: publicProcedure
.input(spawnAgentInputSchema)
.mutation(async ({ ctx, input }) => {
const agentManager = requireAgentManager(ctx);
return agentManager.spawn({
name: input.name,
taskId: input.taskId,
prompt: input.prompt,
cwd: input.cwd,
});
}),
/**
* Stop a running agent by name or ID.
*/
stopAgent: publicProcedure
.input(agentIdentifierSchema)
.mutation(async ({ ctx, input }) => {
const agentManager = requireAgentManager(ctx);
const agent = await resolveAgent(ctx, input);
await agentManager.stop(agent.id);
return { success: true, name: agent.name };
}),
/**
* List all agents with their current status.
*/
listAgents: publicProcedure
.query(async ({ ctx }) => {
const agentManager = requireAgentManager(ctx);
return agentManager.list();
}),
/**
* Get agent details by name or ID.
*/
getAgent: publicProcedure
.input(agentIdentifierSchema)
.query(async ({ ctx, input }) => {
return resolveAgent(ctx, input);
}),
/**
* Get agent by name (convenience method).
*/
getAgentByName: publicProcedure
.input(z.object({ name: z.string().min(1) }))
.query(async ({ ctx, input }) => {
const agentManager = requireAgentManager(ctx);
return agentManager.getByName(input.name);
}),
/**
* Resume an agent that is waiting for input.
* Uses stored session ID to continue with full context.
*/
resumeAgent: publicProcedure
.input(resumeAgentInputSchema)
.mutation(async ({ ctx, input }) => {
const agentManager = requireAgentManager(ctx);
const agent = await resolveAgent(ctx, input);
await agentManager.resume(agent.id, input.answers);
return { success: true, name: agent.name };
}),
/**
* Get the result of an agent's work.
* Only available after agent completes or stops.
*/
getAgentResult: publicProcedure
.input(agentIdentifierSchema)
.query(async ({ ctx, input }): Promise<AgentResult | null> => {
const agentManager = requireAgentManager(ctx);
const agent = await resolveAgent(ctx, input);
return agentManager.getResult(agent.id);
}),
// ===========================================================================
// Task Procedures
// ===========================================================================
/**
* List tasks for a plan.
* Returns tasks ordered by order field.
*/
listTasks: publicProcedure
.input(z.object({ planId: z.string().min(1) }))
.query(async ({ ctx, input }) => {
const taskRepository = requireTaskRepository(ctx);
return taskRepository.findByPlanId(input.planId);
}),
/**
* Get a task by ID.
* Throws NOT_FOUND if task doesn't exist.
*/
getTask: publicProcedure
.input(z.object({ id: z.string().min(1) }))
.query(async ({ ctx, input }) => {
const taskRepository = requireTaskRepository(ctx);
const task = await taskRepository.findById(input.id);
if (!task) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Task '${input.id}' not found`,
});
}
return task;
}),
/**
* Update a task's status.
* Returns the updated task.
*/
updateTaskStatus: publicProcedure
.input(z.object({
id: z.string().min(1),
status: z.enum(['pending', 'in_progress', 'completed', 'blocked']),
}))
.mutation(async ({ ctx, input }) => {
const taskRepository = requireTaskRepository(ctx);
// Check task exists first
const existing = await taskRepository.findById(input.id);
if (!existing) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Task '${input.id}' not found`,
});
}
return taskRepository.update(input.id, { status: input.status });
}),
// ===========================================================================
// Message Procedures
// ===========================================================================
/**
* List messages with optional filtering.
* Filter by agent ID or status.
*/
listMessages: publicProcedure
.input(z.object({
agentId: z.string().optional(),
status: z.enum(['pending', 'read', 'responded']).optional(),
}))
.query(async ({ ctx, input }) => {
const messageRepository = requireMessageRepository(ctx);
// Get all messages for user (recipientType='user')
let messages = await messageRepository.findByRecipient('user');
// Filter by agentId if provided (sender is the agent)
if (input.agentId) {
messages = messages.filter(m => m.senderId === input.agentId);
}
// Filter by status if provided
if (input.status) {
messages = messages.filter(m => m.status === input.status);
}
return messages;
}),
/**
* Get a single message by ID.
* Throws NOT_FOUND if message doesn't exist.
*/
getMessage: publicProcedure
.input(z.object({ id: z.string().min(1) }))
.query(async ({ ctx, input }) => {
const messageRepository = requireMessageRepository(ctx);
const message = await messageRepository.findById(input.id);
if (!message) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Message '${input.id}' not found`,
});
}
return message;
}),
/**
* Respond to a message.
* Updates message with response and sets status to 'responded'.
*/
respondToMessage: publicProcedure
.input(z.object({
id: z.string().min(1),
response: z.string().min(1),
}))
.mutation(async ({ ctx, input }) => {
const messageRepository = requireMessageRepository(ctx);
// Check message exists
const existing = await messageRepository.findById(input.id);
if (!existing) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Message '${input.id}' not found`,
});
}
// Create a response message linked to the original
const responseMessage = await messageRepository.create({
senderType: 'user',
recipientType: 'agent',
recipientId: existing.senderId,
type: 'response',
content: input.response,
parentMessageId: input.id,
});
// Update original message status to 'responded'
await messageRepository.update(input.id, { status: 'responded' });
return responseMessage;
}),
// ===========================================================================
// Dispatch Procedures
// ===========================================================================
/**
* Queue a task for dispatch.
* Task will be dispatched when all dependencies complete.
*/
queueTask: publicProcedure
.input(z.object({ taskId: z.string().min(1) }))
.mutation(async ({ ctx, input }) => {
const dispatchManager = requireDispatchManager(ctx);
await dispatchManager.queue(input.taskId);
return { success: true };
}),
/**
* Dispatch next available task to an agent.
* Returns dispatch result with task and agent info.
*/
dispatchNext: publicProcedure
.mutation(async ({ ctx }) => {
const dispatchManager = requireDispatchManager(ctx);
return dispatchManager.dispatchNext();
}),
/**
* Get current queue state.
* Returns queued, ready, and blocked task counts.
*/
getQueueState: publicProcedure
.query(async ({ ctx }) => {
const dispatchManager = requireDispatchManager(ctx);
return dispatchManager.getQueueState();
}),
/**
* Mark a task as complete.
* Removes from queue and triggers dependent task re-evaluation.
*/
completeTask: publicProcedure
.input(z.object({ taskId: z.string().min(1) }))
.mutation(async ({ ctx, input }) => {
const dispatchManager = requireDispatchManager(ctx);
await dispatchManager.completeTask(input.taskId);
return { success: true };
}),
// ===========================================================================
// Coordination Procedures
// ===========================================================================
/**
* Queue a completed task for merge.
* Task will be merged when all dependencies complete.
*/
queueMerge: publicProcedure
.input(z.object({ taskId: z.string().min(1) }))
.mutation(async ({ ctx, input }) => {
const coordinationManager = requireCoordinationManager(ctx);
await coordinationManager.queueMerge(input.taskId);
return { success: true };
}),
/**
* Process all ready merges in dependency order.
* Returns results of all merge operations.
*/
processMerges: publicProcedure
.input(z.object({
targetBranch: z.string().default('main'),
}))
.mutation(async ({ ctx, input }) => {
const coordinationManager = requireCoordinationManager(ctx);
const results = await coordinationManager.processMerges(input.targetBranch);
return { results };
}),
/**
* Get merge queue status.
* Shows queued, in-progress, merged, and conflicted tasks.
*/
getMergeQueueStatus: publicProcedure
.query(async ({ ctx }) => {
const coordinationManager = requireCoordinationManager(ctx);
return coordinationManager.getQueueState();
}),
/**
* Get next task ready to merge.
* Returns task with all dependencies resolved.
*/
getNextMergeable: publicProcedure
.query(async ({ ctx }) => {
const coordinationManager = requireCoordinationManager(ctx);
return coordinationManager.getNextMergeable();
}),
});
/**
* Type of the application router.
* Used by clients for type-safe procedure calls.
*/
export type AppRouter = typeof appRouter;