feat: Auto-resume idle agents for inter-agent conversations
When an agent asks a question via `cw ask` targeting an idle agent, the conversation router now auto-resumes the idle agent's session so it can answer. Previously, questions to idle agents sat unanswered forever because target resolution only matched running agents. Changes: - Add `resumeForConversation()` to AgentManager interface and implement on MultiProviderAgentManager (mirrors resumeForCommit pattern) - Relax createConversation target resolution: prefer running, fall back to idle (was running-only) - Trigger auto-resume after conversation creation for idle targets - Add concurrency lock (conversationResumeLocks Set) to prevent double-resume race conditions
This commit is contained in:
@@ -471,6 +471,42 @@ export class MockAgentManager implements AgentManager {
|
||||
record.info.updatedAt = now;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume an idle agent to answer an inter-agent conversation.
|
||||
* Mock implementation: marks agent as running and schedules immediate completion.
|
||||
*/
|
||||
async resumeForConversation(
|
||||
agentId: string,
|
||||
conversationId: string,
|
||||
question: string,
|
||||
fromAgentId: string,
|
||||
): Promise<boolean> {
|
||||
const record = this.agents.get(agentId);
|
||||
if (!record || record.info.status !== 'idle' || !record.info.sessionId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
record.info.status = 'running';
|
||||
record.info.updatedAt = new Date();
|
||||
|
||||
if (this.eventBus) {
|
||||
const event: AgentResumedEvent = {
|
||||
type: 'agent:resumed',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
agentId,
|
||||
name: record.info.name,
|
||||
taskId: record.info.taskId,
|
||||
sessionId: record.info.sessionId,
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
|
||||
this.scheduleCompletion(agentId, { status: 'done', delay: 0, result: 'Answered conversation' });
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all agents and pending timers.
|
||||
* Useful for test cleanup.
|
||||
|
||||
@@ -237,4 +237,24 @@ export interface AgentManager {
|
||||
* @param agentId - Agent to dismiss
|
||||
*/
|
||||
dismiss(agentId: string): Promise<void>;
|
||||
|
||||
/**
|
||||
* Resume an idle agent to answer an inter-agent conversation.
|
||||
*
|
||||
* When Agent A asks Agent B a question via `cw ask` and Agent B is idle,
|
||||
* this resumes B's session with a prompt to answer via `cw answer` and
|
||||
* drain any remaining pending conversations via `cw listen`.
|
||||
*
|
||||
* @param agentId - The idle agent to resume
|
||||
* @param conversationId - The conversation that triggered the resume
|
||||
* @param question - The question being asked
|
||||
* @param fromAgentId - The agent asking the question
|
||||
* @returns true if resume was initiated, false if not possible
|
||||
*/
|
||||
resumeForConversation(
|
||||
agentId: string,
|
||||
conversationId: string,
|
||||
question: string,
|
||||
fromAgentId: string,
|
||||
): Promise<boolean>;
|
||||
}
|
||||
|
||||
@@ -80,6 +80,7 @@ function createMockAgentManager(
|
||||
resume: vi.fn().mockResolvedValue(undefined),
|
||||
getResult: vi.fn().mockResolvedValue(null),
|
||||
getPendingQuestions: vi.fn().mockResolvedValue(null),
|
||||
resumeForConversation: vi.fn().mockResolvedValue(false),
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -254,6 +254,11 @@ export interface TestHarness {
|
||||
*/
|
||||
getPendingQuestions(agentId: string): Promise<PendingQuestions | null>;
|
||||
|
||||
/**
|
||||
* Resume an idle agent to answer a conversation (mock: always returns false).
|
||||
*/
|
||||
resumeForConversation(agentId: string, conversationId: string, question: string, fromAgentId: string): Promise<boolean>;
|
||||
|
||||
/**
|
||||
* Get events by type.
|
||||
*/
|
||||
@@ -505,6 +510,9 @@ export function createTestHarness(): TestHarness {
|
||||
|
||||
getPendingQuestions: (agentId: string) => agentManager.getPendingQuestions(agentId),
|
||||
|
||||
resumeForConversation: (agentId: string, conversationId: string, question: string, fromAgentId: string) =>
|
||||
agentManager.resumeForConversation(agentId, conversationId, question, fromAgentId),
|
||||
|
||||
getEventsByType: (type: string) => eventBus.getEventsByType(type),
|
||||
|
||||
getEmittedEvents: (type: string) => eventBus.getEventsByType(type),
|
||||
|
||||
@@ -8,6 +8,9 @@ import { z } from 'zod';
|
||||
import type { ProcedureBuilder } from '../trpc.js';
|
||||
import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js';
|
||||
import type { ConversationCreatedEvent, ConversationAnsweredEvent } from '../../events/types.js';
|
||||
import { createModuleLogger } from '../../logger/index.js';
|
||||
|
||||
const log = createModuleLogger('conversation-router');
|
||||
|
||||
export function conversationProcedures(publicProcedure: ProcedureBuilder) {
|
||||
return {
|
||||
@@ -25,30 +28,34 @@ export function conversationProcedures(publicProcedure: ProcedureBuilder) {
|
||||
|
||||
let toAgentId = input.toAgentId;
|
||||
|
||||
// Resolve target agent from taskId
|
||||
// Resolve target agent from taskId — prefer running, fall back to idle
|
||||
if (!toAgentId && input.taskId) {
|
||||
const agents = await agentManager.list();
|
||||
const match = agents.find(a => a.taskId === input.taskId && a.status === 'running');
|
||||
const running = agents.find(a => a.taskId === input.taskId && a.status === 'running');
|
||||
const idle = agents.find(a => a.taskId === input.taskId && a.status === 'idle');
|
||||
const match = running ?? idle;
|
||||
if (!match) {
|
||||
throw new TRPCError({
|
||||
code: 'NOT_FOUND',
|
||||
message: `No running agent found for task '${input.taskId}'`,
|
||||
message: `No running or idle agent found for task '${input.taskId}'`,
|
||||
});
|
||||
}
|
||||
toAgentId = match.id;
|
||||
}
|
||||
|
||||
// Resolve target agent from phaseId
|
||||
// Resolve target agent from phaseId — prefer running, fall back to idle
|
||||
if (!toAgentId && input.phaseId) {
|
||||
const taskRepo = requireTaskRepository(ctx);
|
||||
const tasks = await taskRepo.findByPhaseId(input.phaseId);
|
||||
const taskIds = new Set(tasks.map(t => t.id));
|
||||
const agents = await agentManager.list();
|
||||
const match = agents.find(a => a.taskId && taskIds.has(a.taskId) && a.status === 'running');
|
||||
const running = agents.find(a => a.taskId && taskIds.has(a.taskId) && a.status === 'running');
|
||||
const idle = agents.find(a => a.taskId && taskIds.has(a.taskId) && a.status === 'idle');
|
||||
const match = running ?? idle;
|
||||
if (!match) {
|
||||
throw new TRPCError({
|
||||
code: 'NOT_FOUND',
|
||||
message: `No running agent found for phase '${input.phaseId}'`,
|
||||
message: `No running or idle agent found for phase '${input.phaseId}'`,
|
||||
});
|
||||
}
|
||||
toAgentId = match.id;
|
||||
@@ -80,6 +87,24 @@ export function conversationProcedures(publicProcedure: ProcedureBuilder) {
|
||||
},
|
||||
});
|
||||
|
||||
// Auto-resume idle target agent so it can answer the conversation
|
||||
const targetAgent = await agentManager.get(toAgentId);
|
||||
if (targetAgent && targetAgent.status === 'idle') {
|
||||
try {
|
||||
const resumed = await agentManager.resumeForConversation(
|
||||
toAgentId, conversation.id, input.question, input.fromAgentId,
|
||||
);
|
||||
if (resumed) {
|
||||
log.info({ conversationId: conversation.id, toAgentId }, 'auto-resumed idle agent for conversation');
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn(
|
||||
{ conversationId: conversation.id, toAgentId, err: err instanceof Error ? err.message : String(err) },
|
||||
'failed to auto-resume agent for conversation',
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return conversation;
|
||||
}),
|
||||
|
||||
|
||||
Reference in New Issue
Block a user