/** * Agent Router — spawn, stop, delete, list, get, resume, result, questions, output */ import { TRPCError } from '@trpc/server'; import { z } from 'zod'; import { tracked, type TrackedEnvelope } from '@trpc/server'; import path from 'path'; import fs from 'fs/promises'; import type { ProcedureBuilder } from '../trpc.js'; import type { TRPCContext } from '../context.js'; import type { AgentInfo, AgentResult, PendingQuestions } from '../../agent/types.js'; import type { AgentOutputEvent } from '../../events/types.js'; import { requireAgentManager, requireLogChunkRepository, requireTaskRepository, requireInitiativeRepository, requireConversationRepository } from './_helpers.js'; export type AgentRadarRow = { id: string; name: string; mode: string; status: string; initiativeId: string | null; initiativeName: string | null; taskId: string | null; taskName: string | null; createdAt: string; questionsCount: number; messagesCount: number; subagentsCount: number; compactionsCount: number; }; export const spawnAgentInputSchema = z.object({ name: z.string().min(1).optional(), taskId: z.string().min(1), prompt: z.string().min(1), cwd: z.string().optional(), mode: z.enum(['execute', 'discuss', 'plan', 'detail', 'refine']).optional(), provider: z.string().optional(), initiativeId: z.string().min(1).optional(), }); export type SpawnAgentInput = z.infer; export const agentIdentifierSchema = z.object({ name: z.string().optional(), id: z.string().optional(), }).refine(data => data.name || data.id, { message: 'Either name or id must be provided', }); export type AgentIdentifier = z.infer; export const resumeAgentInputSchema = z.object({ name: z.string().optional(), id: z.string().optional(), answers: z.record(z.string(), z.string()), }).refine(data => data.name || data.id, { message: 'Either name or id must be provided', }); export type ResumeAgentInput = z.infer; async function resolveAgent( ctx: TRPCContext, input: { name?: string; id?: string } ): Promise { if (!ctx.agentManager) { throw new TRPCError({ code: 'INTERNAL_SERVER_ERROR', message: 'Agent manager not available', }); } const agent = input.name ? await ctx.agentManager.getByName(input.name) : await ctx.agentManager.get(input.id!); if (!agent) { throw new TRPCError({ code: 'NOT_FOUND', message: `Agent '${input.name ?? input.id}' not found`, }); } return agent; } export function agentProcedures(publicProcedure: ProcedureBuilder) { return { spawnAgent: publicProcedure .input(spawnAgentInputSchema) .mutation(async ({ ctx, input }) => { const agentManager = requireAgentManager(ctx); return agentManager.spawn({ name: input.name, taskId: input.taskId, prompt: input.prompt, cwd: input.cwd, mode: input.mode, provider: input.provider, initiativeId: input.initiativeId, }); }), stopAgent: publicProcedure .input(agentIdentifierSchema) .mutation(async ({ ctx, input }) => { const agentManager = requireAgentManager(ctx); const agent = await resolveAgent(ctx, input); await agentManager.stop(agent.id); return { success: true, name: agent.name }; }), deleteAgent: publicProcedure .input(agentIdentifierSchema) .mutation(async ({ ctx, input }) => { const agentManager = requireAgentManager(ctx); const agent = await resolveAgent(ctx, input); await agentManager.delete(agent.id); return { success: true, name: agent.name }; }), dismissAgent: publicProcedure .input(agentIdentifierSchema) .mutation(async ({ ctx, input }) => { const agentManager = requireAgentManager(ctx); const agent = await resolveAgent(ctx, input); await agentManager.dismiss(agent.id); return { success: true, name: agent.name }; }), listAgents: publicProcedure .query(async ({ ctx }) => { const agentManager = requireAgentManager(ctx); return agentManager.list(); }), getAgent: publicProcedure .input(agentIdentifierSchema) .query(async ({ ctx, input }) => { const agent = await resolveAgent(ctx, input); let taskName: string | null = null; let initiativeName: string | null = null; if (agent.taskId) { const taskRepo = requireTaskRepository(ctx); const task = await taskRepo.findById(agent.taskId); taskName = task?.name ?? null; } if (agent.initiativeId) { const initiativeRepo = requireInitiativeRepository(ctx); const initiative = await initiativeRepo.findById(agent.initiativeId); initiativeName = initiative?.name ?? null; } return { ...agent, taskName, initiativeName }; }), getAgentByName: publicProcedure .input(z.object({ name: z.string().min(1) })) .query(async ({ ctx, input }) => { const agentManager = requireAgentManager(ctx); return agentManager.getByName(input.name); }), resumeAgent: publicProcedure .input(resumeAgentInputSchema) .mutation(async ({ ctx, input }) => { const agentManager = requireAgentManager(ctx); const agent = await resolveAgent(ctx, input); await agentManager.resume(agent.id, input.answers); return { success: true, name: agent.name }; }), getAgentResult: publicProcedure .input(agentIdentifierSchema) .query(async ({ ctx, input }): Promise => { const agentManager = requireAgentManager(ctx); const agent = await resolveAgent(ctx, input); return agentManager.getResult(agent.id); }), getAgentQuestions: publicProcedure .input(agentIdentifierSchema) .query(async ({ ctx, input }): Promise => { const agentManager = requireAgentManager(ctx); const agent = await resolveAgent(ctx, input); return agentManager.getPendingQuestions(agent.id); }), listWaitingAgents: publicProcedure .query(async ({ ctx }) => { const agentManager = requireAgentManager(ctx); const allAgents = await agentManager.list(); return allAgents.filter(agent => agent.status === 'waiting_for_input'); }), getActiveRefineAgent: publicProcedure .input(z.object({ initiativeId: z.string().min(1) })) .query(async ({ ctx, input }): Promise => { const agentManager = requireAgentManager(ctx); const allAgents = await agentManager.list(); // Surface discuss and refine agents — both work on initiative content const CONTENT_MODES = ['discuss', 'refine']; const candidates = allAgents .filter( (a) => CONTENT_MODES.includes(a.mode) && a.initiativeId === input.initiativeId && ['running', 'waiting_for_input', 'idle', 'crashed'].includes(a.status) && !a.userDismissedAt, ) .sort( (a, b) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(), ); return candidates[0] ?? null; }), getTaskAgent: publicProcedure .input(z.object({ taskId: z.string().min(1) })) .query(async ({ ctx, input }): Promise => { const agentManager = requireAgentManager(ctx); const all = await agentManager.list(); const matches = all .filter(a => a.taskId === input.taskId) .sort((a, b) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime()); return matches[0] ?? null; }), getActiveConflictAgent: publicProcedure .input(z.object({ initiativeId: z.string().min(1) })) .query(async ({ ctx, input }): Promise => { const agentManager = requireAgentManager(ctx); const allAgents = await agentManager.list(); const candidates = allAgents .filter( (a) => a.mode === 'execute' && a.initiativeId === input.initiativeId && a.name?.startsWith('conflict-') && ['running', 'waiting_for_input', 'idle', 'crashed'].includes(a.status) && !a.userDismissedAt, ) .sort( (a, b) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(), ); return candidates[0] ?? null; }), getAgentOutput: publicProcedure .input(agentIdentifierSchema) .query(async ({ ctx, input }) => { const agent = await resolveAgent(ctx, input); const logChunkRepo = requireLogChunkRepository(ctx); const chunks = await logChunkRepo.findByAgentId(agent.id); return chunks.map(c => ({ content: c.content, createdAt: c.createdAt.toISOString(), })); }), onAgentOutput: publicProcedure .input(z.object({ agentId: z.string().min(1) })) .subscription(async function* (opts): AsyncGenerator> { const { agentId } = opts.input; const signal = opts.signal ?? new AbortController().signal; const eventBus = opts.ctx.eventBus; let eventCounter = 0; const queue: string[] = []; let resolve: (() => void) | null = null; const handler = (event: AgentOutputEvent) => { if (event.payload.agentId !== agentId) return; queue.push(event.payload.data); if (resolve) { const r = resolve; resolve = null; r(); } }; eventBus.on('agent:output', handler); const cleanup = () => { eventBus.off('agent:output', handler); if (resolve) { const r = resolve; resolve = null; r(); } }; signal.addEventListener('abort', cleanup, { once: true }); try { while (!signal.aborted) { while (queue.length > 0) { const data = queue.shift()!; const id = `${agentId}-live-${eventCounter++}`; yield tracked(id, { agentId, data }); } if (!signal.aborted) { await new Promise((r) => { resolve = r; }); } } } finally { cleanup(); } }), getAgentInputFiles: publicProcedure .input(z.object({ id: z.string().min(1) })) .output(z.object({ files: z.array(z.object({ name: z.string(), content: z.string(), sizeBytes: z.number(), })), reason: z.enum(['worktree_missing', 'input_dir_missing']).optional(), })) .query(async ({ ctx, input }) => { const agent = await resolveAgent(ctx, { id: input.id }); const worktreeRoot = path.join(ctx.workspaceRoot!, 'agent-workdirs', agent.worktreeId); const inputDir = path.join(worktreeRoot, '.cw', 'input'); // Check worktree root exists try { await fs.stat(worktreeRoot); } catch { return { files: [], reason: 'worktree_missing' as const }; } // Check input dir exists try { await fs.stat(inputDir); } catch { return { files: [], reason: 'input_dir_missing' as const }; } // Walk inputDir recursively const entries = await fs.readdir(inputDir, { recursive: true, withFileTypes: true }); const MAX_SIZE = 500 * 1024; const results: Array<{ name: string; content: string; sizeBytes: number }> = []; for (const entry of entries) { if (!entry.isFile()) continue; // entry.parentPath is available in Node 20+ const dir = (entry as any).parentPath ?? (entry as any).path; const fullPath = path.join(dir, entry.name); const relativeName = path.relative(inputDir, fullPath); try { // Binary detection: read first 512 bytes const fd = await fs.open(fullPath, 'r'); const headerBuf = Buffer.alloc(512); const { bytesRead } = await fd.read(headerBuf, 0, 512, 0); await fd.close(); if (headerBuf.slice(0, bytesRead).includes(0)) continue; // skip binary const raw = await fs.readFile(fullPath); const sizeBytes = raw.length; let content: string; if (sizeBytes > MAX_SIZE) { content = raw.slice(0, MAX_SIZE).toString('utf-8') + '\n\n[truncated — file exceeds 500 KB]'; } else { content = raw.toString('utf-8'); } results.push({ name: relativeName, content, sizeBytes }); } catch { continue; // skip unreadable files } } results.sort((a, b) => a.name.localeCompare(b.name)); return { files: results }; }), getAgentPrompt: publicProcedure .input(z.object({ id: z.string().min(1) })) .output(z.object({ content: z.string().nullable(), reason: z.enum(['prompt_not_written']).optional(), })) .query(async ({ ctx, input }) => { const agent = await resolveAgent(ctx, { id: input.id }); const MAX_BYTES = 1024 * 1024; // 1 MB function truncateIfNeeded(text: string): string { if (Buffer.byteLength(text, 'utf-8') > MAX_BYTES) { const buf = Buffer.from(text, 'utf-8'); return buf.slice(0, MAX_BYTES).toString('utf-8') + '\n\n[truncated — prompt exceeds 1 MB]'; } return text; } // Prefer DB-persisted prompt (durable even after log file cleanup) if (agent.prompt !== null) { return { content: truncateIfNeeded(agent.prompt) }; } // Fall back to filesystem for agents spawned before DB persistence was added const promptPath = path.join(ctx.workspaceRoot!, '.cw', 'agent-logs', agent.name, 'PROMPT.md'); let raw: string; try { raw = await fs.readFile(promptPath, 'utf-8'); } catch (err: any) { if (err?.code === 'ENOENT') { return { content: null, reason: 'prompt_not_written' as const }; } throw new TRPCError({ code: 'INTERNAL_SERVER_ERROR', message: `Failed to read prompt file: ${String(err)}`, }); } return { content: truncateIfNeeded(raw) }; }), listForRadar: publicProcedure .input(z.object({ timeRange: z.enum(['1h', '6h', '24h', '7d', 'all']).default('24h'), status: z.enum(['running', 'completed', 'crashed']).optional(), initiativeId: z.string().optional(), mode: z.enum(['execute', 'discuss', 'plan', 'detail', 'refine', 'chat', 'errand']).optional(), })) .query(async ({ ctx, input }): Promise => { const agentManager = requireAgentManager(ctx); const allAgents = await agentManager.list(); // Compute cutoff const cutoffMap: Record = { '1h': Date.now() - 3_600_000, '6h': Date.now() - 21_600_000, '24h': Date.now() - 86_400_000, '7d': Date.now() - 604_800_000, }; const cutoff = input.timeRange !== 'all' ? cutoffMap[input.timeRange] : null; // Filter agents let filteredAgents = allAgents; if (cutoff !== null) { filteredAgents = filteredAgents.filter(a => a.createdAt.getTime() >= cutoff!); } if (input.status !== undefined) { const dbStatus = input.status === 'completed' ? 'stopped' : input.status; filteredAgents = filteredAgents.filter(a => a.status === dbStatus); } if (input.initiativeId !== undefined) { filteredAgents = filteredAgents.filter(a => a.initiativeId === input.initiativeId); } if (input.mode !== undefined) { filteredAgents = filteredAgents.filter(a => a.mode === input.mode); } const matchingIds = filteredAgents.map(a => a.id); // Batch fetch in parallel const logChunkRepo = requireLogChunkRepository(ctx); const conversationRepo = requireConversationRepository(ctx); const initiativeRepo = requireInitiativeRepository(ctx); const taskRepo = requireTaskRepository(ctx); // Collect unique taskIds and initiativeIds for batch lookup const uniqueTaskIds = [...new Set(filteredAgents.map(a => a.taskId).filter(Boolean) as string[])]; const uniqueInitiativeIds = [...new Set(filteredAgents.map(a => a.initiativeId).filter(Boolean) as string[])]; const [metrics, messageCounts, taskResults, initiativeResults] = await Promise.all([ logChunkRepo.findMetricsByAgentIds(matchingIds), conversationRepo.countByFromAgentIds(matchingIds), Promise.all(uniqueTaskIds.map(id => taskRepo.findById(id))), Promise.all(uniqueInitiativeIds.map(id => initiativeRepo.findById(id))), ]); // Build lookup maps const taskMap = new Map(taskResults.filter(Boolean).map(t => [t!.id, t!.name])); const initiativeMap = new Map(initiativeResults.filter(Boolean).map(i => [i!.id, i!.name])); const messagesMap = new Map(messageCounts.map(m => [m.agentId, m.count])); const metricsMap = new Map(metrics.map(m => [m.agentId, m])); // Build result rows return filteredAgents.map(agent => { const agentMetrics = metricsMap.get(agent.id); const questionsCount = agentMetrics?.questionsCount ?? 0; const subagentsCount = agentMetrics?.subagentsCount ?? 0; const compactionsCount = agentMetrics?.compactionsCount ?? 0; return { id: agent.id, name: agent.name, mode: agent.mode, status: agent.status, initiativeId: agent.initiativeId, initiativeName: agent.initiativeId ? (initiativeMap.get(agent.initiativeId) ?? null) : null, taskId: agent.taskId, taskName: agent.taskId ? (taskMap.get(agent.taskId) ?? null) : null, createdAt: agent.createdAt.toISOString(), questionsCount, messagesCount: messagesMap.get(agent.id) ?? 0, subagentsCount, compactionsCount, }; }); }), getCompactionEvents: publicProcedure .input(z.object({ agentId: z.string().min(1) })) .query(async ({ ctx, input }) => { const logChunkRepo = requireLogChunkRepository(ctx); const chunks = await logChunkRepo.findByAgentId(input.agentId); const results: { timestamp: string; sessionNumber: number }[] = []; for (const chunk of chunks) { try { const parsed = JSON.parse(chunk.content); if (parsed.type === 'system' && parsed.subtype === 'init' && parsed.source === 'compact') { results.push({ timestamp: chunk.createdAt.toISOString(), sessionNumber: chunk.sessionNumber }); } } catch { /* skip malformed */ } if (results.length >= 200) break; } return results; }), getSubagentSpawns: publicProcedure .input(z.object({ agentId: z.string().min(1) })) .query(async ({ ctx, input }) => { const logChunkRepo = requireLogChunkRepository(ctx); const chunks = await logChunkRepo.findByAgentId(input.agentId); const results: { timestamp: string; description: string; promptPreview: string; fullPrompt: string }[] = []; for (const chunk of chunks) { try { const parsed = JSON.parse(chunk.content); if (parsed.type === 'tool_use' && parsed.name === 'Agent') { const fullPrompt: string = parsed.input?.prompt ?? ''; const description: string = parsed.input?.description ?? ''; results.push({ timestamp: chunk.createdAt.toISOString(), description, promptPreview: fullPrompt.slice(0, 200), fullPrompt, }); } } catch { /* skip malformed */ } if (results.length >= 200) break; } return results; }), getQuestionsAsked: publicProcedure .input(z.object({ agentId: z.string().min(1) })) .query(async ({ ctx, input }) => { const logChunkRepo = requireLogChunkRepository(ctx); const chunks = await logChunkRepo.findByAgentId(input.agentId); type QuestionItem = { question: string; header: string; options: { label: string; description: string }[] }; const results: { timestamp: string; questions: QuestionItem[] }[] = []; for (const chunk of chunks) { try { const parsed = JSON.parse(chunk.content); if (parsed.type === 'tool_use' && parsed.name === 'AskUserQuestion') { const questions: QuestionItem[] = parsed.input?.questions ?? []; results.push({ timestamp: chunk.createdAt.toISOString(), questions }); } } catch { /* skip malformed */ } if (results.length >= 200) break; } return results; }), }; }