feat: Inject agent ID into prompts, SSE-based cw listen, all flags documented

- INTER_AGENT_COMMUNICATION constant → buildInterAgentCommunication(agentId) function
- Manager injects actual agent ID into prompt after DB record creation
- Agent ID hardcoded in cw listen/ask commands — no manifest.json indirection
- cw listen now uses onPendingConversation SSE subscription instead of polling
- CLI trpc-client upgraded with splitLink for subscription support
- All CLI flags (--agent-id, --from, --timeout, --poll-interval) documented in prompt
- conversation:created/answered added to ALL_EVENT_TYPES
This commit is contained in:
Lukas May
2026-02-10 15:53:01 +01:00
parent c2d665c24f
commit bfc1b422f9
14 changed files with 166 additions and 103 deletions

View File

@@ -142,28 +142,27 @@ Agent output is persisted to `agent_log_chunks` table and drives all live stream
Agents can communicate with each other via the `conversations` table, coordinated through CLI commands.
### Prompt Integration
`INTER_AGENT_COMMUNICATION` constant in `prompts/shared.ts` is appended to all 5 agent mode prompts. It instructs agents to:
`buildInterAgentCommunication(agentId)` function in `prompts/shared.ts` generates per-agent communication instructions. Called in `manager.ts` after agent record creation — the actual agent ID is injected directly into the prompt (no manifest.json indirection). Appended to the prompt regardless of mode. Instructions include:
1. Set up a background listener via temp-file redirect: `cw listen > $CW_LISTEN_FILE &`
2. Periodically check the temp file for incoming questions between work steps
3. Answer via `cw answer`, clear the file, restart the listener
4. Ask questions to peers via `cw ask --from <ID> --agent-id|--task-id|--phase-id`
4. Ask questions to peers via `cw ask --from <agentId> --agent-id|--task-id|--phase-id`
5. Kill the listener and clean up the temp file before writing `signal.json`
### Agent Identity
`manifest.json` now includes `agentId` and `agentName` fields. The manager passes these from the DB record after agent creation.
`manifest.json` includes `agentId` and `agentName` fields. The manager passes these from the DB record after agent creation. The agent ID is also injected directly into the prompt's communication instructions.
### CLI Commands
**`cw listen --agent-id <id> [--timeout <ms>] [--poll-interval <ms>]`**
- Polls `getPendingConversations`, prints first pending as JSON, exits with code 0
- `--timeout`: max wait in ms (default 0=forever)
- `--poll-interval`: polling frequency in ms (default 2000)
**`cw listen --agent-id <id>`**
- Subscribes to `onPendingConversation` SSE subscription, prints first pending as JSON, exits with code 0
- First yields any existing pending conversations from DB, then listens for `conversation:created` events
- Output: `{ conversationId, fromAgentId, question, phaseId?, taskId? }`
**`cw ask <question> --from <agentId> --agent-id|--task-id|--phase-id <target> [--timeout <ms>] [--poll-interval <ms>]`**
- Creates conversation, polls `getConversation` until answered, prints answer text to stdout
- Target resolution: `--agent-id` (direct), `--task-id` (find agent running task), `--phase-id` (find agent in phase)
- `--timeout` / `--poll-interval`: same defaults as listen
- `--timeout`: max wait in ms (default 0=forever), `--poll-interval`: polling frequency in ms (default 2000)
**`cw answer <answer> --conversation-id <id>`**
- Calls `answerConversation`, prints `{ conversationId, status: "answered" }`

View File

@@ -278,7 +278,7 @@ describe('MultiProviderAgentManager', () => {
// Verify spawn was called with custom cwd
expect(mockSpawn).toHaveBeenCalledWith(
'claude',
expect.arrayContaining(['-p', 'Test task', '--output-format', 'stream-json']),
expect.arrayContaining(['-p', expect.stringContaining('Test task'), '--output-format', 'stream-json']),
expect.objectContaining({ cwd: '/custom/path' })
);
});

View File

@@ -36,7 +36,7 @@ import type {
ProcessCrashedEvent,
} from '../events/index.js';
import { writeInputFiles } from './file-io.js';
import { buildWorkspaceLayout } from './prompts/index.js';
import { buildWorkspaceLayout, buildInterAgentCommunication } from './prompts/index.js';
import { getProvider } from './providers/registry.js';
import { createModuleLogger } from '../logger/index.js';
import { join, dirname } from 'node:path';
@@ -275,6 +275,9 @@ export class MultiProviderAgentManager implements AgentManager {
});
const agentId = agent.id;
// 3a. Append inter-agent communication instructions with actual agent ID
prompt = prompt + buildInterAgentCommunication(agentId);
// 3b. Write input files (after agent creation so we can include agentId/agentName)
if (options.inputContext) {
writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext, agentId, agentName: alias });

View File

@@ -2,7 +2,7 @@
* Detail mode prompt — break a phase into executable tasks.
*/
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
export function buildDetailPrompt(): string {
return `You are an Architect agent in the Codewalk multi-agent system operating in DETAIL mode.
@@ -26,21 +26,15 @@ ${ID_GENERATION}
## Task Design Rules
- Each task: specific, actionable, completable by one agent
- Ideally tasks shall be executable in parallel - if they depend on each other, use dependencies to indicate order
- Include verification steps where appropriate
- Use \`checkpoint:*\` types for tasks requiring human review
- Dependencies should be minimal and explicit
## Existing Context
- FIRST: Read ALL files in \`context/tasks/\` before generating any output
- Your target phase is \`phase.md\` — only create tasks for THIS phase
- If a task in context/tasks/ already covers the same work (even under a different name), do NOT create a duplicate
- Pages contain requirements — reference them for detailed task descriptions
- Pages contain requirements — use them to create detailed task descriptions
- DO NOT create tasks that overlap with existing tasks in other phases
## Rules
- Break work into 3-8 tasks per phase
- Order tasks logically (foundational work first)
- Make each task self-contained with enough context
- Include test/verify tasks where appropriate
${INTER_AGENT_COMMUNICATION}`;
`;
}

View File

@@ -2,7 +2,7 @@
* Discuss mode prompt — clarifying questions and decision capture.
*/
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
export function buildDiscussPrompt(): string {
return `You are an Architect agent in the Codewalk multi-agent system operating in DISCUSS mode.
@@ -30,6 +30,5 @@ ${ID_GENERATION}
- Ask 2-4 questions at a time, not more
- Provide options when choices are clear
- Capture every decision with rationale
- Don't proceed until ambiguities are resolved
${INTER_AGENT_COMMUNICATION}`;
- Don't proceed until ambiguities are resolved`;
}

View File

@@ -2,7 +2,7 @@
* Execute mode prompt — standard worker agent.
*/
import { INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
import { INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
export function buildExecutePrompt(): string {
return `You are a Worker agent in the Codewalk multi-agent system.
@@ -16,6 +16,5 @@ ${SIGNAL_FORMAT}
- Complete the task as specified in .cw/input/task.md
- Ask questions if requirements are unclear
- Report errors honestly — don't guess
- Focus on writing clean, tested code
${INTER_AGENT_COMMUNICATION}`;
- Focus on writing clean, tested code`;
}

View File

@@ -5,7 +5,7 @@
* input files, ID generation) are in shared.ts.
*/
export { SIGNAL_FORMAT, INPUT_FILES, ID_GENERATION, INTER_AGENT_COMMUNICATION } from './shared.js';
export { SIGNAL_FORMAT, INPUT_FILES, ID_GENERATION, buildInterAgentCommunication } from './shared.js';
export { buildExecutePrompt } from './execute.js';
export { buildDiscussPrompt } from './discuss.js';
export { buildPlanPrompt } from './plan.js';

View File

@@ -2,7 +2,7 @@
* Plan mode prompt — plan initiative into phases.
*/
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
export function buildPlanPrompt(): string {
return `You are an Architect agent in the Codewalk multi-agent system operating in PLAN mode.
@@ -36,6 +36,5 @@ ${ID_GENERATION}
- Start with foundation/infrastructure phases
- Group related work together
- Make dependencies explicit using phase IDs
- Each task should be completable in one session
${INTER_AGENT_COMMUNICATION}`;
- Each task should be completable in one session`;
}

View File

@@ -2,7 +2,7 @@
* Refine mode prompt — review and propose edits to initiative pages.
*/
import { INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
import { INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
export function buildRefinePrompt(): string {
return `You are an Architect agent in the Codewalk multi-agent system operating in REFINE mode.
@@ -24,6 +24,5 @@ Write one file per modified page to \`.cw/output/pages/{pageId}.md\`:
- Each output page's body is the FULL new content (not a diff)
- Preserve [[page:\$id|title]] cross-references in your output
- Focus on clarity, completeness, and consistency
- Do not invent new page IDs — only reference existing ones from .cw/input/pages/
${INTER_AGENT_COMMUNICATION}`;
- Do not invent new page IDs — only reference existing ones from .cw/input/pages/`;
}

View File

@@ -44,53 +44,41 @@ cw id
\`\`\`
Use the output as the filename (e.g., \`{id}.md\`).`;
export const INTER_AGENT_COMMUNICATION = `
export function buildInterAgentCommunication(agentId: string): string {
return `
## Inter-Agent Communication
You are working in a multi-agent parallel environment. Other agents may be working on related tasks simultaneously. You can exchange questions and answers with peer agents via CLI commands.
### Your Identity
Read \`.cw/input/manifest.json\` — it contains \`agentId\` and \`agentName\` fields identifying you. You'll need your \`agentId\` for all communication commands.
Your agent ID is: **${agentId}**
### CLI Commands
**\`cw listen\`** — Poll for incoming questions. Prints the first pending question as JSON and exits.
\`\`\`
cw listen --agent-id <YOUR_AGENT_ID> [--timeout <ms>] [--poll-interval <ms>]
\`\`\`
- \`--agent-id\` (required): Your agent ID
- \`--timeout\`: Max wait in ms. Default: 0 (wait forever). Use a value like 120000 (2 min) to avoid hanging.
- \`--poll-interval\`: Polling frequency in ms. Default: 2000
- Output (JSON): \`{ "conversationId": "...", "fromAgentId": "...", "question": "...", "phaseId": "...", "taskId": "..." }\`
- Exit code 0 if a question was found, 1 on timeout or error.
**\`cw listen --agent-id ${agentId}\`**
Waits for an incoming question via server-sent events (SSE). Blocks until a question arrives, then prints one JSON line to stdout and exits with code 0.
Output: \`{ "conversationId": "...", "fromAgentId": "...", "question": "...", "phaseId": "...", "taskId": "..." }\`
Exits with code 1 on connection error.
**\`cw ask\`** — Ask another agent a question. Blocks until the answer arrives, then prints the answer text to stdout.
\`\`\`
cw ask "<question>" --from <YOUR_AGENT_ID> --agent-id <TARGET_AGENT_ID> [--timeout <ms>] [--poll-interval <ms>]
\`\`\`
- \`--from\` (required): Your agent ID (the asker)
- Target (exactly one required):
- \`--agent-id <id>\`: Ask a specific agent directly
- \`--task-id <id>\`: Ask whichever agent is running that task
**\`cw ask "<question>" --from ${agentId} --agent-id <TARGET_AGENT_ID>\`**
Ask another agent a question. Blocks until the answer arrives, then prints the answer text to stdout and exits with code 0.
Target (exactly one required):
- \`--agent-id <id>\`: Ask a specific agent directly by agent ID
- \`--task-id <id>\`: Ask whichever agent is currently running that task
- \`--phase-id <id>\`: Ask whichever agent is running a task in that phase
- \`--timeout\`: Max wait in ms. Default: 0 (wait forever). Use 120000+ for safety.
- \`--poll-interval\`: Polling frequency in ms. Default: 2000
- Output: The answer text (plain text, not JSON).
- Exit code 0 if answered, 1 on timeout or error.
Options:
- \`--poll-interval <ms>\`: Answer polling frequency in ms. Default: 2000
- \`--timeout <ms>\`: Max wait for answer in ms. Default: 0 (wait forever)
**\`cw answer\`** — Answer a pending question.
\`\`\`
cw answer "<answer>" --conversation-id <CONVERSATION_ID>
\`\`\`
- \`--conversation-id\` (required): The conversation ID from the listen output
- Output (JSON): \`{ "conversationId": "...", "status": "answered" }\`
**\`cw answer "<answer>" --conversation-id <CONVERSATION_ID>\`**
Answer a pending question. The conversation ID comes from the \`cw listen\` output.
Output: \`{ "conversationId": "...", "status": "answered" }\`
### Background Listener Pattern
At the START of your session, set up a background listener that writes to a temp file:
At the START of your session, run a background listener that writes to a temp file:
\`\`\`bash
CW_LISTEN_FILE=$(mktemp /tmp/cw-listen-XXXXXX.txt)
cw listen --agent-id <YOUR_AGENT_ID> --timeout 120000 > "$CW_LISTEN_FILE" 2>&1 &
cw listen --agent-id ${agentId} > "$CW_LISTEN_FILE" 2>&1 &
LISTEN_PID=$!
\`\`\`
@@ -102,13 +90,11 @@ if [ -n "$LISTEN_CONTENT" ]; then
fi
\`\`\`
When a question arrives, parse the JSON, answer, then restart the listener:
When a question arrives, parse the JSON, answer it, then restart the listener:
\`\`\`bash
# Parse conversationId and question from the JSON
cw answer "<your answer>" --conversation-id <conversationId>
# Clear and restart
> "$CW_LISTEN_FILE"
cw listen --agent-id <YOUR_AGENT_ID> --timeout 120000 > "$CW_LISTEN_FILE" 2>&1 &
cw listen --agent-id ${agentId} > "$CW_LISTEN_FILE" 2>&1 &
LISTEN_PID=$!
\`\`\`
@@ -119,8 +105,9 @@ LISTEN_PID=$!
- Do NOT ask questions that you can answer by reading the codebase yourself
### Cleanup
Before writing \`.cw/output/signal.json\`, kill your listener:
Before writing \`.cw/output/signal.json\`, kill your listener and clean up:
\`\`\`bash
kill $LISTEN_PID 2>/dev/null
rm -f "$CW_LISTEN_FILE"
\`\`\``;
}

View File

@@ -1366,41 +1366,37 @@ export function createCli(serverHandler?: (port?: number) => Promise<void>): Com
// Inter-agent conversation commands
// =========================================================================
// cw listen --agent-id <id> [--poll-interval <ms>] [--timeout <ms>]
// cw listen --agent-id <id>
program
.command('listen')
.description('Poll for pending conversations addressed to an agent')
.description('Listen for pending conversations via SSE subscription')
.requiredOption('--agent-id <id>', 'Agent ID to listen for')
.option('--poll-interval <ms>', 'Poll interval in milliseconds', '2000')
.option('--timeout <ms>', 'Timeout in milliseconds (0=forever)', '0')
.action(async (options: { agentId: string; pollInterval: string; timeout: string }) => {
.action(async (options: { agentId: string }) => {
try {
const client = createDefaultTrpcClient();
const pollInterval = parseInt(options.pollInterval, 10);
const timeout = parseInt(options.timeout, 10);
const startTime = Date.now();
// eslint-disable-next-line no-constant-condition
while (true) {
const pending = await client.getPendingConversations.query({ agentId: options.agentId });
if (pending.length > 0) {
const conv = pending[0];
console.log(JSON.stringify({
conversationId: conv.id,
fromAgentId: conv.fromAgentId,
question: conv.question,
phaseId: conv.phaseId,
taskId: conv.taskId,
}));
process.exit(0);
}
if (timeout > 0 && Date.now() - startTime >= timeout) {
process.exit(1);
}
await new Promise(resolve => setTimeout(resolve, pollInterval));
}
const subscription = client.onPendingConversation.subscribe(
{ agentId: options.agentId },
{
onData(envelope) {
const conv = envelope.data;
console.log(JSON.stringify({
conversationId: conv.conversationId,
fromAgentId: conv.fromAgentId,
question: conv.question,
phaseId: conv.phaseId,
taskId: conv.taskId,
}));
subscription.unsubscribe();
process.exit(0);
},
onError(err) {
console.error('Failed to listen:', err.message);
subscription.unsubscribe();
process.exit(1);
},
},
);
} catch (error) {
console.error('Failed to listen:', (error as Error).message);
process.exit(1);

View File

@@ -2,10 +2,10 @@
* tRPC Client for CLI
*
* Type-safe client for communicating with the coordination server.
* Uses httpBatchLink for efficient request batching.
* Uses splitLink to route subscriptions to SSE and queries/mutations to HTTP batch.
*/
import { createTRPCClient, httpBatchLink } from '@trpc/client';
import { createTRPCClient, httpBatchLink, splitLink, httpSubscriptionLink } from '@trpc/client';
import type { AppRouter } from '../trpc/index.js';
/** Default server port */
@@ -30,10 +30,13 @@ export function createTrpcClient(
port: number = DEFAULT_PORT,
host: string = DEFAULT_HOST
): TrpcClient {
const url = `http://${host}:${port}/trpc`;
return createTRPCClient<AppRouter>({
links: [
httpBatchLink({
url: `http://${host}:${port}/trpc`,
splitLink({
condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({ url }),
false: httpBatchLink({ url }),
}),
],
});

View File

@@ -3,9 +3,11 @@
*/
import { TRPCError } from '@trpc/server';
import { tracked, type TrackedEnvelope } from '@trpc/server';
import { z } from 'zod';
import type { ProcedureBuilder } from '../trpc.js';
import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js';
import type { ConversationCreatedEvent } from '../../events/types.js';
export function conversationProcedures(publicProcedure: ProcedureBuilder) {
return {
@@ -134,5 +136,86 @@ export function conversationProcedures(publicProcedure: ProcedureBuilder) {
return updated;
}),
onPendingConversation: publicProcedure
.input(z.object({ agentId: z.string().min(1) }))
.subscription(async function* (opts): AsyncGenerator<TrackedEnvelope<{
conversationId: string;
fromAgentId: string;
question: string;
phaseId: string | null;
taskId: string | null;
}>> {
const { agentId } = opts.input;
const signal = opts.signal ?? new AbortController().signal;
const eventBus = opts.ctx.eventBus;
const repo = requireConversationRepository(opts.ctx);
// First yield any already-pending conversations
const existing = await repo.findPendingForAgent(agentId);
let eventCounter = 0;
for (const conv of existing) {
yield tracked(`conv-${eventCounter++}`, {
conversationId: conv.id,
fromAgentId: conv.fromAgentId,
question: conv.question,
phaseId: conv.phaseId,
taskId: conv.taskId,
});
}
// Then listen for new conversation:created events
const queue: string[] = []; // conversation IDs
let resolve: (() => void) | null = null;
const handler = (event: ConversationCreatedEvent) => {
if (event.payload.toAgentId !== agentId) return;
queue.push(event.payload.conversationId);
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
eventBus.on('conversation:created', handler);
const cleanup = () => {
eventBus.off('conversation:created', handler);
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
signal.addEventListener('abort', cleanup, { once: true });
try {
while (!signal.aborted) {
while (queue.length > 0) {
const convId = queue.shift()!;
const conv = await repo.findById(convId);
if (conv && conv.status === 'pending') {
yield tracked(`conv-${eventCounter++}`, {
conversationId: conv.id,
fromAgentId: conv.fromAgentId,
question: conv.question,
phaseId: conv.phaseId,
taskId: conv.taskId,
});
}
}
if (!signal.aborted) {
await new Promise<void>((r) => {
resolve = r;
});
}
}
} finally {
cleanup();
}
}),
};
}

View File

@@ -61,6 +61,8 @@ export const ALL_EVENT_TYPES: DomainEventType[] = [
'page:deleted',
'changeset:created',
'changeset:reverted',
'conversation:created',
'conversation:answered',
];
/**