From bfefbc85af2bfa966a64f479f2691b168f835e94 Mon Sep 17 00:00:00 2001 From: Lukas May Date: Tue, 10 Feb 2026 15:56:54 +0100 Subject: [PATCH] feat: Switch cw ask from polling to SSE via onConversationAnswer subscription - New onConversationAnswer subscription: listens for conversation:answered events matching a specific conversation ID, yields the answer text - cw ask now subscribes via SSE instead of polling getConversation - Removed --poll-interval and --timeout flags from cw ask - Updated prompt to reflect SSE-based cw ask (no polling options) --- docs/agent.md | 5 ++- src/agent/prompts/shared.ts | 6 ++-- src/cli/index.ts | 43 ++++++++++------------ src/trpc/routers/conversation.ts | 62 +++++++++++++++++++++++++++++++- 4 files changed, 83 insertions(+), 33 deletions(-) diff --git a/docs/agent.md b/docs/agent.md index 0d17c06..5af4d57 100644 --- a/docs/agent.md +++ b/docs/agent.md @@ -159,10 +159,9 @@ Agents can communicate with each other via the `conversations` table, coordinate - First yields any existing pending conversations from DB, then listens for `conversation:created` events - Output: `{ conversationId, fromAgentId, question, phaseId?, taskId? }` -**`cw ask --from --agent-id|--task-id|--phase-id [--timeout ] [--poll-interval ]`** -- Creates conversation, polls `getConversation` until answered, prints answer text to stdout +**`cw ask --from --agent-id|--task-id|--phase-id `** +- Creates conversation, subscribes to `onConversationAnswer` SSE, prints answer text to stdout when answered - Target resolution: `--agent-id` (direct), `--task-id` (find agent running task), `--phase-id` (find agent in phase) -- `--timeout`: max wait in ms (default 0=forever), `--poll-interval`: polling frequency in ms (default 2000) **`cw answer --conversation-id `** - Calls `answerConversation`, prints `{ conversationId, status: "answered" }` diff --git a/src/agent/prompts/shared.ts b/src/agent/prompts/shared.ts index 6292d29..4d22ba6 100644 --- a/src/agent/prompts/shared.ts +++ b/src/agent/prompts/shared.ts @@ -60,14 +60,12 @@ Output: \`{ "conversationId": "...", "fromAgentId": "...", "question": "...", "p Exits with code 1 on connection error. **\`cw ask "" --from ${agentId} --agent-id \`** -Ask another agent a question. Blocks until the answer arrives, then prints the answer text to stdout and exits with code 0. +Ask another agent a question. Blocks via SSE until the answer arrives, then prints the answer text to stdout and exits with code 0. Target (exactly one required): - \`--agent-id \`: Ask a specific agent directly by agent ID - \`--task-id \`: Ask whichever agent is currently running that task - \`--phase-id \`: Ask whichever agent is running a task in that phase -Options: - - \`--poll-interval \`: Answer polling frequency in ms. Default: 2000 - - \`--timeout \`: Max wait for answer in ms. Default: 0 (wait forever) +Exits with code 1 on connection error. **\`cw answer "" --conversation-id \`** Answer a pending question. The conversation ID comes from the \`cw listen\` output. diff --git a/src/cli/index.ts b/src/cli/index.ts index f1582c2..6c51494 100644 --- a/src/cli/index.ts +++ b/src/cli/index.ts @@ -1403,28 +1403,22 @@ export function createCli(serverHandler?: (port?: number) => Promise): Com } }); - // cw ask --from [--agent-id|--phase-id|--task-id ] [--poll-interval ] [--timeout ] + // cw ask --from --agent-id|--phase-id|--task-id program .command('ask ') - .description('Ask a question to another agent and wait for the answer') + .description('Ask a question to another agent and wait for the answer via SSE') .requiredOption('--from ', 'Your agent ID (the asker)') .option('--agent-id ', 'Target agent ID') .option('--phase-id ', 'Target phase ID (find running agent in phase)') .option('--task-id ', 'Target task ID (find running agent for task)') - .option('--poll-interval ', 'Poll interval in milliseconds', '2000') - .option('--timeout ', 'Timeout in milliseconds (0=forever)', '0') .action(async (question: string, options: { from: string; agentId?: string; phaseId?: string; taskId?: string; - pollInterval: string; - timeout: string; }) => { try { const client = createDefaultTrpcClient(); - const pollInterval = parseInt(options.pollInterval, 10); - const timeout = parseInt(options.timeout, 10); // Create the conversation const conversation = await client.createConversation.mutate({ @@ -1435,23 +1429,22 @@ export function createCli(serverHandler?: (port?: number) => Promise): Com question, }); - // Poll for answer - const startTime = Date.now(); - // eslint-disable-next-line no-constant-condition - while (true) { - const conv = await client.getConversation.query({ id: conversation.id }); - if (conv && conv.status === 'answered') { - console.log(conv.answer); - process.exit(0); - } - - if (timeout > 0 && Date.now() - startTime >= timeout) { - console.error('Timed out waiting for answer'); - process.exit(1); - } - - await new Promise(resolve => setTimeout(resolve, pollInterval)); - } + // Subscribe for answer via SSE + const subscription = client.onConversationAnswer.subscribe( + { conversationId: conversation.id }, + { + onData(envelope) { + console.log(envelope.data.answer); + subscription.unsubscribe(); + process.exit(0); + }, + onError(err) { + console.error('Failed to ask:', err.message); + subscription.unsubscribe(); + process.exit(1); + }, + }, + ); } catch (error) { console.error('Failed to ask:', (error as Error).message); process.exit(1); diff --git a/src/trpc/routers/conversation.ts b/src/trpc/routers/conversation.ts index aa9f21f..793006e 100644 --- a/src/trpc/routers/conversation.ts +++ b/src/trpc/routers/conversation.ts @@ -7,7 +7,7 @@ import { tracked, type TrackedEnvelope } from '@trpc/server'; import { z } from 'zod'; import type { ProcedureBuilder } from '../trpc.js'; import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js'; -import type { ConversationCreatedEvent } from '../../events/types.js'; +import type { ConversationCreatedEvent, ConversationAnsweredEvent } from '../../events/types.js'; export function conversationProcedures(publicProcedure: ProcedureBuilder) { return { @@ -217,5 +217,65 @@ export function conversationProcedures(publicProcedure: ProcedureBuilder) { cleanup(); } }), + + onConversationAnswer: publicProcedure + .input(z.object({ conversationId: z.string().min(1) })) + .subscription(async function* (opts): AsyncGenerator> { + const { conversationId } = opts.input; + const signal = opts.signal ?? new AbortController().signal; + const eventBus = opts.ctx.eventBus; + const repo = requireConversationRepository(opts.ctx); + + // Check if already answered + const existing = await repo.findById(conversationId); + if (existing && existing.status === 'answered' && existing.answer) { + yield tracked('answer-0', { answer: existing.answer }); + return; + } + + // Listen for conversation:answered events matching this ID + let answered = false; + let resolve: (() => void) | null = null; + + const handler = (event: ConversationAnsweredEvent) => { + if (event.payload.conversationId !== conversationId) return; + answered = true; + if (resolve) { + const r = resolve; + resolve = null; + r(); + } + }; + + eventBus.on('conversation:answered', handler); + + const cleanup = () => { + eventBus.off('conversation:answered', handler); + if (resolve) { + const r = resolve; + resolve = null; + r(); + } + }; + + signal.addEventListener('abort', cleanup, { once: true }); + + try { + while (!signal.aborted && !answered) { + await new Promise((r) => { + resolve = r; + }); + } + + if (answered) { + const conv = await repo.findById(conversationId); + if (conv && conv.answer) { + yield tracked('answer-0', { answer: conv.answer }); + } + } + } finally { + cleanup(); + } + }), }; }