diff --git a/apps/server/agent/manager.ts b/apps/server/agent/manager.ts index 12e6f8d..ac2bf70 100644 --- a/apps/server/agent/manager.ts +++ b/apps/server/agent/manager.ts @@ -277,7 +277,7 @@ export class MultiProviderAgentManager implements AgentManager { const agentId = agent.id; // 3a. Append inter-agent communication instructions with actual agent ID - prompt = prompt + buildInterAgentCommunication(agentId); + prompt = prompt + buildInterAgentCommunication(agentId, mode); // 3b. Write input files (after agent creation so we can include agentId/agentName) if (options.inputContext) { @@ -487,6 +487,117 @@ export class MultiProviderAgentManager implements AgentManager { return true; } + /** + * Resume an idle agent to answer an inter-agent conversation. + * Returns false if the agent can't be resumed (no session, provider doesn't support resume, etc.). + */ + private conversationResumeLocks = new Set(); + + async resumeForConversation( + agentId: string, + conversationId: string, + question: string, + fromAgentId: string, + ): Promise { + // Concurrency guard — prevent double-resume race + if (this.conversationResumeLocks.has(agentId)) { + log.info({ agentId, conversationId }, 'conversation resume already in progress, skipping'); + return false; + } + + const agent = await this.repository.findById(agentId); + if (!agent) return false; + if (agent.status !== 'idle') { + log.debug({ agentId, status: agent.status }, 'agent not idle, skipping conversation resume'); + return false; + } + if (!agent.sessionId) { + log.debug({ agentId }, 'no session ID, cannot resume for conversation'); + return false; + } + + const provider = getProvider(agent.provider); + if (!provider || provider.resumeStyle === 'none') { + log.debug({ agentId, provider: agent.provider }, 'provider does not support resume'); + return false; + } + + const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId); + if (!existsSync(agentCwd)) { + log.debug({ agentId, agentCwd }, 'worktree no longer exists, cannot resume'); + return false; + } + + this.conversationResumeLocks.add(agentId); + try { + const conversationPrompt = + `Another agent (ID: ${fromAgentId}) asked you a question via inter-agent communication.\n\n` + + `**Conversation ID**: ${conversationId}\n` + + `**Question**: ${question}\n\n` + + `Please answer this question using:\n` + + ` cw answer "" --conversation-id ${conversationId}\n\n` + + `After answering, check for any other pending conversations:\n` + + ` cw listen --agent-id ${agentId}\n\n` + + `Answer any additional pending conversations the same way, then complete your session.`; + + // Clear previous signal.json + const signalPath = join(agentCwd, '.cw/output/signal.json'); + try { + await unlink(signalPath); + } catch { + // File might not exist + } + + await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null }); + + const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, conversationPrompt); + const { processEnv } = await this.credentialHandler.prepareProcessEnv(providerEnv, provider, agent.accountId); + + // Stop previous tailer/poll + const prevActive = this.activeAgents.get(agentId); + prevActive?.cancelPoll?.(); + if (prevActive?.tailer) { + await prevActive.tailer.stop(); + } + + let sessionNumber = 1; + if (this.logChunkRepository) { + sessionNumber = (await this.logChunkRepository.getSessionCount(agentId)) + 1; + } + + const { pid, outputFilePath, tailer } = this.processManager.spawnDetached( + agentId, agent.name, command, args, agentCwd, processEnv, provider.name, conversationPrompt, + (event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)), + this.createLogChunkCallback(agentId, agent.name, sessionNumber), + ); + + await this.repository.update(agentId, { pid, outputFilePath }); + const activeEntry: ActiveAgent = { agentId, pid, tailer, outputFilePath }; + this.activeAgents.set(agentId, activeEntry); + + if (this.eventBus) { + const event: AgentResumedEvent = { + type: 'agent:resumed', + timestamp: new Date(), + payload: { agentId, name: agent.name, taskId: agent.taskId ?? '', sessionId: agent.sessionId }, + }; + this.eventBus.emit(event); + } + + const { cancel } = this.processManager.pollForCompletion( + agentId, pid, + () => this.handleDetachedAgentCompletion(agentId), + () => this.activeAgents.get(agentId)?.tailer, + ); + activeEntry.cancelPoll = cancel; + + log.info({ agentId, conversationId, pid }, 'resumed idle agent for conversation'); + return true; + } finally { + this.conversationResumeLocks.delete(agentId); + } + } + /** * Sync credentials from agent's config dir back to DB after completion. * The subprocess may have refreshed tokens mid-session; this ensures diff --git a/apps/server/agent/prompts/shared.ts b/apps/server/agent/prompts/shared.ts index 71ad7e3..a23896f 100644 --- a/apps/server/agent/prompts/shared.ts +++ b/apps/server/agent/prompts/shared.ts @@ -119,25 +119,95 @@ Update \`.cw/output/progress.md\` after each commit: Survives context compaction — read this first if your context is refreshed. `; -export function buildInterAgentCommunication(agentId: string): string { +const PLANNING_MODES = new Set(['plan', 'refine']); + +export function buildInterAgentCommunication(agentId: string, mode: string = 'execute'): string { + if (PLANNING_MODES.has(mode)) { + return ` + +Your agent ID: **${agentId}** + +You are in a planning mode (\`${mode}\`). You define high-level structure, not implementation details. Real-time coordination is almost never needed. + +If you are truly blocked on information only another running agent has: +\`\`\` +cw ask "" --from ${agentId} --agent-id +\`\`\` +This blocks until the target answers. Use it as a last resort — not for approach validation. +`; + } + return ` Your agent ID: **${agentId}** -**CLI Commands** +## Commands -- \`cw listen --agent-id ${agentId}\` — Waits for incoming question. Prints JSON (\`{ conversationId, fromAgentId, question, phaseId, taskId }\`) and exits. -- \`cw ask "" --from ${agentId} --agent-id \` — Blocks until answered. Target with one of: \`--agent-id \`, \`--task-id \`, \`--phase-id \`. -- \`cw answer "" --conversation-id \` — Answer a pending question. +| Command | Behavior | +|---------|----------| +| \`cw listen --agent-id ${agentId}\` | Blocks via SSE until one question arrives. Prints JSON and exits. | +| \`cw ask "" --from ${agentId} --agent-id \` | Creates a conversation and blocks until the target answers. Prints the answer to stdout. | +| \`cw answer "" --conversation-id \` | Answers a pending question. Prints confirmation JSON. | -**Usage Pattern** +## Listener Lifecycle -Run \`cw listen > "$file" &\` at session start. Check periodically. On question: answer, restart listener. Before signal.json: kill listener, clean up. +Set up a background listener so you can answer questions from other agents while working. -**When to Communicate** -- Need interface/schema/API contract info from another agent -- About to modify a shared resource -- Have a dependency on another agent's work -- Don't ask questions you can answer by reading the codebase +\`\`\`bash +# 1. Start listener, redirect to temp file +CW_LISTEN_FILE=$(mktemp) +cw listen --agent-id ${agentId} > "$CW_LISTEN_FILE" & +CW_LISTEN_PID=$! + +# 2. Between work steps, check for incoming questions +if [ -s "$CW_LISTEN_FILE" ]; then + # 3. Parse the JSON, answer, clear, restart + CONV_ID=$(cat "$CW_LISTEN_FILE" | jq -r '.conversationId') + QUESTION=$(cat "$CW_LISTEN_FILE" | jq -r '.question') + # Read code / think / answer with specifics + cw answer "" --conversation-id "$CONV_ID" + > "$CW_LISTEN_FILE" + cw listen --agent-id ${agentId} > "$CW_LISTEN_FILE" & + CW_LISTEN_PID=$! +fi + +# 4. Before writing signal.json — kill listener and clean up +kill $CW_LISTEN_PID 2>/dev/null +rm -f "$CW_LISTEN_FILE" +\`\`\` + +## Targeting + +- \`--agent-id \` — You know exactly which agent to ask (e.g., from manifest or a previous conversation). +- \`--task-id \` — Ask whichever agent is currently running that task. +- \`--phase-id \` — Ask whichever agent is working in that phase. Use when you need something from an adjacent phase but don't know the agent ID. + +## When to Ask + +- You need an **uncommitted interface contract** — an export path, method signature, type definition, or schema that another agent is actively creating and hasn't pushed yet. +- You are about to **modify a shared file** that another agent may also be editing, and you need to coordinate who changes what. + +## When NOT to Ask + +- The answer is in the **codebase** — search first (\`grep\`, \`find\`, read the code). +- The answer is in your **input files or context files** — read them again before asking. +- You are **not actually blocked** — if you can make a reasonable decision and move on, do that. +- You want to **confirm your approach** — that's not what inter-agent communication is for. Make the call. + + +"How should I structure the API response for the users endpoint?" +This is a design decision you should make based on existing codebase patterns. + + + +"What will the export path and method signature be for createUser() in packages/shared/src/api/users.ts? I need to import it." +This asks for a specific uncommitted artifact another agent is building. + + +## Answering Questions + +When you receive a question, be **specific**. Include the actual code snippet, file path, type signature, or schema. Vague answers force a follow-up round-trip. + +Check for incoming questions between commits — not after every line of code. `; } diff --git a/docs/agent.md b/docs/agent.md index 57fe6b6..00d46a2 100644 --- a/docs/agent.md +++ b/docs/agent.md @@ -143,12 +143,18 @@ Agent output is persisted to `agent_log_chunks` table and drives all live stream Agents can communicate with each other via the `conversations` table, coordinated through CLI commands. ### Prompt Integration -`buildInterAgentCommunication(agentId)` function in `prompts/shared.ts` generates per-agent communication instructions. Called in `manager.ts` after agent record creation — the actual agent ID is injected directly into the prompt (no manifest.json indirection). Appended to the prompt regardless of mode. Instructions include: -1. Set up a background listener via temp-file redirect: `cw listen > $CW_LISTEN_FILE &` -2. Periodically check the temp file for incoming questions between work steps -3. Answer via `cw answer`, clear the file, restart the listener -4. Ask questions to peers via `cw ask --from --agent-id|--task-id|--phase-id` -5. Kill the listener and clean up the temp file before writing `signal.json` +`buildInterAgentCommunication(agentId, mode)` function in `prompts/shared.ts` generates per-agent communication instructions. Called in `manager.ts` after agent record creation — the actual agent ID is injected directly into the prompt (no manifest.json indirection). + +**Mode-aware branching:** + +- **Planning modes** (`plan`, `refine`): Minimal block — just the agent ID and `cw ask` syntax for emergencies. These agents define high-level structure, not implementation details, so real-time coordination is almost never needed. +- **Execution + coordination modes** (`execute`, `detail`, `discuss`, `verify`, `merge`, `review`): Full protocol including: + 1. Commands table with accurate CLI behavior descriptions + 2. Numbered shell recipe for background listener lifecycle (start → check → answer → restart → cleanup) + 3. Targeting guidance (`--agent-id` vs `--task-id` vs `--phase-id`) + 4. Decision criteria: when to ask (uncommitted interfaces, shared file conflicts) and when NOT to ask (answer in codebase, answer in input files, not blocked, confirming approach) + 5. Good/bad examples using `` pattern + 6. Answering guidelines (be specific — include code snippets, file paths, type signatures) ### Agent Identity `manifest.json` includes `agentId` and `agentName` fields. The manager passes these from the DB record after agent creation. The agent ID is also injected directly into the prompt's communication instructions.