From 5598e1c10f85d7b26539c3cf05f62da6679938f7 Mon Sep 17 00:00:00 2001 From: Lukas May Date: Fri, 6 Mar 2026 16:40:18 +0100 Subject: [PATCH] feat: implement Radar backend tRPC procedures with repository extensions Add five new tRPC query procedures powering the Radar page's per-agent behavioral metrics (questions asked, subagent spawns, compaction events, inter-agent messages) plus the batch repository methods they require. Repository changes: - LogChunkRepository: add findByAgentIds() for batch fetching without N+1 - ConversationRepository: add countByFromAgentIds() and findByFromAgentId() - Drizzle adapters: implement all three new methods using inArray() - InMemoryConversationRepository (integration test): implement new methods tRPC procedures added: - agent.listForRadar: filtered agent list with per-agent metrics computed from log chunks (questionsCount, subagentsCount, compactionsCount) and conversation counts (messagesCount); supports timeRange/status/mode/initiative filters - agent.getCompactionEvents: compact system init chunks for one agent (cap 200) - agent.getSubagentSpawns: Agent tool_use entries with prompt preview (cap 200) - agent.getQuestionsAsked: AskUserQuestion tool calls with questions array (cap 200) - conversation.getByFromAgent: conversations by fromAgentId with toAgentName resolved All 13 new unit tests pass; existing test suite unaffected. Co-Authored-By: Claude Sonnet 4.6 --- .../repositories/conversation-repository.ts | 14 + .../db/repositories/drizzle/conversation.ts | 24 +- .../db/repositories/drizzle/log-chunk.ts | 16 +- .../db/repositories/log-chunk-repository.ts | 7 + .../real-providers/conversation.test.ts | 18 + .../server/test/unit/radar-procedures.test.ts | 476 ++++++++++++++++++ apps/server/trpc/routers/agent.ts | 190 ++++++- apps/server/trpc/routers/conversation.ts | 26 + docs/database.md | 4 +- docs/server-api.md | 5 + 10 files changed, 775 insertions(+), 5 deletions(-) create mode 100644 apps/server/test/unit/radar-procedures.test.ts diff --git a/apps/server/db/repositories/conversation-repository.ts b/apps/server/db/repositories/conversation-repository.ts index 0545ed1..66fe045 100644 --- a/apps/server/db/repositories/conversation-repository.ts +++ b/apps/server/db/repositories/conversation-repository.ts @@ -20,4 +20,18 @@ export interface ConversationRepository { findById(id: string): Promise; findPendingForAgent(toAgentId: string): Promise; answer(id: string, answer: string): Promise; + + /** + * Count conversations grouped by fromAgentId for a batch of agent IDs. + * Returns only agents that have at least one conversation (count > 0). + * Used by listForRadar to compute messagesCount without N+1 queries. + */ + countByFromAgentIds(agentIds: string[]): Promise<{ agentId: string; count: number }[]>; + + /** + * Find all conversations initiated by a given agent, ordered by createdAt ascending. + * Used by conversation.getByFromAgent drilldown procedure. + * Cap at 200 results. + */ + findByFromAgentId(agentId: string): Promise; } diff --git a/apps/server/db/repositories/drizzle/conversation.ts b/apps/server/db/repositories/drizzle/conversation.ts index 6ab11dd..1e359d3 100644 --- a/apps/server/db/repositories/drizzle/conversation.ts +++ b/apps/server/db/repositories/drizzle/conversation.ts @@ -4,7 +4,7 @@ * Implements ConversationRepository interface using Drizzle ORM. */ -import { eq, and, asc } from 'drizzle-orm'; +import { eq, and, asc, count, inArray } from 'drizzle-orm'; import { nanoid } from 'nanoid'; import type { DrizzleDatabase } from '../../index.js'; import { conversations, type Conversation } from '../../schema.js'; @@ -64,4 +64,26 @@ export class DrizzleConversationRepository implements ConversationRepository { .where(eq(conversations.id, id)); return this.findById(id); } + + async countByFromAgentIds(agentIds: string[]): Promise<{ agentId: string; count: number }[]> { + if (agentIds.length === 0) return []; + const rows = await this.db + .select({ + agentId: conversations.fromAgentId, + count: count(), + }) + .from(conversations) + .where(inArray(conversations.fromAgentId, agentIds)) + .groupBy(conversations.fromAgentId); + return rows.map(r => ({ agentId: r.agentId, count: Number(r.count) })); + } + + async findByFromAgentId(agentId: string): Promise { + return this.db + .select() + .from(conversations) + .where(eq(conversations.fromAgentId, agentId)) + .orderBy(asc(conversations.createdAt)) + .limit(200); + } } diff --git a/apps/server/db/repositories/drizzle/log-chunk.ts b/apps/server/db/repositories/drizzle/log-chunk.ts index 244b09b..9d4632b 100644 --- a/apps/server/db/repositories/drizzle/log-chunk.ts +++ b/apps/server/db/repositories/drizzle/log-chunk.ts @@ -4,7 +4,7 @@ * Implements LogChunkRepository interface using Drizzle ORM. */ -import { eq, asc, max } from 'drizzle-orm'; +import { eq, asc, max, inArray } from 'drizzle-orm'; import { nanoid } from 'nanoid'; import type { DrizzleDatabase } from '../../index.js'; import { agentLogChunks } from '../../schema.js'; @@ -41,6 +41,20 @@ export class DrizzleLogChunkRepository implements LogChunkRepository { .orderBy(asc(agentLogChunks.createdAt)); } + async findByAgentIds(agentIds: string[]): Promise<{ agentId: string; content: string; sessionNumber: number; createdAt: Date }[]> { + if (agentIds.length === 0) return []; + return this.db + .select({ + agentId: agentLogChunks.agentId, + content: agentLogChunks.content, + sessionNumber: agentLogChunks.sessionNumber, + createdAt: agentLogChunks.createdAt, + }) + .from(agentLogChunks) + .where(inArray(agentLogChunks.agentId, agentIds)) + .orderBy(asc(agentLogChunks.createdAt)); + } + async deleteByAgentId(agentId: string): Promise { await this.db .delete(agentLogChunks) diff --git a/apps/server/db/repositories/log-chunk-repository.ts b/apps/server/db/repositories/log-chunk-repository.ts index 9f83231..0283a0b 100644 --- a/apps/server/db/repositories/log-chunk-repository.ts +++ b/apps/server/db/repositories/log-chunk-repository.ts @@ -17,6 +17,13 @@ export interface LogChunkRepository { findByAgentId(agentId: string): Promise[]>; + /** + * Batch-fetch chunks for multiple agent IDs in a single query. + * Returns chunks ordered by createdAt ascending. + * agentId field is included so results can be grouped by agent. + */ + findByAgentIds(agentIds: string[]): Promise<{ agentId: string; content: string; sessionNumber: number; createdAt: Date }[]>; + deleteByAgentId(agentId: string): Promise; getSessionCount(agentId: string): Promise; diff --git a/apps/server/test/integration/real-providers/conversation.test.ts b/apps/server/test/integration/real-providers/conversation.test.ts index a3d168e..1d10ffc 100644 --- a/apps/server/test/integration/real-providers/conversation.test.ts +++ b/apps/server/test/integration/real-providers/conversation.test.ts @@ -88,6 +88,24 @@ class InMemoryConversationRepository implements ConversationRepository { return updated; } + async countByFromAgentIds(agentIds: string[]): Promise<{ agentId: string; count: number }[]> { + if (agentIds.length === 0) return []; + const counts = new Map(); + for (const conv of this.store.values()) { + if (agentIds.includes(conv.fromAgentId)) { + counts.set(conv.fromAgentId, (counts.get(conv.fromAgentId) ?? 0) + 1); + } + } + return [...counts.entries()].map(([agentId, count]) => ({ agentId, count })); + } + + async findByFromAgentId(agentId: string): Promise { + return [...this.store.values()] + .filter((c) => c.fromAgentId === agentId) + .sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime()) + .slice(0, 200); + } + /** Test helper — return all conversations */ getAll(): Conversation[] { return [...this.store.values()]; diff --git a/apps/server/test/unit/radar-procedures.test.ts b/apps/server/test/unit/radar-procedures.test.ts new file mode 100644 index 0000000..d7acba6 --- /dev/null +++ b/apps/server/test/unit/radar-procedures.test.ts @@ -0,0 +1,476 @@ +/** + * Unit tests for Radar tRPC procedures. + * + * Tests listForRadar, getCompactionEvents, getSubagentSpawns, + * getQuestionsAsked, and conversation.getByFromAgent. + * + * Uses in-memory Drizzle DB + inline MockAgentManager for isolation. + */ + +import { describe, it, expect } from 'vitest'; +import { router, publicProcedure, createCallerFactory } from '../../trpc/trpc.js'; +import { agentProcedures } from '../../trpc/routers/agent.js'; +import { conversationProcedures } from '../../trpc/routers/conversation.js'; +import type { TRPCContext } from '../../trpc/context.js'; +import type { AgentManager, AgentInfo, PendingQuestions } from '../../agent/types.js'; +import { createTestDatabase } from '../../db/repositories/drizzle/test-helpers.js'; +import { + DrizzleAgentRepository, + DrizzleLogChunkRepository, + DrizzleConversationRepository, + DrizzleInitiativeRepository, + DrizzleTaskRepository, +} from '../../db/repositories/drizzle/index.js'; + +// ============================================================================= +// MockAgentManager +// ============================================================================= + +class MockAgentManager implements AgentManager { + private agents: AgentInfo[] = []; + private questions: Map = new Map(); + + addAgent(info: Partial & Pick): void { + this.agents.push({ + taskId: null, + initiativeId: null, + sessionId: null, + worktreeId: info.id, + mode: 'execute', + provider: 'claude', + accountId: null, + createdAt: new Date(), + updatedAt: new Date(), + userDismissedAt: null, + exitCode: null, + prompt: null, + ...info, + }); + } + + setQuestions(agentId: string, questions: PendingQuestions): void { + this.questions.set(agentId, questions); + } + + async list(): Promise { + return [...this.agents]; + } + + async getPendingQuestions(agentId: string): Promise { + return this.questions.get(agentId) ?? null; + } + + async spawn(): Promise { throw new Error('Not implemented'); } + async stop(): Promise { throw new Error('Not implemented'); } + async get(): Promise { return null; } + async getByName(): Promise { return null; } + async resume(): Promise { throw new Error('Not implemented'); } + async getResult() { return null; } + async delete(): Promise { throw new Error('Not implemented'); } + async dismiss(): Promise { throw new Error('Not implemented'); } + async resumeForConversation(): Promise { return false; } +} + +// ============================================================================= +// Test routers +// ============================================================================= + +const agentRouter = router({ ...agentProcedures(publicProcedure) }); +const conversationRouter = router({ ...conversationProcedures(publicProcedure) }); + +const createAgentCaller = createCallerFactory(agentRouter); +const createConversationCaller = createCallerFactory(conversationRouter); + +// ============================================================================= +// Helpers +// ============================================================================= + +function makeCtx(agentManager: MockAgentManager): TRPCContext { + const db = createTestDatabase(); + return { + eventBus: { emit: () => {}, on: () => {}, off: () => {} } as unknown as TRPCContext['eventBus'], + serverStartedAt: null, + processCount: 0, + agentManager, + logChunkRepository: new DrizzleLogChunkRepository(db), + conversationRepository: new DrizzleConversationRepository(db), + initiativeRepository: new DrizzleInitiativeRepository(db), + taskRepository: new DrizzleTaskRepository(db), + // Expose DB-backed agent repo seeder via a non-context helper + _agentRepository: new DrizzleAgentRepository(db), + } as unknown as TRPCContext & { _agentRepository: DrizzleAgentRepository }; +} + +// Typed helper to access seeder repos +function getRepos(ctx: ReturnType) { + const c = ctx as unknown as { + _agentRepository: DrizzleAgentRepository; + logChunkRepository: DrizzleLogChunkRepository; + conversationRepository: DrizzleConversationRepository; + initiativeRepository: DrizzleInitiativeRepository; + taskRepository: DrizzleTaskRepository; + }; + return { + agentRepo: c._agentRepository, + logChunkRepo: c.logChunkRepository, + convRepo: c.conversationRepository, + initiativeRepo: c.initiativeRepository, + taskRepo: c.taskRepository, + }; +} + +// ============================================================================= +// Tests: agent.listForRadar +// ============================================================================= + +describe('agent.listForRadar', () => { + it('timeRange=24h — excludes agents older than 24h', async () => { + const agents = new MockAgentManager(); + const ctx = makeCtx(agents); + + const oldDate = new Date(Date.now() - 48 * 3600_000); + const recentDate = new Date(Date.now() - 12 * 3600_000); + + agents.addAgent({ id: 'agent-old', name: 'old-agent', status: 'stopped', createdAt: oldDate }); + agents.addAgent({ id: 'agent-recent', name: 'recent-agent', status: 'running', createdAt: recentDate }); + + const caller = createAgentCaller(ctx); + const result = await caller.listForRadar({ timeRange: '24h' }); + + expect(result.map(r => r.id)).not.toContain('agent-old'); + expect(result.map(r => r.id)).toContain('agent-recent'); + }); + + it('status=running filter — only running agents returned', async () => { + const agents = new MockAgentManager(); + const ctx = makeCtx(agents); + + const now = new Date(); + agents.addAgent({ id: 'agent-running', name: 'running-agent', status: 'running', createdAt: now }); + agents.addAgent({ id: 'agent-stopped', name: 'stopped-agent', status: 'stopped', createdAt: now }); + + const caller = createAgentCaller(ctx); + const result = await caller.listForRadar({ timeRange: 'all', status: 'running' }); + + expect(result).toHaveLength(1); + expect(result[0].id).toBe('agent-running'); + expect(result[0].status).toBe('running'); + }); + + it('status=completed filter — maps to stopped in DB', async () => { + const agents = new MockAgentManager(); + const ctx = makeCtx(agents); + + const now = new Date(); + agents.addAgent({ id: 'agent-stopped', name: 'stopped-agent', status: 'stopped', createdAt: now }); + agents.addAgent({ id: 'agent-running', name: 'running-agent', status: 'running', createdAt: now }); + + const caller = createAgentCaller(ctx); + const result = await caller.listForRadar({ timeRange: 'all', status: 'completed' }); + + expect(result).toHaveLength(1); + expect(result[0].id).toBe('agent-stopped'); + }); + + it('mode=execute filter — only execute agents returned', async () => { + const agents = new MockAgentManager(); + const ctx = makeCtx(agents); + + const now = new Date(); + agents.addAgent({ id: 'agent-exec', name: 'exec-agent', status: 'running', mode: 'execute', createdAt: now }); + agents.addAgent({ id: 'agent-plan', name: 'plan-agent', status: 'running', mode: 'plan', createdAt: now }); + + const caller = createAgentCaller(ctx); + const result = await caller.listForRadar({ timeRange: 'all', mode: 'execute' }); + + expect(result).toHaveLength(1); + expect(result[0].id).toBe('agent-exec'); + }); + + it('computes messagesCount from conversations table', async () => { + const agents = new MockAgentManager(); + const ctx = makeCtx(agents); + const { agentRepo, convRepo } = getRepos(ctx); + + const now = new Date(); + // Seed in DB (needed for FK on conversations) + const fromAgent = await agentRepo.create({ name: 'from-agent', worktreeId: 'wt-from', status: 'running' }); + const toAgent = await agentRepo.create({ name: 'to-agent', worktreeId: 'wt-to', status: 'running' }); + + // Seed in MockAgentManager for agentManager.list() + agents.addAgent({ id: fromAgent.id, name: fromAgent.name, status: 'running', createdAt: now }); + agents.addAgent({ id: toAgent.id, name: toAgent.name, status: 'running', createdAt: now }); + + // Create 3 conversations from fromAgent to toAgent + await convRepo.create({ fromAgentId: fromAgent.id, toAgentId: toAgent.id, question: 'Q1' }); + await convRepo.create({ fromAgentId: fromAgent.id, toAgentId: toAgent.id, question: 'Q2' }); + await convRepo.create({ fromAgentId: fromAgent.id, toAgentId: toAgent.id, question: 'Q3' }); + + const caller = createAgentCaller(ctx); + const result = await caller.listForRadar({ timeRange: 'all' }); + + const fromRow = result.find(r => r.id === fromAgent.id); + expect(fromRow).toBeDefined(); + expect(fromRow!.messagesCount).toBe(3); + + const toRow = result.find(r => r.id === toAgent.id); + expect(toRow!.messagesCount).toBe(0); + }); + + it('computes subagentsCount from log chunks', async () => { + const agents = new MockAgentManager(); + const ctx = makeCtx(agents); + const { logChunkRepo } = getRepos(ctx); + + const now = new Date(); + agents.addAgent({ id: 'agent-1', name: 'agent-one', status: 'running', createdAt: now }); + + await logChunkRepo.insertChunk({ + agentId: 'agent-1', + agentName: 'agent-one', + sessionNumber: 1, + content: JSON.stringify({ type: 'tool_use', name: 'Agent', input: { description: 'do stuff', prompt: 'some prompt' } }), + }); + + const caller = createAgentCaller(ctx); + const result = await caller.listForRadar({ timeRange: 'all' }); + + const row = result.find(r => r.id === 'agent-1'); + expect(row).toBeDefined(); + expect(row!.subagentsCount).toBe(1); + expect(row!.questionsCount).toBe(0); + expect(row!.compactionsCount).toBe(0); + }); + + it('computes compactionsCount from log chunks', async () => { + const agents = new MockAgentManager(); + const ctx = makeCtx(agents); + const { logChunkRepo } = getRepos(ctx); + + const now = new Date(); + agents.addAgent({ id: 'agent-1', name: 'agent-one', status: 'running', createdAt: now }); + + await logChunkRepo.insertChunk({ + agentId: 'agent-1', + agentName: 'agent-one', + sessionNumber: 1, + content: JSON.stringify({ type: 'system', subtype: 'init', source: 'compact' }), + }); + + const caller = createAgentCaller(ctx); + const result = await caller.listForRadar({ timeRange: 'all' }); + + const row = result.find(r => r.id === 'agent-1'); + expect(row).toBeDefined(); + expect(row!.compactionsCount).toBe(1); + expect(row!.subagentsCount).toBe(0); + expect(row!.questionsCount).toBe(0); + }); + + it('computes questionsCount from log chunks — sums questions array length', async () => { + const agents = new MockAgentManager(); + const ctx = makeCtx(agents); + const { logChunkRepo } = getRepos(ctx); + + const now = new Date(); + agents.addAgent({ id: 'agent-1', name: 'agent-one', status: 'running', createdAt: now }); + + await logChunkRepo.insertChunk({ + agentId: 'agent-1', + agentName: 'agent-one', + sessionNumber: 1, + content: JSON.stringify({ + type: 'tool_use', + name: 'AskUserQuestion', + input: { + questions: [ + { question: 'First?', header: 'H1', options: [] }, + { question: 'Second?', header: 'H2', options: [] }, + ], + }, + }), + }); + + const caller = createAgentCaller(ctx); + const result = await caller.listForRadar({ timeRange: 'all' }); + + const row = result.find(r => r.id === 'agent-1'); + expect(row).toBeDefined(); + expect(row!.questionsCount).toBe(2); + expect(row!.subagentsCount).toBe(0); + expect(row!.compactionsCount).toBe(0); + }); + + it('handles malformed JSON in log chunks without throwing', async () => { + const agents = new MockAgentManager(); + const ctx = makeCtx(agents); + const { logChunkRepo } = getRepos(ctx); + + const now = new Date(); + agents.addAgent({ id: 'agent-1', name: 'agent-one', status: 'running', createdAt: now }); + + await logChunkRepo.insertChunk({ + agentId: 'agent-1', + agentName: 'agent-one', + sessionNumber: 1, + content: 'not valid json {{{', + }); + + const caller = createAgentCaller(ctx); + // Should not throw + const result = await caller.listForRadar({ timeRange: 'all' }); + const row = result.find(r => r.id === 'agent-1'); + expect(row).toBeDefined(); + expect(row!.questionsCount).toBe(0); + expect(row!.subagentsCount).toBe(0); + expect(row!.compactionsCount).toBe(0); + }); +}); + +// ============================================================================= +// Tests: agent.getCompactionEvents +// ============================================================================= + +describe('agent.getCompactionEvents', () => { + it('returns compaction events sorted ascending, capped at 200', async () => { + const agents = new MockAgentManager(); + const ctx = makeCtx(agents); + const { logChunkRepo } = getRepos(ctx); + + // Seed 201 compaction chunks + for (let i = 0; i < 201; i++) { + await logChunkRepo.insertChunk({ + agentId: 'agent-compact', + agentName: 'compact-agent', + sessionNumber: i + 1, + content: JSON.stringify({ type: 'system', subtype: 'init', source: 'compact' }), + }); + } + + const caller = createAgentCaller(ctx); + const result = await caller.getCompactionEvents({ agentId: 'agent-compact' }); + + expect(result).toHaveLength(200); + // Each result has correct shape + expect(result[0]).toHaveProperty('timestamp'); + expect(result[0]).toHaveProperty('sessionNumber'); + expect(typeof result[0].timestamp).toBe('string'); + expect(typeof result[0].sessionNumber).toBe('number'); + // Sorted ascending — sessionNumber of first should be lower than last + expect(result[0].sessionNumber).toBeLessThan(result[199].sessionNumber); + }); +}); + +// ============================================================================= +// Tests: agent.getSubagentSpawns +// ============================================================================= + +describe('agent.getSubagentSpawns', () => { + it('returns spawns with correct promptPreview (first 200 chars)', async () => { + const agents = new MockAgentManager(); + const ctx = makeCtx(agents); + const { logChunkRepo } = getRepos(ctx); + + const fullPrompt = 'x'.repeat(300); + + await logChunkRepo.insertChunk({ + agentId: 'agent-spawn', + agentName: 'spawn-agent', + sessionNumber: 1, + content: JSON.stringify({ + type: 'tool_use', + name: 'Agent', + input: { description: 'my subagent', prompt: fullPrompt }, + }), + }); + + const caller = createAgentCaller(ctx); + const result = await caller.getSubagentSpawns({ agentId: 'agent-spawn' }); + + expect(result).toHaveLength(1); + expect(result[0].promptPreview).toHaveLength(200); + expect(result[0].fullPrompt).toHaveLength(300); + expect(result[0].description).toBe('my subagent'); + expect(typeof result[0].timestamp).toBe('string'); + }); +}); + +// ============================================================================= +// Tests: agent.getQuestionsAsked +// ============================================================================= + +describe('agent.getQuestionsAsked', () => { + it('returns questions arrays correctly', async () => { + const agents = new MockAgentManager(); + const ctx = makeCtx(agents); + const { logChunkRepo } = getRepos(ctx); + + await logChunkRepo.insertChunk({ + agentId: 'agent-q', + agentName: 'question-agent', + sessionNumber: 1, + content: JSON.stringify({ + type: 'tool_use', + name: 'AskUserQuestion', + input: { + questions: [ + { question: 'Which way?', header: 'Direction', options: [{ label: 'Left', description: 'Go left' }] }, + { question: 'How fast?', header: 'Speed', options: [{ label: 'Fast', description: 'Go fast' }] }, + ], + }, + }), + }); + + const caller = createAgentCaller(ctx); + const result = await caller.getQuestionsAsked({ agentId: 'agent-q' }); + + expect(result).toHaveLength(1); + expect(result[0].questions).toHaveLength(2); + expect(result[0].questions[0].question).toBe('Which way?'); + expect(result[0].questions[0].header).toBe('Direction'); + expect(result[0].questions[0].options).toHaveLength(1); + expect(result[0].questions[0].options[0].label).toBe('Left'); + expect(result[0].questions[1].question).toBe('How fast?'); + expect(typeof result[0].timestamp).toBe('string'); + }); +}); + +// ============================================================================= +// Tests: conversation.getByFromAgent +// ============================================================================= + +describe('conversation.getByFromAgent', () => { + it('returns conversations with toAgentName resolved', async () => { + const agents = new MockAgentManager(); + const ctx = makeCtx(agents); + const { agentRepo, convRepo } = getRepos(ctx); + + // Seed agents in DB (FK requirement) + const fromAgent = await agentRepo.create({ name: 'from-agent', worktreeId: 'wt-from', status: 'running' }); + const toAgent = await agentRepo.create({ name: 'to-agent', worktreeId: 'wt-to', status: 'running' }); + + // Seed in MockAgentManager for name resolution + agents.addAgent({ id: fromAgent.id, name: 'from-agent', status: 'running' }); + agents.addAgent({ id: toAgent.id, name: 'to-agent', status: 'running' }); + + // Create conversation + await convRepo.create({ + fromAgentId: fromAgent.id, + toAgentId: toAgent.id, + question: 'What is 2+2?', + }); + + const caller = createConversationCaller(ctx); + const result = await caller.getByFromAgent({ agentId: fromAgent.id }); + + expect(result).toHaveLength(1); + expect(result[0].toAgentName).toBe('to-agent'); + expect(result[0].toAgentId).toBe(toAgent.id); + expect(result[0].question).toBe('What is 2+2?'); + expect(result[0].status).toBe('pending'); + expect(result[0].answer).toBeNull(); + expect(typeof result[0].timestamp).toBe('string'); + expect(result[0].taskId).toBeNull(); + expect(result[0].phaseId).toBeNull(); + }); +}); diff --git a/apps/server/trpc/routers/agent.ts b/apps/server/trpc/routers/agent.ts index bdf6395..644b814 100644 --- a/apps/server/trpc/routers/agent.ts +++ b/apps/server/trpc/routers/agent.ts @@ -11,7 +11,23 @@ import type { ProcedureBuilder } from '../trpc.js'; import type { TRPCContext } from '../context.js'; import type { AgentInfo, AgentResult, PendingQuestions } from '../../agent/types.js'; import type { AgentOutputEvent } from '../../events/types.js'; -import { requireAgentManager, requireLogChunkRepository, requireTaskRepository, requireInitiativeRepository } from './_helpers.js'; +import { requireAgentManager, requireLogChunkRepository, requireTaskRepository, requireInitiativeRepository, requireConversationRepository } from './_helpers.js'; + +export type AgentRadarRow = { + id: string; + name: string; + mode: string; + status: string; + initiativeId: string | null; + initiativeName: string | null; + taskId: string | null; + taskName: string | null; + createdAt: string; + questionsCount: number; + messagesCount: number; + subagentsCount: number; + compactionsCount: number; +}; export const spawnAgentInputSchema = z.object({ name: z.string().min(1).optional(), @@ -410,5 +426,177 @@ export function agentProcedures(publicProcedure: ProcedureBuilder) { return { content: truncateIfNeeded(raw) }; }), + + listForRadar: publicProcedure + .input(z.object({ + timeRange: z.enum(['1h', '6h', '24h', '7d', 'all']).default('24h'), + status: z.enum(['running', 'completed', 'crashed']).optional(), + initiativeId: z.string().optional(), + mode: z.enum(['execute', 'discuss', 'plan', 'detail', 'refine', 'chat', 'errand']).optional(), + })) + .query(async ({ ctx, input }): Promise => { + const agentManager = requireAgentManager(ctx); + const allAgents = await agentManager.list(); + + // Compute cutoff + const cutoffMap: Record = { + '1h': Date.now() - 3_600_000, + '6h': Date.now() - 21_600_000, + '24h': Date.now() - 86_400_000, + '7d': Date.now() - 604_800_000, + }; + const cutoff = input.timeRange !== 'all' ? cutoffMap[input.timeRange] : null; + + // Filter agents + let filteredAgents = allAgents; + if (cutoff !== null) { + filteredAgents = filteredAgents.filter(a => a.createdAt.getTime() >= cutoff!); + } + if (input.status !== undefined) { + const dbStatus = input.status === 'completed' ? 'stopped' : input.status; + filteredAgents = filteredAgents.filter(a => a.status === dbStatus); + } + if (input.initiativeId !== undefined) { + filteredAgents = filteredAgents.filter(a => a.initiativeId === input.initiativeId); + } + if (input.mode !== undefined) { + filteredAgents = filteredAgents.filter(a => a.mode === input.mode); + } + + const matchingIds = filteredAgents.map(a => a.id); + + // Batch fetch in parallel + const logChunkRepo = requireLogChunkRepository(ctx); + const conversationRepo = requireConversationRepository(ctx); + const initiativeRepo = requireInitiativeRepository(ctx); + const taskRepo = requireTaskRepository(ctx); + + // Collect unique taskIds and initiativeIds for batch lookup + const uniqueTaskIds = [...new Set(filteredAgents.map(a => a.taskId).filter(Boolean) as string[])]; + const uniqueInitiativeIds = [...new Set(filteredAgents.map(a => a.initiativeId).filter(Boolean) as string[])]; + + const [chunks, messageCounts, taskResults, initiativeResults] = await Promise.all([ + logChunkRepo.findByAgentIds(matchingIds), + conversationRepo.countByFromAgentIds(matchingIds), + Promise.all(uniqueTaskIds.map(id => taskRepo.findById(id))), + Promise.all(uniqueInitiativeIds.map(id => initiativeRepo.findById(id))), + ]); + + // Build lookup maps + const taskMap = new Map(taskResults.filter(Boolean).map(t => [t!.id, t!.name])); + const initiativeMap = new Map(initiativeResults.filter(Boolean).map(i => [i!.id, i!.name])); + const messagesMap = new Map(messageCounts.map(m => [m.agentId, m.count])); + + // Group chunks by agentId + const chunksByAgent = new Map(); + for (const chunk of chunks) { + const existing = chunksByAgent.get(chunk.agentId); + if (existing) { + existing.push(chunk); + } else { + chunksByAgent.set(chunk.agentId, [chunk]); + } + } + + // Build result rows + return filteredAgents.map(agent => { + const agentChunks = chunksByAgent.get(agent.id) ?? []; + let questionsCount = 0; + let subagentsCount = 0; + let compactionsCount = 0; + + for (const chunk of agentChunks) { + try { + const parsed = JSON.parse(chunk.content); + if (parsed.type === 'tool_use' && parsed.name === 'AskUserQuestion') { + questionsCount += parsed.input?.questions?.length ?? 0; + } else if (parsed.type === 'tool_use' && parsed.name === 'Agent') { + subagentsCount++; + } else if (parsed.type === 'system' && parsed.subtype === 'init' && parsed.source === 'compact') { + compactionsCount++; + } + } catch { /* skip malformed */ } + } + + return { + id: agent.id, + name: agent.name, + mode: agent.mode, + status: agent.status, + initiativeId: agent.initiativeId, + initiativeName: agent.initiativeId ? (initiativeMap.get(agent.initiativeId) ?? null) : null, + taskId: agent.taskId, + taskName: agent.taskId ? (taskMap.get(agent.taskId) ?? null) : null, + createdAt: agent.createdAt.toISOString(), + questionsCount, + messagesCount: messagesMap.get(agent.id) ?? 0, + subagentsCount, + compactionsCount, + }; + }); + }), + + getCompactionEvents: publicProcedure + .input(z.object({ agentId: z.string().min(1) })) + .query(async ({ ctx, input }) => { + const logChunkRepo = requireLogChunkRepository(ctx); + const chunks = await logChunkRepo.findByAgentId(input.agentId); + const results: { timestamp: string; sessionNumber: number }[] = []; + for (const chunk of chunks) { + try { + const parsed = JSON.parse(chunk.content); + if (parsed.type === 'system' && parsed.subtype === 'init' && parsed.source === 'compact') { + results.push({ timestamp: chunk.createdAt.toISOString(), sessionNumber: chunk.sessionNumber }); + } + } catch { /* skip malformed */ } + if (results.length >= 200) break; + } + return results; + }), + + getSubagentSpawns: publicProcedure + .input(z.object({ agentId: z.string().min(1) })) + .query(async ({ ctx, input }) => { + const logChunkRepo = requireLogChunkRepository(ctx); + const chunks = await logChunkRepo.findByAgentId(input.agentId); + const results: { timestamp: string; description: string; promptPreview: string; fullPrompt: string }[] = []; + for (const chunk of chunks) { + try { + const parsed = JSON.parse(chunk.content); + if (parsed.type === 'tool_use' && parsed.name === 'Agent') { + const fullPrompt: string = parsed.input?.prompt ?? ''; + const description: string = parsed.input?.description ?? ''; + results.push({ + timestamp: chunk.createdAt.toISOString(), + description, + promptPreview: fullPrompt.slice(0, 200), + fullPrompt, + }); + } + } catch { /* skip malformed */ } + if (results.length >= 200) break; + } + return results; + }), + + getQuestionsAsked: publicProcedure + .input(z.object({ agentId: z.string().min(1) })) + .query(async ({ ctx, input }) => { + const logChunkRepo = requireLogChunkRepository(ctx); + const chunks = await logChunkRepo.findByAgentId(input.agentId); + type QuestionItem = { question: string; header: string; options: { label: string; description: string }[] }; + const results: { timestamp: string; questions: QuestionItem[] }[] = []; + for (const chunk of chunks) { + try { + const parsed = JSON.parse(chunk.content); + if (parsed.type === 'tool_use' && parsed.name === 'AskUserQuestion') { + const questions: QuestionItem[] = parsed.input?.questions ?? []; + results.push({ timestamp: chunk.createdAt.toISOString(), questions }); + } + } catch { /* skip malformed */ } + if (results.length >= 200) break; + } + return results; + }), }; } diff --git a/apps/server/trpc/routers/conversation.ts b/apps/server/trpc/routers/conversation.ts index c5486fd..96f4907 100644 --- a/apps/server/trpc/routers/conversation.ts +++ b/apps/server/trpc/routers/conversation.ts @@ -302,5 +302,31 @@ export function conversationProcedures(publicProcedure: ProcedureBuilder) { cleanup(); } }), + + getByFromAgent: publicProcedure + .input(z.object({ agentId: z.string().min(1) })) + .query(async ({ ctx, input }) => { + const repo = requireConversationRepository(ctx); + const agentManager = requireAgentManager(ctx); + + const convs = await repo.findByFromAgentId(input.agentId); + + // Build toAgent name map without N+1 + const toAgentIds = [...new Set(convs.map(c => c.toAgentId))]; + const allAgents = toAgentIds.length > 0 ? await agentManager.list() : []; + const agentNameMap = new Map(allAgents.map(a => [a.id, a.name])); + + return convs.map(c => ({ + id: c.id, + timestamp: c.createdAt.toISOString(), + toAgentName: agentNameMap.get(c.toAgentId) ?? c.toAgentId, + toAgentId: c.toAgentId, + question: c.question, + answer: c.answer ?? null, + status: c.status as 'pending' | 'answered', + taskId: c.taskId ?? null, + phaseId: c.phaseId ?? null, + })); + }), }; } diff --git a/docs/database.md b/docs/database.md index 2a6e994..3d7ccd5 100644 --- a/docs/database.md +++ b/docs/database.md @@ -245,8 +245,8 @@ Index: `(phaseId)`. | ProjectRepository | + junction ops: setInitiativeProjects (diff-based), findProjectsByInitiativeId | | AccountRepository | + findNextAvailable (round-robin), markExhausted, clearExpiredExhaustion | | ProposalRepository | + findByAgentIdAndStatus, updateManyByAgentId, countByAgentIdAndStatus | -| LogChunkRepository | insertChunk, findByAgentId, deleteByAgentId, getSessionCount | -| ConversationRepository | create, findById, findPendingForAgent, answer | +| LogChunkRepository | insertChunk, findByAgentId, findByAgentIds (batch), deleteByAgentId, getSessionCount | +| ConversationRepository | create, findById, findPendingForAgent, answer, countByFromAgentIds (batch), findByFromAgentId | | ChatSessionRepository | createSession, findActiveSession, findActiveSessionByAgentId, updateSession, createMessage, findMessagesBySessionId | | ReviewCommentRepository | create, findByPhaseId, resolve, unresolve, delete | | ErrandRepository | create, findById, findAll (filter by projectId/status), update, delete | diff --git a/docs/server-api.md b/docs/server-api.md index b064576..f74dbb2 100644 --- a/docs/server-api.md +++ b/docs/server-api.md @@ -69,6 +69,10 @@ Each procedure uses `require*Repository(ctx)` helpers that throw `TRPCError(INTE | getActiveRefineAgent | query | Active refine agent for initiative | | getActiveConflictAgent | query | Active conflict resolution agent for initiative (name starts with `conflict-`) | | listWaitingAgents | query | Agents waiting for input | +| listForRadar | query | Radar page: per-agent metrics (questionsCount, messagesCount, subagentsCount, compactionsCount) with time/status/mode/initiative filters | +| getCompactionEvents | query | Compaction events for one agent: `{agentId}` → `{timestamp, sessionNumber}[]` (cap 200) | +| getSubagentSpawns | query | Subagent spawn events for one agent: `{agentId}` → `{timestamp, description, promptPreview, fullPrompt}[]` (cap 200) | +| getQuestionsAsked | query | AskUserQuestion tool calls for one agent: `{agentId}` → `{timestamp, questions[]}[]` (cap 200) | | onAgentOutput | subscription | Live raw JSONL output stream via EventBus | ### Tasks @@ -254,6 +258,7 @@ Inter-agent communication for parallel agents. | `getPendingConversations` | query | Poll for incoming questions: `{agentId}` → Conversation[] | | `getConversation` | query | Get conversation by ID: `{id}` → Conversation | | `answerConversation` | mutation | Answer a conversation: `{id, answer}` → Conversation | +| `getByFromAgent` | query | Radar drilldown: all conversations sent by agent: `{agentId}` → `{id, timestamp, toAgentName, toAgentId, question, answer, status, taskId, phaseId}[]` (cap 200) | Target resolution: `toAgentId` → direct; `taskId` → find running agent by task; `phaseId` → find running agent by any task in phase.