/** * Conversation Router — inter-agent communication procedures */ import { TRPCError } from '@trpc/server'; import { tracked, type TrackedEnvelope } from '@trpc/server'; import { z } from 'zod'; import type { ProcedureBuilder } from '../trpc.js'; import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js'; import type { ConversationCreatedEvent, ConversationAnsweredEvent } from '../../events/types.js'; import { createModuleLogger } from '../../logger/index.js'; const log = createModuleLogger('conversation-router'); export function conversationProcedures(publicProcedure: ProcedureBuilder) { return { createConversation: publicProcedure .input(z.object({ fromAgentId: z.string().min(1), toAgentId: z.string().min(1).optional(), phaseId: z.string().min(1).optional(), taskId: z.string().min(1).optional(), question: z.string().min(1), })) .mutation(async ({ ctx, input }) => { const repo = requireConversationRepository(ctx); const agentManager = requireAgentManager(ctx); let toAgentId = input.toAgentId; // Resolve target agent from taskId — prefer running, fall back to idle if (!toAgentId && input.taskId) { const agents = await agentManager.list(); const running = agents.find(a => a.taskId === input.taskId && a.status === 'running'); const idle = agents.find(a => a.taskId === input.taskId && a.status === 'idle'); const match = running ?? idle; if (!match) { throw new TRPCError({ code: 'NOT_FOUND', message: `No running or idle agent found for task '${input.taskId}'`, }); } toAgentId = match.id; } // Resolve target agent from phaseId — prefer running, fall back to idle if (!toAgentId && input.phaseId) { const taskRepo = requireTaskRepository(ctx); const tasks = await taskRepo.findByPhaseId(input.phaseId); const taskIds = new Set(tasks.map(t => t.id)); const agents = await agentManager.list(); const running = agents.find(a => a.taskId && taskIds.has(a.taskId) && a.status === 'running'); const idle = agents.find(a => a.taskId && taskIds.has(a.taskId) && a.status === 'idle'); const match = running ?? idle; if (!match) { throw new TRPCError({ code: 'NOT_FOUND', message: `No running or idle agent found for phase '${input.phaseId}'`, }); } toAgentId = match.id; } if (!toAgentId) { throw new TRPCError({ code: 'BAD_REQUEST', message: 'Must provide toAgentId, taskId, or phaseId to identify target agent', }); } const conversation = await repo.create({ fromAgentId: input.fromAgentId, toAgentId, initiativeId: null, phaseId: input.phaseId ?? null, taskId: input.taskId ?? null, question: input.question, }); ctx.eventBus.emit({ type: 'conversation:created' as const, timestamp: new Date(), payload: { conversationId: conversation.id, fromAgentId: input.fromAgentId, toAgentId, }, }); // Auto-resume idle target agent so it can answer the conversation const targetAgent = await agentManager.get(toAgentId); if (targetAgent && targetAgent.status === 'idle') { try { const resumed = await agentManager.resumeForConversation( toAgentId, conversation.id, input.question, input.fromAgentId, ); if (resumed) { log.info({ conversationId: conversation.id, toAgentId }, 'auto-resumed idle agent for conversation'); } } catch (err) { log.warn( { conversationId: conversation.id, toAgentId, err: err instanceof Error ? err.message : String(err) }, 'failed to auto-resume agent for conversation', ); } } return conversation; }), getPendingConversations: publicProcedure .input(z.object({ agentId: z.string().min(1), })) .query(async ({ ctx, input }) => { const repo = requireConversationRepository(ctx); return repo.findPendingForAgent(input.agentId); }), getConversation: publicProcedure .input(z.object({ id: z.string().min(1), })) .query(async ({ ctx, input }) => { const repo = requireConversationRepository(ctx); return repo.findById(input.id); }), answerConversation: publicProcedure .input(z.object({ id: z.string().min(1), answer: z.string().min(1), })) .mutation(async ({ ctx, input }) => { const repo = requireConversationRepository(ctx); const existing = await repo.findById(input.id); if (!existing) { throw new TRPCError({ code: 'NOT_FOUND', message: `Conversation '${input.id}' not found`, }); } if (existing.status === 'answered') { throw new TRPCError({ code: 'BAD_REQUEST', message: `Conversation '${input.id}' is already answered`, }); } const updated = await repo.answer(input.id, input.answer); ctx.eventBus.emit({ type: 'conversation:answered' as const, timestamp: new Date(), payload: { conversationId: input.id, fromAgentId: existing.fromAgentId, toAgentId: existing.toAgentId, }, }); return updated; }), onPendingConversation: publicProcedure .input(z.object({ agentId: z.string().min(1) })) .subscription(async function* (opts): AsyncGenerator> { const { agentId } = opts.input; const signal = opts.signal ?? new AbortController().signal; const eventBus = opts.ctx.eventBus; const repo = requireConversationRepository(opts.ctx); // First yield any already-pending conversations const existing = await repo.findPendingForAgent(agentId); let eventCounter = 0; for (const conv of existing) { yield tracked(`conv-${eventCounter++}`, { conversationId: conv.id, fromAgentId: conv.fromAgentId, question: conv.question, phaseId: conv.phaseId, taskId: conv.taskId, }); } // Then listen for new conversation:created events const queue: string[] = []; // conversation IDs let resolve: (() => void) | null = null; const handler = (event: ConversationCreatedEvent) => { if (event.payload.toAgentId !== agentId) return; queue.push(event.payload.conversationId); if (resolve) { const r = resolve; resolve = null; r(); } }; eventBus.on('conversation:created', handler); const cleanup = () => { eventBus.off('conversation:created', handler); if (resolve) { const r = resolve; resolve = null; r(); } }; signal.addEventListener('abort', cleanup, { once: true }); try { while (!signal.aborted) { while (queue.length > 0) { const convId = queue.shift()!; const conv = await repo.findById(convId); if (conv && conv.status === 'pending') { yield tracked(`conv-${eventCounter++}`, { conversationId: conv.id, fromAgentId: conv.fromAgentId, question: conv.question, phaseId: conv.phaseId, taskId: conv.taskId, }); } } if (!signal.aborted) { await new Promise((r) => { resolve = r; }); } } } finally { cleanup(); } }), onConversationAnswer: publicProcedure .input(z.object({ conversationId: z.string().min(1) })) .subscription(async function* (opts): AsyncGenerator> { const { conversationId } = opts.input; const signal = opts.signal ?? new AbortController().signal; const eventBus = opts.ctx.eventBus; const repo = requireConversationRepository(opts.ctx); // Check if already answered const existing = await repo.findById(conversationId); if (existing && existing.status === 'answered' && existing.answer) { yield tracked('answer-0', { answer: existing.answer }); return; } // Listen for conversation:answered events matching this ID let answered = false; let resolve: (() => void) | null = null; const handler = (event: ConversationAnsweredEvent) => { if (event.payload.conversationId !== conversationId) return; answered = true; if (resolve) { const r = resolve; resolve = null; r(); } }; eventBus.on('conversation:answered', handler); const cleanup = () => { eventBus.off('conversation:answered', handler); if (resolve) { const r = resolve; resolve = null; r(); } }; signal.addEventListener('abort', cleanup, { once: true }); try { while (!signal.aborted && !answered) { await new Promise((r) => { resolve = r; }); } if (answered) { const conv = await repo.findById(conversationId); if (conv && conv.answer) { yield tracked('answer-0', { answer: conv.answer }); } } } finally { cleanup(); } }), }; }