Files
Codewalkers/apps/server/trpc/routers/conversation.ts
Lukas May 9edc93a268 feat: Auto-resume idle agents for inter-agent conversations
When an agent asks a question via `cw ask` targeting an idle agent,
the conversation router now auto-resumes the idle agent's session so
it can answer. Previously, questions to idle agents sat unanswered
forever because target resolution only matched running agents.

Changes:
- Add `resumeForConversation()` to AgentManager interface and implement
  on MultiProviderAgentManager (mirrors resumeForCommit pattern)
- Relax createConversation target resolution: prefer running, fall back
  to idle (was running-only)
- Trigger auto-resume after conversation creation for idle targets
- Add concurrency lock (conversationResumeLocks Set) to prevent
  double-resume race conditions
2026-03-03 13:29:39 +01:00

307 lines
10 KiB
TypeScript

/**
* Conversation Router — inter-agent communication procedures
*/
import { TRPCError } from '@trpc/server';
import { tracked, type TrackedEnvelope } from '@trpc/server';
import { z } from 'zod';
import type { ProcedureBuilder } from '../trpc.js';
import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js';
import type { ConversationCreatedEvent, ConversationAnsweredEvent } from '../../events/types.js';
import { createModuleLogger } from '../../logger/index.js';
const log = createModuleLogger('conversation-router');
export function conversationProcedures(publicProcedure: ProcedureBuilder) {
return {
createConversation: publicProcedure
.input(z.object({
fromAgentId: z.string().min(1),
toAgentId: z.string().min(1).optional(),
phaseId: z.string().min(1).optional(),
taskId: z.string().min(1).optional(),
question: z.string().min(1),
}))
.mutation(async ({ ctx, input }) => {
const repo = requireConversationRepository(ctx);
const agentManager = requireAgentManager(ctx);
let toAgentId = input.toAgentId;
// Resolve target agent from taskId — prefer running, fall back to idle
if (!toAgentId && input.taskId) {
const agents = await agentManager.list();
const running = agents.find(a => a.taskId === input.taskId && a.status === 'running');
const idle = agents.find(a => a.taskId === input.taskId && a.status === 'idle');
const match = running ?? idle;
if (!match) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `No running or idle agent found for task '${input.taskId}'`,
});
}
toAgentId = match.id;
}
// Resolve target agent from phaseId — prefer running, fall back to idle
if (!toAgentId && input.phaseId) {
const taskRepo = requireTaskRepository(ctx);
const tasks = await taskRepo.findByPhaseId(input.phaseId);
const taskIds = new Set(tasks.map(t => t.id));
const agents = await agentManager.list();
const running = agents.find(a => a.taskId && taskIds.has(a.taskId) && a.status === 'running');
const idle = agents.find(a => a.taskId && taskIds.has(a.taskId) && a.status === 'idle');
const match = running ?? idle;
if (!match) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `No running or idle agent found for phase '${input.phaseId}'`,
});
}
toAgentId = match.id;
}
if (!toAgentId) {
throw new TRPCError({
code: 'BAD_REQUEST',
message: 'Must provide toAgentId, taskId, or phaseId to identify target agent',
});
}
const conversation = await repo.create({
fromAgentId: input.fromAgentId,
toAgentId,
initiativeId: null,
phaseId: input.phaseId ?? null,
taskId: input.taskId ?? null,
question: input.question,
});
ctx.eventBus.emit({
type: 'conversation:created' as const,
timestamp: new Date(),
payload: {
conversationId: conversation.id,
fromAgentId: input.fromAgentId,
toAgentId,
},
});
// Auto-resume idle target agent so it can answer the conversation
const targetAgent = await agentManager.get(toAgentId);
if (targetAgent && targetAgent.status === 'idle') {
try {
const resumed = await agentManager.resumeForConversation(
toAgentId, conversation.id, input.question, input.fromAgentId,
);
if (resumed) {
log.info({ conversationId: conversation.id, toAgentId }, 'auto-resumed idle agent for conversation');
}
} catch (err) {
log.warn(
{ conversationId: conversation.id, toAgentId, err: err instanceof Error ? err.message : String(err) },
'failed to auto-resume agent for conversation',
);
}
}
return conversation;
}),
getPendingConversations: publicProcedure
.input(z.object({
agentId: z.string().min(1),
}))
.query(async ({ ctx, input }) => {
const repo = requireConversationRepository(ctx);
return repo.findPendingForAgent(input.agentId);
}),
getConversation: publicProcedure
.input(z.object({
id: z.string().min(1),
}))
.query(async ({ ctx, input }) => {
const repo = requireConversationRepository(ctx);
return repo.findById(input.id);
}),
answerConversation: publicProcedure
.input(z.object({
id: z.string().min(1),
answer: z.string().min(1),
}))
.mutation(async ({ ctx, input }) => {
const repo = requireConversationRepository(ctx);
const existing = await repo.findById(input.id);
if (!existing) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Conversation '${input.id}' not found`,
});
}
if (existing.status === 'answered') {
throw new TRPCError({
code: 'BAD_REQUEST',
message: `Conversation '${input.id}' is already answered`,
});
}
const updated = await repo.answer(input.id, input.answer);
ctx.eventBus.emit({
type: 'conversation:answered' as const,
timestamp: new Date(),
payload: {
conversationId: input.id,
fromAgentId: existing.fromAgentId,
toAgentId: existing.toAgentId,
},
});
return updated;
}),
onPendingConversation: publicProcedure
.input(z.object({ agentId: z.string().min(1) }))
.subscription(async function* (opts): AsyncGenerator<TrackedEnvelope<{
conversationId: string;
fromAgentId: string;
question: string;
phaseId: string | null;
taskId: string | null;
}>> {
const { agentId } = opts.input;
const signal = opts.signal ?? new AbortController().signal;
const eventBus = opts.ctx.eventBus;
const repo = requireConversationRepository(opts.ctx);
// First yield any already-pending conversations
const existing = await repo.findPendingForAgent(agentId);
let eventCounter = 0;
for (const conv of existing) {
yield tracked(`conv-${eventCounter++}`, {
conversationId: conv.id,
fromAgentId: conv.fromAgentId,
question: conv.question,
phaseId: conv.phaseId,
taskId: conv.taskId,
});
}
// Then listen for new conversation:created events
const queue: string[] = []; // conversation IDs
let resolve: (() => void) | null = null;
const handler = (event: ConversationCreatedEvent) => {
if (event.payload.toAgentId !== agentId) return;
queue.push(event.payload.conversationId);
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
eventBus.on('conversation:created', handler);
const cleanup = () => {
eventBus.off('conversation:created', handler);
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
signal.addEventListener('abort', cleanup, { once: true });
try {
while (!signal.aborted) {
while (queue.length > 0) {
const convId = queue.shift()!;
const conv = await repo.findById(convId);
if (conv && conv.status === 'pending') {
yield tracked(`conv-${eventCounter++}`, {
conversationId: conv.id,
fromAgentId: conv.fromAgentId,
question: conv.question,
phaseId: conv.phaseId,
taskId: conv.taskId,
});
}
}
if (!signal.aborted) {
await new Promise<void>((r) => {
resolve = r;
});
}
}
} finally {
cleanup();
}
}),
onConversationAnswer: publicProcedure
.input(z.object({ conversationId: z.string().min(1) }))
.subscription(async function* (opts): AsyncGenerator<TrackedEnvelope<{ answer: string }>> {
const { conversationId } = opts.input;
const signal = opts.signal ?? new AbortController().signal;
const eventBus = opts.ctx.eventBus;
const repo = requireConversationRepository(opts.ctx);
// Check if already answered
const existing = await repo.findById(conversationId);
if (existing && existing.status === 'answered' && existing.answer) {
yield tracked('answer-0', { answer: existing.answer });
return;
}
// Listen for conversation:answered events matching this ID
let answered = false;
let resolve: (() => void) | null = null;
const handler = (event: ConversationAnsweredEvent) => {
if (event.payload.conversationId !== conversationId) return;
answered = true;
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
eventBus.on('conversation:answered', handler);
const cleanup = () => {
eventBus.off('conversation:answered', handler);
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
signal.addEventListener('abort', cleanup, { once: true });
try {
while (!signal.aborted && !answered) {
await new Promise<void>((r) => {
resolve = r;
});
}
if (answered) {
const conv = await repo.findById(conversationId);
if (conv && conv.answer) {
yield tracked('answer-0', { answer: conv.answer });
}
}
} finally {
cleanup();
}
}),
};
}