feat: Switch cw ask from polling to SSE via onConversationAnswer subscription

- New onConversationAnswer subscription: listens for conversation:answered
  events matching a specific conversation ID, yields the answer text
- cw ask now subscribes via SSE instead of polling getConversation
- Removed --poll-interval and --timeout flags from cw ask
- Updated prompt to reflect SSE-based cw ask (no polling options)
This commit is contained in:
Lukas May
2026-02-10 15:56:54 +01:00
parent bfc1b422f9
commit bfefbc85af
4 changed files with 83 additions and 33 deletions

View File

@@ -159,10 +159,9 @@ Agents can communicate with each other via the `conversations` table, coordinate
- First yields any existing pending conversations from DB, then listens for `conversation:created` events
- Output: `{ conversationId, fromAgentId, question, phaseId?, taskId? }`
**`cw ask <question> --from <agentId> --agent-id|--task-id|--phase-id <target> [--timeout <ms>] [--poll-interval <ms>]`**
- Creates conversation, polls `getConversation` until answered, prints answer text to stdout
**`cw ask <question> --from <agentId> --agent-id|--task-id|--phase-id <target>`**
- Creates conversation, subscribes to `onConversationAnswer` SSE, prints answer text to stdout when answered
- Target resolution: `--agent-id` (direct), `--task-id` (find agent running task), `--phase-id` (find agent in phase)
- `--timeout`: max wait in ms (default 0=forever), `--poll-interval`: polling frequency in ms (default 2000)
**`cw answer <answer> --conversation-id <id>`**
- Calls `answerConversation`, prints `{ conversationId, status: "answered" }`

View File

@@ -60,14 +60,12 @@ Output: \`{ "conversationId": "...", "fromAgentId": "...", "question": "...", "p
Exits with code 1 on connection error.
**\`cw ask "<question>" --from ${agentId} --agent-id <TARGET_AGENT_ID>\`**
Ask another agent a question. Blocks until the answer arrives, then prints the answer text to stdout and exits with code 0.
Ask another agent a question. Blocks via SSE until the answer arrives, then prints the answer text to stdout and exits with code 0.
Target (exactly one required):
- \`--agent-id <id>\`: Ask a specific agent directly by agent ID
- \`--task-id <id>\`: Ask whichever agent is currently running that task
- \`--phase-id <id>\`: Ask whichever agent is running a task in that phase
Options:
- \`--poll-interval <ms>\`: Answer polling frequency in ms. Default: 2000
- \`--timeout <ms>\`: Max wait for answer in ms. Default: 0 (wait forever)
Exits with code 1 on connection error.
**\`cw answer "<answer>" --conversation-id <CONVERSATION_ID>\`**
Answer a pending question. The conversation ID comes from the \`cw listen\` output.

View File

@@ -1403,28 +1403,22 @@ export function createCli(serverHandler?: (port?: number) => Promise<void>): Com
}
});
// cw ask <question> --from <agentId> [--agent-id|--phase-id|--task-id <target>] [--poll-interval <ms>] [--timeout <ms>]
// cw ask <question> --from <agentId> --agent-id|--phase-id|--task-id <target>
program
.command('ask <question>')
.description('Ask a question to another agent and wait for the answer')
.description('Ask a question to another agent and wait for the answer via SSE')
.requiredOption('--from <agentId>', 'Your agent ID (the asker)')
.option('--agent-id <id>', 'Target agent ID')
.option('--phase-id <id>', 'Target phase ID (find running agent in phase)')
.option('--task-id <id>', 'Target task ID (find running agent for task)')
.option('--poll-interval <ms>', 'Poll interval in milliseconds', '2000')
.option('--timeout <ms>', 'Timeout in milliseconds (0=forever)', '0')
.action(async (question: string, options: {
from: string;
agentId?: string;
phaseId?: string;
taskId?: string;
pollInterval: string;
timeout: string;
}) => {
try {
const client = createDefaultTrpcClient();
const pollInterval = parseInt(options.pollInterval, 10);
const timeout = parseInt(options.timeout, 10);
// Create the conversation
const conversation = await client.createConversation.mutate({
@@ -1435,23 +1429,22 @@ export function createCli(serverHandler?: (port?: number) => Promise<void>): Com
question,
});
// Poll for answer
const startTime = Date.now();
// eslint-disable-next-line no-constant-condition
while (true) {
const conv = await client.getConversation.query({ id: conversation.id });
if (conv && conv.status === 'answered') {
console.log(conv.answer);
process.exit(0);
}
if (timeout > 0 && Date.now() - startTime >= timeout) {
console.error('Timed out waiting for answer');
process.exit(1);
}
await new Promise(resolve => setTimeout(resolve, pollInterval));
}
// Subscribe for answer via SSE
const subscription = client.onConversationAnswer.subscribe(
{ conversationId: conversation.id },
{
onData(envelope) {
console.log(envelope.data.answer);
subscription.unsubscribe();
process.exit(0);
},
onError(err) {
console.error('Failed to ask:', err.message);
subscription.unsubscribe();
process.exit(1);
},
},
);
} catch (error) {
console.error('Failed to ask:', (error as Error).message);
process.exit(1);

View File

@@ -7,7 +7,7 @@ import { tracked, type TrackedEnvelope } from '@trpc/server';
import { z } from 'zod';
import type { ProcedureBuilder } from '../trpc.js';
import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js';
import type { ConversationCreatedEvent } from '../../events/types.js';
import type { ConversationCreatedEvent, ConversationAnsweredEvent } from '../../events/types.js';
export function conversationProcedures(publicProcedure: ProcedureBuilder) {
return {
@@ -217,5 +217,65 @@ export function conversationProcedures(publicProcedure: ProcedureBuilder) {
cleanup();
}
}),
onConversationAnswer: publicProcedure
.input(z.object({ conversationId: z.string().min(1) }))
.subscription(async function* (opts): AsyncGenerator<TrackedEnvelope<{ answer: string }>> {
const { conversationId } = opts.input;
const signal = opts.signal ?? new AbortController().signal;
const eventBus = opts.ctx.eventBus;
const repo = requireConversationRepository(opts.ctx);
// Check if already answered
const existing = await repo.findById(conversationId);
if (existing && existing.status === 'answered' && existing.answer) {
yield tracked('answer-0', { answer: existing.answer });
return;
}
// Listen for conversation:answered events matching this ID
let answered = false;
let resolve: (() => void) | null = null;
const handler = (event: ConversationAnsweredEvent) => {
if (event.payload.conversationId !== conversationId) return;
answered = true;
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
eventBus.on('conversation:answered', handler);
const cleanup = () => {
eventBus.off('conversation:answered', handler);
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
signal.addEventListener('abort', cleanup, { once: true });
try {
while (!signal.aborted && !answered) {
await new Promise<void>((r) => {
resolve = r;
});
}
if (answered) {
const conv = await repo.findById(conversationId);
if (conv && conv.answer) {
yield tracked('answer-0', { answer: conv.answer });
}
}
} finally {
cleanup();
}
}),
};
}