feat: Inject agent ID into prompts, SSE-based cw listen, all flags documented
- INTER_AGENT_COMMUNICATION constant → buildInterAgentCommunication(agentId) function - Manager injects actual agent ID into prompt after DB record creation - Agent ID hardcoded in cw listen/ask commands — no manifest.json indirection - cw listen now uses onPendingConversation SSE subscription instead of polling - CLI trpc-client upgraded with splitLink for subscription support - All CLI flags (--agent-id, --from, --timeout, --poll-interval) documented in prompt - conversation:created/answered added to ALL_EVENT_TYPES
This commit is contained in:
@@ -142,28 +142,27 @@ Agent output is persisted to `agent_log_chunks` table and drives all live stream
|
||||
Agents can communicate with each other via the `conversations` table, coordinated through CLI commands.
|
||||
|
||||
### Prompt Integration
|
||||
`INTER_AGENT_COMMUNICATION` constant in `prompts/shared.ts` is appended to all 5 agent mode prompts. It instructs agents to:
|
||||
`buildInterAgentCommunication(agentId)` function in `prompts/shared.ts` generates per-agent communication instructions. Called in `manager.ts` after agent record creation — the actual agent ID is injected directly into the prompt (no manifest.json indirection). Appended to the prompt regardless of mode. Instructions include:
|
||||
1. Set up a background listener via temp-file redirect: `cw listen > $CW_LISTEN_FILE &`
|
||||
2. Periodically check the temp file for incoming questions between work steps
|
||||
3. Answer via `cw answer`, clear the file, restart the listener
|
||||
4. Ask questions to peers via `cw ask --from <ID> --agent-id|--task-id|--phase-id`
|
||||
4. Ask questions to peers via `cw ask --from <agentId> --agent-id|--task-id|--phase-id`
|
||||
5. Kill the listener and clean up the temp file before writing `signal.json`
|
||||
|
||||
### Agent Identity
|
||||
`manifest.json` now includes `agentId` and `agentName` fields. The manager passes these from the DB record after agent creation.
|
||||
`manifest.json` includes `agentId` and `agentName` fields. The manager passes these from the DB record after agent creation. The agent ID is also injected directly into the prompt's communication instructions.
|
||||
|
||||
### CLI Commands
|
||||
|
||||
**`cw listen --agent-id <id> [--timeout <ms>] [--poll-interval <ms>]`**
|
||||
- Polls `getPendingConversations`, prints first pending as JSON, exits with code 0
|
||||
- `--timeout`: max wait in ms (default 0=forever)
|
||||
- `--poll-interval`: polling frequency in ms (default 2000)
|
||||
**`cw listen --agent-id <id>`**
|
||||
- Subscribes to `onPendingConversation` SSE subscription, prints first pending as JSON, exits with code 0
|
||||
- First yields any existing pending conversations from DB, then listens for `conversation:created` events
|
||||
- Output: `{ conversationId, fromAgentId, question, phaseId?, taskId? }`
|
||||
|
||||
**`cw ask <question> --from <agentId> --agent-id|--task-id|--phase-id <target> [--timeout <ms>] [--poll-interval <ms>]`**
|
||||
- Creates conversation, polls `getConversation` until answered, prints answer text to stdout
|
||||
- Target resolution: `--agent-id` (direct), `--task-id` (find agent running task), `--phase-id` (find agent in phase)
|
||||
- `--timeout` / `--poll-interval`: same defaults as listen
|
||||
- `--timeout`: max wait in ms (default 0=forever), `--poll-interval`: polling frequency in ms (default 2000)
|
||||
|
||||
**`cw answer <answer> --conversation-id <id>`**
|
||||
- Calls `answerConversation`, prints `{ conversationId, status: "answered" }`
|
||||
|
||||
@@ -278,7 +278,7 @@ describe('MultiProviderAgentManager', () => {
|
||||
// Verify spawn was called with custom cwd
|
||||
expect(mockSpawn).toHaveBeenCalledWith(
|
||||
'claude',
|
||||
expect.arrayContaining(['-p', 'Test task', '--output-format', 'stream-json']),
|
||||
expect.arrayContaining(['-p', expect.stringContaining('Test task'), '--output-format', 'stream-json']),
|
||||
expect.objectContaining({ cwd: '/custom/path' })
|
||||
);
|
||||
});
|
||||
|
||||
@@ -36,7 +36,7 @@ import type {
|
||||
ProcessCrashedEvent,
|
||||
} from '../events/index.js';
|
||||
import { writeInputFiles } from './file-io.js';
|
||||
import { buildWorkspaceLayout } from './prompts/index.js';
|
||||
import { buildWorkspaceLayout, buildInterAgentCommunication } from './prompts/index.js';
|
||||
import { getProvider } from './providers/registry.js';
|
||||
import { createModuleLogger } from '../logger/index.js';
|
||||
import { join, dirname } from 'node:path';
|
||||
@@ -275,6 +275,9 @@ export class MultiProviderAgentManager implements AgentManager {
|
||||
});
|
||||
const agentId = agent.id;
|
||||
|
||||
// 3a. Append inter-agent communication instructions with actual agent ID
|
||||
prompt = prompt + buildInterAgentCommunication(agentId);
|
||||
|
||||
// 3b. Write input files (after agent creation so we can include agentId/agentName)
|
||||
if (options.inputContext) {
|
||||
writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext, agentId, agentName: alias });
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
* Detail mode prompt — break a phase into executable tasks.
|
||||
*/
|
||||
|
||||
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
|
||||
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
|
||||
|
||||
export function buildDetailPrompt(): string {
|
||||
return `You are an Architect agent in the Codewalk multi-agent system operating in DETAIL mode.
|
||||
@@ -26,21 +26,15 @@ ${ID_GENERATION}
|
||||
|
||||
## Task Design Rules
|
||||
- Each task: specific, actionable, completable by one agent
|
||||
- Ideally tasks shall be executable in parallel - if they depend on each other, use dependencies to indicate order
|
||||
- Include verification steps where appropriate
|
||||
- Use \`checkpoint:*\` types for tasks requiring human review
|
||||
- Dependencies should be minimal and explicit
|
||||
|
||||
## Existing Context
|
||||
- FIRST: Read ALL files in \`context/tasks/\` before generating any output
|
||||
- Your target phase is \`phase.md\` — only create tasks for THIS phase
|
||||
- If a task in context/tasks/ already covers the same work (even under a different name), do NOT create a duplicate
|
||||
- Pages contain requirements — reference them for detailed task descriptions
|
||||
- Pages contain requirements — use them to create detailed task descriptions
|
||||
- DO NOT create tasks that overlap with existing tasks in other phases
|
||||
|
||||
## Rules
|
||||
- Break work into 3-8 tasks per phase
|
||||
- Order tasks logically (foundational work first)
|
||||
- Make each task self-contained with enough context
|
||||
- Include test/verify tasks where appropriate
|
||||
${INTER_AGENT_COMMUNICATION}`;
|
||||
`;
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
* Discuss mode prompt — clarifying questions and decision capture.
|
||||
*/
|
||||
|
||||
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
|
||||
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
|
||||
|
||||
export function buildDiscussPrompt(): string {
|
||||
return `You are an Architect agent in the Codewalk multi-agent system operating in DISCUSS mode.
|
||||
@@ -30,6 +30,5 @@ ${ID_GENERATION}
|
||||
- Ask 2-4 questions at a time, not more
|
||||
- Provide options when choices are clear
|
||||
- Capture every decision with rationale
|
||||
- Don't proceed until ambiguities are resolved
|
||||
${INTER_AGENT_COMMUNICATION}`;
|
||||
- Don't proceed until ambiguities are resolved`;
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
* Execute mode prompt — standard worker agent.
|
||||
*/
|
||||
|
||||
import { INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
|
||||
import { INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
|
||||
|
||||
export function buildExecutePrompt(): string {
|
||||
return `You are a Worker agent in the Codewalk multi-agent system.
|
||||
@@ -16,6 +16,5 @@ ${SIGNAL_FORMAT}
|
||||
- Complete the task as specified in .cw/input/task.md
|
||||
- Ask questions if requirements are unclear
|
||||
- Report errors honestly — don't guess
|
||||
- Focus on writing clean, tested code
|
||||
${INTER_AGENT_COMMUNICATION}`;
|
||||
- Focus on writing clean, tested code`;
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* input files, ID generation) are in shared.ts.
|
||||
*/
|
||||
|
||||
export { SIGNAL_FORMAT, INPUT_FILES, ID_GENERATION, INTER_AGENT_COMMUNICATION } from './shared.js';
|
||||
export { SIGNAL_FORMAT, INPUT_FILES, ID_GENERATION, buildInterAgentCommunication } from './shared.js';
|
||||
export { buildExecutePrompt } from './execute.js';
|
||||
export { buildDiscussPrompt } from './discuss.js';
|
||||
export { buildPlanPrompt } from './plan.js';
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
* Plan mode prompt — plan initiative into phases.
|
||||
*/
|
||||
|
||||
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
|
||||
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
|
||||
|
||||
export function buildPlanPrompt(): string {
|
||||
return `You are an Architect agent in the Codewalk multi-agent system operating in PLAN mode.
|
||||
@@ -36,6 +36,5 @@ ${ID_GENERATION}
|
||||
- Start with foundation/infrastructure phases
|
||||
- Group related work together
|
||||
- Make dependencies explicit using phase IDs
|
||||
- Each task should be completable in one session
|
||||
${INTER_AGENT_COMMUNICATION}`;
|
||||
- Each task should be completable in one session`;
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
* Refine mode prompt — review and propose edits to initiative pages.
|
||||
*/
|
||||
|
||||
import { INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
|
||||
import { INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
|
||||
|
||||
export function buildRefinePrompt(): string {
|
||||
return `You are an Architect agent in the Codewalk multi-agent system operating in REFINE mode.
|
||||
@@ -24,6 +24,5 @@ Write one file per modified page to \`.cw/output/pages/{pageId}.md\`:
|
||||
- Each output page's body is the FULL new content (not a diff)
|
||||
- Preserve [[page:\$id|title]] cross-references in your output
|
||||
- Focus on clarity, completeness, and consistency
|
||||
- Do not invent new page IDs — only reference existing ones from .cw/input/pages/
|
||||
${INTER_AGENT_COMMUNICATION}`;
|
||||
- Do not invent new page IDs — only reference existing ones from .cw/input/pages/`;
|
||||
}
|
||||
|
||||
@@ -44,53 +44,41 @@ cw id
|
||||
\`\`\`
|
||||
Use the output as the filename (e.g., \`{id}.md\`).`;
|
||||
|
||||
export const INTER_AGENT_COMMUNICATION = `
|
||||
export function buildInterAgentCommunication(agentId: string): string {
|
||||
return `
|
||||
## Inter-Agent Communication
|
||||
|
||||
You are working in a multi-agent parallel environment. Other agents may be working on related tasks simultaneously. You can exchange questions and answers with peer agents via CLI commands.
|
||||
|
||||
### Your Identity
|
||||
Read \`.cw/input/manifest.json\` — it contains \`agentId\` and \`agentName\` fields identifying you. You'll need your \`agentId\` for all communication commands.
|
||||
Your agent ID is: **${agentId}**
|
||||
|
||||
### CLI Commands
|
||||
|
||||
**\`cw listen\`** — Poll for incoming questions. Prints the first pending question as JSON and exits.
|
||||
\`\`\`
|
||||
cw listen --agent-id <YOUR_AGENT_ID> [--timeout <ms>] [--poll-interval <ms>]
|
||||
\`\`\`
|
||||
- \`--agent-id\` (required): Your agent ID
|
||||
- \`--timeout\`: Max wait in ms. Default: 0 (wait forever). Use a value like 120000 (2 min) to avoid hanging.
|
||||
- \`--poll-interval\`: Polling frequency in ms. Default: 2000
|
||||
- Output (JSON): \`{ "conversationId": "...", "fromAgentId": "...", "question": "...", "phaseId": "...", "taskId": "..." }\`
|
||||
- Exit code 0 if a question was found, 1 on timeout or error.
|
||||
**\`cw listen --agent-id ${agentId}\`**
|
||||
Waits for an incoming question via server-sent events (SSE). Blocks until a question arrives, then prints one JSON line to stdout and exits with code 0.
|
||||
Output: \`{ "conversationId": "...", "fromAgentId": "...", "question": "...", "phaseId": "...", "taskId": "..." }\`
|
||||
Exits with code 1 on connection error.
|
||||
|
||||
**\`cw ask\`** — Ask another agent a question. Blocks until the answer arrives, then prints the answer text to stdout.
|
||||
\`\`\`
|
||||
cw ask "<question>" --from <YOUR_AGENT_ID> --agent-id <TARGET_AGENT_ID> [--timeout <ms>] [--poll-interval <ms>]
|
||||
\`\`\`
|
||||
- \`--from\` (required): Your agent ID (the asker)
|
||||
- Target (exactly one required):
|
||||
- \`--agent-id <id>\`: Ask a specific agent directly
|
||||
- \`--task-id <id>\`: Ask whichever agent is running that task
|
||||
**\`cw ask "<question>" --from ${agentId} --agent-id <TARGET_AGENT_ID>\`**
|
||||
Ask another agent a question. Blocks until the answer arrives, then prints the answer text to stdout and exits with code 0.
|
||||
Target (exactly one required):
|
||||
- \`--agent-id <id>\`: Ask a specific agent directly by agent ID
|
||||
- \`--task-id <id>\`: Ask whichever agent is currently running that task
|
||||
- \`--phase-id <id>\`: Ask whichever agent is running a task in that phase
|
||||
- \`--timeout\`: Max wait in ms. Default: 0 (wait forever). Use 120000+ for safety.
|
||||
- \`--poll-interval\`: Polling frequency in ms. Default: 2000
|
||||
- Output: The answer text (plain text, not JSON).
|
||||
- Exit code 0 if answered, 1 on timeout or error.
|
||||
Options:
|
||||
- \`--poll-interval <ms>\`: Answer polling frequency in ms. Default: 2000
|
||||
- \`--timeout <ms>\`: Max wait for answer in ms. Default: 0 (wait forever)
|
||||
|
||||
**\`cw answer\`** — Answer a pending question.
|
||||
\`\`\`
|
||||
cw answer "<answer>" --conversation-id <CONVERSATION_ID>
|
||||
\`\`\`
|
||||
- \`--conversation-id\` (required): The conversation ID from the listen output
|
||||
- Output (JSON): \`{ "conversationId": "...", "status": "answered" }\`
|
||||
**\`cw answer "<answer>" --conversation-id <CONVERSATION_ID>\`**
|
||||
Answer a pending question. The conversation ID comes from the \`cw listen\` output.
|
||||
Output: \`{ "conversationId": "...", "status": "answered" }\`
|
||||
|
||||
### Background Listener Pattern
|
||||
|
||||
At the START of your session, set up a background listener that writes to a temp file:
|
||||
At the START of your session, run a background listener that writes to a temp file:
|
||||
\`\`\`bash
|
||||
CW_LISTEN_FILE=$(mktemp /tmp/cw-listen-XXXXXX.txt)
|
||||
cw listen --agent-id <YOUR_AGENT_ID> --timeout 120000 > "$CW_LISTEN_FILE" 2>&1 &
|
||||
cw listen --agent-id ${agentId} > "$CW_LISTEN_FILE" 2>&1 &
|
||||
LISTEN_PID=$!
|
||||
\`\`\`
|
||||
|
||||
@@ -102,13 +90,11 @@ if [ -n "$LISTEN_CONTENT" ]; then
|
||||
fi
|
||||
\`\`\`
|
||||
|
||||
When a question arrives, parse the JSON, answer, then restart the listener:
|
||||
When a question arrives, parse the JSON, answer it, then restart the listener:
|
||||
\`\`\`bash
|
||||
# Parse conversationId and question from the JSON
|
||||
cw answer "<your answer>" --conversation-id <conversationId>
|
||||
# Clear and restart
|
||||
> "$CW_LISTEN_FILE"
|
||||
cw listen --agent-id <YOUR_AGENT_ID> --timeout 120000 > "$CW_LISTEN_FILE" 2>&1 &
|
||||
cw listen --agent-id ${agentId} > "$CW_LISTEN_FILE" 2>&1 &
|
||||
LISTEN_PID=$!
|
||||
\`\`\`
|
||||
|
||||
@@ -119,8 +105,9 @@ LISTEN_PID=$!
|
||||
- Do NOT ask questions that you can answer by reading the codebase yourself
|
||||
|
||||
### Cleanup
|
||||
Before writing \`.cw/output/signal.json\`, kill your listener:
|
||||
Before writing \`.cw/output/signal.json\`, kill your listener and clean up:
|
||||
\`\`\`bash
|
||||
kill $LISTEN_PID 2>/dev/null
|
||||
rm -f "$CW_LISTEN_FILE"
|
||||
\`\`\``;
|
||||
}
|
||||
|
||||
@@ -1366,41 +1366,37 @@ export function createCli(serverHandler?: (port?: number) => Promise<void>): Com
|
||||
// Inter-agent conversation commands
|
||||
// =========================================================================
|
||||
|
||||
// cw listen --agent-id <id> [--poll-interval <ms>] [--timeout <ms>]
|
||||
// cw listen --agent-id <id>
|
||||
program
|
||||
.command('listen')
|
||||
.description('Poll for pending conversations addressed to an agent')
|
||||
.description('Listen for pending conversations via SSE subscription')
|
||||
.requiredOption('--agent-id <id>', 'Agent ID to listen for')
|
||||
.option('--poll-interval <ms>', 'Poll interval in milliseconds', '2000')
|
||||
.option('--timeout <ms>', 'Timeout in milliseconds (0=forever)', '0')
|
||||
.action(async (options: { agentId: string; pollInterval: string; timeout: string }) => {
|
||||
.action(async (options: { agentId: string }) => {
|
||||
try {
|
||||
const client = createDefaultTrpcClient();
|
||||
const pollInterval = parseInt(options.pollInterval, 10);
|
||||
const timeout = parseInt(options.timeout, 10);
|
||||
const startTime = Date.now();
|
||||
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
const pending = await client.getPendingConversations.query({ agentId: options.agentId });
|
||||
if (pending.length > 0) {
|
||||
const conv = pending[0];
|
||||
console.log(JSON.stringify({
|
||||
conversationId: conv.id,
|
||||
fromAgentId: conv.fromAgentId,
|
||||
question: conv.question,
|
||||
phaseId: conv.phaseId,
|
||||
taskId: conv.taskId,
|
||||
}));
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (timeout > 0 && Date.now() - startTime >= timeout) {
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, pollInterval));
|
||||
}
|
||||
const subscription = client.onPendingConversation.subscribe(
|
||||
{ agentId: options.agentId },
|
||||
{
|
||||
onData(envelope) {
|
||||
const conv = envelope.data;
|
||||
console.log(JSON.stringify({
|
||||
conversationId: conv.conversationId,
|
||||
fromAgentId: conv.fromAgentId,
|
||||
question: conv.question,
|
||||
phaseId: conv.phaseId,
|
||||
taskId: conv.taskId,
|
||||
}));
|
||||
subscription.unsubscribe();
|
||||
process.exit(0);
|
||||
},
|
||||
onError(err) {
|
||||
console.error('Failed to listen:', err.message);
|
||||
subscription.unsubscribe();
|
||||
process.exit(1);
|
||||
},
|
||||
},
|
||||
);
|
||||
} catch (error) {
|
||||
console.error('Failed to listen:', (error as Error).message);
|
||||
process.exit(1);
|
||||
|
||||
@@ -2,10 +2,10 @@
|
||||
* tRPC Client for CLI
|
||||
*
|
||||
* Type-safe client for communicating with the coordination server.
|
||||
* Uses httpBatchLink for efficient request batching.
|
||||
* Uses splitLink to route subscriptions to SSE and queries/mutations to HTTP batch.
|
||||
*/
|
||||
|
||||
import { createTRPCClient, httpBatchLink } from '@trpc/client';
|
||||
import { createTRPCClient, httpBatchLink, splitLink, httpSubscriptionLink } from '@trpc/client';
|
||||
import type { AppRouter } from '../trpc/index.js';
|
||||
|
||||
/** Default server port */
|
||||
@@ -30,10 +30,13 @@ export function createTrpcClient(
|
||||
port: number = DEFAULT_PORT,
|
||||
host: string = DEFAULT_HOST
|
||||
): TrpcClient {
|
||||
const url = `http://${host}:${port}/trpc`;
|
||||
return createTRPCClient<AppRouter>({
|
||||
links: [
|
||||
httpBatchLink({
|
||||
url: `http://${host}:${port}/trpc`,
|
||||
splitLink({
|
||||
condition: (op) => op.type === 'subscription',
|
||||
true: httpSubscriptionLink({ url }),
|
||||
false: httpBatchLink({ url }),
|
||||
}),
|
||||
],
|
||||
});
|
||||
|
||||
@@ -3,9 +3,11 @@
|
||||
*/
|
||||
|
||||
import { TRPCError } from '@trpc/server';
|
||||
import { tracked, type TrackedEnvelope } from '@trpc/server';
|
||||
import { z } from 'zod';
|
||||
import type { ProcedureBuilder } from '../trpc.js';
|
||||
import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js';
|
||||
import type { ConversationCreatedEvent } from '../../events/types.js';
|
||||
|
||||
export function conversationProcedures(publicProcedure: ProcedureBuilder) {
|
||||
return {
|
||||
@@ -134,5 +136,86 @@ export function conversationProcedures(publicProcedure: ProcedureBuilder) {
|
||||
|
||||
return updated;
|
||||
}),
|
||||
|
||||
onPendingConversation: publicProcedure
|
||||
.input(z.object({ agentId: z.string().min(1) }))
|
||||
.subscription(async function* (opts): AsyncGenerator<TrackedEnvelope<{
|
||||
conversationId: string;
|
||||
fromAgentId: string;
|
||||
question: string;
|
||||
phaseId: string | null;
|
||||
taskId: string | null;
|
||||
}>> {
|
||||
const { agentId } = opts.input;
|
||||
const signal = opts.signal ?? new AbortController().signal;
|
||||
const eventBus = opts.ctx.eventBus;
|
||||
const repo = requireConversationRepository(opts.ctx);
|
||||
|
||||
// First yield any already-pending conversations
|
||||
const existing = await repo.findPendingForAgent(agentId);
|
||||
let eventCounter = 0;
|
||||
for (const conv of existing) {
|
||||
yield tracked(`conv-${eventCounter++}`, {
|
||||
conversationId: conv.id,
|
||||
fromAgentId: conv.fromAgentId,
|
||||
question: conv.question,
|
||||
phaseId: conv.phaseId,
|
||||
taskId: conv.taskId,
|
||||
});
|
||||
}
|
||||
|
||||
// Then listen for new conversation:created events
|
||||
const queue: string[] = []; // conversation IDs
|
||||
let resolve: (() => void) | null = null;
|
||||
|
||||
const handler = (event: ConversationCreatedEvent) => {
|
||||
if (event.payload.toAgentId !== agentId) return;
|
||||
queue.push(event.payload.conversationId);
|
||||
if (resolve) {
|
||||
const r = resolve;
|
||||
resolve = null;
|
||||
r();
|
||||
}
|
||||
};
|
||||
|
||||
eventBus.on('conversation:created', handler);
|
||||
|
||||
const cleanup = () => {
|
||||
eventBus.off('conversation:created', handler);
|
||||
if (resolve) {
|
||||
const r = resolve;
|
||||
resolve = null;
|
||||
r();
|
||||
}
|
||||
};
|
||||
|
||||
signal.addEventListener('abort', cleanup, { once: true });
|
||||
|
||||
try {
|
||||
while (!signal.aborted) {
|
||||
while (queue.length > 0) {
|
||||
const convId = queue.shift()!;
|
||||
const conv = await repo.findById(convId);
|
||||
if (conv && conv.status === 'pending') {
|
||||
yield tracked(`conv-${eventCounter++}`, {
|
||||
conversationId: conv.id,
|
||||
fromAgentId: conv.fromAgentId,
|
||||
question: conv.question,
|
||||
phaseId: conv.phaseId,
|
||||
taskId: conv.taskId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (!signal.aborted) {
|
||||
await new Promise<void>((r) => {
|
||||
resolve = r;
|
||||
});
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
cleanup();
|
||||
}
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -61,6 +61,8 @@ export const ALL_EVENT_TYPES: DomainEventType[] = [
|
||||
'page:deleted',
|
||||
'changeset:created',
|
||||
'changeset:reverted',
|
||||
'conversation:created',
|
||||
'conversation:answered',
|
||||
];
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user