feat: Add inter-agent conversation system (listen, ask, answer)

Enables parallel agents to communicate through a CLI-based conversation
mechanism coordinated via tRPC. Agents can ask questions to peers and
receive answers, with target resolution by agent ID, task ID, or phase ID.
This commit is contained in:
Lukas May
2026-02-10 13:43:30 +01:00
parent 270a5cb21d
commit a6371e156a
29 changed files with 632 additions and 46 deletions

View File

@@ -24,7 +24,7 @@
| `accounts/` | Account discovery, config dir setup, credential management, usage API |
| `credentials/` | `AccountCredentialManager` — credential injection per account |
| `lifecycle/` | `LifecycleController` — retry policy, signal recovery, missing signal instructions |
| `prompts/` | Mode-specific prompt builders (execute, discuss, plan, detail, refine) |
| `prompts/` | Mode-specific prompt builders (execute, discuss, plan, detail, refine) + shared inter-agent communication instructions |
## Key Flows
@@ -124,3 +124,22 @@ Agent output is persisted to `agent_log_chunks` table and drives all live stream
- Read path (`getAgentOutput` tRPC): concatenates all DB chunks (no file fallback)
- Live path (`onAgentOutput` subscription): listens for `agent:output` events
- Frontend: initial query loads from DB, subscription accumulates raw JSONL, both parsed via `parseAgentOutput()`
## Inter-Agent Communication
Agents can communicate with each other via the `conversations` table, coordinated through CLI commands.
### Prompt Integration
`INTER_AGENT_COMMUNICATION` constant in `prompts/shared.ts` is appended to all 5 agent mode prompts. It instructs agents to:
1. Start `cw listen --agent-id <ID> &` as a background process at session start
2. Handle incoming questions by answering via `cw answer` and restarting the listener
3. Ask questions to peers via `cw ask --from <ID> --agent-id|--phase-id|--task-id`
4. Kill the listener before writing `signal.json`
### Agent Identity
`manifest.json` now includes `agentId` and `agentName` fields. The manager passes these from the DB record after agent creation.
### CLI Commands
- `cw listen`: Polls `getPendingConversations`, prints first pending as JSON, exits
- `cw ask`: Creates conversation, polls `getConversation` until answered, prints answer
- `cw answer`: Calls `answerConversation`, prints confirmation JSON

View File

@@ -107,6 +107,15 @@ Uses **Commander.js** for command parsing.
| `list [--initiative <id>]` | List active previews |
| `status <previewId>` | Get preview status with service details |
### Inter-Agent Conversation (`cw listen`, `cw ask`, `cw answer`)
| Command | Description |
|---------|-------------|
| `listen --agent-id <id> [--poll-interval <ms>] [--timeout <ms>]` | Poll for pending questions, print JSON, exit |
| `ask <question> --from <agentId> [--agent-id\|--phase-id\|--task-id <target>] [--poll-interval <ms>] [--timeout <ms>]` | Ask question, block until answered, print answer |
| `answer <answer> --conversation-id <id>` | Answer a pending conversation |
All three commands output JSON for programmatic agent consumption.
### Accounts (`cw account`)
| Command | Description |
|---------|-------------|

View File

@@ -143,9 +143,29 @@ Self-referencing (parentMessageId) for threading. Sender/recipient types: 'agent
Index on `agentId` for fast queries.
### conversations
Inter-agent communication records.
| Column | Type | Notes |
|--------|------|-------|
| id | text PK | |
| fromAgentId | text NOT NULL | FK→agents ON DELETE CASCADE |
| toAgentId | text NOT NULL | FK→agents ON DELETE CASCADE |
| initiativeId | text | FK→initiatives ON DELETE SET NULL |
| phaseId | text | FK→phases ON DELETE SET NULL |
| taskId | text | FK→tasks ON DELETE SET NULL |
| question | text NOT NULL | |
| answer | text | nullable until answered |
| status | text enum | pending, answered |
| createdAt | integer/timestamp | |
| updatedAt | integer/timestamp | |
Indexes: `(toAgentId, status)` for listen polling, `(fromAgentId)`.
## Repository Interfaces
10 repositories, each with standard CRUD plus domain-specific methods:
11 repositories, each with standard CRUD plus domain-specific methods:
| Repository | Key Methods |
|-----------|-------------|
@@ -159,6 +179,7 @@ Index on `agentId` for fast queries.
| AccountRepository | + findNextAvailable (round-robin), markExhausted, clearExpiredExhaustion |
| ProposalRepository | + findByAgentIdAndStatus, updateManyByAgentId, countByAgentIdAndStatus |
| LogChunkRepository | insertChunk, findByAgentId, deleteByAgentId, getSessionCount |
| ConversationRepository | create, findById, findPendingForAgent, answer |
## Migrations
@@ -170,4 +191,4 @@ Key rules:
- See [database-migrations.md](database-migrations.md) for full workflow
- Snapshots stale after 0008; migrations 0008+ are hand-written
Current migrations: 0000 through 0018 (19 total).
Current migrations: 0000 through 0024 (25 total).

View File

@@ -25,6 +25,7 @@
| **Worktree** | `worktree:created`, `worktree:removed`, `worktree:merged`, `worktree:conflict` | 4 |
| **Account** | `account:credentials_refreshed`, `account:credentials_expired`, `account:credentials_validated` | 3 |
| **Preview** | `preview:building`, `preview:ready`, `preview:stopped`, `preview:failed` | 4 |
| **Conversation** | `conversation:created`, `conversation:answered` | 2 |
| **Log** | `log:entry` | 1 |
### Key Event Payloads

View File

@@ -212,3 +212,18 @@ Docker-based preview deployments. No database table — Docker is the source of
| `getPreviewStatus` | query | Get preview status: `{previewId}` → PreviewStatus |
Context dependency: `requirePreviewManager(ctx)` — requires `PreviewManager` from container.
## Conversation Procedures
Inter-agent communication for parallel agents.
| Procedure | Type | Description |
|-----------|------|-------------|
| `createConversation` | mutation | Ask a question: `{fromAgentId, toAgentId?, phaseId?, taskId?, question}` → Conversation |
| `getPendingConversations` | query | Poll for incoming questions: `{agentId}` → Conversation[] |
| `getConversation` | query | Get conversation by ID: `{id}` → Conversation |
| `answerConversation` | mutation | Answer a conversation: `{id, answer}` → Conversation |
Target resolution: `toAgentId` → direct; `taskId` → find running agent by task; `phaseId` → find running agent by any task in phase.
Context dependency: `requireConversationRepository(ctx)`, `requireAgentManager(ctx)`.

View File

@@ -0,0 +1,18 @@
-- Inter-agent conversations
CREATE TABLE `conversations` (
`id` text PRIMARY KEY NOT NULL,
`from_agent_id` text NOT NULL REFERENCES `agents`(`id`) ON DELETE CASCADE,
`to_agent_id` text NOT NULL REFERENCES `agents`(`id`) ON DELETE CASCADE,
`initiative_id` text REFERENCES `initiatives`(`id`) ON DELETE SET NULL,
`phase_id` text REFERENCES `phases`(`id`) ON DELETE SET NULL,
`task_id` text REFERENCES `tasks`(`id`) ON DELETE SET NULL,
`question` text NOT NULL,
`answer` text,
`status` text DEFAULT 'pending' NOT NULL,
`created_at` integer NOT NULL,
`updated_at` integer NOT NULL
);
--> statement-breakpoint
CREATE INDEX `conversations_to_agent_status_idx` ON `conversations` (`to_agent_id`, `status`);
--> statement-breakpoint
CREATE INDEX `conversations_from_agent_idx` ON `conversations` (`from_agent_id`);

View File

@@ -206,43 +206,50 @@ export class CleanupManager {
}
/**
* Check if all project worktrees for an agent are clean (no uncommitted/untracked files).
* Get the relative subdirectory names of dirty worktrees for an agent.
* Returns an empty array if all worktrees are clean or the workdir doesn't exist.
*/
async isWorkdirClean(alias: string, initiativeId: string | null): Promise<boolean> {
async getDirtyWorktreePaths(alias: string, initiativeId: string | null): Promise<string[]> {
const agentWorkdir = this.getAgentWorkdir(alias);
try {
await readdir(agentWorkdir);
} catch {
// Workdir doesn't exist — treat as clean
return true;
return [];
}
const worktreePaths: string[] = [];
const worktreePaths: { absPath: string; name: string }[] = [];
if (initiativeId) {
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
for (const project of projects) {
worktreePaths.push(join(agentWorkdir, project.name));
worktreePaths.push({ absPath: join(agentWorkdir, project.name), name: project.name });
}
} else {
worktreePaths.push(join(agentWorkdir, 'workspace'));
worktreePaths.push({ absPath: join(agentWorkdir, 'workspace'), name: 'workspace' });
}
for (const wtPath of worktreePaths) {
const dirty: string[] = [];
for (const { absPath, name } of worktreePaths) {
try {
const { stdout } = await execFileAsync('git', ['status', '--porcelain'], { cwd: wtPath });
if (stdout.trim().length > 0) {
log.info({ alias, worktree: wtPath }, 'workdir has uncommitted changes');
return false;
}
} catch (err) {
log.warn({ alias, worktree: wtPath, err: err instanceof Error ? err.message : String(err) }, 'git status failed, treating as dirty');
return false;
const { stdout } = await execFileAsync('git', ['status', '--porcelain'], { cwd: absPath });
if (stdout.trim().length > 0) dirty.push(name);
} catch {
dirty.push(name);
}
}
return dirty;
}
return true;
/**
* Check if all project worktrees for an agent are clean (no uncommitted/untracked files).
*/
async isWorkdirClean(alias: string, initiativeId: string | null): Promise<boolean> {
const dirty = await this.getDirtyWorktreePaths(alias, initiativeId);
if (dirty.length > 0) {
log.info({ alias, dirtyWorktrees: dirty }, 'workdir has uncommitted changes');
}
return dirty.length === 0;
}
/**

View File

@@ -259,7 +259,12 @@ export function writeInputFiles(options: WriteInputFilesOptions): void {
// Write manifest listing exactly which files were created
writeFileSync(
join(inputDir, 'manifest.json'),
JSON.stringify({ files: manifestFiles, contextFiles }) + '\n',
JSON.stringify({
files: manifestFiles,
contextFiles,
agentId: options.agentId ?? null,
agentName: options.agentName ?? null,
}) + '\n',
'utf-8',
);
}

View File

@@ -255,13 +255,7 @@ export class MultiProviderAgentManager implements AgentManager {
initiativeBasedAgent: !!initiativeId
}, 'agent workdir setup completed');
// 2b. Write input files
if (options.inputContext) {
writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext });
log.debug({ alias }, 'input files written');
}
// 2c. Append workspace layout to prompt now that worktrees exist
// 2b. Append workspace layout to prompt now that worktrees exist
const workspaceSection = buildWorkspaceLayout(agentCwd);
if (workspaceSection) {
prompt = prompt + workspaceSection;
@@ -281,6 +275,12 @@ export class MultiProviderAgentManager implements AgentManager {
});
const agentId = agent.id;
// 3b. Write input files (after agent creation so we can include agentId/agentName)
if (options.inputContext) {
writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext, agentId, agentName: alias });
log.debug({ alias }, 'input files written');
}
// 4. Build spawn command
const { command, args, env: providerEnv } = this.processManager.buildSpawnCommand(provider, prompt);
const finalCwd = cwd ?? agentCwd;
@@ -418,7 +418,6 @@ export class MultiProviderAgentManager implements AgentManager {
}
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'auto-cleanup failed');
} finally {
this.commitRetryCount.delete(agentId);
}
}
@@ -434,10 +433,16 @@ export class MultiProviderAgentManager implements AgentManager {
const provider = getProvider(agent.provider);
if (!provider || provider.resumeStyle === 'none') return false;
// Check which specific worktrees are dirty — skip resume if all clean
const dirtyPaths = await this.cleanupManager.getDirtyWorktreePaths(agent.name, agent.initiativeId);
if (dirtyPaths.length === 0) return false;
const dirtyList = dirtyPaths.map(p => `- \`${p}/\``).join('\n');
const commitPrompt =
'You have uncommitted changes in your working directory. ' +
'Stage everything with `git add -A` and commit with an appropriate message describing your work. ' +
'Do not make any other changes.';
'You have uncommitted changes in the following project directories:\n' +
dirtyList + '\n\n' +
'For each directory listed above, `cd` into it, then run `git add -A && git commit -m "<message>"` ' +
'with an appropriate commit message describing the work. Do not make any other changes.';
await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null });

View File

@@ -2,7 +2,7 @@
* Detail mode prompt — break a phase into executable tasks.
*/
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
export function buildDetailPrompt(): string {
return `You are an Architect agent in the Codewalk multi-agent system operating in DETAIL mode.
@@ -41,5 +41,6 @@ ${ID_GENERATION}
- Break work into 3-8 tasks per phase
- Order tasks logically (foundational work first)
- Make each task self-contained with enough context
- Include test/verify tasks where appropriate`;
- Include test/verify tasks where appropriate
${INTER_AGENT_COMMUNICATION}`;
}

View File

@@ -2,7 +2,7 @@
* Discuss mode prompt — clarifying questions and decision capture.
*/
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
export function buildDiscussPrompt(): string {
return `You are an Architect agent in the Codewalk multi-agent system operating in DISCUSS mode.
@@ -30,5 +30,6 @@ ${ID_GENERATION}
- Ask 2-4 questions at a time, not more
- Provide options when choices are clear
- Capture every decision with rationale
- Don't proceed until ambiguities are resolved`;
- Don't proceed until ambiguities are resolved
${INTER_AGENT_COMMUNICATION}`;
}

View File

@@ -2,7 +2,7 @@
* Execute mode prompt — standard worker agent.
*/
import { INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
import { INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
export function buildExecutePrompt(): string {
return `You are a Worker agent in the Codewalk multi-agent system.
@@ -16,5 +16,6 @@ ${SIGNAL_FORMAT}
- Complete the task as specified in .cw/input/task.md
- Ask questions if requirements are unclear
- Report errors honestly — don't guess
- Focus on writing clean, tested code`;
- Focus on writing clean, tested code
${INTER_AGENT_COMMUNICATION}`;
}

View File

@@ -5,7 +5,7 @@
* input files, ID generation) are in shared.ts.
*/
export { SIGNAL_FORMAT, INPUT_FILES, ID_GENERATION } from './shared.js';
export { SIGNAL_FORMAT, INPUT_FILES, ID_GENERATION, INTER_AGENT_COMMUNICATION } from './shared.js';
export { buildExecutePrompt } from './execute.js';
export { buildDiscussPrompt } from './discuss.js';
export { buildPlanPrompt } from './plan.js';

View File

@@ -2,7 +2,7 @@
* Plan mode prompt — plan initiative into phases.
*/
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
import { ID_GENERATION, INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
export function buildPlanPrompt(): string {
return `You are an Architect agent in the Codewalk multi-agent system operating in PLAN mode.
@@ -36,5 +36,6 @@ ${ID_GENERATION}
- Start with foundation/infrastructure phases
- Group related work together
- Make dependencies explicit using phase IDs
- Each task should be completable in one session`;
- Each task should be completable in one session
${INTER_AGENT_COMMUNICATION}`;
}

View File

@@ -2,7 +2,7 @@
* Refine mode prompt — review and propose edits to initiative pages.
*/
import { INPUT_FILES, SIGNAL_FORMAT } from './shared.js';
import { INPUT_FILES, SIGNAL_FORMAT, INTER_AGENT_COMMUNICATION } from './shared.js';
export function buildRefinePrompt(): string {
return `You are an Architect agent in the Codewalk multi-agent system operating in REFINE mode.
@@ -24,5 +24,6 @@ Write one file per modified page to \`.cw/output/pages/{pageId}.md\`:
- Each output page's body is the FULL new content (not a diff)
- Preserve [[page:\$id|title]] cross-references in your output
- Focus on clarity, completeness, and consistency
- Do not invent new page IDs — only reference existing ones from .cw/input/pages/`;
- Do not invent new page IDs — only reference existing ones from .cw/input/pages/
${INTER_AGENT_COMMUNICATION}`;
}

View File

@@ -43,3 +43,57 @@ When creating new entities (phases, tasks, decisions), generate a unique ID by r
cw id
\`\`\`
Use the output as the filename (e.g., \`{id}.md\`).`;
export const INTER_AGENT_COMMUNICATION = `
## Inter-Agent Communication
You are working in a multi-agent parallel environment. Other agents may be working on related tasks simultaneously.
### Your Identity
Read \`.cw/input/manifest.json\` — it contains \`agentId\` and \`agentName\` fields identifying you.
### Listening for Questions
At the START of your session, start a background listener:
\`\`\`bash
cw listen --agent-id <YOUR_AGENT_ID> &
LISTEN_PID=$!
\`\`\`
When the listener prints JSON to stdout, another agent is asking you a question:
\`\`\`json
{ "conversationId": "...", "fromAgentId": "...", "question": "..." }
\`\`\`
Answer it:
\`\`\`bash
cw answer "<your answer>" --conversation-id <conversationId>
\`\`\`
Then restart the listener:
\`\`\`bash
cw listen --agent-id <YOUR_AGENT_ID> &
LISTEN_PID=$!
\`\`\`
### Asking Questions
To ask another agent a question (blocks until answered):
\`\`\`bash
cw ask "What interface does the user service expose?" --from <YOUR_AGENT_ID> --agent-id <TARGET_AGENT_ID>
\`\`\`
You can also target by task or phase:
\`\`\`bash
cw ask "What port does the API run on?" --from <YOUR_AGENT_ID> --task-id <TASK_ID>
cw ask "What schema are you using?" --from <YOUR_AGENT_ID> --phase-id <PHASE_ID>
\`\`\`
### When to Communicate
- You need interface/schema/contract info from another agent's work
- You're about to modify a shared resource and want to coordinate
- You have a dependency on work another agent is doing
### Cleanup
Before writing \`.cw/output/signal.json\`, kill your listener:
\`\`\`bash
kill $LISTEN_PID 2>/dev/null
\`\`\``;

View File

@@ -29,6 +29,10 @@ export interface AgentInputContext {
phases?: Array<import('../db/schema.js').Phase & { dependsOn?: string[] }>;
/** All tasks for the initiative (read-only context for agents) */
tasks?: import('../db/schema.js').Task[];
/** Agent ID for inter-agent communication */
agentId?: string;
/** Agent name for inter-agent communication */
agentName?: string;
}
/**

View File

@@ -1362,6 +1362,125 @@ export function createCli(serverHandler?: (port?: number) => Promise<void>): Com
}
});
// =========================================================================
// Inter-agent conversation commands
// =========================================================================
// cw listen --agent-id <id> [--poll-interval <ms>] [--timeout <ms>]
program
.command('listen')
.description('Poll for pending conversations addressed to an agent')
.requiredOption('--agent-id <id>', 'Agent ID to listen for')
.option('--poll-interval <ms>', 'Poll interval in milliseconds', '2000')
.option('--timeout <ms>', 'Timeout in milliseconds (0=forever)', '0')
.action(async (options: { agentId: string; pollInterval: string; timeout: string }) => {
try {
const client = createDefaultTrpcClient();
const pollInterval = parseInt(options.pollInterval, 10);
const timeout = parseInt(options.timeout, 10);
const startTime = Date.now();
// eslint-disable-next-line no-constant-condition
while (true) {
const pending = await client.getPendingConversations.query({ agentId: options.agentId });
if (pending.length > 0) {
const conv = pending[0];
console.log(JSON.stringify({
conversationId: conv.id,
fromAgentId: conv.fromAgentId,
question: conv.question,
phaseId: conv.phaseId,
taskId: conv.taskId,
}));
process.exit(0);
}
if (timeout > 0 && Date.now() - startTime >= timeout) {
process.exit(1);
}
await new Promise(resolve => setTimeout(resolve, pollInterval));
}
} catch (error) {
console.error('Failed to listen:', (error as Error).message);
process.exit(1);
}
});
// cw ask <question> --from <agentId> [--agent-id|--phase-id|--task-id <target>] [--poll-interval <ms>] [--timeout <ms>]
program
.command('ask <question>')
.description('Ask a question to another agent and wait for the answer')
.requiredOption('--from <agentId>', 'Your agent ID (the asker)')
.option('--agent-id <id>', 'Target agent ID')
.option('--phase-id <id>', 'Target phase ID (find running agent in phase)')
.option('--task-id <id>', 'Target task ID (find running agent for task)')
.option('--poll-interval <ms>', 'Poll interval in milliseconds', '2000')
.option('--timeout <ms>', 'Timeout in milliseconds (0=forever)', '0')
.action(async (question: string, options: {
from: string;
agentId?: string;
phaseId?: string;
taskId?: string;
pollInterval: string;
timeout: string;
}) => {
try {
const client = createDefaultTrpcClient();
const pollInterval = parseInt(options.pollInterval, 10);
const timeout = parseInt(options.timeout, 10);
// Create the conversation
const conversation = await client.createConversation.mutate({
fromAgentId: options.from,
toAgentId: options.agentId,
phaseId: options.phaseId,
taskId: options.taskId,
question,
});
// Poll for answer
const startTime = Date.now();
// eslint-disable-next-line no-constant-condition
while (true) {
const conv = await client.getConversation.query({ id: conversation.id });
if (conv && conv.status === 'answered') {
console.log(conv.answer);
process.exit(0);
}
if (timeout > 0 && Date.now() - startTime >= timeout) {
console.error('Timed out waiting for answer');
process.exit(1);
}
await new Promise(resolve => setTimeout(resolve, pollInterval));
}
} catch (error) {
console.error('Failed to ask:', (error as Error).message);
process.exit(1);
}
});
// cw answer <answer> --conversation-id <id>
program
.command('answer <answer>')
.description('Answer a pending conversation')
.requiredOption('--conversation-id <id>', 'Conversation ID to answer')
.action(async (answer: string, options: { conversationId: string }) => {
try {
const client = createDefaultTrpcClient();
await client.answerConversation.mutate({
id: options.conversationId,
answer,
});
console.log(JSON.stringify({ conversationId: options.conversationId, status: 'answered' }));
} catch (error) {
console.error('Failed to answer:', (error as Error).message);
process.exit(1);
}
});
return program;
}

View File

@@ -19,6 +19,7 @@ import {
DrizzleAccountRepository,
DrizzleChangeSetRepository,
DrizzleLogChunkRepository,
DrizzleConversationRepository,
} from './db/index.js';
import type { InitiativeRepository } from './db/repositories/initiative-repository.js';
import type { PhaseRepository } from './db/repositories/phase-repository.js';
@@ -30,6 +31,7 @@ import type { ProjectRepository } from './db/repositories/project-repository.js'
import type { AccountRepository } from './db/repositories/account-repository.js';
import type { ChangeSetRepository } from './db/repositories/change-set-repository.js';
import type { LogChunkRepository } from './db/repositories/log-chunk-repository.js';
import type { ConversationRepository } from './db/repositories/conversation-repository.js';
import type { EventBus } from './events/index.js';
import { createEventBus } from './events/index.js';
import { ProcessManager, ProcessRegistry } from './process/index.js';
@@ -54,7 +56,7 @@ import type { ServerContextDeps } from './server/index.js';
// =============================================================================
/**
* All 10 repository ports.
* All 11 repository ports.
*/
export interface Repositories {
initiativeRepository: InitiativeRepository;
@@ -67,10 +69,11 @@ export interface Repositories {
accountRepository: AccountRepository;
changeSetRepository: ChangeSetRepository;
logChunkRepository: LogChunkRepository;
conversationRepository: ConversationRepository;
}
/**
* Create all 10 Drizzle repository adapters from a database instance.
* Create all 11 Drizzle repository adapters from a database instance.
* Reusable by both the production server and the test harness.
*/
export function createRepositories(db: DrizzleDatabase): Repositories {
@@ -85,6 +88,7 @@ export function createRepositories(db: DrizzleDatabase): Repositories {
accountRepository: new DrizzleAccountRepository(db),
changeSetRepository: new DrizzleChangeSetRepository(db),
logChunkRepository: new DrizzleLogChunkRepository(db),
conversationRepository: new DrizzleConversationRepository(db),
};
}

View File

@@ -0,0 +1,23 @@
/**
* Conversation Repository Port Interface
*
* Port for inter-agent conversation persistence operations.
*/
import type { Conversation } from '../schema.js';
export interface CreateConversationData {
fromAgentId: string;
toAgentId: string;
initiativeId?: string | null;
phaseId?: string | null;
taskId?: string | null;
question: string;
}
export interface ConversationRepository {
create(data: CreateConversationData): Promise<Conversation>;
findById(id: string): Promise<Conversation | null>;
findPendingForAgent(toAgentId: string): Promise<Conversation[]>;
answer(id: string, answer: string): Promise<Conversation | null>;
}

View File

@@ -0,0 +1,67 @@
/**
* Drizzle Conversation Repository Adapter
*
* Implements ConversationRepository interface using Drizzle ORM.
*/
import { eq, and, asc } from 'drizzle-orm';
import { nanoid } from 'nanoid';
import type { DrizzleDatabase } from '../../index.js';
import { conversations, type Conversation } from '../../schema.js';
import type { ConversationRepository, CreateConversationData } from '../conversation-repository.js';
export class DrizzleConversationRepository implements ConversationRepository {
constructor(private db: DrizzleDatabase) {}
async create(data: CreateConversationData): Promise<Conversation> {
const now = new Date();
const id = nanoid();
await this.db.insert(conversations).values({
id,
fromAgentId: data.fromAgentId,
toAgentId: data.toAgentId,
initiativeId: data.initiativeId ?? null,
phaseId: data.phaseId ?? null,
taskId: data.taskId ?? null,
question: data.question,
status: 'pending',
createdAt: now,
updatedAt: now,
});
return this.findById(id) as Promise<Conversation>;
}
async findById(id: string): Promise<Conversation | null> {
const rows = await this.db
.select()
.from(conversations)
.where(eq(conversations.id, id))
.limit(1);
return rows[0] ?? null;
}
async findPendingForAgent(toAgentId: string): Promise<Conversation[]> {
return this.db
.select()
.from(conversations)
.where(
and(
eq(conversations.toAgentId, toAgentId),
eq(conversations.status, 'pending' as 'pending' | 'answered'),
),
)
.orderBy(asc(conversations.createdAt));
}
async answer(id: string, answer: string): Promise<Conversation | null> {
await this.db
.update(conversations)
.set({
answer,
status: 'answered' as 'pending' | 'answered',
updatedAt: new Date(),
})
.where(eq(conversations.id, id));
return this.findById(id);
}
}

View File

@@ -15,3 +15,4 @@ export { DrizzleProjectRepository } from './project.js';
export { DrizzleAccountRepository } from './account.js';
export { DrizzleChangeSetRepository } from './change-set.js';
export { DrizzleLogChunkRepository } from './log-chunk.js';
export { DrizzleConversationRepository } from './conversation.js';

View File

@@ -67,3 +67,8 @@ export type {
export type {
LogChunkRepository,
} from './log-chunk-repository.js';
export type {
ConversationRepository,
CreateConversationData,
} from './conversation-repository.js';

View File

@@ -511,3 +511,27 @@ export const agentLogChunks = sqliteTable('agent_log_chunks', {
export type AgentLogChunk = InferSelectModel<typeof agentLogChunks>;
export type NewAgentLogChunk = InferInsertModel<typeof agentLogChunks>;
// ============================================================================
// CONVERSATIONS (inter-agent communication)
// ============================================================================
export const conversations = sqliteTable('conversations', {
id: text('id').primaryKey(),
fromAgentId: text('from_agent_id').notNull().references(() => agents.id, { onDelete: 'cascade' }),
toAgentId: text('to_agent_id').notNull().references(() => agents.id, { onDelete: 'cascade' }),
initiativeId: text('initiative_id').references(() => initiatives.id, { onDelete: 'set null' }),
phaseId: text('phase_id').references(() => phases.id, { onDelete: 'set null' }),
taskId: text('task_id').references(() => tasks.id, { onDelete: 'set null' }),
question: text('question').notNull(),
answer: text('answer'),
status: text('status', { enum: ['pending', 'answered'] }).notNull().default('pending'),
createdAt: integer('created_at', { mode: 'timestamp' }).notNull(),
updatedAt: integer('updated_at', { mode: 'timestamp' }).notNull(),
}, (table) => [
index('conversations_to_agent_status_idx').on(table.toAgentId, table.status),
index('conversations_from_agent_idx').on(table.fromAgentId),
]);
export type Conversation = InferSelectModel<typeof conversations>;
export type NewConversation = InferInsertModel<typeof conversations>;

View File

@@ -520,6 +520,28 @@ export interface AccountCredentialsValidatedEvent extends DomainEvent {
};
}
/**
* Inter-Agent Conversation Events
*/
export interface ConversationCreatedEvent extends DomainEvent {
type: 'conversation:created';
payload: {
conversationId: string;
fromAgentId: string;
toAgentId: string;
};
}
export interface ConversationAnsweredEvent extends DomainEvent {
type: 'conversation:answered';
payload: {
conversationId: string;
fromAgentId: string;
toAgentId: string;
};
}
/**
* Union of all domain events - enables type-safe event handling
*/
@@ -569,7 +591,9 @@ export type DomainEventMap =
| PreviewBuildingEvent
| PreviewReadyEvent
| PreviewStoppedEvent
| PreviewFailedEvent;
| PreviewFailedEvent
| ConversationCreatedEvent
| ConversationAnsweredEvent;
/**
* Event type literal union for type checking

View File

@@ -16,6 +16,7 @@ import type { ProjectRepository } from '../db/repositories/project-repository.js
import type { AccountRepository } from '../db/repositories/account-repository.js';
import type { ChangeSetRepository } from '../db/repositories/change-set-repository.js';
import type { LogChunkRepository } from '../db/repositories/log-chunk-repository.js';
import type { ConversationRepository } from '../db/repositories/conversation-repository.js';
import type { AccountCredentialManager } from '../agent/credentials/types.js';
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
import type { CoordinationManager } from '../coordination/types.js';
@@ -70,6 +71,8 @@ export interface TRPCContext {
executionOrchestrator?: ExecutionOrchestrator;
/** Preview manager for Docker-based preview deployments */
previewManager?: PreviewManager;
/** Conversation repository for inter-agent communication */
conversationRepository?: ConversationRepository;
/** Absolute path to the workspace root (.cwrc directory) */
workspaceRoot?: string;
}
@@ -98,6 +101,7 @@ export interface CreateContextOptions {
branchManager?: BranchManager;
executionOrchestrator?: ExecutionOrchestrator;
previewManager?: PreviewManager;
conversationRepository?: ConversationRepository;
workspaceRoot?: string;
}
@@ -129,6 +133,7 @@ export function createContext(options: CreateContextOptions): TRPCContext {
branchManager: options.branchManager,
executionOrchestrator: options.executionOrchestrator,
previewManager: options.previewManager,
conversationRepository: options.conversationRepository,
workspaceRoot: options.workspaceRoot,
};
}

View File

@@ -22,6 +22,7 @@ import { accountProcedures } from './routers/account.js';
import { changeSetProcedures } from './routers/change-set.js';
import { subscriptionProcedures } from './routers/subscription.js';
import { previewProcedures } from './routers/preview.js';
import { conversationProcedures } from './routers/conversation.js';
// Re-export tRPC primitives (preserves existing import paths)
export { router, publicProcedure, middleware, createCallerFactory } from './trpc.js';
@@ -59,6 +60,7 @@ export const appRouter = router({
...changeSetProcedures(publicProcedure),
...subscriptionProcedures(publicProcedure),
...previewProcedures(publicProcedure),
...conversationProcedures(publicProcedure),
});
export type AppRouter = typeof appRouter;

View File

@@ -16,6 +16,7 @@ import type { ProjectRepository } from '../../db/repositories/project-repository
import type { AccountRepository } from '../../db/repositories/account-repository.js';
import type { ChangeSetRepository } from '../../db/repositories/change-set-repository.js';
import type { LogChunkRepository } from '../../db/repositories/log-chunk-repository.js';
import type { ConversationRepository } from '../../db/repositories/conversation-repository.js';
import type { DispatchManager, PhaseDispatchManager } from '../../dispatch/types.js';
import type { CoordinationManager } from '../../coordination/types.js';
import type { BranchManager } from '../../git/branch-manager.js';
@@ -181,3 +182,13 @@ export function requirePreviewManager(ctx: TRPCContext): PreviewManager {
}
return ctx.previewManager;
}
export function requireConversationRepository(ctx: TRPCContext): ConversationRepository {
if (!ctx.conversationRepository) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Conversation repository not available',
});
}
return ctx.conversationRepository;
}

View File

@@ -0,0 +1,138 @@
/**
* Conversation Router — inter-agent communication procedures
*/
import { TRPCError } from '@trpc/server';
import { z } from 'zod';
import type { ProcedureBuilder } from '../trpc.js';
import { requireConversationRepository, requireAgentManager, requireTaskRepository } from './_helpers.js';
export function conversationProcedures(publicProcedure: ProcedureBuilder) {
return {
createConversation: publicProcedure
.input(z.object({
fromAgentId: z.string().min(1),
toAgentId: z.string().min(1).optional(),
phaseId: z.string().min(1).optional(),
taskId: z.string().min(1).optional(),
question: z.string().min(1),
}))
.mutation(async ({ ctx, input }) => {
const repo = requireConversationRepository(ctx);
const agentManager = requireAgentManager(ctx);
let toAgentId = input.toAgentId;
// Resolve target agent from taskId
if (!toAgentId && input.taskId) {
const agents = await agentManager.list();
const match = agents.find(a => a.taskId === input.taskId && a.status === 'running');
if (!match) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `No running agent found for task '${input.taskId}'`,
});
}
toAgentId = match.id;
}
// Resolve target agent from phaseId
if (!toAgentId && input.phaseId) {
const taskRepo = requireTaskRepository(ctx);
const tasks = await taskRepo.findByPhaseId(input.phaseId);
const taskIds = new Set(tasks.map(t => t.id));
const agents = await agentManager.list();
const match = agents.find(a => a.taskId && taskIds.has(a.taskId) && a.status === 'running');
if (!match) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `No running agent found for phase '${input.phaseId}'`,
});
}
toAgentId = match.id;
}
if (!toAgentId) {
throw new TRPCError({
code: 'BAD_REQUEST',
message: 'Must provide toAgentId, taskId, or phaseId to identify target agent',
});
}
const conversation = await repo.create({
fromAgentId: input.fromAgentId,
toAgentId,
initiativeId: null,
phaseId: input.phaseId ?? null,
taskId: input.taskId ?? null,
question: input.question,
});
ctx.eventBus.emit({
type: 'conversation:created' as const,
timestamp: new Date(),
payload: {
conversationId: conversation.id,
fromAgentId: input.fromAgentId,
toAgentId,
},
});
return conversation;
}),
getPendingConversations: publicProcedure
.input(z.object({
agentId: z.string().min(1),
}))
.query(async ({ ctx, input }) => {
const repo = requireConversationRepository(ctx);
return repo.findPendingForAgent(input.agentId);
}),
getConversation: publicProcedure
.input(z.object({
id: z.string().min(1),
}))
.query(async ({ ctx, input }) => {
const repo = requireConversationRepository(ctx);
return repo.findById(input.id);
}),
answerConversation: publicProcedure
.input(z.object({
id: z.string().min(1),
answer: z.string().min(1),
}))
.mutation(async ({ ctx, input }) => {
const repo = requireConversationRepository(ctx);
const existing = await repo.findById(input.id);
if (!existing) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Conversation '${input.id}' not found`,
});
}
if (existing.status === 'answered') {
throw new TRPCError({
code: 'BAD_REQUEST',
message: `Conversation '${input.id}' is already answered`,
});
}
const updated = await repo.answer(input.id, input.answer);
ctx.eventBus.emit({
type: 'conversation:answered' as const,
timestamp: new Date(),
payload: {
conversationId: input.id,
fromAgentId: existing.fromAgentId,
toAgentId: existing.toAgentId,
},
});
return updated;
}),
};
}