feat: Inject agent ID into prompts, SSE-based cw listen, all flags documented
- INTER_AGENT_COMMUNICATION constant → buildInterAgentCommunication(agentId) function - Manager injects actual agent ID into prompt after DB record creation - Agent ID hardcoded in cw listen/ask commands — no manifest.json indirection - cw listen now uses onPendingConversation SSE subscription instead of polling - CLI trpc-client upgraded with splitLink for subscription support - All CLI flags (--agent-id, --from, --timeout, --poll-interval) documented in prompt - conversation:created/answered added to ALL_EVENT_TYPES
This commit is contained in:
@@ -142,28 +142,27 @@ Agent output is persisted to `agent_log_chunks` table and drives all live stream
|
|||||||
Agents can communicate with each other via the `conversations` table, coordinated through CLI commands.
|
Agents can communicate with each other via the `conversations` table, coordinated through CLI commands.
|
||||||
|
|
||||||
### Prompt Integration
|
### Prompt Integration
|
||||||
`INTER_AGENT_COMMUNICATION` constant in `prompts/shared.ts` is appended to all 5 agent mode prompts. It instructs agents to:
|
`buildInterAgentCommunication(agentId)` function in `prompts/shared.ts` generates per-agent communication instructions. Called in `manager.ts` after agent record creation — the actual agent ID is injected directly into the prompt (no manifest.json indirection). Appended to the prompt regardless of mode. Instructions include:
|
||||||
1. Set up a background listener via temp-file redirect: `cw listen > $CW_LISTEN_FILE &`
|
1. Set up a background listener via temp-file redirect: `cw listen > $CW_LISTEN_FILE &`
|
||||||
2. Periodically check the temp file for incoming questions between work steps
|
2. Periodically check the temp file for incoming questions between work steps
|
||||||
3. Answer via `cw answer`, clear the file, restart the listener
|
3. Answer via `cw answer`, clear the file, restart the listener
|
||||||
4. Ask questions to peers via `cw ask --from <ID> --agent-id|--task-id|--phase-id`
|
4. Ask questions to peers via `cw ask --from <agentId> --agent-id|--task-id|--phase-id`
|
||||||
5. Kill the listener and clean up the temp file before writing `signal.json`
|
5. Kill the listener and clean up the temp file before writing `signal.json`
|
||||||
|
|
||||||
### Agent Identity
|
### Agent Identity
|
||||||
`manifest.json` now includes `agentId` and `agentName` fields. The manager passes these from the DB record after agent creation.
|
`manifest.json` includes `agentId` and `agentName` fields. The manager passes these from the DB record after agent creation. The agent ID is also injected directly into the prompt's communication instructions.
|
||||||
|
|
||||||
### CLI Commands
|
### CLI Commands
|
||||||
|
|
||||||
**`cw listen --agent-id <id> [--timeout <ms>] [--poll-interval <ms>]`**
|
**`cw listen --agent-id <id>`**
|
||||||
- Polls `getPendingConversations`, prints first pending as JSON, exits with code 0
|
- Subscribes to `onPendingConversation` SSE subscription, prints first pending as JSON, exits with code 0
|
||||||
- `--timeout`: max wait in ms (default 0=forever)
|
- First yields any existing pending conversations from DB, then listens for `conversation:created` events
|
||||||
- `--poll-interval`: polling frequency in ms (default 2000)
|
|
||||||
- Output: `{ conversationId, fromAgentId, question, phaseId?, taskId? }`
|
- Output: `{ conversationId, fromAgentId, question, phaseId?, taskId? }`
|
||||||
|
|
||||||
**`cw ask <question> --from <agentId> --agent-id|--task-id|--phase-id <target> [--timeout <ms>] [--poll-interval <ms>]`**
|
**`cw ask <question> --from <agentId> --agent-id|--task-id|--phase-id <target> [--timeout <ms>] [--poll-interval <ms>]`**
|
||||||
- Creates conversation, polls `getConversation` until answered, prints answer text to stdout
|
- Creates conversation, polls `getConversation` until answered, prints answer text to stdout
|
||||||
- Target resolution: `--agent-id` (direct), `--task-id` (find agent running task), `--phase-id` (find agent in phase)
|
- Target resolution: `--agent-id` (direct), `--task-id` (find agent running task), `--phase-id` (find agent in phase)
|
||||||
- `--timeout` / `--poll-interval`: same defaults as listen
|
- `--timeout`: max wait in ms (default 0=forever), `--poll-interval`: polling frequency in ms (default 2000)
|
||||||
|
|
||||||
**`cw answer <answer> --conversation-id <id>`**
|
**`cw answer <answer> --conversation-id <id>`**
|
||||||
- Calls `answerConversation`, prints `{ conversationId, status: "answered" }`
|
- Calls `answerConversation`, prints `{ conversationId, status: "answered" }`
|
||||||
|
|||||||
@@ -278,7 +278,7 @@ describe('MultiProviderAgentManager', () => {
|
|||||||
// Verify spawn was called with custom cwd
|
// Verify spawn was called with custom cwd
|
||||||
expect(mockSpawn).toHaveBeenCalledWith(
|
expect(mockSpawn).toHaveBeenCalledWith(
|
||||||
'claude',
|
'claude',
|
||||||
expect.arrayContaining(['-p', 'Test task', '--output-format', 'stream-json']),
|
expect.arrayContaining(['-p', expect.stringContaining('Test task'), '--output-format', 'stream-json']),
|
||||||
expect.objectContaining({ cwd: '/custom/path' })
|
expect.objectContaining({ cwd: '/custom/path' })
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ import type {
|
|||||||
ProcessCrashedEvent,
|
ProcessCrashedEvent,
|
||||||
} from '../events/index.js';
|
} from '../events/index.js';
|
||||||
import { writeInputFiles } from './file-io.js';
|
import { writeInputFiles } from './file-io.js';
|
||||||
import { buildWorkspaceLayout } from './prompts/index.js';
|
import { buildWorkspaceLayout, buildInterAgentCommunication } from './prompts/index.js';
|
||||||
import { getProvider } from './providers/registry.js';
|
import { getProvider } from './providers/registry.js';
|
||||||
import { createModuleLogger } from '../logger/index.js';
|
import { createModuleLogger } from '../logger/index.js';
|
||||||
import { join, dirname } from 'node:path';
|
import { join, dirname } from 'node:path';
|
||||||
@@ -275,6 +275,9 @@ export class MultiProviderAgentManager implements AgentManager {
|
|||||||
});
|
});
|
||||||
const agentId = agent.id;
|
const agentId = agent.id;
|
||||||
|
|
||||||
|
// 3a. Append inter-agent communication instructions with actual agent ID
|
||||||
|
prompt = prompt + buildInterAgentCommunication(agentId);
|
||||||
|
|
||||||
// 3b. Write input files (after agent creation so we can include agentId/agentName)
|
// 3b. Write input files (after agent creation so we can include agentId/agentName)
|
||||||
if (options.inputContext) {
|
if (options.inputContext) {
|
||||||
writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext, agentId, agentName: alias });
|
writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext, agentId, agentName: alias });
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
* Detail mode prompt — break a phase into executable tasks.
|
* Detail mode prompt — break a phase into executable tasks.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
|
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
|
||||||
|
|
||||||
export function buildDetailPrompt(): string {
|
export function buildDetailPrompt(): string {
|
||||||
return `You are an Architect agent in the Codewalk multi-agent system operating in DETAIL mode.
|
return `You are an Architect agent in the Codewalk multi-agent system operating in DETAIL mode.
|
||||||
@@ -26,21 +26,15 @@ ${ID_GENERATION}
|
|||||||
|
|
||||||
## Task Design Rules
|
## Task Design Rules
|
||||||
- Each task: specific, actionable, completable by one agent
|
- Each task: specific, actionable, completable by one agent
|
||||||
|
- Ideally tasks shall be executable in parallel - if they depend on each other, use dependencies to indicate order
|
||||||
- Include verification steps where appropriate
|
- Include verification steps where appropriate
|
||||||
- Use \`checkpoint:*\` types for tasks requiring human review
|
|
||||||
- Dependencies should be minimal and explicit
|
- Dependencies should be minimal and explicit
|
||||||
|
|
||||||
## Existing Context
|
## Existing Context
|
||||||
- FIRST: Read ALL files in \`context/tasks/\` before generating any output
|
- FIRST: Read ALL files in \`context/tasks/\` before generating any output
|
||||||
- Your target phase is \`phase.md\` — only create tasks for THIS phase
|
- Your target phase is \`phase.md\` — only create tasks for THIS phase
|
||||||
- If a task in context/tasks/ already covers the same work (even under a different name), do NOT create a duplicate
|
- If a task in context/tasks/ already covers the same work (even under a different name), do NOT create a duplicate
|
||||||
- Pages contain requirements — reference them for detailed task descriptions
|
- Pages contain requirements — use them to create detailed task descriptions
|
||||||
- DO NOT create tasks that overlap with existing tasks in other phases
|
- DO NOT create tasks that overlap with existing tasks in other phases
|
||||||
|
`;
|
||||||
## Rules
|
|
||||||
- Break work into 3-8 tasks per phase
|
|
||||||
- Order tasks logically (foundational work first)
|
|
||||||
- Make each task self-contained with enough context
|
|
||||||
- Include test/verify tasks where appropriate
|
|
||||||
${INTER_AGENT_COMMUNICATION}`;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
* Discuss mode prompt — clarifying questions and decision capture.
|
* Discuss mode prompt — clarifying questions and decision capture.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
|
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
|
||||||
|
|
||||||
export function buildDiscussPrompt(): string {
|
export function buildDiscussPrompt(): string {
|
||||||
return `You are an Architect agent in the Codewalk multi-agent system operating in DISCUSS mode.
|
return `You are an Architect agent in the Codewalk multi-agent system operating in DISCUSS mode.
|
||||||
@@ -30,6 +30,5 @@ ${ID_GENERATION}
|
|||||||
- Ask 2-4 questions at a time, not more
|
- Ask 2-4 questions at a time, not more
|
||||||
- Provide options when choices are clear
|
- Provide options when choices are clear
|
||||||
- Capture every decision with rationale
|
- Capture every decision with rationale
|
||||||
- Don't proceed until ambiguities are resolved
|
- Don't proceed until ambiguities are resolved`;
|
||||||
${INTER_AGENT_COMMUNICATION}`;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
* Execute mode prompt — standard worker agent.
|
* Execute mode prompt — standard worker agent.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
|
import { INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
|
||||||
|
|
||||||
export function buildExecutePrompt(): string {
|
export function buildExecutePrompt(): string {
|
||||||
return `You are a Worker agent in the Codewalk multi-agent system.
|
return `You are a Worker agent in the Codewalk multi-agent system.
|
||||||
@@ -16,6 +16,5 @@ ${SIGNAL_FORMAT}
|
|||||||
- Complete the task as specified in .cw/input/task.md
|
- Complete the task as specified in .cw/input/task.md
|
||||||
- Ask questions if requirements are unclear
|
- Ask questions if requirements are unclear
|
||||||
- Report errors honestly — don't guess
|
- Report errors honestly — don't guess
|
||||||
- Focus on writing clean, tested code
|
- Focus on writing clean, tested code`;
|
||||||
${INTER_AGENT_COMMUNICATION}`;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,7 @@
|
|||||||
* input files, ID generation) are in shared.ts.
|
* input files, ID generation) are in shared.ts.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
export { SIGNAL_FORMAT, INPUT_FILES, ID_GENERATION, INTER_AGENT_COMMUNICATION } from './shared.js';
|
export { SIGNAL_FORMAT, INPUT_FILES, ID_GENERATION, buildInterAgentCommunication } from './shared.js';
|
||||||
export { buildExecutePrompt } from './execute.js';
|
export { buildExecutePrompt } from './execute.js';
|
||||||
export { buildDiscussPrompt } from './discuss.js';
|
export { buildDiscussPrompt } from './discuss.js';
|
||||||
export { buildPlanPrompt } from './plan.js';
|
export { buildPlanPrompt } from './plan.js';
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
* Plan mode prompt — plan initiative into phases.
|
* Plan mode prompt — plan initiative into phases.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
|
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
|
||||||
|
|
||||||
export function buildPlanPrompt(): string {
|
export function buildPlanPrompt(): string {
|
||||||
return `You are an Architect agent in the Codewalk multi-agent system operating in PLAN mode.
|
return `You are an Architect agent in the Codewalk multi-agent system operating in PLAN mode.
|
||||||
@@ -36,6 +36,5 @@ ${ID_GENERATION}
|
|||||||
- Start with foundation/infrastructure phases
|
- Start with foundation/infrastructure phases
|
||||||
- Group related work together
|
- Group related work together
|
||||||
- Make dependencies explicit using phase IDs
|
- Make dependencies explicit using phase IDs
|
||||||
- Each task should be completable in one session
|
- Each task should be completable in one session`;
|
||||||
${INTER_AGENT_COMMUNICATION}`;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
* Refine mode prompt — review and propose edits to initiative pages.
|
* Refine mode prompt — review and propose edits to initiative pages.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
|
import { INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
|
||||||
|
|
||||||
export function buildRefinePrompt(): string {
|
export function buildRefinePrompt(): string {
|
||||||
return `You are an Architect agent in the Codewalk multi-agent system operating in REFINE mode.
|
return `You are an Architect agent in the Codewalk multi-agent system operating in REFINE mode.
|
||||||
@@ -24,6 +24,5 @@ Write one file per modified page to \`.cw/output/pages/{pageId}.md\`:
|
|||||||
- Each output page's body is the FULL new content (not a diff)
|
- Each output page's body is the FULL new content (not a diff)
|
||||||
- Preserve [[page:\$id|title]] cross-references in your output
|
- Preserve [[page:\$id|title]] cross-references in your output
|
||||||
- Focus on clarity, completeness, and consistency
|
- Focus on clarity, completeness, and consistency
|
||||||
- Do not invent new page IDs — only reference existing ones from .cw/input/pages/
|
- Do not invent new page IDs — only reference existing ones from .cw/input/pages/`;
|
||||||
${INTER_AGENT_COMMUNICATION}`;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -44,53 +44,41 @@ cw id
|
|||||||
\`\`\`
|
\`\`\`
|
||||||
Use the output as the filename (e.g., \`{id}.md\`).`;
|
Use the output as the filename (e.g., \`{id}.md\`).`;
|
||||||
|
|
||||||
export const INTER_AGENT_COMMUNICATION = `
|
export function buildInterAgentCommunication(agentId: string): string {
|
||||||
|
return `
|
||||||
## Inter-Agent Communication
|
## Inter-Agent Communication
|
||||||
|
|
||||||
You are working in a multi-agent parallel environment. Other agents may be working on related tasks simultaneously. You can exchange questions and answers with peer agents via CLI commands.
|
You are working in a multi-agent parallel environment. Other agents may be working on related tasks simultaneously. You can exchange questions and answers with peer agents via CLI commands.
|
||||||
|
|
||||||
### Your Identity
|
Your agent ID is: **${agentId}**
|
||||||
Read \`.cw/input/manifest.json\` — it contains \`agentId\` and \`agentName\` fields identifying you. You'll need your \`agentId\` for all communication commands.
|
|
||||||
|
|
||||||
### CLI Commands
|
### CLI Commands
|
||||||
|
|
||||||
**\`cw listen\`** — Poll for incoming questions. Prints the first pending question as JSON and exits.
|
**\`cw listen --agent-id ${agentId}\`**
|
||||||
\`\`\`
|
Waits for an incoming question via server-sent events (SSE). Blocks until a question arrives, then prints one JSON line to stdout and exits with code 0.
|
||||||
cw listen --agent-id <YOUR_AGENT_ID> [--timeout <ms>] [--poll-interval <ms>]
|
Output: \`{ "conversationId": "...", "fromAgentId": "...", "question": "...", "phaseId": "...", "taskId": "..." }\`
|
||||||
\`\`\`
|
Exits with code 1 on connection error.
|
||||||
- \`--agent-id\` (required): Your agent ID
|
|
||||||
- \`--timeout\`: Max wait in ms. Default: 0 (wait forever). Use a value like 120000 (2 min) to avoid hanging.
|
|
||||||
- \`--poll-interval\`: Polling frequency in ms. Default: 2000
|
|
||||||
- Output (JSON): \`{ "conversationId": "...", "fromAgentId": "...", "question": "...", "phaseId": "...", "taskId": "..." }\`
|
|
||||||
- Exit code 0 if a question was found, 1 on timeout or error.
|
|
||||||
|
|
||||||
**\`cw ask\`** — Ask another agent a question. Blocks until the answer arrives, then prints the answer text to stdout.
|
**\`cw ask "<question>" --from ${agentId} --agent-id <TARGET_AGENT_ID>\`**
|
||||||
\`\`\`
|
Ask another agent a question. Blocks until the answer arrives, then prints the answer text to stdout and exits with code 0.
|
||||||
cw ask "<question>" --from <YOUR_AGENT_ID> --agent-id <TARGET_AGENT_ID> [--timeout <ms>] [--poll-interval <ms>]
|
Target (exactly one required):
|
||||||
\`\`\`
|
- \`--agent-id <id>\`: Ask a specific agent directly by agent ID
|
||||||
- \`--from\` (required): Your agent ID (the asker)
|
- \`--task-id <id>\`: Ask whichever agent is currently running that task
|
||||||
- Target (exactly one required):
|
|
||||||
- \`--agent-id <id>\`: Ask a specific agent directly
|
|
||||||
- \`--task-id <id>\`: Ask whichever agent is running that task
|
|
||||||
- \`--phase-id <id>\`: Ask whichever agent is running a task in that phase
|
- \`--phase-id <id>\`: Ask whichever agent is running a task in that phase
|
||||||
- \`--timeout\`: Max wait in ms. Default: 0 (wait forever). Use 120000+ for safety.
|
Options:
|
||||||
- \`--poll-interval\`: Polling frequency in ms. Default: 2000
|
- \`--poll-interval <ms>\`: Answer polling frequency in ms. Default: 2000
|
||||||
- Output: The answer text (plain text, not JSON).
|
- \`--timeout <ms>\`: Max wait for answer in ms. Default: 0 (wait forever)
|
||||||
- Exit code 0 if answered, 1 on timeout or error.
|
|
||||||
|
|
||||||
**\`cw answer\`** — Answer a pending question.
|
**\`cw answer "<answer>" --conversation-id <CONVERSATION_ID>\`**
|
||||||
\`\`\`
|
Answer a pending question. The conversation ID comes from the \`cw listen\` output.
|
||||||
cw answer "<answer>" --conversation-id <CONVERSATION_ID>
|
Output: \`{ "conversationId": "...", "status": "answered" }\`
|
||||||
\`\`\`
|
|
||||||
- \`--conversation-id\` (required): The conversation ID from the listen output
|
|
||||||
- Output (JSON): \`{ "conversationId": "...", "status": "answered" }\`
|
|
||||||
|
|
||||||
### Background Listener Pattern
|
### Background Listener Pattern
|
||||||
|
|
||||||
At the START of your session, set up a background listener that writes to a temp file:
|
At the START of your session, run a background listener that writes to a temp file:
|
||||||
\`\`\`bash
|
\`\`\`bash
|
||||||
CW_LISTEN_FILE=$(mktemp /tmp/cw-listen-XXXXXX.txt)
|
CW_LISTEN_FILE=$(mktemp /tmp/cw-listen-XXXXXX.txt)
|
||||||
cw listen --agent-id <YOUR_AGENT_ID> --timeout 120000 > "$CW_LISTEN_FILE" 2>&1 &
|
cw listen --agent-id ${agentId} > "$CW_LISTEN_FILE" 2>&1 &
|
||||||
LISTEN_PID=$!
|
LISTEN_PID=$!
|
||||||
\`\`\`
|
\`\`\`
|
||||||
|
|
||||||
@@ -102,13 +90,11 @@ if [ -n "$LISTEN_CONTENT" ]; then
|
|||||||
fi
|
fi
|
||||||
\`\`\`
|
\`\`\`
|
||||||
|
|
||||||
When a question arrives, parse the JSON, answer, then restart the listener:
|
When a question arrives, parse the JSON, answer it, then restart the listener:
|
||||||
\`\`\`bash
|
\`\`\`bash
|
||||||
# Parse conversationId and question from the JSON
|
|
||||||
cw answer "<your answer>" --conversation-id <conversationId>
|
cw answer "<your answer>" --conversation-id <conversationId>
|
||||||
# Clear and restart
|
|
||||||
> "$CW_LISTEN_FILE"
|
> "$CW_LISTEN_FILE"
|
||||||
cw listen --agent-id <YOUR_AGENT_ID> --timeout 120000 > "$CW_LISTEN_FILE" 2>&1 &
|
cw listen --agent-id ${agentId} > "$CW_LISTEN_FILE" 2>&1 &
|
||||||
LISTEN_PID=$!
|
LISTEN_PID=$!
|
||||||
\`\`\`
|
\`\`\`
|
||||||
|
|
||||||
@@ -119,8 +105,9 @@ LISTEN_PID=$!
|
|||||||
- Do NOT ask questions that you can answer by reading the codebase yourself
|
- Do NOT ask questions that you can answer by reading the codebase yourself
|
||||||
|
|
||||||
### Cleanup
|
### Cleanup
|
||||||
Before writing \`.cw/output/signal.json\`, kill your listener:
|
Before writing \`.cw/output/signal.json\`, kill your listener and clean up:
|
||||||
\`\`\`bash
|
\`\`\`bash
|
||||||
kill $LISTEN_PID 2>/dev/null
|
kill $LISTEN_PID 2>/dev/null
|
||||||
rm -f "$CW_LISTEN_FILE"
|
rm -f "$CW_LISTEN_FILE"
|
||||||
\`\`\``;
|
\`\`\``;
|
||||||
|
}
|
||||||
|
|||||||
@@ -1366,41 +1366,37 @@ export function createCli(serverHandler?: (port?: number) => Promise<void>): Com
|
|||||||
// Inter-agent conversation commands
|
// Inter-agent conversation commands
|
||||||
// =========================================================================
|
// =========================================================================
|
||||||
|
|
||||||
// cw listen --agent-id <id> [--poll-interval <ms>] [--timeout <ms>]
|
// cw listen --agent-id <id>
|
||||||
program
|
program
|
||||||
.command('listen')
|
.command('listen')
|
||||||
.description('Poll for pending conversations addressed to an agent')
|
.description('Listen for pending conversations via SSE subscription')
|
||||||
.requiredOption('--agent-id <id>', 'Agent ID to listen for')
|
.requiredOption('--agent-id <id>', 'Agent ID to listen for')
|
||||||
.option('--poll-interval <ms>', 'Poll interval in milliseconds', '2000')
|
.action(async (options: { agentId: string }) => {
|
||||||
.option('--timeout <ms>', 'Timeout in milliseconds (0=forever)', '0')
|
|
||||||
.action(async (options: { agentId: string; pollInterval: string; timeout: string }) => {
|
|
||||||
try {
|
try {
|
||||||
const client = createDefaultTrpcClient();
|
const client = createDefaultTrpcClient();
|
||||||
const pollInterval = parseInt(options.pollInterval, 10);
|
|
||||||
const timeout = parseInt(options.timeout, 10);
|
|
||||||
const startTime = Date.now();
|
|
||||||
|
|
||||||
// eslint-disable-next-line no-constant-condition
|
const subscription = client.onPendingConversation.subscribe(
|
||||||
while (true) {
|
{ agentId: options.agentId },
|
||||||
const pending = await client.getPendingConversations.query({ agentId: options.agentId });
|
{
|
||||||
if (pending.length > 0) {
|
onData(envelope) {
|
||||||
const conv = pending[0];
|
const conv = envelope.data;
|
||||||
console.log(JSON.stringify({
|
console.log(JSON.stringify({
|
||||||
conversationId: conv.id,
|
conversationId: conv.conversationId,
|
||||||
fromAgentId: conv.fromAgentId,
|
fromAgentId: conv.fromAgentId,
|
||||||
question: conv.question,
|
question: conv.question,
|
||||||
phaseId: conv.phaseId,
|
phaseId: conv.phaseId,
|
||||||
taskId: conv.taskId,
|
taskId: conv.taskId,
|
||||||
}));
|
}));
|
||||||
process.exit(0);
|
subscription.unsubscribe();
|
||||||
}
|
process.exit(0);
|
||||||
|
},
|
||||||
if (timeout > 0 && Date.now() - startTime >= timeout) {
|
onError(err) {
|
||||||
process.exit(1);
|
console.error('Failed to listen:', err.message);
|
||||||
}
|
subscription.unsubscribe();
|
||||||
|
process.exit(1);
|
||||||
await new Promise(resolve => setTimeout(resolve, pollInterval));
|
},
|
||||||
}
|
},
|
||||||
|
);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Failed to listen:', (error as Error).message);
|
console.error('Failed to listen:', (error as Error).message);
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
|
|||||||
@@ -2,10 +2,10 @@
|
|||||||
* tRPC Client for CLI
|
* tRPC Client for CLI
|
||||||
*
|
*
|
||||||
* Type-safe client for communicating with the coordination server.
|
* Type-safe client for communicating with the coordination server.
|
||||||
* Uses httpBatchLink for efficient request batching.
|
* Uses splitLink to route subscriptions to SSE and queries/mutations to HTTP batch.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { createTRPCClient, httpBatchLink } from '@trpc/client';
|
import { createTRPCClient, httpBatchLink, splitLink, httpSubscriptionLink } from '@trpc/client';
|
||||||
import type { AppRouter } from '../trpc/index.js';
|
import type { AppRouter } from '../trpc/index.js';
|
||||||
|
|
||||||
/** Default server port */
|
/** Default server port */
|
||||||
@@ -30,10 +30,13 @@ export function createTrpcClient(
|
|||||||
port: number = DEFAULT_PORT,
|
port: number = DEFAULT_PORT,
|
||||||
host: string = DEFAULT_HOST
|
host: string = DEFAULT_HOST
|
||||||
): TrpcClient {
|
): TrpcClient {
|
||||||
|
const url = `http://${host}:${port}/trpc`;
|
||||||
return createTRPCClient<AppRouter>({
|
return createTRPCClient<AppRouter>({
|
||||||
links: [
|
links: [
|
||||||
httpBatchLink({
|
splitLink({
|
||||||
url: `http://${host}:${port}/trpc`,
|
condition: (op) => op.type === 'subscription',
|
||||||
|
true: httpSubscriptionLink({ url }),
|
||||||
|
false: httpBatchLink({ url }),
|
||||||
}),
|
}),
|
||||||
],
|
],
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -3,9 +3,11 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import { TRPCError } from '@trpc/server';
|
import { TRPCError } from '@trpc/server';
|
||||||
|
import { tracked, type TrackedEnvelope } from '@trpc/server';
|
||||||
import { z } from 'zod';
|
import { z } from 'zod';
|
||||||
import type { ProcedureBuilder } from '../trpc.js';
|
import type { ProcedureBuilder } from '../trpc.js';
|
||||||
import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js';
|
import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js';
|
||||||
|
import type { ConversationCreatedEvent } from '../../events/types.js';
|
||||||
|
|
||||||
export function conversationProcedures(publicProcedure: ProcedureBuilder) {
|
export function conversationProcedures(publicProcedure: ProcedureBuilder) {
|
||||||
return {
|
return {
|
||||||
@@ -134,5 +136,86 @@ export function conversationProcedures(publicProcedure: ProcedureBuilder) {
|
|||||||
|
|
||||||
return updated;
|
return updated;
|
||||||
}),
|
}),
|
||||||
|
|
||||||
|
onPendingConversation: publicProcedure
|
||||||
|
.input(z.object({ agentId: z.string().min(1) }))
|
||||||
|
.subscription(async function* (opts): AsyncGenerator<TrackedEnvelope<{
|
||||||
|
conversationId: string;
|
||||||
|
fromAgentId: string;
|
||||||
|
question: string;
|
||||||
|
phaseId: string | null;
|
||||||
|
taskId: string | null;
|
||||||
|
}>> {
|
||||||
|
const { agentId } = opts.input;
|
||||||
|
const signal = opts.signal ?? new AbortController().signal;
|
||||||
|
const eventBus = opts.ctx.eventBus;
|
||||||
|
const repo = requireConversationRepository(opts.ctx);
|
||||||
|
|
||||||
|
// First yield any already-pending conversations
|
||||||
|
const existing = await repo.findPendingForAgent(agentId);
|
||||||
|
let eventCounter = 0;
|
||||||
|
for (const conv of existing) {
|
||||||
|
yield tracked(`conv-${eventCounter++}`, {
|
||||||
|
conversationId: conv.id,
|
||||||
|
fromAgentId: conv.fromAgentId,
|
||||||
|
question: conv.question,
|
||||||
|
phaseId: conv.phaseId,
|
||||||
|
taskId: conv.taskId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then listen for new conversation:created events
|
||||||
|
const queue: string[] = []; // conversation IDs
|
||||||
|
let resolve: (() => void) | null = null;
|
||||||
|
|
||||||
|
const handler = (event: ConversationCreatedEvent) => {
|
||||||
|
if (event.payload.toAgentId !== agentId) return;
|
||||||
|
queue.push(event.payload.conversationId);
|
||||||
|
if (resolve) {
|
||||||
|
const r = resolve;
|
||||||
|
resolve = null;
|
||||||
|
r();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
eventBus.on('conversation:created', handler);
|
||||||
|
|
||||||
|
const cleanup = () => {
|
||||||
|
eventBus.off('conversation:created', handler);
|
||||||
|
if (resolve) {
|
||||||
|
const r = resolve;
|
||||||
|
resolve = null;
|
||||||
|
r();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
signal.addEventListener('abort', cleanup, { once: true });
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (!signal.aborted) {
|
||||||
|
while (queue.length > 0) {
|
||||||
|
const convId = queue.shift()!;
|
||||||
|
const conv = await repo.findById(convId);
|
||||||
|
if (conv && conv.status === 'pending') {
|
||||||
|
yield tracked(`conv-${eventCounter++}`, {
|
||||||
|
conversationId: conv.id,
|
||||||
|
fromAgentId: conv.fromAgentId,
|
||||||
|
question: conv.question,
|
||||||
|
phaseId: conv.phaseId,
|
||||||
|
taskId: conv.taskId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!signal.aborted) {
|
||||||
|
await new Promise<void>((r) => {
|
||||||
|
resolve = r;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cleanup();
|
||||||
|
}
|
||||||
|
}),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -61,6 +61,8 @@ export const ALL_EVENT_TYPES: DomainEventType[] = [
|
|||||||
'page:deleted',
|
'page:deleted',
|
||||||
'changeset:created',
|
'changeset:created',
|
||||||
'changeset:reverted',
|
'changeset:reverted',
|
||||||
|
'conversation:created',
|
||||||
|
'conversation:answered',
|
||||||
];
|
];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user