feat: Inject agent ID into prompts, SSE-based cw listen, all flags documented

- INTER_AGENT_COMMUNICATION constant → buildInterAgentCommunication(agentId) function
- Manager injects actual agent ID into prompt after DB record creation
- Agent ID hardcoded in cw listen/ask commands — no manifest.json indirection
- cw listen now uses onPendingConversation SSE subscription instead of polling
- CLI trpc-client upgraded with splitLink for subscription support
- All CLI flags (--agent-id, --from, --timeout, --poll-interval) documented in prompt
- conversation:created/answered added to ALL_EVENT_TYPES
This commit is contained in:
Lukas May
2026-02-10 15:53:01 +01:00
parent c2d665c24f
commit bfc1b422f9
14 changed files with 166 additions and 103 deletions

View File

@@ -142,28 +142,27 @@ Agent output is persisted to `agent_log_chunks` table and drives all live stream
Agents can communicate with each other via the `conversations` table, coordinated through CLI commands. Agents can communicate with each other via the `conversations` table, coordinated through CLI commands.
### Prompt Integration ### Prompt Integration
`INTER_AGENT_COMMUNICATION` constant in `prompts/shared.ts` is appended to all 5 agent mode prompts. It instructs agents to: `buildInterAgentCommunication(agentId)` function in `prompts/shared.ts` generates per-agent communication instructions. Called in `manager.ts` after agent record creation — the actual agent ID is injected directly into the prompt (no manifest.json indirection). Appended to the prompt regardless of mode. Instructions include:
1. Set up a background listener via temp-file redirect: `cw listen > $CW_LISTEN_FILE &` 1. Set up a background listener via temp-file redirect: `cw listen > $CW_LISTEN_FILE &`
2. Periodically check the temp file for incoming questions between work steps 2. Periodically check the temp file for incoming questions between work steps
3. Answer via `cw answer`, clear the file, restart the listener 3. Answer via `cw answer`, clear the file, restart the listener
4. Ask questions to peers via `cw ask --from <ID> --agent-id|--task-id|--phase-id` 4. Ask questions to peers via `cw ask --from <agentId> --agent-id|--task-id|--phase-id`
5. Kill the listener and clean up the temp file before writing `signal.json` 5. Kill the listener and clean up the temp file before writing `signal.json`
### Agent Identity ### Agent Identity
`manifest.json` now includes `agentId` and `agentName` fields. The manager passes these from the DB record after agent creation. `manifest.json` includes `agentId` and `agentName` fields. The manager passes these from the DB record after agent creation. The agent ID is also injected directly into the prompt's communication instructions.
### CLI Commands ### CLI Commands
**`cw listen --agent-id <id> [--timeout <ms>] [--poll-interval <ms>]`** **`cw listen --agent-id <id>`**
- Polls `getPendingConversations`, prints first pending as JSON, exits with code 0 - Subscribes to `onPendingConversation` SSE subscription, prints first pending as JSON, exits with code 0
- `--timeout`: max wait in ms (default 0=forever) - First yields any existing pending conversations from DB, then listens for `conversation:created` events
- `--poll-interval`: polling frequency in ms (default 2000)
- Output: `{ conversationId, fromAgentId, question, phaseId?, taskId? }` - Output: `{ conversationId, fromAgentId, question, phaseId?, taskId? }`
**`cw ask <question> --from <agentId> --agent-id|--task-id|--phase-id <target> [--timeout <ms>] [--poll-interval <ms>]`** **`cw ask <question> --from <agentId> --agent-id|--task-id|--phase-id <target> [--timeout <ms>] [--poll-interval <ms>]`**
- Creates conversation, polls `getConversation` until answered, prints answer text to stdout - Creates conversation, polls `getConversation` until answered, prints answer text to stdout
- Target resolution: `--agent-id` (direct), `--task-id` (find agent running task), `--phase-id` (find agent in phase) - Target resolution: `--agent-id` (direct), `--task-id` (find agent running task), `--phase-id` (find agent in phase)
- `--timeout` / `--poll-interval`: same defaults as listen - `--timeout`: max wait in ms (default 0=forever), `--poll-interval`: polling frequency in ms (default 2000)
**`cw answer <answer> --conversation-id <id>`** **`cw answer <answer> --conversation-id <id>`**
- Calls `answerConversation`, prints `{ conversationId, status: "answered" }` - Calls `answerConversation`, prints `{ conversationId, status: "answered" }`

View File

@@ -278,7 +278,7 @@ describe('MultiProviderAgentManager', () => {
// Verify spawn was called with custom cwd // Verify spawn was called with custom cwd
expect(mockSpawn).toHaveBeenCalledWith( expect(mockSpawn).toHaveBeenCalledWith(
'claude', 'claude',
expect.arrayContaining(['-p', 'Test task', '--output-format', 'stream-json']), expect.arrayContaining(['-p', expect.stringContaining('Test task'), '--output-format', 'stream-json']),
expect.objectContaining({ cwd: '/custom/path' }) expect.objectContaining({ cwd: '/custom/path' })
); );
}); });

View File

@@ -36,7 +36,7 @@ import type {
ProcessCrashedEvent, ProcessCrashedEvent,
} from '../events/index.js'; } from '../events/index.js';
import { writeInputFiles } from './file-io.js'; import { writeInputFiles } from './file-io.js';
import { buildWorkspaceLayout } from './prompts/index.js'; import { buildWorkspaceLayout, buildInterAgentCommunication } from './prompts/index.js';
import { getProvider } from './providers/registry.js'; import { getProvider } from './providers/registry.js';
import { createModuleLogger } from '../logger/index.js'; import { createModuleLogger } from '../logger/index.js';
import { join, dirname } from 'node:path'; import { join, dirname } from 'node:path';
@@ -275,6 +275,9 @@ export class MultiProviderAgentManager implements AgentManager {
}); });
const agentId = agent.id; const agentId = agent.id;
// 3a. Append inter-agent communication instructions with actual agent ID
prompt = prompt + buildInterAgentCommunication(agentId);
// 3b. Write input files (after agent creation so we can include agentId/agentName) // 3b. Write input files (after agent creation so we can include agentId/agentName)
if (options.inputContext) { if (options.inputContext) {
writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext, agentId, agentName: alias }); writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext, agentId, agentName: alias });

View File

@@ -2,7 +2,7 @@
* Detail mode prompt — break a phase into executable tasks. * Detail mode prompt — break a phase into executable tasks.
*/ */
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
export function buildDetailPrompt(): string { export function buildDetailPrompt(): string {
return `You are an Architect agent in the Codewalk multi-agent system operating in DETAIL mode. return `You are an Architect agent in the Codewalk multi-agent system operating in DETAIL mode.
@@ -26,21 +26,15 @@ ${ID_GENERATION}
## Task Design Rules ## Task Design Rules
- Each task: specific, actionable, completable by one agent - Each task: specific, actionable, completable by one agent
- Ideally tasks shall be executable in parallel - if they depend on each other, use dependencies to indicate order
- Include verification steps where appropriate - Include verification steps where appropriate
- Use \`checkpoint:*\` types for tasks requiring human review
- Dependencies should be minimal and explicit - Dependencies should be minimal and explicit
## Existing Context ## Existing Context
- FIRST: Read ALL files in \`context/tasks/\` before generating any output - FIRST: Read ALL files in \`context/tasks/\` before generating any output
- Your target phase is \`phase.md\` — only create tasks for THIS phase - Your target phase is \`phase.md\` — only create tasks for THIS phase
- If a task in context/tasks/ already covers the same work (even under a different name), do NOT create a duplicate - If a task in context/tasks/ already covers the same work (even under a different name), do NOT create a duplicate
- Pages contain requirements — reference them for detailed task descriptions - Pages contain requirements — use them to create detailed task descriptions
- DO NOT create tasks that overlap with existing tasks in other phases - DO NOT create tasks that overlap with existing tasks in other phases
`;
## Rules
- Break work into 3-8 tasks per phase
- Order tasks logically (foundational work first)
- Make each task self-contained with enough context
- Include test/verify tasks where appropriate
${INTER_AGENT_COMMUNICATION}`;
} }

View File

@@ -2,7 +2,7 @@
* Discuss mode prompt — clarifying questions and decision capture. * Discuss mode prompt — clarifying questions and decision capture.
*/ */
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
export function buildDiscussPrompt(): string { export function buildDiscussPrompt(): string {
return `You are an Architect agent in the Codewalk multi-agent system operating in DISCUSS mode. return `You are an Architect agent in the Codewalk multi-agent system operating in DISCUSS mode.
@@ -30,6 +30,5 @@ ${ID_GENERATION}
- Ask 2-4 questions at a time, not more - Ask 2-4 questions at a time, not more
- Provide options when choices are clear - Provide options when choices are clear
- Capture every decision with rationale - Capture every decision with rationale
- Don't proceed until ambiguities are resolved - Don't proceed until ambiguities are resolved`;
${INTER_AGENT_COMMUNICATION}`;
} }

View File

@@ -2,7 +2,7 @@
* Execute mode prompt — standard worker agent. * Execute mode prompt — standard worker agent.
*/ */
import { INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; import { INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
export function buildExecutePrompt(): string { export function buildExecutePrompt(): string {
return `You are a Worker agent in the Codewalk multi-agent system. return `You are a Worker agent in the Codewalk multi-agent system.
@@ -16,6 +16,5 @@ ${SIGNAL_FORMAT}
- Complete the task as specified in .cw/input/task.md - Complete the task as specified in .cw/input/task.md
- Ask questions if requirements are unclear - Ask questions if requirements are unclear
- Report errors honestly — don't guess - Report errors honestly — don't guess
- Focus on writing clean, tested code - Focus on writing clean, tested code`;
${INTER_AGENT_COMMUNICATION}`;
} }

View File

@@ -5,7 +5,7 @@
* input files, ID generation) are in shared.ts. * input files, ID generation) are in shared.ts.
*/ */
export { SIGNAL_FORMAT, INPUT_FILES, ID_GENERATION, INTER_AGENT_COMMUNICATION } from './shared.js'; export { SIGNAL_FORMAT, INPUT_FILES, ID_GENERATION, buildInterAgentCommunication } from './shared.js';
export { buildExecutePrompt } from './execute.js'; export { buildExecutePrompt } from './execute.js';
export { buildDiscussPrompt } from './discuss.js'; export { buildDiscussPrompt } from './discuss.js';
export { buildPlanPrompt } from './plan.js'; export { buildPlanPrompt } from './plan.js';

View File

@@ -2,7 +2,7 @@
* Plan mode prompt — plan initiative into phases. * Plan mode prompt — plan initiative into phases.
*/ */
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
export function buildPlanPrompt(): string { export function buildPlanPrompt(): string {
return `You are an Architect agent in the Codewalk multi-agent system operating in PLAN mode. return `You are an Architect agent in the Codewalk multi-agent system operating in PLAN mode.
@@ -36,6 +36,5 @@ ${ID_GENERATION}
- Start with foundation/infrastructure phases - Start with foundation/infrastructure phases
- Group related work together - Group related work together
- Make dependencies explicit using phase IDs - Make dependencies explicit using phase IDs
- Each task should be completable in one session - Each task should be completable in one session`;
${INTER_AGENT_COMMUNICATION}`;
} }

View File

@@ -2,7 +2,7 @@
* Refine mode prompt — review and propose edits to initiative pages. * Refine mode prompt — review and propose edits to initiative pages.
*/ */
import { INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; import { INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
export function buildRefinePrompt(): string { export function buildRefinePrompt(): string {
return `You are an Architect agent in the Codewalk multi-agent system operating in REFINE mode. return `You are an Architect agent in the Codewalk multi-agent system operating in REFINE mode.
@@ -24,6 +24,5 @@ Write one file per modified page to \`.cw/output/pages/{pageId}.md\`:
- Each output page's body is the FULL new content (not a diff) - Each output page's body is the FULL new content (not a diff)
- Preserve [[page:\$id|title]] cross-references in your output - Preserve [[page:\$id|title]] cross-references in your output
- Focus on clarity, completeness, and consistency - Focus on clarity, completeness, and consistency
- Do not invent new page IDs — only reference existing ones from .cw/input/pages/ - Do not invent new page IDs — only reference existing ones from .cw/input/pages/`;
${INTER_AGENT_COMMUNICATION}`;
} }

View File

@@ -44,53 +44,41 @@ cw id
\`\`\` \`\`\`
Use the output as the filename (e.g., \`{id}.md\`).`; Use the output as the filename (e.g., \`{id}.md\`).`;
export const INTER_AGENT_COMMUNICATION = ` export function buildInterAgentCommunication(agentId: string): string {
return `
## Inter-Agent Communication ## Inter-Agent Communication
You are working in a multi-agent parallel environment. Other agents may be working on related tasks simultaneously. You can exchange questions and answers with peer agents via CLI commands. You are working in a multi-agent parallel environment. Other agents may be working on related tasks simultaneously. You can exchange questions and answers with peer agents via CLI commands.
### Your Identity Your agent ID is: **${agentId}**
Read \`.cw/input/manifest.json\` — it contains \`agentId\` and \`agentName\` fields identifying you. You'll need your \`agentId\` for all communication commands.
### CLI Commands ### CLI Commands
**\`cw listen\`** — Poll for incoming questions. Prints the first pending question as JSON and exits. **\`cw listen --agent-id ${agentId}\`**
\`\`\` Waits for an incoming question via server-sent events (SSE). Blocks until a question arrives, then prints one JSON line to stdout and exits with code 0.
cw listen --agent-id <YOUR_AGENT_ID> [--timeout <ms>] [--poll-interval <ms>] Output: \`{ "conversationId": "...", "fromAgentId": "...", "question": "...", "phaseId": "...", "taskId": "..." }\`
\`\`\` Exits with code 1 on connection error.
- \`--agent-id\` (required): Your agent ID
- \`--timeout\`: Max wait in ms. Default: 0 (wait forever). Use a value like 120000 (2 min) to avoid hanging.
- \`--poll-interval\`: Polling frequency in ms. Default: 2000
- Output (JSON): \`{ "conversationId": "...", "fromAgentId": "...", "question": "...", "phaseId": "...", "taskId": "..." }\`
- Exit code 0 if a question was found, 1 on timeout or error.
**\`cw ask\`** — Ask another agent a question. Blocks until the answer arrives, then prints the answer text to stdout. **\`cw ask "<question>" --from ${agentId} --agent-id <TARGET_AGENT_ID>\`**
\`\`\` Ask another agent a question. Blocks until the answer arrives, then prints the answer text to stdout and exits with code 0.
cw ask "<question>" --from <YOUR_AGENT_ID> --agent-id <TARGET_AGENT_ID> [--timeout <ms>] [--poll-interval <ms>] Target (exactly one required):
\`\`\` - \`--agent-id <id>\`: Ask a specific agent directly by agent ID
- \`--from\` (required): Your agent ID (the asker) - \`--task-id <id>\`: Ask whichever agent is currently running that task
- Target (exactly one required):
- \`--agent-id <id>\`: Ask a specific agent directly
- \`--task-id <id>\`: Ask whichever agent is running that task
- \`--phase-id <id>\`: Ask whichever agent is running a task in that phase - \`--phase-id <id>\`: Ask whichever agent is running a task in that phase
- \`--timeout\`: Max wait in ms. Default: 0 (wait forever). Use 120000+ for safety. Options:
- \`--poll-interval\`: Polling frequency in ms. Default: 2000 - \`--poll-interval <ms>\`: Answer polling frequency in ms. Default: 2000
- Output: The answer text (plain text, not JSON). - \`--timeout <ms>\`: Max wait for answer in ms. Default: 0 (wait forever)
- Exit code 0 if answered, 1 on timeout or error.
**\`cw answer\`** — Answer a pending question. **\`cw answer "<answer>" --conversation-id <CONVERSATION_ID>\`**
\`\`\` Answer a pending question. The conversation ID comes from the \`cw listen\` output.
cw answer "<answer>" --conversation-id <CONVERSATION_ID> Output: \`{ "conversationId": "...", "status": "answered" }\`
\`\`\`
- \`--conversation-id\` (required): The conversation ID from the listen output
- Output (JSON): \`{ "conversationId": "...", "status": "answered" }\`
### Background Listener Pattern ### Background Listener Pattern
At the START of your session, set up a background listener that writes to a temp file: At the START of your session, run a background listener that writes to a temp file:
\`\`\`bash \`\`\`bash
CW_LISTEN_FILE=$(mktemp /tmp/cw-listen-XXXXXX.txt) CW_LISTEN_FILE=$(mktemp /tmp/cw-listen-XXXXXX.txt)
cw listen --agent-id <YOUR_AGENT_ID> --timeout 120000 > "$CW_LISTEN_FILE" 2>&1 & cw listen --agent-id ${agentId} > "$CW_LISTEN_FILE" 2>&1 &
LISTEN_PID=$! LISTEN_PID=$!
\`\`\` \`\`\`
@@ -102,13 +90,11 @@ if [ -n "$LISTEN_CONTENT" ]; then
fi fi
\`\`\` \`\`\`
When a question arrives, parse the JSON, answer, then restart the listener: When a question arrives, parse the JSON, answer it, then restart the listener:
\`\`\`bash \`\`\`bash
# Parse conversationId and question from the JSON
cw answer "<your answer>" --conversation-id <conversationId> cw answer "<your answer>" --conversation-id <conversationId>
# Clear and restart
> "$CW_LISTEN_FILE" > "$CW_LISTEN_FILE"
cw listen --agent-id <YOUR_AGENT_ID> --timeout 120000 > "$CW_LISTEN_FILE" 2>&1 & cw listen --agent-id ${agentId} > "$CW_LISTEN_FILE" 2>&1 &
LISTEN_PID=$! LISTEN_PID=$!
\`\`\` \`\`\`
@@ -119,8 +105,9 @@ LISTEN_PID=$!
- Do NOT ask questions that you can answer by reading the codebase yourself - Do NOT ask questions that you can answer by reading the codebase yourself
### Cleanup ### Cleanup
Before writing \`.cw/output/signal.json\`, kill your listener: Before writing \`.cw/output/signal.json\`, kill your listener and clean up:
\`\`\`bash \`\`\`bash
kill $LISTEN_PID 2>/dev/null kill $LISTEN_PID 2>/dev/null
rm -f "$CW_LISTEN_FILE" rm -f "$CW_LISTEN_FILE"
\`\`\``; \`\`\``;
}

View File

@@ -1366,41 +1366,37 @@ export function createCli(serverHandler?: (port?: number) => Promise<void>): Com
// Inter-agent conversation commands // Inter-agent conversation commands
// ========================================================================= // =========================================================================
// cw listen --agent-id <id> [--poll-interval <ms>] [--timeout <ms>] // cw listen --agent-id <id>
program program
.command('listen') .command('listen')
.description('Poll for pending conversations addressed to an agent') .description('Listen for pending conversations via SSE subscription')
.requiredOption('--agent-id <id>', 'Agent ID to listen for') .requiredOption('--agent-id <id>', 'Agent ID to listen for')
.option('--poll-interval <ms>', 'Poll interval in milliseconds', '2000') .action(async (options: { agentId: string }) => {
.option('--timeout <ms>', 'Timeout in milliseconds (0=forever)', '0')
.action(async (options: { agentId: string; pollInterval: string; timeout: string }) => {
try { try {
const client = createDefaultTrpcClient(); const client = createDefaultTrpcClient();
const pollInterval = parseInt(options.pollInterval, 10);
const timeout = parseInt(options.timeout, 10);
const startTime = Date.now();
// eslint-disable-next-line no-constant-condition const subscription = client.onPendingConversation.subscribe(
while (true) { { agentId: options.agentId },
const pending = await client.getPendingConversations.query({ agentId: options.agentId }); {
if (pending.length > 0) { onData(envelope) {
const conv = pending[0]; const conv = envelope.data;
console.log(JSON.stringify({ console.log(JSON.stringify({
conversationId: conv.id, conversationId: conv.conversationId,
fromAgentId: conv.fromAgentId, fromAgentId: conv.fromAgentId,
question: conv.question, question: conv.question,
phaseId: conv.phaseId, phaseId: conv.phaseId,
taskId: conv.taskId, taskId: conv.taskId,
})); }));
process.exit(0); subscription.unsubscribe();
} process.exit(0);
},
if (timeout > 0 && Date.now() - startTime >= timeout) { onError(err) {
process.exit(1); console.error('Failed to listen:', err.message);
} subscription.unsubscribe();
process.exit(1);
await new Promise(resolve => setTimeout(resolve, pollInterval)); },
} },
);
} catch (error) { } catch (error) {
console.error('Failed to listen:', (error as Error).message); console.error('Failed to listen:', (error as Error).message);
process.exit(1); process.exit(1);

View File

@@ -2,10 +2,10 @@
* tRPC Client for CLI * tRPC Client for CLI
* *
* Type-safe client for communicating with the coordination server. * Type-safe client for communicating with the coordination server.
* Uses httpBatchLink for efficient request batching. * Uses splitLink to route subscriptions to SSE and queries/mutations to HTTP batch.
*/ */
import { createTRPCClient, httpBatchLink } from '@trpc/client'; import { createTRPCClient, httpBatchLink, splitLink, httpSubscriptionLink } from '@trpc/client';
import type { AppRouter } from '../trpc/index.js'; import type { AppRouter } from '../trpc/index.js';
/** Default server port */ /** Default server port */
@@ -30,10 +30,13 @@ export function createTrpcClient(
port: number = DEFAULT_PORT, port: number = DEFAULT_PORT,
host: string = DEFAULT_HOST host: string = DEFAULT_HOST
): TrpcClient { ): TrpcClient {
const url = `http://${host}:${port}/trpc`;
return createTRPCClient<AppRouter>({ return createTRPCClient<AppRouter>({
links: [ links: [
httpBatchLink({ splitLink({
url: `http://${host}:${port}/trpc`, condition: (op) => op.type === 'subscription',
true: httpSubscriptionLink({ url }),
false: httpBatchLink({ url }),
}), }),
], ],
}); });

View File

@@ -3,9 +3,11 @@
*/ */
import { TRPCError } from '@trpc/server'; import { TRPCError } from '@trpc/server';
import { tracked, type TrackedEnvelope } from '@trpc/server';
import { z } from 'zod'; import { z } from 'zod';
import type { ProcedureBuilder } from '../trpc.js'; import type { ProcedureBuilder } from '../trpc.js';
import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js'; import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js';
import type { ConversationCreatedEvent } from '../../events/types.js';
export function conversationProcedures(publicProcedure: ProcedureBuilder) { export function conversationProcedures(publicProcedure: ProcedureBuilder) {
return { return {
@@ -134,5 +136,86 @@ export function conversationProcedures(publicProcedure: ProcedureBuilder) {
return updated; return updated;
}), }),
onPendingConversation: publicProcedure
.input(z.object({ agentId: z.string().min(1) }))
.subscription(async function* (opts): AsyncGenerator<TrackedEnvelope<{
conversationId: string;
fromAgentId: string;
question: string;
phaseId: string | null;
taskId: string | null;
}>> {
const { agentId } = opts.input;
const signal = opts.signal ?? new AbortController().signal;
const eventBus = opts.ctx.eventBus;
const repo = requireConversationRepository(opts.ctx);
// First yield any already-pending conversations
const existing = await repo.findPendingForAgent(agentId);
let eventCounter = 0;
for (const conv of existing) {
yield tracked(`conv-${eventCounter++}`, {
conversationId: conv.id,
fromAgentId: conv.fromAgentId,
question: conv.question,
phaseId: conv.phaseId,
taskId: conv.taskId,
});
}
// Then listen for new conversation:created events
const queue: string[] = []; // conversation IDs
let resolve: (() => void) | null = null;
const handler = (event: ConversationCreatedEvent) => {
if (event.payload.toAgentId !== agentId) return;
queue.push(event.payload.conversationId);
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
eventBus.on('conversation:created', handler);
const cleanup = () => {
eventBus.off('conversation:created', handler);
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
signal.addEventListener('abort', cleanup, { once: true });
try {
while (!signal.aborted) {
while (queue.length > 0) {
const convId = queue.shift()!;
const conv = await repo.findById(convId);
if (conv && conv.status === 'pending') {
yield tracked(`conv-${eventCounter++}`, {
conversationId: conv.id,
fromAgentId: conv.fromAgentId,
question: conv.question,
phaseId: conv.phaseId,
taskId: conv.taskId,
});
}
}
if (!signal.aborted) {
await new Promise<void>((r) => {
resolve = r;
});
}
}
} finally {
cleanup();
}
}),
}; };
} }

View File

@@ -61,6 +61,8 @@ export const ALL_EVENT_TYPES: DomainEventType[] = [
'page:deleted', 'page:deleted',
'changeset:created', 'changeset:created',
'changeset:reverted', 'changeset:reverted',
'conversation:created',
'conversation:answered',
]; ];
/** /**