From bfc1b422f9451ec7588ccf0ffc9af4b140b6a4a7 Mon Sep 17 00:00:00 2001 From: Lukas May Date: Tue, 10 Feb 2026 15:53:01 +0100 Subject: [PATCH] feat: Inject agent ID into prompts, SSE-based cw listen, all flags documented MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - INTER_AGENT_COMMUNICATION constant → buildInterAgentCommunication(agentId) function - Manager injects actual agent ID into prompt after DB record creation - Agent ID hardcoded in cw listen/ask commands — no manifest.json indirection - cw listen now uses onPendingConversation SSE subscription instead of polling - CLI trpc-client upgraded with splitLink for subscription support - All CLI flags (--agent-id, --from, --timeout, --poll-interval) documented in prompt - conversation:created/answered added to ALL_EVENT_TYPES --- docs/agent.md | 15 +++--- src/agent/manager.test.ts | 2 +- src/agent/manager.ts | 5 +- src/agent/prompts/detail.ts | 14 ++---- src/agent/prompts/discuss.ts | 5 +- src/agent/prompts/execute.ts | 5 +- src/agent/prompts/index.ts | 2 +- src/agent/prompts/plan.ts | 5 +- src/agent/prompts/refine.ts | 5 +- src/agent/prompts/shared.ts | 61 +++++++++-------------- src/cli/index.ts | 54 ++++++++++----------- src/cli/trpc-client.ts | 11 +++-- src/trpc/routers/conversation.ts | 83 ++++++++++++++++++++++++++++++++ src/trpc/subscriptions.ts | 2 + 14 files changed, 166 insertions(+), 103 deletions(-) diff --git a/docs/agent.md b/docs/agent.md index 3d6117d..0d17c06 100644 --- a/docs/agent.md +++ b/docs/agent.md @@ -142,28 +142,27 @@ Agent output is persisted to `agent_log_chunks` table and drives all live stream Agents can communicate with each other via the `conversations` table, coordinated through CLI commands. ### Prompt Integration -`INTER_AGENT_COMMUNICATION` constant in `prompts/shared.ts` is appended to all 5 agent mode prompts. It instructs agents to: +`buildInterAgentCommunication(agentId)` function in `prompts/shared.ts` generates per-agent communication instructions. Called in `manager.ts` after agent record creation — the actual agent ID is injected directly into the prompt (no manifest.json indirection). Appended to the prompt regardless of mode. Instructions include: 1. Set up a background listener via temp-file redirect: `cw listen > $CW_LISTEN_FILE &` 2. Periodically check the temp file for incoming questions between work steps 3. Answer via `cw answer`, clear the file, restart the listener -4. Ask questions to peers via `cw ask --from --agent-id|--task-id|--phase-id` +4. Ask questions to peers via `cw ask --from --agent-id|--task-id|--phase-id` 5. Kill the listener and clean up the temp file before writing `signal.json` ### Agent Identity -`manifest.json` now includes `agentId` and `agentName` fields. The manager passes these from the DB record after agent creation. +`manifest.json` includes `agentId` and `agentName` fields. The manager passes these from the DB record after agent creation. The agent ID is also injected directly into the prompt's communication instructions. ### CLI Commands -**`cw listen --agent-id [--timeout ] [--poll-interval ]`** -- Polls `getPendingConversations`, prints first pending as JSON, exits with code 0 -- `--timeout`: max wait in ms (default 0=forever) -- `--poll-interval`: polling frequency in ms (default 2000) +**`cw listen --agent-id `** +- Subscribes to `onPendingConversation` SSE subscription, prints first pending as JSON, exits with code 0 +- First yields any existing pending conversations from DB, then listens for `conversation:created` events - Output: `{ conversationId, fromAgentId, question, phaseId?, taskId? }` **`cw ask --from --agent-id|--task-id|--phase-id [--timeout ] [--poll-interval ]`** - Creates conversation, polls `getConversation` until answered, prints answer text to stdout - Target resolution: `--agent-id` (direct), `--task-id` (find agent running task), `--phase-id` (find agent in phase) -- `--timeout` / `--poll-interval`: same defaults as listen +- `--timeout`: max wait in ms (default 0=forever), `--poll-interval`: polling frequency in ms (default 2000) **`cw answer --conversation-id `** - Calls `answerConversation`, prints `{ conversationId, status: "answered" }` diff --git a/src/agent/manager.test.ts b/src/agent/manager.test.ts index ca8353b..91f6360 100644 --- a/src/agent/manager.test.ts +++ b/src/agent/manager.test.ts @@ -278,7 +278,7 @@ describe('MultiProviderAgentManager', () => { // Verify spawn was called with custom cwd expect(mockSpawn).toHaveBeenCalledWith( 'claude', - expect.arrayContaining(['-p', 'Test task', '--output-format', 'stream-json']), + expect.arrayContaining(['-p', expect.stringContaining('Test task'), '--output-format', 'stream-json']), expect.objectContaining({ cwd: '/custom/path' }) ); }); diff --git a/src/agent/manager.ts b/src/agent/manager.ts index d6d5546..802914e 100644 --- a/src/agent/manager.ts +++ b/src/agent/manager.ts @@ -36,7 +36,7 @@ import type { ProcessCrashedEvent, } from '../events/index.js'; import { writeInputFiles } from './file-io.js'; -import { buildWorkspaceLayout } from './prompts/index.js'; +import { buildWorkspaceLayout, buildInterAgentCommunication } from './prompts/index.js'; import { getProvider } from './providers/registry.js'; import { createModuleLogger } from '../logger/index.js'; import { join, dirname } from 'node:path'; @@ -275,6 +275,9 @@ export class MultiProviderAgentManager implements AgentManager { }); const agentId = agent.id; + // 3a. Append inter-agent communication instructions with actual agent ID + prompt = prompt + buildInterAgentCommunication(agentId); + // 3b. Write input files (after agent creation so we can include agentId/agentName) if (options.inputContext) { writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext, agentId, agentName: alias }); diff --git a/src/agent/prompts/detail.ts b/src/agent/prompts/detail.ts index 4a9c294..62713d1 100644 --- a/src/agent/prompts/detail.ts +++ b/src/agent/prompts/detail.ts @@ -2,7 +2,7 @@ * Detail mode prompt — break a phase into executable tasks. */ -import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; +import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js'; export function buildDetailPrompt(): string { return `You are an Architect agent in the Codewalk multi-agent system operating in DETAIL mode. @@ -26,21 +26,15 @@ ${ID_GENERATION} ## Task Design Rules - Each task: specific, actionable, completable by one agent +- Ideally tasks shall be executable in parallel - if they depend on each other, use dependencies to indicate order - Include verification steps where appropriate -- Use \`checkpoint:*\` types for tasks requiring human review - Dependencies should be minimal and explicit ## Existing Context - FIRST: Read ALL files in \`context/tasks/\` before generating any output - Your target phase is \`phase.md\` — only create tasks for THIS phase - If a task in context/tasks/ already covers the same work (even under a different name), do NOT create a duplicate -- Pages contain requirements — reference them for detailed task descriptions +- Pages contain requirements — use them to create detailed task descriptions - DO NOT create tasks that overlap with existing tasks in other phases - -## Rules -- Break work into 3-8 tasks per phase -- Order tasks logically (foundational work first) -- Make each task self-contained with enough context -- Include test/verify tasks where appropriate -${INTER_AGENT_COMMUNICATION}`; +`; } diff --git a/src/agent/prompts/discuss.ts b/src/agent/prompts/discuss.ts index b54e93f..147ddf6 100644 --- a/src/agent/prompts/discuss.ts +++ b/src/agent/prompts/discuss.ts @@ -2,7 +2,7 @@ * Discuss mode prompt — clarifying questions and decision capture. */ -import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; +import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js'; export function buildDiscussPrompt(): string { return `You are an Architect agent in the Codewalk multi-agent system operating in DISCUSS mode. @@ -30,6 +30,5 @@ ${ID_GENERATION} - Ask 2-4 questions at a time, not more - Provide options when choices are clear - Capture every decision with rationale -- Don't proceed until ambiguities are resolved -${INTER_AGENT_COMMUNICATION}`; +- Don't proceed until ambiguities are resolved`; } diff --git a/src/agent/prompts/execute.ts b/src/agent/prompts/execute.ts index c6d0629..c3a7873 100644 --- a/src/agent/prompts/execute.ts +++ b/src/agent/prompts/execute.ts @@ -2,7 +2,7 @@ * Execute mode prompt — standard worker agent. */ -import { INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; +import { INPUT_FILES, SIGNAL_FORMAT } from './shared.js'; export function buildExecutePrompt(): string { return `You are a Worker agent in the Codewalk multi-agent system. @@ -16,6 +16,5 @@ ${SIGNAL_FORMAT} - Complete the task as specified in .cw/input/task.md - Ask questions if requirements are unclear - Report errors honestly — don't guess -- Focus on writing clean, tested code -${INTER_AGENT_COMMUNICATION}`; +- Focus on writing clean, tested code`; } diff --git a/src/agent/prompts/index.ts b/src/agent/prompts/index.ts index b9cb923..fd18eef 100644 --- a/src/agent/prompts/index.ts +++ b/src/agent/prompts/index.ts @@ -5,7 +5,7 @@ * input files, ID generation) are in shared.ts. */ -export { SIGNAL_FORMAT, INPUT_FILES, ID_GENERATION, INTER_AGENT_COMMUNICATION } from './shared.js'; +export { SIGNAL_FORMAT, INPUT_FILES, ID_GENERATION, buildInterAgentCommunication } from './shared.js'; export { buildExecutePrompt } from './execute.js'; export { buildDiscussPrompt } from './discuss.js'; export { buildPlanPrompt } from './plan.js'; diff --git a/src/agent/prompts/plan.ts b/src/agent/prompts/plan.ts index 577018c..3e3907e 100644 --- a/src/agent/prompts/plan.ts +++ b/src/agent/prompts/plan.ts @@ -2,7 +2,7 @@ * Plan mode prompt — plan initiative into phases. */ -import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; +import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js'; export function buildPlanPrompt(): string { return `You are an Architect agent in the Codewalk multi-agent system operating in PLAN mode. @@ -36,6 +36,5 @@ ${ID_GENERATION} - Start with foundation/infrastructure phases - Group related work together - Make dependencies explicit using phase IDs -- Each task should be completable in one session -${INTER_AGENT_COMMUNICATION}`; +- Each task should be completable in one session`; } diff --git a/src/agent/prompts/refine.ts b/src/agent/prompts/refine.ts index 5fb0696..d69e684 100644 --- a/src/agent/prompts/refine.ts +++ b/src/agent/prompts/refine.ts @@ -2,7 +2,7 @@ * Refine mode prompt — review and propose edits to initiative pages. */ -import { INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; +import { INPUT_FILES, SIGNAL_FORMAT } from './shared.js'; export function buildRefinePrompt(): string { return `You are an Architect agent in the Codewalk multi-agent system operating in REFINE mode. @@ -24,6 +24,5 @@ Write one file per modified page to \`.cw/output/pages/{pageId}.md\`: - Each output page's body is the FULL new content (not a diff) - Preserve [[page:\$id|title]] cross-references in your output - Focus on clarity, completeness, and consistency -- Do not invent new page IDs — only reference existing ones from .cw/input/pages/ -${INTER_AGENT_COMMUNICATION}`; +- Do not invent new page IDs — only reference existing ones from .cw/input/pages/`; } diff --git a/src/agent/prompts/shared.ts b/src/agent/prompts/shared.ts index 3da352e..6292d29 100644 --- a/src/agent/prompts/shared.ts +++ b/src/agent/prompts/shared.ts @@ -44,53 +44,41 @@ cw id \`\`\` Use the output as the filename (e.g., \`{id}.md\`).`; -export const INTER_AGENT_COMMUNICATION = ` +export function buildInterAgentCommunication(agentId: string): string { + return ` ## Inter-Agent Communication You are working in a multi-agent parallel environment. Other agents may be working on related tasks simultaneously. You can exchange questions and answers with peer agents via CLI commands. -### Your Identity -Read \`.cw/input/manifest.json\` — it contains \`agentId\` and \`agentName\` fields identifying you. You'll need your \`agentId\` for all communication commands. +Your agent ID is: **${agentId}** ### CLI Commands -**\`cw listen\`** — Poll for incoming questions. Prints the first pending question as JSON and exits. -\`\`\` -cw listen --agent-id [--timeout ] [--poll-interval ] -\`\`\` -- \`--agent-id\` (required): Your agent ID -- \`--timeout\`: Max wait in ms. Default: 0 (wait forever). Use a value like 120000 (2 min) to avoid hanging. -- \`--poll-interval\`: Polling frequency in ms. Default: 2000 -- Output (JSON): \`{ "conversationId": "...", "fromAgentId": "...", "question": "...", "phaseId": "...", "taskId": "..." }\` -- Exit code 0 if a question was found, 1 on timeout or error. +**\`cw listen --agent-id ${agentId}\`** +Waits for an incoming question via server-sent events (SSE). Blocks until a question arrives, then prints one JSON line to stdout and exits with code 0. +Output: \`{ "conversationId": "...", "fromAgentId": "...", "question": "...", "phaseId": "...", "taskId": "..." }\` +Exits with code 1 on connection error. -**\`cw ask\`** — Ask another agent a question. Blocks until the answer arrives, then prints the answer text to stdout. -\`\`\` -cw ask "" --from --agent-id [--timeout ] [--poll-interval ] -\`\`\` -- \`--from\` (required): Your agent ID (the asker) -- Target (exactly one required): - - \`--agent-id \`: Ask a specific agent directly - - \`--task-id \`: Ask whichever agent is running that task +**\`cw ask "" --from ${agentId} --agent-id \`** +Ask another agent a question. Blocks until the answer arrives, then prints the answer text to stdout and exits with code 0. +Target (exactly one required): + - \`--agent-id \`: Ask a specific agent directly by agent ID + - \`--task-id \`: Ask whichever agent is currently running that task - \`--phase-id \`: Ask whichever agent is running a task in that phase -- \`--timeout\`: Max wait in ms. Default: 0 (wait forever). Use 120000+ for safety. -- \`--poll-interval\`: Polling frequency in ms. Default: 2000 -- Output: The answer text (plain text, not JSON). -- Exit code 0 if answered, 1 on timeout or error. +Options: + - \`--poll-interval \`: Answer polling frequency in ms. Default: 2000 + - \`--timeout \`: Max wait for answer in ms. Default: 0 (wait forever) -**\`cw answer\`** — Answer a pending question. -\`\`\` -cw answer "" --conversation-id -\`\`\` -- \`--conversation-id\` (required): The conversation ID from the listen output -- Output (JSON): \`{ "conversationId": "...", "status": "answered" }\` +**\`cw answer "" --conversation-id \`** +Answer a pending question. The conversation ID comes from the \`cw listen\` output. +Output: \`{ "conversationId": "...", "status": "answered" }\` ### Background Listener Pattern -At the START of your session, set up a background listener that writes to a temp file: +At the START of your session, run a background listener that writes to a temp file: \`\`\`bash CW_LISTEN_FILE=$(mktemp /tmp/cw-listen-XXXXXX.txt) -cw listen --agent-id --timeout 120000 > "$CW_LISTEN_FILE" 2>&1 & +cw listen --agent-id ${agentId} > "$CW_LISTEN_FILE" 2>&1 & LISTEN_PID=$! \`\`\` @@ -102,13 +90,11 @@ if [ -n "$LISTEN_CONTENT" ]; then fi \`\`\` -When a question arrives, parse the JSON, answer, then restart the listener: +When a question arrives, parse the JSON, answer it, then restart the listener: \`\`\`bash -# Parse conversationId and question from the JSON cw answer "" --conversation-id -# Clear and restart > "$CW_LISTEN_FILE" -cw listen --agent-id --timeout 120000 > "$CW_LISTEN_FILE" 2>&1 & +cw listen --agent-id ${agentId} > "$CW_LISTEN_FILE" 2>&1 & LISTEN_PID=$! \`\`\` @@ -119,8 +105,9 @@ LISTEN_PID=$! - Do NOT ask questions that you can answer by reading the codebase yourself ### Cleanup -Before writing \`.cw/output/signal.json\`, kill your listener: +Before writing \`.cw/output/signal.json\`, kill your listener and clean up: \`\`\`bash kill $LISTEN_PID 2>/dev/null rm -f "$CW_LISTEN_FILE" \`\`\``; +} diff --git a/src/cli/index.ts b/src/cli/index.ts index 177a0b5..f1582c2 100644 --- a/src/cli/index.ts +++ b/src/cli/index.ts @@ -1366,41 +1366,37 @@ export function createCli(serverHandler?: (port?: number) => Promise): Com // Inter-agent conversation commands // ========================================================================= - // cw listen --agent-id [--poll-interval ] [--timeout ] + // cw listen --agent-id program .command('listen') - .description('Poll for pending conversations addressed to an agent') + .description('Listen for pending conversations via SSE subscription') .requiredOption('--agent-id ', 'Agent ID to listen for') - .option('--poll-interval ', 'Poll interval in milliseconds', '2000') - .option('--timeout ', 'Timeout in milliseconds (0=forever)', '0') - .action(async (options: { agentId: string; pollInterval: string; timeout: string }) => { + .action(async (options: { agentId: string }) => { try { const client = createDefaultTrpcClient(); - const pollInterval = parseInt(options.pollInterval, 10); - const timeout = parseInt(options.timeout, 10); - const startTime = Date.now(); - // eslint-disable-next-line no-constant-condition - while (true) { - const pending = await client.getPendingConversations.query({ agentId: options.agentId }); - if (pending.length > 0) { - const conv = pending[0]; - console.log(JSON.stringify({ - conversationId: conv.id, - fromAgentId: conv.fromAgentId, - question: conv.question, - phaseId: conv.phaseId, - taskId: conv.taskId, - })); - process.exit(0); - } - - if (timeout > 0 && Date.now() - startTime >= timeout) { - process.exit(1); - } - - await new Promise(resolve => setTimeout(resolve, pollInterval)); - } + const subscription = client.onPendingConversation.subscribe( + { agentId: options.agentId }, + { + onData(envelope) { + const conv = envelope.data; + console.log(JSON.stringify({ + conversationId: conv.conversationId, + fromAgentId: conv.fromAgentId, + question: conv.question, + phaseId: conv.phaseId, + taskId: conv.taskId, + })); + subscription.unsubscribe(); + process.exit(0); + }, + onError(err) { + console.error('Failed to listen:', err.message); + subscription.unsubscribe(); + process.exit(1); + }, + }, + ); } catch (error) { console.error('Failed to listen:', (error as Error).message); process.exit(1); diff --git a/src/cli/trpc-client.ts b/src/cli/trpc-client.ts index ac3de79..c2d74f2 100644 --- a/src/cli/trpc-client.ts +++ b/src/cli/trpc-client.ts @@ -2,10 +2,10 @@ * tRPC Client for CLI * * Type-safe client for communicating with the coordination server. - * Uses httpBatchLink for efficient request batching. + * Uses splitLink to route subscriptions to SSE and queries/mutations to HTTP batch. */ -import { createTRPCClient, httpBatchLink } from '@trpc/client'; +import { createTRPCClient, httpBatchLink, splitLink, httpSubscriptionLink } from '@trpc/client'; import type { AppRouter } from '../trpc/index.js'; /** Default server port */ @@ -30,10 +30,13 @@ export function createTrpcClient( port: number = DEFAULT_PORT, host: string = DEFAULT_HOST ): TrpcClient { + const url = `http://${host}:${port}/trpc`; return createTRPCClient({ links: [ - httpBatchLink({ - url: `http://${host}:${port}/trpc`, + splitLink({ + condition: (op) => op.type === 'subscription', + true: httpSubscriptionLink({ url }), + false: httpBatchLink({ url }), }), ], }); diff --git a/src/trpc/routers/conversation.ts b/src/trpc/routers/conversation.ts index f8da953..aa9f21f 100644 --- a/src/trpc/routers/conversation.ts +++ b/src/trpc/routers/conversation.ts @@ -3,9 +3,11 @@ */ import { TRPCError } from '@trpc/server'; +import { tracked, type TrackedEnvelope } from '@trpc/server'; import { z } from 'zod'; import type { ProcedureBuilder } from '../trpc.js'; import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js'; +import type { ConversationCreatedEvent } from '../../events/types.js'; export function conversationProcedures(publicProcedure: ProcedureBuilder) { return { @@ -134,5 +136,86 @@ export function conversationProcedures(publicProcedure: ProcedureBuilder) { return updated; }), + + onPendingConversation: publicProcedure + .input(z.object({ agentId: z.string().min(1) })) + .subscription(async function* (opts): AsyncGenerator> { + const { agentId } = opts.input; + const signal = opts.signal ?? new AbortController().signal; + const eventBus = opts.ctx.eventBus; + const repo = requireConversationRepository(opts.ctx); + + // First yield any already-pending conversations + const existing = await repo.findPendingForAgent(agentId); + let eventCounter = 0; + for (const conv of existing) { + yield tracked(`conv-${eventCounter++}`, { + conversationId: conv.id, + fromAgentId: conv.fromAgentId, + question: conv.question, + phaseId: conv.phaseId, + taskId: conv.taskId, + }); + } + + // Then listen for new conversation:created events + const queue: string[] = []; // conversation IDs + let resolve: (() => void) | null = null; + + const handler = (event: ConversationCreatedEvent) => { + if (event.payload.toAgentId !== agentId) return; + queue.push(event.payload.conversationId); + if (resolve) { + const r = resolve; + resolve = null; + r(); + } + }; + + eventBus.on('conversation:created', handler); + + const cleanup = () => { + eventBus.off('conversation:created', handler); + if (resolve) { + const r = resolve; + resolve = null; + r(); + } + }; + + signal.addEventListener('abort', cleanup, { once: true }); + + try { + while (!signal.aborted) { + while (queue.length > 0) { + const convId = queue.shift()!; + const conv = await repo.findById(convId); + if (conv && conv.status === 'pending') { + yield tracked(`conv-${eventCounter++}`, { + conversationId: conv.id, + fromAgentId: conv.fromAgentId, + question: conv.question, + phaseId: conv.phaseId, + taskId: conv.taskId, + }); + } + } + + if (!signal.aborted) { + await new Promise((r) => { + resolve = r; + }); + } + } + } finally { + cleanup(); + } + }), }; } diff --git a/src/trpc/subscriptions.ts b/src/trpc/subscriptions.ts index 6d8b53b..4bf55a3 100644 --- a/src/trpc/subscriptions.ts +++ b/src/trpc/subscriptions.ts @@ -61,6 +61,8 @@ export const ALL_EVENT_TYPES: DomainEventType[] = [ 'page:deleted', 'changeset:created', 'changeset:reverted', + 'conversation:created', + 'conversation:answered', ]; /**