feat: Switch cw ask from polling to SSE via onConversationAnswer subscription
- New onConversationAnswer subscription: listens for conversation:answered events matching a specific conversation ID, yields the answer text - cw ask now subscribes via SSE instead of polling getConversation - Removed --poll-interval and --timeout flags from cw ask - Updated prompt to reflect SSE-based cw ask (no polling options)
This commit is contained in:
@@ -159,10 +159,9 @@ Agents can communicate with each other via the `conversations` table, coordinate
|
||||
- First yields any existing pending conversations from DB, then listens for `conversation:created` events
|
||||
- Output: `{ conversationId, fromAgentId, question, phaseId?, taskId? }`
|
||||
|
||||
**`cw ask <question> --from <agentId> --agent-id|--task-id|--phase-id <target> [--timeout <ms>] [--poll-interval <ms>]`**
|
||||
- Creates conversation, polls `getConversation` until answered, prints answer text to stdout
|
||||
**`cw ask <question> --from <agentId> --agent-id|--task-id|--phase-id <target>`**
|
||||
- Creates conversation, subscribes to `onConversationAnswer` SSE, prints answer text to stdout when answered
|
||||
- Target resolution: `--agent-id` (direct), `--task-id` (find agent running task), `--phase-id` (find agent in phase)
|
||||
- `--timeout`: max wait in ms (default 0=forever), `--poll-interval`: polling frequency in ms (default 2000)
|
||||
|
||||
**`cw answer <answer> --conversation-id <id>`**
|
||||
- Calls `answerConversation`, prints `{ conversationId, status: "answered" }`
|
||||
|
||||
@@ -60,14 +60,12 @@ Output: \`{ "conversationId": "...", "fromAgentId": "...", "question": "...", "p
|
||||
Exits with code 1 on connection error.
|
||||
|
||||
**\`cw ask "<question>" --from ${agentId} --agent-id <TARGET_AGENT_ID>\`**
|
||||
Ask another agent a question. Blocks until the answer arrives, then prints the answer text to stdout and exits with code 0.
|
||||
Ask another agent a question. Blocks via SSE until the answer arrives, then prints the answer text to stdout and exits with code 0.
|
||||
Target (exactly one required):
|
||||
- \`--agent-id <id>\`: Ask a specific agent directly by agent ID
|
||||
- \`--task-id <id>\`: Ask whichever agent is currently running that task
|
||||
- \`--phase-id <id>\`: Ask whichever agent is running a task in that phase
|
||||
Options:
|
||||
- \`--poll-interval <ms>\`: Answer polling frequency in ms. Default: 2000
|
||||
- \`--timeout <ms>\`: Max wait for answer in ms. Default: 0 (wait forever)
|
||||
Exits with code 1 on connection error.
|
||||
|
||||
**\`cw answer "<answer>" --conversation-id <CONVERSATION_ID>\`**
|
||||
Answer a pending question. The conversation ID comes from the \`cw listen\` output.
|
||||
|
||||
@@ -1403,28 +1403,22 @@ export function createCli(serverHandler?: (port?: number) => Promise<void>): Com
|
||||
}
|
||||
});
|
||||
|
||||
// cw ask <question> --from <agentId> [--agent-id|--phase-id|--task-id <target>] [--poll-interval <ms>] [--timeout <ms>]
|
||||
// cw ask <question> --from <agentId> --agent-id|--phase-id|--task-id <target>
|
||||
program
|
||||
.command('ask <question>')
|
||||
.description('Ask a question to another agent and wait for the answer')
|
||||
.description('Ask a question to another agent and wait for the answer via SSE')
|
||||
.requiredOption('--from <agentId>', 'Your agent ID (the asker)')
|
||||
.option('--agent-id <id>', 'Target agent ID')
|
||||
.option('--phase-id <id>', 'Target phase ID (find running agent in phase)')
|
||||
.option('--task-id <id>', 'Target task ID (find running agent for task)')
|
||||
.option('--poll-interval <ms>', 'Poll interval in milliseconds', '2000')
|
||||
.option('--timeout <ms>', 'Timeout in milliseconds (0=forever)', '0')
|
||||
.action(async (question: string, options: {
|
||||
from: string;
|
||||
agentId?: string;
|
||||
phaseId?: string;
|
||||
taskId?: string;
|
||||
pollInterval: string;
|
||||
timeout: string;
|
||||
}) => {
|
||||
try {
|
||||
const client = createDefaultTrpcClient();
|
||||
const pollInterval = parseInt(options.pollInterval, 10);
|
||||
const timeout = parseInt(options.timeout, 10);
|
||||
|
||||
// Create the conversation
|
||||
const conversation = await client.createConversation.mutate({
|
||||
@@ -1435,23 +1429,22 @@ export function createCli(serverHandler?: (port?: number) => Promise<void>): Com
|
||||
question,
|
||||
});
|
||||
|
||||
// Poll for answer
|
||||
const startTime = Date.now();
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
const conv = await client.getConversation.query({ id: conversation.id });
|
||||
if (conv && conv.status === 'answered') {
|
||||
console.log(conv.answer);
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (timeout > 0 && Date.now() - startTime >= timeout) {
|
||||
console.error('Timed out waiting for answer');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, pollInterval));
|
||||
}
|
||||
// Subscribe for answer via SSE
|
||||
const subscription = client.onConversationAnswer.subscribe(
|
||||
{ conversationId: conversation.id },
|
||||
{
|
||||
onData(envelope) {
|
||||
console.log(envelope.data.answer);
|
||||
subscription.unsubscribe();
|
||||
process.exit(0);
|
||||
},
|
||||
onError(err) {
|
||||
console.error('Failed to ask:', err.message);
|
||||
subscription.unsubscribe();
|
||||
process.exit(1);
|
||||
},
|
||||
},
|
||||
);
|
||||
} catch (error) {
|
||||
console.error('Failed to ask:', (error as Error).message);
|
||||
process.exit(1);
|
||||
|
||||
@@ -7,7 +7,7 @@ import { tracked, type TrackedEnvelope } from '@trpc/server';
|
||||
import { z } from 'zod';
|
||||
import type { ProcedureBuilder } from '../trpc.js';
|
||||
import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js';
|
||||
import type { ConversationCreatedEvent } from '../../events/types.js';
|
||||
import type { ConversationCreatedEvent, ConversationAnsweredEvent } from '../../events/types.js';
|
||||
|
||||
export function conversationProcedures(publicProcedure: ProcedureBuilder) {
|
||||
return {
|
||||
@@ -217,5 +217,65 @@ export function conversationProcedures(publicProcedure: ProcedureBuilder) {
|
||||
cleanup();
|
||||
}
|
||||
}),
|
||||
|
||||
onConversationAnswer: publicProcedure
|
||||
.input(z.object({ conversationId: z.string().min(1) }))
|
||||
.subscription(async function* (opts): AsyncGenerator<TrackedEnvelope<{ answer: string }>> {
|
||||
const { conversationId } = opts.input;
|
||||
const signal = opts.signal ?? new AbortController().signal;
|
||||
const eventBus = opts.ctx.eventBus;
|
||||
const repo = requireConversationRepository(opts.ctx);
|
||||
|
||||
// Check if already answered
|
||||
const existing = await repo.findById(conversationId);
|
||||
if (existing && existing.status === 'answered' && existing.answer) {
|
||||
yield tracked('answer-0', { answer: existing.answer });
|
||||
return;
|
||||
}
|
||||
|
||||
// Listen for conversation:answered events matching this ID
|
||||
let answered = false;
|
||||
let resolve: (() => void) | null = null;
|
||||
|
||||
const handler = (event: ConversationAnsweredEvent) => {
|
||||
if (event.payload.conversationId !== conversationId) return;
|
||||
answered = true;
|
||||
if (resolve) {
|
||||
const r = resolve;
|
||||
resolve = null;
|
||||
r();
|
||||
}
|
||||
};
|
||||
|
||||
eventBus.on('conversation:answered', handler);
|
||||
|
||||
const cleanup = () => {
|
||||
eventBus.off('conversation:answered', handler);
|
||||
if (resolve) {
|
||||
const r = resolve;
|
||||
resolve = null;
|
||||
r();
|
||||
}
|
||||
};
|
||||
|
||||
signal.addEventListener('abort', cleanup, { once: true });
|
||||
|
||||
try {
|
||||
while (!signal.aborted && !answered) {
|
||||
await new Promise<void>((r) => {
|
||||
resolve = r;
|
||||
});
|
||||
}
|
||||
|
||||
if (answered) {
|
||||
const conv = await repo.findById(conversationId);
|
||||
if (conv && conv.answer) {
|
||||
yield tracked('answer-0', { answer: conv.answer });
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
cleanup();
|
||||
}
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user