From a6371e156a7cf4e8337336d43448cf7f38d59baa Mon Sep 17 00:00:00 2001 From: Lukas May Date: Tue, 10 Feb 2026 13:43:30 +0100 Subject: [PATCH] feat: Add inter-agent conversation system (listen, ask, answer) Enables parallel agents to communicate through a CLI-based conversation mechanism coordinated via tRPC. Agents can ask questions to peers and receive answers, with target resolution by agent ID, task ID, or phase ID. --- docs/agent.md | 21 ++- docs/cli-config.md | 9 ++ docs/database.md | 25 +++- docs/dispatch-events.md | 1 + docs/server-api.md | 15 ++ drizzle/0024_add_conversations.sql | 18 +++ src/agent/cleanup-manager.ts | 41 +++--- src/agent/file-io.ts | 7 +- src/agent/manager.ts | 27 ++-- src/agent/prompts/detail.ts | 5 +- src/agent/prompts/discuss.ts | 5 +- src/agent/prompts/execute.ts | 5 +- src/agent/prompts/index.ts | 2 +- src/agent/prompts/plan.ts | 5 +- src/agent/prompts/refine.ts | 5 +- src/agent/prompts/shared.ts | 54 +++++++ src/agent/types.ts | 4 + src/cli/index.ts | 119 +++++++++++++++ src/container.ts | 8 +- .../repositories/conversation-repository.ts | 23 +++ src/db/repositories/drizzle/conversation.ts | 67 +++++++++ src/db/repositories/drizzle/index.ts | 1 + src/db/repositories/index.ts | 5 + src/db/schema.ts | 24 +++ src/events/types.ts | 26 +++- src/trpc/context.ts | 5 + src/trpc/router.ts | 2 + src/trpc/routers/_helpers.ts | 11 ++ src/trpc/routers/conversation.ts | 138 ++++++++++++++++++ 29 files changed, 632 insertions(+), 46 deletions(-) create mode 100644 drizzle/0024_add_conversations.sql create mode 100644 src/db/repositories/conversation-repository.ts create mode 100644 src/db/repositories/drizzle/conversation.ts create mode 100644 src/trpc/routers/conversation.ts diff --git a/docs/agent.md b/docs/agent.md index b11544f..7d3b963 100644 --- a/docs/agent.md +++ b/docs/agent.md @@ -24,7 +24,7 @@ | `accounts/` | Account discovery, config dir setup, credential management, usage API | | `credentials/` | `AccountCredentialManager` — credential injection per account | | `lifecycle/` | `LifecycleController` — retry policy, signal recovery, missing signal instructions | -| `prompts/` | Mode-specific prompt builders (execute, discuss, plan, detail, refine) | +| `prompts/` | Mode-specific prompt builders (execute, discuss, plan, detail, refine) + shared inter-agent communication instructions | ## Key Flows @@ -124,3 +124,22 @@ Agent output is persisted to `agent_log_chunks` table and drives all live stream - Read path (`getAgentOutput` tRPC): concatenates all DB chunks (no file fallback) - Live path (`onAgentOutput` subscription): listens for `agent:output` events - Frontend: initial query loads from DB, subscription accumulates raw JSONL, both parsed via `parseAgentOutput()` + +## Inter-Agent Communication + +Agents can communicate with each other via the `conversations` table, coordinated through CLI commands. + +### Prompt Integration +`INTER_AGENT_COMMUNICATION` constant in `prompts/shared.ts` is appended to all 5 agent mode prompts. It instructs agents to: +1. Start `cw listen --agent-id &` as a background process at session start +2. Handle incoming questions by answering via `cw answer` and restarting the listener +3. Ask questions to peers via `cw ask --from --agent-id|--phase-id|--task-id` +4. Kill the listener before writing `signal.json` + +### Agent Identity +`manifest.json` now includes `agentId` and `agentName` fields. The manager passes these from the DB record after agent creation. + +### CLI Commands +- `cw listen`: Polls `getPendingConversations`, prints first pending as JSON, exits +- `cw ask`: Creates conversation, polls `getConversation` until answered, prints answer +- `cw answer`: Calls `answerConversation`, prints confirmation JSON diff --git a/docs/cli-config.md b/docs/cli-config.md index ec5f449..22fc68b 100644 --- a/docs/cli-config.md +++ b/docs/cli-config.md @@ -107,6 +107,15 @@ Uses **Commander.js** for command parsing. | `list [--initiative ]` | List active previews | | `status ` | Get preview status with service details | +### Inter-Agent Conversation (`cw listen`, `cw ask`, `cw answer`) +| Command | Description | +|---------|-------------| +| `listen --agent-id [--poll-interval ] [--timeout ]` | Poll for pending questions, print JSON, exit | +| `ask --from [--agent-id\|--phase-id\|--task-id ] [--poll-interval ] [--timeout ]` | Ask question, block until answered, print answer | +| `answer --conversation-id ` | Answer a pending conversation | + +All three commands output JSON for programmatic agent consumption. + ### Accounts (`cw account`) | Command | Description | |---------|-------------| diff --git a/docs/database.md b/docs/database.md index b6493d4..e3d7307 100644 --- a/docs/database.md +++ b/docs/database.md @@ -143,9 +143,29 @@ Self-referencing (parentMessageId) for threading. Sender/recipient types: 'agent Index on `agentId` for fast queries. +### conversations + +Inter-agent communication records. + +| Column | Type | Notes | +|--------|------|-------| +| id | text PK | | +| fromAgentId | text NOT NULL | FK→agents ON DELETE CASCADE | +| toAgentId | text NOT NULL | FK→agents ON DELETE CASCADE | +| initiativeId | text | FK→initiatives ON DELETE SET NULL | +| phaseId | text | FK→phases ON DELETE SET NULL | +| taskId | text | FK→tasks ON DELETE SET NULL | +| question | text NOT NULL | | +| answer | text | nullable until answered | +| status | text enum | pending, answered | +| createdAt | integer/timestamp | | +| updatedAt | integer/timestamp | | + +Indexes: `(toAgentId, status)` for listen polling, `(fromAgentId)`. + ## Repository Interfaces -10 repositories, each with standard CRUD plus domain-specific methods: +11 repositories, each with standard CRUD plus domain-specific methods: | Repository | Key Methods | |-----------|-------------| @@ -159,6 +179,7 @@ Index on `agentId` for fast queries. | AccountRepository | + findNextAvailable (round-robin), markExhausted, clearExpiredExhaustion | | ProposalRepository | + findByAgentIdAndStatus, updateManyByAgentId, countByAgentIdAndStatus | | LogChunkRepository | insertChunk, findByAgentId, deleteByAgentId, getSessionCount | +| ConversationRepository | create, findById, findPendingForAgent, answer | ## Migrations @@ -170,4 +191,4 @@ Key rules: - See [database-migrations.md](database-migrations.md) for full workflow - Snapshots stale after 0008; migrations 0008+ are hand-written -Current migrations: 0000 through 0018 (19 total). +Current migrations: 0000 through 0024 (25 total). diff --git a/docs/dispatch-events.md b/docs/dispatch-events.md index 20b6c61..19f8f4e 100644 --- a/docs/dispatch-events.md +++ b/docs/dispatch-events.md @@ -25,6 +25,7 @@ | **Worktree** | `worktree:created`, `worktree:removed`, `worktree:merged`, `worktree:conflict` | 4 | | **Account** | `account:credentials_refreshed`, `account:credentials_expired`, `account:credentials_validated` | 3 | | **Preview** | `preview:building`, `preview:ready`, `preview:stopped`, `preview:failed` | 4 | +| **Conversation** | `conversation:created`, `conversation:answered` | 2 | | **Log** | `log:entry` | 1 | ### Key Event Payloads diff --git a/docs/server-api.md b/docs/server-api.md index 7c11bab..b94ee83 100644 --- a/docs/server-api.md +++ b/docs/server-api.md @@ -212,3 +212,18 @@ Docker-based preview deployments. No database table — Docker is the source of | `getPreviewStatus` | query | Get preview status: `{previewId}` → PreviewStatus | Context dependency: `requirePreviewManager(ctx)` — requires `PreviewManager` from container. + +## Conversation Procedures + +Inter-agent communication for parallel agents. + +| Procedure | Type | Description | +|-----------|------|-------------| +| `createConversation` | mutation | Ask a question: `{fromAgentId, toAgentId?, phaseId?, taskId?, question}` → Conversation | +| `getPendingConversations` | query | Poll for incoming questions: `{agentId}` → Conversation[] | +| `getConversation` | query | Get conversation by ID: `{id}` → Conversation | +| `answerConversation` | mutation | Answer a conversation: `{id, answer}` → Conversation | + +Target resolution: `toAgentId` → direct; `taskId` → find running agent by task; `phaseId` → find running agent by any task in phase. + +Context dependency: `requireConversationRepository(ctx)`, `requireAgentManager(ctx)`. diff --git a/drizzle/0024_add_conversations.sql b/drizzle/0024_add_conversations.sql new file mode 100644 index 0000000..c5c3fbd --- /dev/null +++ b/drizzle/0024_add_conversations.sql @@ -0,0 +1,18 @@ +-- Inter-agent conversations +CREATE TABLE `conversations` ( + `id` text PRIMARY KEY NOT NULL, + `from_agent_id` text NOT NULL REFERENCES `agents`(`id`) ON DELETE CASCADE, + `to_agent_id` text NOT NULL REFERENCES `agents`(`id`) ON DELETE CASCADE, + `initiative_id` text REFERENCES `initiatives`(`id`) ON DELETE SET NULL, + `phase_id` text REFERENCES `phases`(`id`) ON DELETE SET NULL, + `task_id` text REFERENCES `tasks`(`id`) ON DELETE SET NULL, + `question` text NOT NULL, + `answer` text, + `status` text DEFAULT 'pending' NOT NULL, + `created_at` integer NOT NULL, + `updated_at` integer NOT NULL +); +--> statement-breakpoint +CREATE INDEX `conversations_to_agent_status_idx` ON `conversations` (`to_agent_id`, `status`); +--> statement-breakpoint +CREATE INDEX `conversations_from_agent_idx` ON `conversations` (`from_agent_id`); diff --git a/src/agent/cleanup-manager.ts b/src/agent/cleanup-manager.ts index eb0dea9..c37433c 100644 --- a/src/agent/cleanup-manager.ts +++ b/src/agent/cleanup-manager.ts @@ -206,43 +206,50 @@ export class CleanupManager { } /** - * Check if all project worktrees for an agent are clean (no uncommitted/untracked files). + * Get the relative subdirectory names of dirty worktrees for an agent. + * Returns an empty array if all worktrees are clean or the workdir doesn't exist. */ - async isWorkdirClean(alias: string, initiativeId: string | null): Promise { + async getDirtyWorktreePaths(alias: string, initiativeId: string | null): Promise { const agentWorkdir = this.getAgentWorkdir(alias); try { await readdir(agentWorkdir); } catch { - // Workdir doesn't exist — treat as clean - return true; + return []; } - const worktreePaths: string[] = []; + const worktreePaths: { absPath: string; name: string }[] = []; if (initiativeId) { const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId); for (const project of projects) { - worktreePaths.push(join(agentWorkdir, project.name)); + worktreePaths.push({ absPath: join(agentWorkdir, project.name), name: project.name }); } } else { - worktreePaths.push(join(agentWorkdir, 'workspace')); + worktreePaths.push({ absPath: join(agentWorkdir, 'workspace'), name: 'workspace' }); } - for (const wtPath of worktreePaths) { + const dirty: string[] = []; + for (const { absPath, name } of worktreePaths) { try { - const { stdout } = await execFileAsync('git', ['status', '--porcelain'], { cwd: wtPath }); - if (stdout.trim().length > 0) { - log.info({ alias, worktree: wtPath }, 'workdir has uncommitted changes'); - return false; - } - } catch (err) { - log.warn({ alias, worktree: wtPath, err: err instanceof Error ? err.message : String(err) }, 'git status failed, treating as dirty'); - return false; + const { stdout } = await execFileAsync('git', ['status', '--porcelain'], { cwd: absPath }); + if (stdout.trim().length > 0) dirty.push(name); + } catch { + dirty.push(name); } } + return dirty; + } - return true; + /** + * Check if all project worktrees for an agent are clean (no uncommitted/untracked files). + */ + async isWorkdirClean(alias: string, initiativeId: string | null): Promise { + const dirty = await this.getDirtyWorktreePaths(alias, initiativeId); + if (dirty.length > 0) { + log.info({ alias, dirtyWorktrees: dirty }, 'workdir has uncommitted changes'); + } + return dirty.length === 0; } /** diff --git a/src/agent/file-io.ts b/src/agent/file-io.ts index 56764fe..c7455e5 100644 --- a/src/agent/file-io.ts +++ b/src/agent/file-io.ts @@ -259,7 +259,12 @@ export function writeInputFiles(options: WriteInputFilesOptions): void { // Write manifest listing exactly which files were created writeFileSync( join(inputDir, 'manifest.json'), - JSON.stringify({ files: manifestFiles, contextFiles }) + '\n', + JSON.stringify({ + files: manifestFiles, + contextFiles, + agentId: options.agentId ?? null, + agentName: options.agentName ?? null, + }) + '\n', 'utf-8', ); } diff --git a/src/agent/manager.ts b/src/agent/manager.ts index cb2fad7..d6d5546 100644 --- a/src/agent/manager.ts +++ b/src/agent/manager.ts @@ -255,13 +255,7 @@ export class MultiProviderAgentManager implements AgentManager { initiativeBasedAgent: !!initiativeId }, 'agent workdir setup completed'); - // 2b. Write input files - if (options.inputContext) { - writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext }); - log.debug({ alias }, 'input files written'); - } - - // 2c. Append workspace layout to prompt now that worktrees exist + // 2b. Append workspace layout to prompt now that worktrees exist const workspaceSection = buildWorkspaceLayout(agentCwd); if (workspaceSection) { prompt = prompt + workspaceSection; @@ -281,6 +275,12 @@ export class MultiProviderAgentManager implements AgentManager { }); const agentId = agent.id; + // 3b. Write input files (after agent creation so we can include agentId/agentName) + if (options.inputContext) { + writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext, agentId, agentName: alias }); + log.debug({ alias }, 'input files written'); + } + // 4. Build spawn command const { command, args, env: providerEnv } = this.processManager.buildSpawnCommand(provider, prompt); const finalCwd = cwd ?? agentCwd; @@ -418,7 +418,6 @@ export class MultiProviderAgentManager implements AgentManager { } } catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'auto-cleanup failed'); - } finally { this.commitRetryCount.delete(agentId); } } @@ -434,10 +433,16 @@ export class MultiProviderAgentManager implements AgentManager { const provider = getProvider(agent.provider); if (!provider || provider.resumeStyle === 'none') return false; + // Check which specific worktrees are dirty — skip resume if all clean + const dirtyPaths = await this.cleanupManager.getDirtyWorktreePaths(agent.name, agent.initiativeId); + if (dirtyPaths.length === 0) return false; + + const dirtyList = dirtyPaths.map(p => `- \`${p}/\``).join('\n'); const commitPrompt = - 'You have uncommitted changes in your working directory. ' + - 'Stage everything with `git add -A` and commit with an appropriate message describing your work. ' + - 'Do not make any other changes.'; + 'You have uncommitted changes in the following project directories:\n' + + dirtyList + '\n\n' + + 'For each directory listed above, `cd` into it, then run `git add -A && git commit -m ""` ' + + 'with an appropriate commit message describing the work. Do not make any other changes.'; await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null }); diff --git a/src/agent/prompts/detail.ts b/src/agent/prompts/detail.ts index b6a7e98..4a9c294 100644 --- a/src/agent/prompts/detail.ts +++ b/src/agent/prompts/detail.ts @@ -2,7 +2,7 @@ * Detail mode prompt — break a phase into executable tasks. */ -import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js'; +import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; export function buildDetailPrompt(): string { return `You are an Architect agent in the Codewalk multi-agent system operating in DETAIL mode. @@ -41,5 +41,6 @@ ${ID_GENERATION} - Break work into 3-8 tasks per phase - Order tasks logically (foundational work first) - Make each task self-contained with enough context -- Include test/verify tasks where appropriate`; +- Include test/verify tasks where appropriate +${INTER_AGENT_COMMUNICATION}`; } diff --git a/src/agent/prompts/discuss.ts b/src/agent/prompts/discuss.ts index 147ddf6..b54e93f 100644 --- a/src/agent/prompts/discuss.ts +++ b/src/agent/prompts/discuss.ts @@ -2,7 +2,7 @@ * Discuss mode prompt — clarifying questions and decision capture. */ -import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js'; +import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; export function buildDiscussPrompt(): string { return `You are an Architect agent in the Codewalk multi-agent system operating in DISCUSS mode. @@ -30,5 +30,6 @@ ${ID_GENERATION} - Ask 2-4 questions at a time, not more - Provide options when choices are clear - Capture every decision with rationale -- Don't proceed until ambiguities are resolved`; +- Don't proceed until ambiguities are resolved +${INTER_AGENT_COMMUNICATION}`; } diff --git a/src/agent/prompts/execute.ts b/src/agent/prompts/execute.ts index c3a7873..c6d0629 100644 --- a/src/agent/prompts/execute.ts +++ b/src/agent/prompts/execute.ts @@ -2,7 +2,7 @@ * Execute mode prompt — standard worker agent. */ -import { INPUT_FILES, SIGNAL_FORMAT } from './shared.js'; +import { INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; export function buildExecutePrompt(): string { return `You are a Worker agent in the Codewalk multi-agent system. @@ -16,5 +16,6 @@ ${SIGNAL_FORMAT} - Complete the task as specified in .cw/input/task.md - Ask questions if requirements are unclear - Report errors honestly — don't guess -- Focus on writing clean, tested code`; +- Focus on writing clean, tested code +${INTER_AGENT_COMMUNICATION}`; } diff --git a/src/agent/prompts/index.ts b/src/agent/prompts/index.ts index 455b319..b9cb923 100644 --- a/src/agent/prompts/index.ts +++ b/src/agent/prompts/index.ts @@ -5,7 +5,7 @@ * input files, ID generation) are in shared.ts. */ -export { SIGNAL_FORMAT, INPUT_FILES, ID_GENERATION } from './shared.js'; +export { SIGNAL_FORMAT, INPUT_FILES, ID_GENERATION, INTER_AGENT_COMMUNICATION } from './shared.js'; export { buildExecutePrompt } from './execute.js'; export { buildDiscussPrompt } from './discuss.js'; export { buildPlanPrompt } from './plan.js'; diff --git a/src/agent/prompts/plan.ts b/src/agent/prompts/plan.ts index 3e3907e..577018c 100644 --- a/src/agent/prompts/plan.ts +++ b/src/agent/prompts/plan.ts @@ -2,7 +2,7 @@ * Plan mode prompt — plan initiative into phases. */ -import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js'; +import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; export function buildPlanPrompt(): string { return `You are an Architect agent in the Codewalk multi-agent system operating in PLAN mode. @@ -36,5 +36,6 @@ ${ID_GENERATION} - Start with foundation/infrastructure phases - Group related work together - Make dependencies explicit using phase IDs -- Each task should be completable in one session`; +- Each task should be completable in one session +${INTER_AGENT_COMMUNICATION}`; } diff --git a/src/agent/prompts/refine.ts b/src/agent/prompts/refine.ts index d69e684..5fb0696 100644 --- a/src/agent/prompts/refine.ts +++ b/src/agent/prompts/refine.ts @@ -2,7 +2,7 @@ * Refine mode prompt — review and propose edits to initiative pages. */ -import { INPUT_FILES, SIGNAL_FORMAT } from './shared.js'; +import { INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js'; export function buildRefinePrompt(): string { return `You are an Architect agent in the Codewalk multi-agent system operating in REFINE mode. @@ -24,5 +24,6 @@ Write one file per modified page to \`.cw/output/pages/{pageId}.md\`: - Each output page's body is the FULL new content (not a diff) - Preserve [[page:\$id|title]] cross-references in your output - Focus on clarity, completeness, and consistency -- Do not invent new page IDs — only reference existing ones from .cw/input/pages/`; +- Do not invent new page IDs — only reference existing ones from .cw/input/pages/ +${INTER_AGENT_COMMUNICATION}`; } diff --git a/src/agent/prompts/shared.ts b/src/agent/prompts/shared.ts index 0a85cc8..2c95223 100644 --- a/src/agent/prompts/shared.ts +++ b/src/agent/prompts/shared.ts @@ -43,3 +43,57 @@ When creating new entities (phases, tasks, decisions), generate a unique ID by r cw id \`\`\` Use the output as the filename (e.g., \`{id}.md\`).`; + +export const INTER_AGENT_COMMUNICATION = ` +## Inter-Agent Communication + +You are working in a multi-agent parallel environment. Other agents may be working on related tasks simultaneously. + +### Your Identity +Read \`.cw/input/manifest.json\` — it contains \`agentId\` and \`agentName\` fields identifying you. + +### Listening for Questions +At the START of your session, start a background listener: +\`\`\`bash +cw listen --agent-id & +LISTEN_PID=$! +\`\`\` + +When the listener prints JSON to stdout, another agent is asking you a question: +\`\`\`json +{ "conversationId": "...", "fromAgentId": "...", "question": "..." } +\`\`\` + +Answer it: +\`\`\`bash +cw answer "" --conversation-id +\`\`\` + +Then restart the listener: +\`\`\`bash +cw listen --agent-id & +LISTEN_PID=$! +\`\`\` + +### Asking Questions +To ask another agent a question (blocks until answered): +\`\`\`bash +cw ask "What interface does the user service expose?" --from --agent-id +\`\`\` + +You can also target by task or phase: +\`\`\`bash +cw ask "What port does the API run on?" --from --task-id +cw ask "What schema are you using?" --from --phase-id +\`\`\` + +### When to Communicate +- You need interface/schema/contract info from another agent's work +- You're about to modify a shared resource and want to coordinate +- You have a dependency on work another agent is doing + +### Cleanup +Before writing \`.cw/output/signal.json\`, kill your listener: +\`\`\`bash +kill $LISTEN_PID 2>/dev/null +\`\`\``; diff --git a/src/agent/types.ts b/src/agent/types.ts index eff15cc..3640072 100644 --- a/src/agent/types.ts +++ b/src/agent/types.ts @@ -29,6 +29,10 @@ export interface AgentInputContext { phases?: Array; /** All tasks for the initiative (read-only context for agents) */ tasks?: import('../db/schema.js').Task[]; + /** Agent ID for inter-agent communication */ + agentId?: string; + /** Agent name for inter-agent communication */ + agentName?: string; } /** diff --git a/src/cli/index.ts b/src/cli/index.ts index f5cb26b..177a0b5 100644 --- a/src/cli/index.ts +++ b/src/cli/index.ts @@ -1362,6 +1362,125 @@ export function createCli(serverHandler?: (port?: number) => Promise): Com } }); + // ========================================================================= + // Inter-agent conversation commands + // ========================================================================= + + // cw listen --agent-id [--poll-interval ] [--timeout ] + program + .command('listen') + .description('Poll for pending conversations addressed to an agent') + .requiredOption('--agent-id ', 'Agent ID to listen for') + .option('--poll-interval ', 'Poll interval in milliseconds', '2000') + .option('--timeout ', 'Timeout in milliseconds (0=forever)', '0') + .action(async (options: { agentId: string; pollInterval: string; timeout: string }) => { + try { + const client = createDefaultTrpcClient(); + const pollInterval = parseInt(options.pollInterval, 10); + const timeout = parseInt(options.timeout, 10); + const startTime = Date.now(); + + // eslint-disable-next-line no-constant-condition + while (true) { + const pending = await client.getPendingConversations.query({ agentId: options.agentId }); + if (pending.length > 0) { + const conv = pending[0]; + console.log(JSON.stringify({ + conversationId: conv.id, + fromAgentId: conv.fromAgentId, + question: conv.question, + phaseId: conv.phaseId, + taskId: conv.taskId, + })); + process.exit(0); + } + + if (timeout > 0 && Date.now() - startTime >= timeout) { + process.exit(1); + } + + await new Promise(resolve => setTimeout(resolve, pollInterval)); + } + } catch (error) { + console.error('Failed to listen:', (error as Error).message); + process.exit(1); + } + }); + + // cw ask --from [--agent-id|--phase-id|--task-id ] [--poll-interval ] [--timeout ] + program + .command('ask ') + .description('Ask a question to another agent and wait for the answer') + .requiredOption('--from ', 'Your agent ID (the asker)') + .option('--agent-id ', 'Target agent ID') + .option('--phase-id ', 'Target phase ID (find running agent in phase)') + .option('--task-id ', 'Target task ID (find running agent for task)') + .option('--poll-interval ', 'Poll interval in milliseconds', '2000') + .option('--timeout ', 'Timeout in milliseconds (0=forever)', '0') + .action(async (question: string, options: { + from: string; + agentId?: string; + phaseId?: string; + taskId?: string; + pollInterval: string; + timeout: string; + }) => { + try { + const client = createDefaultTrpcClient(); + const pollInterval = parseInt(options.pollInterval, 10); + const timeout = parseInt(options.timeout, 10); + + // Create the conversation + const conversation = await client.createConversation.mutate({ + fromAgentId: options.from, + toAgentId: options.agentId, + phaseId: options.phaseId, + taskId: options.taskId, + question, + }); + + // Poll for answer + const startTime = Date.now(); + // eslint-disable-next-line no-constant-condition + while (true) { + const conv = await client.getConversation.query({ id: conversation.id }); + if (conv && conv.status === 'answered') { + console.log(conv.answer); + process.exit(0); + } + + if (timeout > 0 && Date.now() - startTime >= timeout) { + console.error('Timed out waiting for answer'); + process.exit(1); + } + + await new Promise(resolve => setTimeout(resolve, pollInterval)); + } + } catch (error) { + console.error('Failed to ask:', (error as Error).message); + process.exit(1); + } + }); + + // cw answer --conversation-id + program + .command('answer ') + .description('Answer a pending conversation') + .requiredOption('--conversation-id ', 'Conversation ID to answer') + .action(async (answer: string, options: { conversationId: string }) => { + try { + const client = createDefaultTrpcClient(); + await client.answerConversation.mutate({ + id: options.conversationId, + answer, + }); + console.log(JSON.stringify({ conversationId: options.conversationId, status: 'answered' })); + } catch (error) { + console.error('Failed to answer:', (error as Error).message); + process.exit(1); + } + }); + return program; } diff --git a/src/container.ts b/src/container.ts index 5012b2a..1c8198d 100644 --- a/src/container.ts +++ b/src/container.ts @@ -19,6 +19,7 @@ import { DrizzleAccountRepository, DrizzleChangeSetRepository, DrizzleLogChunkRepository, + DrizzleConversationRepository, } from './db/index.js'; import type { InitiativeRepository } from './db/repositories/initiative-repository.js'; import type { PhaseRepository } from './db/repositories/phase-repository.js'; @@ -30,6 +31,7 @@ import type { ProjectRepository } from './db/repositories/project-repository.js' import type { AccountRepository } from './db/repositories/account-repository.js'; import type { ChangeSetRepository } from './db/repositories/change-set-repository.js'; import type { LogChunkRepository } from './db/repositories/log-chunk-repository.js'; +import type { ConversationRepository } from './db/repositories/conversation-repository.js'; import type { EventBus } from './events/index.js'; import { createEventBus } from './events/index.js'; import { ProcessManager, ProcessRegistry } from './process/index.js'; @@ -54,7 +56,7 @@ import type { ServerContextDeps } from './server/index.js'; // ============================================================================= /** - * All 10 repository ports. + * All 11 repository ports. */ export interface Repositories { initiativeRepository: InitiativeRepository; @@ -67,10 +69,11 @@ export interface Repositories { accountRepository: AccountRepository; changeSetRepository: ChangeSetRepository; logChunkRepository: LogChunkRepository; + conversationRepository: ConversationRepository; } /** - * Create all 10 Drizzle repository adapters from a database instance. + * Create all 11 Drizzle repository adapters from a database instance. * Reusable by both the production server and the test harness. */ export function createRepositories(db: DrizzleDatabase): Repositories { @@ -85,6 +88,7 @@ export function createRepositories(db: DrizzleDatabase): Repositories { accountRepository: new DrizzleAccountRepository(db), changeSetRepository: new DrizzleChangeSetRepository(db), logChunkRepository: new DrizzleLogChunkRepository(db), + conversationRepository: new DrizzleConversationRepository(db), }; } diff --git a/src/db/repositories/conversation-repository.ts b/src/db/repositories/conversation-repository.ts new file mode 100644 index 0000000..0545ed1 --- /dev/null +++ b/src/db/repositories/conversation-repository.ts @@ -0,0 +1,23 @@ +/** + * Conversation Repository Port Interface + * + * Port for inter-agent conversation persistence operations. + */ + +import type { Conversation } from '../schema.js'; + +export interface CreateConversationData { + fromAgentId: string; + toAgentId: string; + initiativeId?: string | null; + phaseId?: string | null; + taskId?: string | null; + question: string; +} + +export interface ConversationRepository { + create(data: CreateConversationData): Promise; + findById(id: string): Promise; + findPendingForAgent(toAgentId: string): Promise; + answer(id: string, answer: string): Promise; +} diff --git a/src/db/repositories/drizzle/conversation.ts b/src/db/repositories/drizzle/conversation.ts new file mode 100644 index 0000000..6ab11dd --- /dev/null +++ b/src/db/repositories/drizzle/conversation.ts @@ -0,0 +1,67 @@ +/** + * Drizzle Conversation Repository Adapter + * + * Implements ConversationRepository interface using Drizzle ORM. + */ + +import { eq, and, asc } from 'drizzle-orm'; +import { nanoid } from 'nanoid'; +import type { DrizzleDatabase } from '../../index.js'; +import { conversations, type Conversation } from '../../schema.js'; +import type { ConversationRepository, CreateConversationData } from '../conversation-repository.js'; + +export class DrizzleConversationRepository implements ConversationRepository { + constructor(private db: DrizzleDatabase) {} + + async create(data: CreateConversationData): Promise { + const now = new Date(); + const id = nanoid(); + await this.db.insert(conversations).values({ + id, + fromAgentId: data.fromAgentId, + toAgentId: data.toAgentId, + initiativeId: data.initiativeId ?? null, + phaseId: data.phaseId ?? null, + taskId: data.taskId ?? null, + question: data.question, + status: 'pending', + createdAt: now, + updatedAt: now, + }); + return this.findById(id) as Promise; + } + + async findById(id: string): Promise { + const rows = await this.db + .select() + .from(conversations) + .where(eq(conversations.id, id)) + .limit(1); + return rows[0] ?? null; + } + + async findPendingForAgent(toAgentId: string): Promise { + return this.db + .select() + .from(conversations) + .where( + and( + eq(conversations.toAgentId, toAgentId), + eq(conversations.status, 'pending' as 'pending' | 'answered'), + ), + ) + .orderBy(asc(conversations.createdAt)); + } + + async answer(id: string, answer: string): Promise { + await this.db + .update(conversations) + .set({ + answer, + status: 'answered' as 'pending' | 'answered', + updatedAt: new Date(), + }) + .where(eq(conversations.id, id)); + return this.findById(id); + } +} diff --git a/src/db/repositories/drizzle/index.ts b/src/db/repositories/drizzle/index.ts index 027c7cf..e58eb86 100644 --- a/src/db/repositories/drizzle/index.ts +++ b/src/db/repositories/drizzle/index.ts @@ -15,3 +15,4 @@ export { DrizzleProjectRepository } from './project.js'; export { DrizzleAccountRepository } from './account.js'; export { DrizzleChangeSetRepository } from './change-set.js'; export { DrizzleLogChunkRepository } from './log-chunk.js'; +export { DrizzleConversationRepository } from './conversation.js'; diff --git a/src/db/repositories/index.ts b/src/db/repositories/index.ts index 4ee9cc6..1dddc9f 100644 --- a/src/db/repositories/index.ts +++ b/src/db/repositories/index.ts @@ -67,3 +67,8 @@ export type { export type { LogChunkRepository, } from './log-chunk-repository.js'; + +export type { + ConversationRepository, + CreateConversationData, +} from './conversation-repository.js'; diff --git a/src/db/schema.ts b/src/db/schema.ts index bd9044f..afab311 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -511,3 +511,27 @@ export const agentLogChunks = sqliteTable('agent_log_chunks', { export type AgentLogChunk = InferSelectModel; export type NewAgentLogChunk = InferInsertModel; + +// ============================================================================ +// CONVERSATIONS (inter-agent communication) +// ============================================================================ + +export const conversations = sqliteTable('conversations', { + id: text('id').primaryKey(), + fromAgentId: text('from_agent_id').notNull().references(() => agents.id, { onDelete: 'cascade' }), + toAgentId: text('to_agent_id').notNull().references(() => agents.id, { onDelete: 'cascade' }), + initiativeId: text('initiative_id').references(() => initiatives.id, { onDelete: 'set null' }), + phaseId: text('phase_id').references(() => phases.id, { onDelete: 'set null' }), + taskId: text('task_id').references(() => tasks.id, { onDelete: 'set null' }), + question: text('question').notNull(), + answer: text('answer'), + status: text('status', { enum: ['pending', 'answered'] }).notNull().default('pending'), + createdAt: integer('created_at', { mode: 'timestamp' }).notNull(), + updatedAt: integer('updated_at', { mode: 'timestamp' }).notNull(), +}, (table) => [ + index('conversations_to_agent_status_idx').on(table.toAgentId, table.status), + index('conversations_from_agent_idx').on(table.fromAgentId), +]); + +export type Conversation = InferSelectModel; +export type NewConversation = InferInsertModel; diff --git a/src/events/types.ts b/src/events/types.ts index e3c3ace..a0d317f 100644 --- a/src/events/types.ts +++ b/src/events/types.ts @@ -520,6 +520,28 @@ export interface AccountCredentialsValidatedEvent extends DomainEvent { }; } +/** + * Inter-Agent Conversation Events + */ + +export interface ConversationCreatedEvent extends DomainEvent { + type: 'conversation:created'; + payload: { + conversationId: string; + fromAgentId: string; + toAgentId: string; + }; +} + +export interface ConversationAnsweredEvent extends DomainEvent { + type: 'conversation:answered'; + payload: { + conversationId: string; + fromAgentId: string; + toAgentId: string; + }; +} + /** * Union of all domain events - enables type-safe event handling */ @@ -569,7 +591,9 @@ export type DomainEventMap = | PreviewBuildingEvent | PreviewReadyEvent | PreviewStoppedEvent - | PreviewFailedEvent; + | PreviewFailedEvent + | ConversationCreatedEvent + | ConversationAnsweredEvent; /** * Event type literal union for type checking diff --git a/src/trpc/context.ts b/src/trpc/context.ts index fcd91f1..148a844 100644 --- a/src/trpc/context.ts +++ b/src/trpc/context.ts @@ -16,6 +16,7 @@ import type { ProjectRepository } from '../db/repositories/project-repository.js import type { AccountRepository } from '../db/repositories/account-repository.js'; import type { ChangeSetRepository } from '../db/repositories/change-set-repository.js'; import type { LogChunkRepository } from '../db/repositories/log-chunk-repository.js'; +import type { ConversationRepository } from '../db/repositories/conversation-repository.js'; import type { AccountCredentialManager } from '../agent/credentials/types.js'; import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js'; import type { CoordinationManager } from '../coordination/types.js'; @@ -70,6 +71,8 @@ export interface TRPCContext { executionOrchestrator?: ExecutionOrchestrator; /** Preview manager for Docker-based preview deployments */ previewManager?: PreviewManager; + /** Conversation repository for inter-agent communication */ + conversationRepository?: ConversationRepository; /** Absolute path to the workspace root (.cwrc directory) */ workspaceRoot?: string; } @@ -98,6 +101,7 @@ export interface CreateContextOptions { branchManager?: BranchManager; executionOrchestrator?: ExecutionOrchestrator; previewManager?: PreviewManager; + conversationRepository?: ConversationRepository; workspaceRoot?: string; } @@ -129,6 +133,7 @@ export function createContext(options: CreateContextOptions): TRPCContext { branchManager: options.branchManager, executionOrchestrator: options.executionOrchestrator, previewManager: options.previewManager, + conversationRepository: options.conversationRepository, workspaceRoot: options.workspaceRoot, }; } diff --git a/src/trpc/router.ts b/src/trpc/router.ts index b7cb3ea..e83aaeb 100644 --- a/src/trpc/router.ts +++ b/src/trpc/router.ts @@ -22,6 +22,7 @@ import { accountProcedures } from './routers/account.js'; import { changeSetProcedures } from './routers/change-set.js'; import { subscriptionProcedures } from './routers/subscription.js'; import { previewProcedures } from './routers/preview.js'; +import { conversationProcedures } from './routers/conversation.js'; // Re-export tRPC primitives (preserves existing import paths) export { router, publicProcedure, middleware, createCallerFactory } from './trpc.js'; @@ -59,6 +60,7 @@ export const appRouter = router({ ...changeSetProcedures(publicProcedure), ...subscriptionProcedures(publicProcedure), ...previewProcedures(publicProcedure), + ...conversationProcedures(publicProcedure), }); export type AppRouter = typeof appRouter; diff --git a/src/trpc/routers/_helpers.ts b/src/trpc/routers/_helpers.ts index 5c39f59..5fed4a3 100644 --- a/src/trpc/routers/_helpers.ts +++ b/src/trpc/routers/_helpers.ts @@ -16,6 +16,7 @@ import type { ProjectRepository } from '../../db/repositories/project-repository import type { AccountRepository } from '../../db/repositories/account-repository.js'; import type { ChangeSetRepository } from '../../db/repositories/change-set-repository.js'; import type { LogChunkRepository } from '../../db/repositories/log-chunk-repository.js'; +import type { ConversationRepository } from '../../db/repositories/conversation-repository.js'; import type { DispatchManager, PhaseDispatchManager } from '../../dispatch/types.js'; import type { CoordinationManager } from '../../coordination/types.js'; import type { BranchManager } from '../../git/branch-manager.js'; @@ -181,3 +182,13 @@ export function requirePreviewManager(ctx: TRPCContext): PreviewManager { } return ctx.previewManager; } + +export function requireConversationRepository(ctx: TRPCContext): ConversationRepository { + if (!ctx.conversationRepository) { + throw new TRPCError({ + code: 'INTERNAL_SERVER_ERROR', + message: 'Conversation repository not available', + }); + } + return ctx.conversationRepository; +} diff --git a/src/trpc/routers/conversation.ts b/src/trpc/routers/conversation.ts new file mode 100644 index 0000000..f8da953 --- /dev/null +++ b/src/trpc/routers/conversation.ts @@ -0,0 +1,138 @@ +/** + * Conversation Router — inter-agent communication procedures + */ + +import { TRPCError } from '@trpc/server'; +import { z } from 'zod'; +import type { ProcedureBuilder } from '../trpc.js'; +import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js'; + +export function conversationProcedures(publicProcedure: ProcedureBuilder) { + return { + createConversation: publicProcedure + .input(z.object({ + fromAgentId: z.string().min(1), + toAgentId: z.string().min(1).optional(), + phaseId: z.string().min(1).optional(), + taskId: z.string().min(1).optional(), + question: z.string().min(1), + })) + .mutation(async ({ ctx, input }) => { + const repo = requireConversationRepository(ctx); + const agentManager = requireAgentManager(ctx); + + let toAgentId = input.toAgentId; + + // Resolve target agent from taskId + if (!toAgentId && input.taskId) { + const agents = await agentManager.list(); + const match = agents.find(a => a.taskId === input.taskId && a.status === 'running'); + if (!match) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: `No running agent found for task '${input.taskId}'`, + }); + } + toAgentId = match.id; + } + + // Resolve target agent from phaseId + if (!toAgentId && input.phaseId) { + const taskRepo = requireTaskRepository(ctx); + const tasks = await taskRepo.findByPhaseId(input.phaseId); + const taskIds = new Set(tasks.map(t => t.id)); + const agents = await agentManager.list(); + const match = agents.find(a => a.taskId && taskIds.has(a.taskId) && a.status === 'running'); + if (!match) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: `No running agent found for phase '${input.phaseId}'`, + }); + } + toAgentId = match.id; + } + + if (!toAgentId) { + throw new TRPCError({ + code: 'BAD_REQUEST', + message: 'Must provide toAgentId, taskId, or phaseId to identify target agent', + }); + } + + const conversation = await repo.create({ + fromAgentId: input.fromAgentId, + toAgentId, + initiativeId: null, + phaseId: input.phaseId ?? null, + taskId: input.taskId ?? null, + question: input.question, + }); + + ctx.eventBus.emit({ + type: 'conversation:created' as const, + timestamp: new Date(), + payload: { + conversationId: conversation.id, + fromAgentId: input.fromAgentId, + toAgentId, + }, + }); + + return conversation; + }), + + getPendingConversations: publicProcedure + .input(z.object({ + agentId: z.string().min(1), + })) + .query(async ({ ctx, input }) => { + const repo = requireConversationRepository(ctx); + return repo.findPendingForAgent(input.agentId); + }), + + getConversation: publicProcedure + .input(z.object({ + id: z.string().min(1), + })) + .query(async ({ ctx, input }) => { + const repo = requireConversationRepository(ctx); + return repo.findById(input.id); + }), + + answerConversation: publicProcedure + .input(z.object({ + id: z.string().min(1), + answer: z.string().min(1), + })) + .mutation(async ({ ctx, input }) => { + const repo = requireConversationRepository(ctx); + const existing = await repo.findById(input.id); + if (!existing) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: `Conversation '${input.id}' not found`, + }); + } + if (existing.status === 'answered') { + throw new TRPCError({ + code: 'BAD_REQUEST', + message: `Conversation '${input.id}' is already answered`, + }); + } + + const updated = await repo.answer(input.id, input.answer); + + ctx.eventBus.emit({ + type: 'conversation:answered' as const, + timestamp: new Date(), + payload: { + conversationId: input.id, + fromAgentId: existing.fromAgentId, + toAgentId: existing.toAgentId, + }, + }); + + return updated; + }), + }; +}