Add five new tRPC query procedures powering the Radar page's per-agent behavioral metrics (questions asked, subagent spawns, compaction events, inter-agent messages) plus the batch repository methods they require. Repository changes: - LogChunkRepository: add findByAgentIds() for batch fetching without N+1 - ConversationRepository: add countByFromAgentIds() and findByFromAgentId() - Drizzle adapters: implement all three new methods using inArray() - InMemoryConversationRepository (integration test): implement new methods tRPC procedures added: - agent.listForRadar: filtered agent list with per-agent metrics computed from log chunks (questionsCount, subagentsCount, compactionsCount) and conversation counts (messagesCount); supports timeRange/status/mode/initiative filters - agent.getCompactionEvents: compact system init chunks for one agent (cap 200) - agent.getSubagentSpawns: Agent tool_use entries with prompt preview (cap 200) - agent.getQuestionsAsked: AskUserQuestion tool calls with questions array (cap 200) - conversation.getByFromAgent: conversations by fromAgentId with toAgentName resolved All 13 new unit tests pass; existing test suite unaffected. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
333 lines
11 KiB
TypeScript
333 lines
11 KiB
TypeScript
/**
|
|
* Conversation Router — inter-agent communication procedures
|
|
*/
|
|
|
|
import { TRPCError } from '@trpc/server';
|
|
import { tracked, type TrackedEnvelope } from '@trpc/server';
|
|
import { z } from 'zod';
|
|
import type { ProcedureBuilder } from '../trpc.js';
|
|
import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js';
|
|
import type { ConversationCreatedEvent, ConversationAnsweredEvent } from '../../events/types.js';
|
|
import { createModuleLogger } from '../../logger/index.js';
|
|
|
|
const log = createModuleLogger('conversation-router');
|
|
|
|
export function conversationProcedures(publicProcedure: ProcedureBuilder) {
|
|
return {
|
|
createConversation: publicProcedure
|
|
.input(z.object({
|
|
fromAgentId: z.string().min(1),
|
|
toAgentId: z.string().min(1).optional(),
|
|
phaseId: z.string().min(1).optional(),
|
|
taskId: z.string().min(1).optional(),
|
|
question: z.string().min(1),
|
|
}))
|
|
.mutation(async ({ ctx, input }) => {
|
|
const repo = requireConversationRepository(ctx);
|
|
const agentManager = requireAgentManager(ctx);
|
|
|
|
let toAgentId = input.toAgentId;
|
|
|
|
// Resolve target agent from taskId — prefer running, fall back to idle
|
|
if (!toAgentId && input.taskId) {
|
|
const agents = await agentManager.list();
|
|
const running = agents.find(a => a.taskId === input.taskId && a.status === 'running');
|
|
const idle = agents.find(a => a.taskId === input.taskId && a.status === 'idle');
|
|
const match = running ?? idle;
|
|
if (!match) {
|
|
throw new TRPCError({
|
|
code: 'NOT_FOUND',
|
|
message: `No running or idle agent found for task '${input.taskId}'`,
|
|
});
|
|
}
|
|
toAgentId = match.id;
|
|
}
|
|
|
|
// Resolve target agent from phaseId — prefer running, fall back to idle
|
|
if (!toAgentId && input.phaseId) {
|
|
const taskRepo = requireTaskRepository(ctx);
|
|
const tasks = await taskRepo.findByPhaseId(input.phaseId);
|
|
const taskIds = new Set(tasks.map(t => t.id));
|
|
const agents = await agentManager.list();
|
|
const running = agents.find(a => a.taskId && taskIds.has(a.taskId) && a.status === 'running');
|
|
const idle = agents.find(a => a.taskId && taskIds.has(a.taskId) && a.status === 'idle');
|
|
const match = running ?? idle;
|
|
if (!match) {
|
|
throw new TRPCError({
|
|
code: 'NOT_FOUND',
|
|
message: `No running or idle agent found for phase '${input.phaseId}'`,
|
|
});
|
|
}
|
|
toAgentId = match.id;
|
|
}
|
|
|
|
if (!toAgentId) {
|
|
throw new TRPCError({
|
|
code: 'BAD_REQUEST',
|
|
message: 'Must provide toAgentId, taskId, or phaseId to identify target agent',
|
|
});
|
|
}
|
|
|
|
const conversation = await repo.create({
|
|
fromAgentId: input.fromAgentId,
|
|
toAgentId,
|
|
initiativeId: null,
|
|
phaseId: input.phaseId ?? null,
|
|
taskId: input.taskId ?? null,
|
|
question: input.question,
|
|
});
|
|
|
|
ctx.eventBus.emit({
|
|
type: 'conversation:created' as const,
|
|
timestamp: new Date(),
|
|
payload: {
|
|
conversationId: conversation.id,
|
|
fromAgentId: input.fromAgentId,
|
|
toAgentId,
|
|
},
|
|
});
|
|
|
|
// Auto-resume idle target agent so it can answer the conversation
|
|
const targetAgent = await agentManager.get(toAgentId);
|
|
if (targetAgent && targetAgent.status === 'idle') {
|
|
try {
|
|
const resumed = await agentManager.resumeForConversation(
|
|
toAgentId, conversation.id, input.question, input.fromAgentId,
|
|
);
|
|
if (resumed) {
|
|
log.info({ conversationId: conversation.id, toAgentId }, 'auto-resumed idle agent for conversation');
|
|
}
|
|
} catch (err) {
|
|
log.warn(
|
|
{ conversationId: conversation.id, toAgentId, err: err instanceof Error ? err.message : String(err) },
|
|
'failed to auto-resume agent for conversation',
|
|
);
|
|
}
|
|
}
|
|
|
|
return conversation;
|
|
}),
|
|
|
|
getPendingConversations: publicProcedure
|
|
.input(z.object({
|
|
agentId: z.string().min(1),
|
|
}))
|
|
.query(async ({ ctx, input }) => {
|
|
const repo = requireConversationRepository(ctx);
|
|
return repo.findPendingForAgent(input.agentId);
|
|
}),
|
|
|
|
getConversation: publicProcedure
|
|
.input(z.object({
|
|
id: z.string().min(1),
|
|
}))
|
|
.query(async ({ ctx, input }) => {
|
|
const repo = requireConversationRepository(ctx);
|
|
return repo.findById(input.id);
|
|
}),
|
|
|
|
answerConversation: publicProcedure
|
|
.input(z.object({
|
|
id: z.string().min(1),
|
|
answer: z.string().min(1),
|
|
}))
|
|
.mutation(async ({ ctx, input }) => {
|
|
const repo = requireConversationRepository(ctx);
|
|
const existing = await repo.findById(input.id);
|
|
if (!existing) {
|
|
throw new TRPCError({
|
|
code: 'NOT_FOUND',
|
|
message: `Conversation '${input.id}' not found`,
|
|
});
|
|
}
|
|
if (existing.status === 'answered') {
|
|
throw new TRPCError({
|
|
code: 'BAD_REQUEST',
|
|
message: `Conversation '${input.id}' is already answered`,
|
|
});
|
|
}
|
|
|
|
const updated = await repo.answer(input.id, input.answer);
|
|
|
|
ctx.eventBus.emit({
|
|
type: 'conversation:answered' as const,
|
|
timestamp: new Date(),
|
|
payload: {
|
|
conversationId: input.id,
|
|
fromAgentId: existing.fromAgentId,
|
|
toAgentId: existing.toAgentId,
|
|
},
|
|
});
|
|
|
|
return updated;
|
|
}),
|
|
|
|
onPendingConversation: publicProcedure
|
|
.input(z.object({ agentId: z.string().min(1) }))
|
|
.subscription(async function* (opts): AsyncGenerator<TrackedEnvelope<{
|
|
conversationId: string;
|
|
fromAgentId: string;
|
|
question: string;
|
|
phaseId: string | null;
|
|
taskId: string | null;
|
|
}>> {
|
|
const { agentId } = opts.input;
|
|
const signal = opts.signal ?? new AbortController().signal;
|
|
const eventBus = opts.ctx.eventBus;
|
|
const repo = requireConversationRepository(opts.ctx);
|
|
|
|
// First yield any already-pending conversations
|
|
const existing = await repo.findPendingForAgent(agentId);
|
|
let eventCounter = 0;
|
|
for (const conv of existing) {
|
|
yield tracked(`conv-${eventCounter++}`, {
|
|
conversationId: conv.id,
|
|
fromAgentId: conv.fromAgentId,
|
|
question: conv.question,
|
|
phaseId: conv.phaseId,
|
|
taskId: conv.taskId,
|
|
});
|
|
}
|
|
|
|
// Then listen for new conversation:created events
|
|
const queue: string[] = []; // conversation IDs
|
|
let resolve: (() => void) | null = null;
|
|
|
|
const handler = (event: ConversationCreatedEvent) => {
|
|
if (event.payload.toAgentId !== agentId) return;
|
|
queue.push(event.payload.conversationId);
|
|
if (resolve) {
|
|
const r = resolve;
|
|
resolve = null;
|
|
r();
|
|
}
|
|
};
|
|
|
|
eventBus.on('conversation:created', handler);
|
|
|
|
const cleanup = () => {
|
|
eventBus.off('conversation:created', handler);
|
|
if (resolve) {
|
|
const r = resolve;
|
|
resolve = null;
|
|
r();
|
|
}
|
|
};
|
|
|
|
signal.addEventListener('abort', cleanup, { once: true });
|
|
|
|
try {
|
|
while (!signal.aborted) {
|
|
while (queue.length > 0) {
|
|
const convId = queue.shift()!;
|
|
const conv = await repo.findById(convId);
|
|
if (conv && conv.status === 'pending') {
|
|
yield tracked(`conv-${eventCounter++}`, {
|
|
conversationId: conv.id,
|
|
fromAgentId: conv.fromAgentId,
|
|
question: conv.question,
|
|
phaseId: conv.phaseId,
|
|
taskId: conv.taskId,
|
|
});
|
|
}
|
|
}
|
|
|
|
if (!signal.aborted) {
|
|
await new Promise<void>((r) => {
|
|
resolve = r;
|
|
});
|
|
}
|
|
}
|
|
} finally {
|
|
cleanup();
|
|
}
|
|
}),
|
|
|
|
onConversationAnswer: publicProcedure
|
|
.input(z.object({ conversationId: z.string().min(1) }))
|
|
.subscription(async function* (opts): AsyncGenerator<TrackedEnvelope<{ answer: string }>> {
|
|
const { conversationId } = opts.input;
|
|
const signal = opts.signal ?? new AbortController().signal;
|
|
const eventBus = opts.ctx.eventBus;
|
|
const repo = requireConversationRepository(opts.ctx);
|
|
|
|
// Check if already answered
|
|
const existing = await repo.findById(conversationId);
|
|
if (existing && existing.status === 'answered' && existing.answer) {
|
|
yield tracked('answer-0', { answer: existing.answer });
|
|
return;
|
|
}
|
|
|
|
// Listen for conversation:answered events matching this ID
|
|
let answered = false;
|
|
let resolve: (() => void) | null = null;
|
|
|
|
const handler = (event: ConversationAnsweredEvent) => {
|
|
if (event.payload.conversationId !== conversationId) return;
|
|
answered = true;
|
|
if (resolve) {
|
|
const r = resolve;
|
|
resolve = null;
|
|
r();
|
|
}
|
|
};
|
|
|
|
eventBus.on('conversation:answered', handler);
|
|
|
|
const cleanup = () => {
|
|
eventBus.off('conversation:answered', handler);
|
|
if (resolve) {
|
|
const r = resolve;
|
|
resolve = null;
|
|
r();
|
|
}
|
|
};
|
|
|
|
signal.addEventListener('abort', cleanup, { once: true });
|
|
|
|
try {
|
|
while (!signal.aborted && !answered) {
|
|
await new Promise<void>((r) => {
|
|
resolve = r;
|
|
});
|
|
}
|
|
|
|
if (answered) {
|
|
const conv = await repo.findById(conversationId);
|
|
if (conv && conv.answer) {
|
|
yield tracked('answer-0', { answer: conv.answer });
|
|
}
|
|
}
|
|
} finally {
|
|
cleanup();
|
|
}
|
|
}),
|
|
|
|
getByFromAgent: publicProcedure
|
|
.input(z.object({ agentId: z.string().min(1) }))
|
|
.query(async ({ ctx, input }) => {
|
|
const repo = requireConversationRepository(ctx);
|
|
const agentManager = requireAgentManager(ctx);
|
|
|
|
const convs = await repo.findByFromAgentId(input.agentId);
|
|
|
|
// Build toAgent name map without N+1
|
|
const toAgentIds = [...new Set(convs.map(c => c.toAgentId))];
|
|
const allAgents = toAgentIds.length > 0 ? await agentManager.list() : [];
|
|
const agentNameMap = new Map(allAgents.map(a => [a.id, a.name]));
|
|
|
|
return convs.map(c => ({
|
|
id: c.id,
|
|
timestamp: c.createdAt.toISOString(),
|
|
toAgentName: agentNameMap.get(c.toAgentId) ?? c.toAgentId,
|
|
toAgentId: c.toAgentId,
|
|
question: c.question,
|
|
answer: c.answer ?? null,
|
|
status: c.status as 'pending' | 'answered',
|
|
taskId: c.taskId ?? null,
|
|
phaseId: c.phaseId ?? null,
|
|
}));
|
|
}),
|
|
};
|
|
}
|