feat: Switch cw ask from polling to SSE via onConversationAnswer subscription
- New onConversationAnswer subscription: listens for conversation:answered events matching a specific conversation ID, yields the answer text - cw ask now subscribes via SSE instead of polling getConversation - Removed --poll-interval and --timeout flags from cw ask - Updated prompt to reflect SSE-based cw ask (no polling options)
This commit is contained in:
@@ -159,10 +159,9 @@ Agents can communicate with each other via the `conversations` table, coordinate
|
|||||||
- First yields any existing pending conversations from DB, then listens for `conversation:created` events
|
- First yields any existing pending conversations from DB, then listens for `conversation:created` events
|
||||||
- Output: `{ conversationId, fromAgentId, question, phaseId?, taskId? }`
|
- Output: `{ conversationId, fromAgentId, question, phaseId?, taskId? }`
|
||||||
|
|
||||||
**`cw ask <question> --from <agentId> --agent-id|--task-id|--phase-id <target> [--timeout <ms>] [--poll-interval <ms>]`**
|
**`cw ask <question> --from <agentId> --agent-id|--task-id|--phase-id <target>`**
|
||||||
- Creates conversation, polls `getConversation` until answered, prints answer text to stdout
|
- Creates conversation, subscribes to `onConversationAnswer` SSE, prints answer text to stdout when answered
|
||||||
- Target resolution: `--agent-id` (direct), `--task-id` (find agent running task), `--phase-id` (find agent in phase)
|
- Target resolution: `--agent-id` (direct), `--task-id` (find agent running task), `--phase-id` (find agent in phase)
|
||||||
- `--timeout`: max wait in ms (default 0=forever), `--poll-interval`: polling frequency in ms (default 2000)
|
|
||||||
|
|
||||||
**`cw answer <answer> --conversation-id <id>`**
|
**`cw answer <answer> --conversation-id <id>`**
|
||||||
- Calls `answerConversation`, prints `{ conversationId, status: "answered" }`
|
- Calls `answerConversation`, prints `{ conversationId, status: "answered" }`
|
||||||
|
|||||||
@@ -60,14 +60,12 @@ Output: \`{ "conversationId": "...", "fromAgentId": "...", "question": "...", "p
|
|||||||
Exits with code 1 on connection error.
|
Exits with code 1 on connection error.
|
||||||
|
|
||||||
**\`cw ask "<question>" --from ${agentId} --agent-id <TARGET_AGENT_ID>\`**
|
**\`cw ask "<question>" --from ${agentId} --agent-id <TARGET_AGENT_ID>\`**
|
||||||
Ask another agent a question. Blocks until the answer arrives, then prints the answer text to stdout and exits with code 0.
|
Ask another agent a question. Blocks via SSE until the answer arrives, then prints the answer text to stdout and exits with code 0.
|
||||||
Target (exactly one required):
|
Target (exactly one required):
|
||||||
- \`--agent-id <id>\`: Ask a specific agent directly by agent ID
|
- \`--agent-id <id>\`: Ask a specific agent directly by agent ID
|
||||||
- \`--task-id <id>\`: Ask whichever agent is currently running that task
|
- \`--task-id <id>\`: Ask whichever agent is currently running that task
|
||||||
- \`--phase-id <id>\`: Ask whichever agent is running a task in that phase
|
- \`--phase-id <id>\`: Ask whichever agent is running a task in that phase
|
||||||
Options:
|
Exits with code 1 on connection error.
|
||||||
- \`--poll-interval <ms>\`: Answer polling frequency in ms. Default: 2000
|
|
||||||
- \`--timeout <ms>\`: Max wait for answer in ms. Default: 0 (wait forever)
|
|
||||||
|
|
||||||
**\`cw answer "<answer>" --conversation-id <CONVERSATION_ID>\`**
|
**\`cw answer "<answer>" --conversation-id <CONVERSATION_ID>\`**
|
||||||
Answer a pending question. The conversation ID comes from the \`cw listen\` output.
|
Answer a pending question. The conversation ID comes from the \`cw listen\` output.
|
||||||
|
|||||||
@@ -1403,28 +1403,22 @@ export function createCli(serverHandler?: (port?: number) => Promise<void>): Com
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// cw ask <question> --from <agentId> [--agent-id|--phase-id|--task-id <target>] [--poll-interval <ms>] [--timeout <ms>]
|
// cw ask <question> --from <agentId> --agent-id|--phase-id|--task-id <target>
|
||||||
program
|
program
|
||||||
.command('ask <question>')
|
.command('ask <question>')
|
||||||
.description('Ask a question to another agent and wait for the answer')
|
.description('Ask a question to another agent and wait for the answer via SSE')
|
||||||
.requiredOption('--from <agentId>', 'Your agent ID (the asker)')
|
.requiredOption('--from <agentId>', 'Your agent ID (the asker)')
|
||||||
.option('--agent-id <id>', 'Target agent ID')
|
.option('--agent-id <id>', 'Target agent ID')
|
||||||
.option('--phase-id <id>', 'Target phase ID (find running agent in phase)')
|
.option('--phase-id <id>', 'Target phase ID (find running agent in phase)')
|
||||||
.option('--task-id <id>', 'Target task ID (find running agent for task)')
|
.option('--task-id <id>', 'Target task ID (find running agent for task)')
|
||||||
.option('--poll-interval <ms>', 'Poll interval in milliseconds', '2000')
|
|
||||||
.option('--timeout <ms>', 'Timeout in milliseconds (0=forever)', '0')
|
|
||||||
.action(async (question: string, options: {
|
.action(async (question: string, options: {
|
||||||
from: string;
|
from: string;
|
||||||
agentId?: string;
|
agentId?: string;
|
||||||
phaseId?: string;
|
phaseId?: string;
|
||||||
taskId?: string;
|
taskId?: string;
|
||||||
pollInterval: string;
|
|
||||||
timeout: string;
|
|
||||||
}) => {
|
}) => {
|
||||||
try {
|
try {
|
||||||
const client = createDefaultTrpcClient();
|
const client = createDefaultTrpcClient();
|
||||||
const pollInterval = parseInt(options.pollInterval, 10);
|
|
||||||
const timeout = parseInt(options.timeout, 10);
|
|
||||||
|
|
||||||
// Create the conversation
|
// Create the conversation
|
||||||
const conversation = await client.createConversation.mutate({
|
const conversation = await client.createConversation.mutate({
|
||||||
@@ -1435,23 +1429,22 @@ export function createCli(serverHandler?: (port?: number) => Promise<void>): Com
|
|||||||
question,
|
question,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Poll for answer
|
// Subscribe for answer via SSE
|
||||||
const startTime = Date.now();
|
const subscription = client.onConversationAnswer.subscribe(
|
||||||
// eslint-disable-next-line no-constant-condition
|
{ conversationId: conversation.id },
|
||||||
while (true) {
|
{
|
||||||
const conv = await client.getConversation.query({ id: conversation.id });
|
onData(envelope) {
|
||||||
if (conv && conv.status === 'answered') {
|
console.log(envelope.data.answer);
|
||||||
console.log(conv.answer);
|
subscription.unsubscribe();
|
||||||
process.exit(0);
|
process.exit(0);
|
||||||
}
|
},
|
||||||
|
onError(err) {
|
||||||
if (timeout > 0 && Date.now() - startTime >= timeout) {
|
console.error('Failed to ask:', err.message);
|
||||||
console.error('Timed out waiting for answer');
|
subscription.unsubscribe();
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
}
|
},
|
||||||
|
},
|
||||||
await new Promise(resolve => setTimeout(resolve, pollInterval));
|
);
|
||||||
}
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Failed to ask:', (error as Error).message);
|
console.error('Failed to ask:', (error as Error).message);
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import { tracked, type TrackedEnvelope } from '@trpc/server';
|
|||||||
import { z } from 'zod';
|
import { z } from 'zod';
|
||||||
import type { ProcedureBuilder } from '../trpc.js';
|
import type { ProcedureBuilder } from '../trpc.js';
|
||||||
import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js';
|
import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js';
|
||||||
import type { ConversationCreatedEvent } from '../../events/types.js';
|
import type { ConversationCreatedEvent, ConversationAnsweredEvent } from '../../events/types.js';
|
||||||
|
|
||||||
export function conversationProcedures(publicProcedure: ProcedureBuilder) {
|
export function conversationProcedures(publicProcedure: ProcedureBuilder) {
|
||||||
return {
|
return {
|
||||||
@@ -217,5 +217,65 @@ export function conversationProcedures(publicProcedure: ProcedureBuilder) {
|
|||||||
cleanup();
|
cleanup();
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
|
|
||||||
|
onConversationAnswer: publicProcedure
|
||||||
|
.input(z.object({ conversationId: z.string().min(1) }))
|
||||||
|
.subscription(async function* (opts): AsyncGenerator<TrackedEnvelope<{ answer: string }>> {
|
||||||
|
const { conversationId } = opts.input;
|
||||||
|
const signal = opts.signal ?? new AbortController().signal;
|
||||||
|
const eventBus = opts.ctx.eventBus;
|
||||||
|
const repo = requireConversationRepository(opts.ctx);
|
||||||
|
|
||||||
|
// Check if already answered
|
||||||
|
const existing = await repo.findById(conversationId);
|
||||||
|
if (existing && existing.status === 'answered' && existing.answer) {
|
||||||
|
yield tracked('answer-0', { answer: existing.answer });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Listen for conversation:answered events matching this ID
|
||||||
|
let answered = false;
|
||||||
|
let resolve: (() => void) | null = null;
|
||||||
|
|
||||||
|
const handler = (event: ConversationAnsweredEvent) => {
|
||||||
|
if (event.payload.conversationId !== conversationId) return;
|
||||||
|
answered = true;
|
||||||
|
if (resolve) {
|
||||||
|
const r = resolve;
|
||||||
|
resolve = null;
|
||||||
|
r();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
eventBus.on('conversation:answered', handler);
|
||||||
|
|
||||||
|
const cleanup = () => {
|
||||||
|
eventBus.off('conversation:answered', handler);
|
||||||
|
if (resolve) {
|
||||||
|
const r = resolve;
|
||||||
|
resolve = null;
|
||||||
|
r();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
signal.addEventListener('abort', cleanup, { once: true });
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (!signal.aborted && !answered) {
|
||||||
|
await new Promise<void>((r) => {
|
||||||
|
resolve = r;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (answered) {
|
||||||
|
const conv = await repo.findById(conversationId);
|
||||||
|
if (conv && conv.answer) {
|
||||||
|
yield tracked('answer-0', { answer: conv.answer });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cleanup();
|
||||||
|
}
|
||||||
|
}),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user