/** * Agent Router — spawn, stop, delete, list, get, resume, result, questions, output */ import { TRPCError } from '@trpc/server'; import { z } from 'zod'; import { tracked, type TrackedEnvelope } from '@trpc/server'; import type { ProcedureBuilder } from '../trpc.js'; import type { TRPCContext } from '../context.js'; import type { AgentInfo, AgentResult, PendingQuestions } from '../../agent/types.js'; import type { AgentOutputEvent } from '../../events/types.js'; import { requireAgentManager, requireLogChunkRepository } from './_helpers.js'; export const spawnAgentInputSchema = z.object({ name: z.string().min(1).optional(), taskId: z.string().min(1), prompt: z.string().min(1), cwd: z.string().optional(), mode: z.enum(['execute', 'discuss', 'plan', 'detail', 'refine']).optional(), provider: z.string().optional(), initiativeId: z.string().min(1).optional(), }); export type SpawnAgentInput = z.infer; export const agentIdentifierSchema = z.object({ name: z.string().optional(), id: z.string().optional(), }).refine(data => data.name || data.id, { message: 'Either name or id must be provided', }); export type AgentIdentifier = z.infer; export const resumeAgentInputSchema = z.object({ name: z.string().optional(), id: z.string().optional(), answers: z.record(z.string(), z.string()), }).refine(data => data.name || data.id, { message: 'Either name or id must be provided', }); export type ResumeAgentInput = z.infer; async function resolveAgent( ctx: TRPCContext, input: { name?: string; id?: string } ): Promise { if (!ctx.agentManager) { throw new TRPCError({ code: 'INTERNAL_SERVER_ERROR', message: 'Agent manager not available', }); } const agent = input.name ? await ctx.agentManager.getByName(input.name) : await ctx.agentManager.get(input.id!); if (!agent) { throw new TRPCError({ code: 'NOT_FOUND', message: `Agent '${input.name ?? input.id}' not found`, }); } return agent; } export function agentProcedures(publicProcedure: ProcedureBuilder) { return { spawnAgent: publicProcedure .input(spawnAgentInputSchema) .mutation(async ({ ctx, input }) => { const agentManager = requireAgentManager(ctx); return agentManager.spawn({ name: input.name, taskId: input.taskId, prompt: input.prompt, cwd: input.cwd, mode: input.mode, provider: input.provider, initiativeId: input.initiativeId, }); }), stopAgent: publicProcedure .input(agentIdentifierSchema) .mutation(async ({ ctx, input }) => { const agentManager = requireAgentManager(ctx); const agent = await resolveAgent(ctx, input); await agentManager.stop(agent.id); return { success: true, name: agent.name }; }), deleteAgent: publicProcedure .input(agentIdentifierSchema) .mutation(async ({ ctx, input }) => { const agentManager = requireAgentManager(ctx); const agent = await resolveAgent(ctx, input); await agentManager.delete(agent.id); return { success: true, name: agent.name }; }), dismissAgent: publicProcedure .input(agentIdentifierSchema) .mutation(async ({ ctx, input }) => { const agentManager = requireAgentManager(ctx); const agent = await resolveAgent(ctx, input); await agentManager.dismiss(agent.id); return { success: true, name: agent.name }; }), listAgents: publicProcedure .query(async ({ ctx }) => { const agentManager = requireAgentManager(ctx); return agentManager.list(); }), getAgent: publicProcedure .input(agentIdentifierSchema) .query(async ({ ctx, input }) => { return resolveAgent(ctx, input); }), getAgentByName: publicProcedure .input(z.object({ name: z.string().min(1) })) .query(async ({ ctx, input }) => { const agentManager = requireAgentManager(ctx); return agentManager.getByName(input.name); }), resumeAgent: publicProcedure .input(resumeAgentInputSchema) .mutation(async ({ ctx, input }) => { const agentManager = requireAgentManager(ctx); const agent = await resolveAgent(ctx, input); await agentManager.resume(agent.id, input.answers); return { success: true, name: agent.name }; }), getAgentResult: publicProcedure .input(agentIdentifierSchema) .query(async ({ ctx, input }): Promise => { const agentManager = requireAgentManager(ctx); const agent = await resolveAgent(ctx, input); return agentManager.getResult(agent.id); }), getAgentQuestions: publicProcedure .input(agentIdentifierSchema) .query(async ({ ctx, input }): Promise => { const agentManager = requireAgentManager(ctx); const agent = await resolveAgent(ctx, input); return agentManager.getPendingQuestions(agent.id); }), listWaitingAgents: publicProcedure .query(async ({ ctx }) => { const agentManager = requireAgentManager(ctx); const allAgents = await agentManager.list(); return allAgents.filter(agent => agent.status === 'waiting_for_input'); }), getActiveRefineAgent: publicProcedure .input(z.object({ initiativeId: z.string().min(1) })) .query(async ({ ctx, input }): Promise => { const agentManager = requireAgentManager(ctx); const allAgents = await agentManager.list(); // Surface discuss and refine agents — both work on initiative content const CONTENT_MODES = ['discuss', 'refine']; const candidates = allAgents .filter( (a) => CONTENT_MODES.includes(a.mode) && a.initiativeId === input.initiativeId && ['running', 'waiting_for_input', 'idle', 'crashed'].includes(a.status) && !a.userDismissedAt, ) .sort( (a, b) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(), ); return candidates[0] ?? null; }), getActiveConflictAgent: publicProcedure .input(z.object({ initiativeId: z.string().min(1) })) .query(async ({ ctx, input }): Promise => { const agentManager = requireAgentManager(ctx); const allAgents = await agentManager.list(); const candidates = allAgents .filter( (a) => a.mode === 'execute' && a.initiativeId === input.initiativeId && a.name?.startsWith('conflict-') && ['running', 'waiting_for_input', 'idle', 'crashed'].includes(a.status) && !a.userDismissedAt, ) .sort( (a, b) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(), ); return candidates[0] ?? null; }), getAgentOutput: publicProcedure .input(agentIdentifierSchema) .query(async ({ ctx, input }): Promise => { const agent = await resolveAgent(ctx, input); const logChunkRepo = requireLogChunkRepository(ctx); const chunks = await logChunkRepo.findByAgentId(agent.id); return chunks.map(c => c.content).join(''); }), onAgentOutput: publicProcedure .input(z.object({ agentId: z.string().min(1) })) .subscription(async function* (opts): AsyncGenerator> { const { agentId } = opts.input; const signal = opts.signal ?? new AbortController().signal; const eventBus = opts.ctx.eventBus; let eventCounter = 0; const queue: string[] = []; let resolve: (() => void) | null = null; const handler = (event: AgentOutputEvent) => { if (event.payload.agentId !== agentId) return; queue.push(event.payload.data); if (resolve) { const r = resolve; resolve = null; r(); } }; eventBus.on('agent:output', handler); const cleanup = () => { eventBus.off('agent:output', handler); if (resolve) { const r = resolve; resolve = null; r(); } }; signal.addEventListener('abort', cleanup, { once: true }); try { while (!signal.aborted) { while (queue.length > 0) { const data = queue.shift()!; const id = `${agentId}-live-${eventCounter++}`; yield tracked(id, { agentId, data }); } if (!signal.aborted) { await new Promise((r) => { resolve = r; }); } } } finally { cleanup(); } }), }; }