From 43e2c8b0ba66d7002b9991b2cdbd1d177adbdc9c Mon Sep 17 00:00:00 2001 From: Lukas May Date: Sun, 8 Feb 2026 15:51:32 +0100 Subject: [PATCH] fix(agent): Eliminate race condition in completion handling PROBLEM: - Agents completing with questions were incorrectly marked as "crashed" - Race condition: polling handler AND crash handler both called handleCompletion() - Caused database corruption and lost pending questions SOLUTION: - Add completion mutex in OutputHandler to prevent concurrent processing - Remove duplicate completion call from crash handler - Only one handler executes completion logic per agent TESTING: - Added mutex-completion.test.ts with 4 test cases - Verified mutex prevents concurrent access - Verified lock cleanup on exceptions - Verified different agents can process concurrently FIXES: residential-cuckoo and 12+ other agents stuck in crashed state --- drizzle/0013_add_proposals_table.sql | 15 + drizzle/0014_add_exit_code_to_agents.sql | 1 + drizzle/meta/_journal.json | 14 + packages/shared/src/index.ts | 2 +- packages/shared/src/types.ts | 2 +- .../web/src/components/AgentOutputViewer.tsx | 251 +++++++++-- .../web/src/components/InitiativeCard.tsx | 3 +- .../web/src/components/TaskDetailModal.tsx | 15 +- packages/web/src/components/TaskRow.tsx | 12 +- .../editor/ContentProposalReview.tsx | 101 +++-- .../components/editor/RefineAgentPanel.tsx | 21 +- .../components/execution/PhaseWithTasks.tsx | 78 ++-- .../components/execution/PlanTasksFetcher.tsx | 20 - .../web/src/components/execution/index.ts | 1 - packages/web/src/hooks/index.ts | 1 - packages/web/src/hooks/useRefineAgent.ts | 133 ++---- packages/web/src/routes/initiatives/$id.tsx | 1 - src/agent/cleanup-manager.ts | 55 ++- src/agent/completion-detection.test.ts | 146 ++++++ src/agent/credential-handler.ts | 19 +- src/agent/file-io.ts | 23 +- src/agent/manager.test.ts | 44 ++ src/agent/manager.ts | 131 ++++++ src/agent/markdown-to-tiptap.ts | 32 ++ src/agent/mutex-completion.test.ts | 152 +++++++ src/agent/output-handler.test.ts | 280 ++++++++++++ src/agent/output-handler.ts | 200 ++++++--- src/agent/process-manager.test.ts | 423 ++++++++++++++++++ src/agent/prompts.ts | 19 +- src/agent/providers/parsers/claude.ts | 6 +- src/agent/providers/stream-types.ts | 2 + src/agent/types.ts | 2 + src/cli/index.ts | 39 +- src/container.ts | 19 +- src/db/repositories/agent-repository.ts | 1 + src/db/repositories/drizzle/index.ts | 1 + src/db/repositories/drizzle/proposal.ts | 133 ++++++ src/db/repositories/index.ts | 6 + src/db/repositories/proposal-repository.ts | 35 ++ src/db/schema.ts | 45 +- src/events/types.ts | 1 + src/process/manager.test.ts | 1 + src/process/manager.ts | 1 + src/server/trpc-adapter.ts | 4 + .../agent-workdir-verification.test.ts | 203 +++++++++ .../integration/real-providers/harness.ts | 4 +- src/trpc/context.ts | 5 + src/trpc/router.ts | 2 + src/trpc/routers/_helpers.ts | 11 + src/trpc/routers/architect.ts | 27 ++ src/trpc/routers/initiative.ts | 2 +- src/trpc/routers/proposal.ts | 170 +++++++ 52 files changed, 2545 insertions(+), 370 deletions(-) create mode 100644 drizzle/0013_add_proposals_table.sql create mode 100644 drizzle/0014_add_exit_code_to_agents.sql delete mode 100644 packages/web/src/components/execution/PlanTasksFetcher.tsx create mode 100644 src/agent/completion-detection.test.ts create mode 100644 src/agent/markdown-to-tiptap.ts create mode 100644 src/agent/mutex-completion.test.ts create mode 100644 src/agent/output-handler.test.ts create mode 100644 src/agent/process-manager.test.ts create mode 100644 src/db/repositories/drizzle/proposal.ts create mode 100644 src/db/repositories/proposal-repository.ts create mode 100644 src/test/integration/agent-workdir-verification.test.ts create mode 100644 src/trpc/routers/proposal.ts diff --git a/drizzle/0013_add_proposals_table.sql b/drizzle/0013_add_proposals_table.sql new file mode 100644 index 0000000..140fa12 --- /dev/null +++ b/drizzle/0013_add_proposals_table.sql @@ -0,0 +1,15 @@ +CREATE TABLE `proposals` ( + `id` text PRIMARY KEY NOT NULL, + `agent_id` text NOT NULL REFERENCES `agents`(`id`) ON DELETE cascade, + `initiative_id` text NOT NULL REFERENCES `initiatives`(`id`) ON DELETE cascade, + `target_type` text NOT NULL, + `target_id` text, + `title` text NOT NULL, + `summary` text, + `content` text, + `metadata` text, + `status` text NOT NULL DEFAULT 'pending', + `sort_order` integer NOT NULL DEFAULT 0, + `created_at` integer NOT NULL, + `updated_at` integer NOT NULL +); diff --git a/drizzle/0014_add_exit_code_to_agents.sql b/drizzle/0014_add_exit_code_to_agents.sql new file mode 100644 index 0000000..f2f5f40 --- /dev/null +++ b/drizzle/0014_add_exit_code_to_agents.sql @@ -0,0 +1 @@ +ALTER TABLE `agents` ADD `exit_code` integer; \ No newline at end of file diff --git a/drizzle/meta/_journal.json b/drizzle/meta/_journal.json index 13ca87f..6d11a48 100644 --- a/drizzle/meta/_journal.json +++ b/drizzle/meta/_journal.json @@ -92,6 +92,20 @@ "when": 1770420629437, "tag": "0012_add_agent_user_dismissed_at", "breakpoints": true + }, + { + "idx": 13, + "version": "6", + "when": 1770681600000, + "tag": "0013_add_proposals_table", + "breakpoints": true + }, + { + "idx": 14, + "version": "6", + "when": 1770768000000, + "tag": "0014_add_exit_code_to_agents", + "breakpoints": true } ] } \ No newline at end of file diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 99b9db5..1d930e8 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -1,3 +1,3 @@ export type { AppRouter } from './trpc.js'; -export type { Initiative, Phase, Plan, Task, Agent, Message, PendingQuestions, QuestionItem, SubscriptionEvent, Project } from './types.js'; +export type { Initiative, Phase, Task, Agent, Message, PendingQuestions, QuestionItem, SubscriptionEvent, Project, Proposal } from './types.js'; export { sortByPriorityAndQueueTime, type SortableItem } from './utils.js'; diff --git a/packages/shared/src/types.ts b/packages/shared/src/types.ts index 9aadeb8..393659d 100644 --- a/packages/shared/src/types.ts +++ b/packages/shared/src/types.ts @@ -1,4 +1,4 @@ -export type { Initiative, Phase, Plan, Task, Agent, Message, Page, Project, Account } from '../../../src/db/schema.js'; +export type { Initiative, Phase, Task, Agent, Message, Page, Project, Account, Proposal } from '../../../src/db/schema.js'; export type { PendingQuestions, QuestionItem } from '../../../src/agent/types.js'; /** diff --git a/packages/web/src/components/AgentOutputViewer.tsx b/packages/web/src/components/AgentOutputViewer.tsx index 966fffe..7a6d661 100644 --- a/packages/web/src/components/AgentOutputViewer.tsx +++ b/packages/web/src/components/AgentOutputViewer.tsx @@ -1,5 +1,6 @@ import { useEffect, useRef, useState } from "react"; import { Button } from "@/components/ui/button"; +import { Badge } from "@/components/ui/badge"; import { ArrowDown, Pause, Play, AlertCircle } from "lucide-react"; import { trpc } from "@/lib/trpc"; import { useSubscriptionWithErrorHandling } from "@/hooks"; @@ -9,10 +10,67 @@ interface AgentOutputViewerProps { agentName?: string; } +function formatToolCall(toolUse: any): string { + const { name, input } = toolUse; + + if (name === 'Bash') { + return `$ ${input.command}${input.description ? '\n# ' + input.description : ''}`; + } + + if (name === 'Read') { + return `šŸ“„ Read: ${input.file_path}${input.offset ? ` (lines ${input.offset}-${input.offset + (input.limit || 10)})` : ''}`; + } + + if (name === 'Edit') { + return `āœļø Edit: ${input.file_path}\n${input.old_string.substring(0, 100)}${input.old_string.length > 100 ? '...' : ''}\n→ ${input.new_string.substring(0, 100)}${input.new_string.length > 100 ? '...' : ''}`; + } + + if (name === 'Write') { + return `šŸ“ Write: ${input.file_path} (${input.content.length} chars)`; + } + + if (name === 'Task') { + return `šŸ¤– ${input.subagent_type}: ${input.description}\n${input.prompt?.substring(0, 200)}${input.prompt && input.prompt.length > 200 ? '...' : ''}`; + } + + // Generic fallback + return `${name}: ${JSON.stringify(input, null, 2)}`; +} + +function getMessageStyling(type: ParsedMessage['type']): string { + switch (type) { + case 'system': + return 'mb-1'; + case 'text': + return 'mb-1'; + case 'tool_call': + return 'mb-2'; + case 'tool_result': + return 'mb-2'; + case 'error': + return 'mb-2'; + case 'session_end': + return 'mb-2'; + default: + return 'mb-1'; + } +} + +interface ParsedMessage { + type: 'text' | 'system' | 'tool_call' | 'tool_result' | 'session_end' | 'error'; + content: string; + meta?: { + toolName?: string; + isError?: boolean; + cost?: number; + duration?: number; + }; +} + export function AgentOutputViewer({ agentId, agentName }: AgentOutputViewerProps) { - const [output, setOutput] = useState([]); + const [messages, setMessages] = useState([]); const [follow, setFollow] = useState(true); - const containerRef = useRef(null); + const containerRef = useRef(null); // Load initial/historical output const outputQuery = trpc.getAgentOutput.useQuery( @@ -26,11 +84,11 @@ export function AgentOutputViewer({ agentId, agentName }: AgentOutputViewerProps const subscription = useSubscriptionWithErrorHandling( () => trpc.onAgentOutput.useSubscription({ agentId }), { - onData: (event) => { - // event is TrackedEnvelope<{ agentId: string; data: string }> - // event.data is the inner data object - const payload = event.data as { agentId: string; data: string }; - setOutput((prev) => [...prev, payload.data]); + onData: (event: any) => { + // TrackedEnvelope shape: { id, data: { agentId, data: string } } + const raw = event?.data?.data ?? event?.data; + const data = typeof raw === 'string' ? raw : JSON.stringify(raw); + setMessages((prev) => [...prev, { type: 'text', content: data }]); }, onError: (error) => { console.error('Agent output subscription error:', error); @@ -43,39 +101,106 @@ export function AgentOutputViewer({ agentId, agentName }: AgentOutputViewerProps // Set initial output when query loads useEffect(() => { if (outputQuery.data) { - // Split NDJSON content into chunks for display - // Each line might be a JSON event, so we just display raw for now const lines = outputQuery.data.split("\n").filter(Boolean); - // Extract text from JSONL events for display - const textChunks: string[] = []; + const parsedMessages: ParsedMessage[] = []; + for (const line of lines) { try { const event = JSON.parse(line); - if (event.type === "assistant" && Array.isArray(event.message?.content)) { - // Claude CLI stream-json: complete assistant messages with content blocks + + // System initialization + if (event.type === "system" && event.session_id) { + parsedMessages.push({ + type: 'system', + content: `Session started: ${event.session_id}` + }); + } + + // Assistant messages with text and tool calls + else if (event.type === "assistant" && Array.isArray(event.message?.content)) { for (const block of event.message.content) { if (block.type === "text" && block.text) { - textChunks.push(block.text); + parsedMessages.push({ + type: 'text', + content: block.text + }); + } else if (block.type === "tool_use") { + parsedMessages.push({ + type: 'tool_call', + content: formatToolCall(block), + meta: { toolName: block.name } + }); } } - } else if (event.type === "stream_event" && event.event?.delta?.text) { - // Legacy streaming format: granular text deltas - textChunks.push(event.event.delta.text); - } else if (event.type === "result" && event.result) { - // Don't add result text since it duplicates the content } + + // User messages with tool results + else if (event.type === "user" && Array.isArray(event.message?.content)) { + for (const block of event.message.content) { + if (block.type === "tool_result") { + const rawContent = block.content; + const output = typeof rawContent === 'string' + ? rawContent + : Array.isArray(rawContent) + ? rawContent.map((c: any) => c.text ?? JSON.stringify(c)).join('\n') + : event.tool_use_result?.stdout || ''; + const stderr = event.tool_use_result?.stderr; + + if (stderr) { + parsedMessages.push({ + type: 'error', + content: stderr, + meta: { isError: true } + }); + } else if (output) { + const displayOutput = output.length > 1000 ? + output.substring(0, 1000) + '\n... (truncated)' : output; + parsedMessages.push({ + type: 'tool_result', + content: displayOutput + }); + } + } + } + } + + // Legacy streaming format + else if (event.type === "stream_event" && event.event?.delta?.text) { + parsedMessages.push({ + type: 'text', + content: event.event.delta.text + }); + } + + // Session completion + else if (event.type === "result") { + parsedMessages.push({ + type: 'session_end', + content: event.is_error ? 'Session failed' : 'Session completed', + meta: { + isError: event.is_error, + cost: event.total_cost_usd, + duration: event.duration_ms + } + }); + } + } catch { // Not JSON, display as-is - textChunks.push(line + "\n"); + parsedMessages.push({ + type: 'error', + content: line, + meta: { isError: true } + }); } } - setOutput(textChunks); + setMessages(parsedMessages); } }, [outputQuery.data]); // Reset output when agent changes useEffect(() => { - setOutput([]); + setMessages([]); setFollow(true); }, [agentId]); @@ -84,7 +209,7 @@ export function AgentOutputViewer({ agentId, agentName }: AgentOutputViewerProps if (follow && containerRef.current) { containerRef.current.scrollTop = containerRef.current.scrollHeight; } - }, [output, follow]); + }, [messages, follow]); // Handle scroll to detect user scrolling up function handleScroll() { @@ -105,7 +230,7 @@ export function AgentOutputViewer({ agentId, agentName }: AgentOutputViewerProps } const isLoading = outputQuery.isLoading; - const hasOutput = output.length > 0; + const hasOutput = messages.length > 0; return (
@@ -159,19 +284,85 @@ export function AgentOutputViewer({ agentId, agentName }: AgentOutputViewerProps
{/* Output content */} -
         {isLoading ? (
-          Loading output...
+          
Loading output...
) : !hasOutput ? ( - No output yet... +
No output yet...
) : ( - output.join("") +
+ {messages.map((message, index) => ( +
+ {message.type === 'system' && ( +
+ System + {message.content} +
+ )} + + {message.type === 'text' && ( +
+ {message.content} +
+ )} + + {message.type === 'tool_call' && ( +
+ + {message.meta?.toolName} + +
+ {message.content} +
+
+ )} + + {message.type === 'tool_result' && ( +
+ + Result + +
+ {message.content} +
+
+ )} + + {message.type === 'error' && ( +
+ + Error + +
+ {message.content} +
+
+ )} + + {message.type === 'session_end' && ( +
+
+ + {message.content} + + {message.meta?.cost && ( + ${message.meta.cost.toFixed(4)} + )} + {message.meta?.duration && ( + {(message.meta.duration / 1000).toFixed(1)}s + )} +
+
+ )} +
+ ))} +
)} -
+ ); } diff --git a/packages/web/src/components/InitiativeCard.tsx b/packages/web/src/components/InitiativeCard.tsx index f4db5c9..25e30b1 100644 --- a/packages/web/src/components/InitiativeCard.tsx +++ b/packages/web/src/components/InitiativeCard.tsx @@ -16,8 +16,9 @@ import { trpc } from "@/lib/trpc"; export interface SerializedInitiative { id: string; name: string; - description: string | null; status: "active" | "completed" | "archived"; + mergeRequiresApproval: boolean; + mergeTarget: string | null; createdAt: string; updatedAt: string; } diff --git a/packages/web/src/components/TaskDetailModal.tsx b/packages/web/src/components/TaskDetailModal.tsx index bfb231f..9922e07 100644 --- a/packages/web/src/components/TaskDetailModal.tsx +++ b/packages/web/src/components/TaskDetailModal.tsx @@ -9,20 +9,7 @@ import { import { Button } from "@/components/ui/button"; import { StatusBadge } from "@/components/StatusBadge"; import { StatusDot } from "@/components/StatusDot"; - -/** Serialized Task shape as returned by tRPC (Date serialized to string over JSON) */ -export interface SerializedTask { - id: string; - planId: string; - name: string; - description: string | null; - type: string; - priority: string; - status: string; - order: number; - createdAt: string; - updatedAt: string; -} +import type { SerializedTask } from "@/components/TaskRow"; interface DependencyInfo { name: string; diff --git a/packages/web/src/components/TaskRow.tsx b/packages/web/src/components/TaskRow.tsx index e3bf5c1..bbbbb37 100644 --- a/packages/web/src/components/TaskRow.tsx +++ b/packages/web/src/components/TaskRow.tsx @@ -6,12 +6,16 @@ import { cn } from "@/lib/utils"; /** Task shape as returned by tRPC (Date fields serialized to string over JSON) */ export interface SerializedTask { id: string; - planId: string; + phaseId: string | null; + initiativeId: string | null; + parentTaskId: string | null; name: string; description: string | null; - type: string; - priority: string; - status: string; + type: "auto" | "checkpoint:human-verify" | "checkpoint:decision" | "checkpoint:human-action"; + category: string; + priority: "low" | "medium" | "high"; + status: "pending_approval" | "pending" | "in_progress" | "completed" | "blocked"; + requiresApproval: boolean | null; order: number; createdAt: string; updatedAt: string; diff --git a/packages/web/src/components/editor/ContentProposalReview.tsx b/packages/web/src/components/editor/ContentProposalReview.tsx index 5b25ef7..cc9bf8a 100644 --- a/packages/web/src/components/editor/ContentProposalReview.tsx +++ b/packages/web/src/components/editor/ContentProposalReview.tsx @@ -2,17 +2,10 @@ import { useState, useCallback } from "react"; import { Check, ChevronDown, ChevronRight, AlertTriangle } from "lucide-react"; import { Button } from "@/components/ui/button"; import { trpc } from "@/lib/trpc"; -import { markdownToTiptapJson } from "@/lib/markdown-to-tiptap"; - -interface ContentProposal { - pageId: string; - pageTitle: string; - summary: string; - markdown: string; -} +import type { Proposal } from "@codewalk-district/shared"; interface ContentProposalReviewProps { - proposals: ContentProposal[]; + proposals: Proposal[]; agentCreatedAt: Date; agentId: string; onDismiss: () => void; @@ -26,46 +19,52 @@ export function ContentProposalReview({ }: ContentProposalReviewProps) { const [accepted, setAccepted] = useState>(new Set()); const utils = trpc.useUtils(); - const updatePageMutation = trpc.updatePage.useMutation({ + + const acceptMutation = trpc.acceptProposal.useMutation({ onSuccess: () => { + void utils.listProposals.invalidate(); void utils.listPages.invalidate(); void utils.getPage.invalidate(); + void utils.listAgents.invalidate(); }, }); - const dismissMutation = trpc.dismissAgent.useMutation({ + const acceptAllMutation = trpc.acceptAllProposals.useMutation({ onSuccess: () => { + void utils.listProposals.invalidate(); + void utils.listPages.invalidate(); + void utils.getPage.invalidate(); void utils.listAgents.invalidate(); onDismiss(); }, }); - const handleAccept = useCallback( - async (proposal: ContentProposal) => { - const tiptapJson = markdownToTiptapJson(proposal.markdown); - await updatePageMutation.mutateAsync({ - id: proposal.pageId, - content: JSON.stringify(tiptapJson), - }); - setAccepted((prev) => new Set(prev).add(proposal.pageId)); + const dismissAllMutation = trpc.dismissAllProposals.useMutation({ + onSuccess: () => { + void utils.listProposals.invalidate(); + void utils.listAgents.invalidate(); + // Note: onDismiss() is not called here because the backend auto-dismiss + // will set userDismissedAt when all proposals are resolved }, - [updatePageMutation], + }); + + const handleAccept = useCallback( + async (proposal: Proposal) => { + await acceptMutation.mutateAsync({ id: proposal.id }); + setAccepted((prev) => new Set(prev).add(proposal.id)); + }, + [acceptMutation], ); const handleAcceptAll = useCallback(async () => { - for (const proposal of proposals) { - if (!accepted.has(proposal.pageId)) { - const tiptapJson = markdownToTiptapJson(proposal.markdown); - await updatePageMutation.mutateAsync({ - id: proposal.pageId, - content: JSON.stringify(tiptapJson), - }); - setAccepted((prev) => new Set(prev).add(proposal.pageId)); - } - } - }, [proposals, accepted, updatePageMutation]); + await acceptAllMutation.mutateAsync({ agentId }); + }, [acceptAllMutation, agentId]); - const allAccepted = proposals.every((p) => accepted.has(p.pageId)); + const handleDismissAll = useCallback(() => { + dismissAllMutation.mutate({ agentId }); + }, [dismissAllMutation, agentId]); + + const allAccepted = proposals.every((p) => accepted.has(p.id) || p.status === 'accepted'); return (
@@ -79,7 +78,7 @@ export function ContentProposalReview({ variant="outline" size="sm" onClick={handleAcceptAll} - disabled={updatePageMutation.isPending} + disabled={acceptAllMutation.isPending} > Accept All @@ -87,10 +86,10 @@ export function ContentProposalReview({
@@ -98,12 +97,12 @@ export function ContentProposalReview({
{proposals.map((proposal) => ( handleAccept(proposal)} - isAccepting={updatePageMutation.isPending} + isAccepting={acceptMutation.isPending} /> ))}
@@ -112,7 +111,7 @@ export function ContentProposalReview({ } interface ProposalCardProps { - proposal: ContentProposal; + proposal: Proposal; isAccepted: boolean; agentCreatedAt: Date; onAccept: () => void; @@ -128,10 +127,14 @@ function ProposalCard({ }: ProposalCardProps) { const [expanded, setExpanded] = useState(false); - // Check if page was modified since agent started - const pageQuery = trpc.getPage.useQuery({ id: proposal.pageId }); + // Check if target page was modified since agent started (page proposals only) + const pageQuery = trpc.getPage.useQuery( + { id: proposal.targetId ?? '' }, + { enabled: proposal.targetType === 'page' && !!proposal.targetId }, + ); const pageUpdatedAt = pageQuery.data?.updatedAt; const isStale = + proposal.targetType === 'page' && pageUpdatedAt && new Date(pageUpdatedAt) > agentCreatedAt; return ( @@ -147,11 +150,13 @@ function ProposalCard({ ) : ( )} - {proposal.pageTitle} + {proposal.title} -

- {proposal.summary} -

+ {proposal.summary && ( +

+ {proposal.summary} +

+ )} {isAccepted ? ( @@ -179,10 +184,10 @@ function ProposalCard({ )} - {expanded && ( + {expanded && proposal.content && (
-
{proposal.markdown}
+
{proposal.content}
)} diff --git a/packages/web/src/components/editor/RefineAgentPanel.tsx b/packages/web/src/components/editor/RefineAgentPanel.tsx index 22f5aea..cd7e376 100644 --- a/packages/web/src/components/editor/RefineAgentPanel.tsx +++ b/packages/web/src/components/editor/RefineAgentPanel.tsx @@ -1,4 +1,4 @@ -import { useCallback } from "react"; +import { useCallback, useEffect } from "react"; import { Loader2, AlertCircle } from "lucide-react"; import { Button } from "@/components/ui/button"; import { QuestionForm } from "@/components/QuestionForm"; @@ -12,7 +12,7 @@ interface RefineAgentPanelProps { export function RefineAgentPanel({ initiativeId }: RefineAgentPanelProps) { // All agent logic is now encapsulated in the hook - const { state, agent, questions, proposals, spawn, resume, refresh } = useRefineAgent(initiativeId); + const { state, agent, questions, proposals, spawn, resume, dismiss, refresh } = useRefineAgent(initiativeId); // spawn.mutate and resume.mutate are stable (ref-backed in useRefineAgent), // so these callbacks won't change on every render. @@ -31,8 +31,21 @@ export function RefineAgentPanel({ initiativeId }: RefineAgentPanelProps) { ); const handleDismiss = useCallback(() => { - refresh(); - }, [refresh]); + dismiss(); + }, [dismiss]); + + // Cmd+Enter (Mac) / Ctrl+Enter (Windows) dismisses when completed + useEffect(() => { + if (state !== "completed") return; + const handler = (e: KeyboardEvent) => { + if (e.key === "Enter" && (e.metaKey || e.ctrlKey)) { + e.preventDefault(); + handleDismiss(); + } + }; + window.addEventListener("keydown", handler); + return () => window.removeEventListener("keydown", handler); + }, [state, handleDismiss]); // No active agent — show spawn button if (state === "none") { diff --git a/packages/web/src/components/execution/PhaseWithTasks.tsx b/packages/web/src/components/execution/PhaseWithTasks.tsx index 31bf945..7d5671a 100644 --- a/packages/web/src/components/execution/PhaseWithTasks.tsx +++ b/packages/web/src/components/execution/PhaseWithTasks.tsx @@ -1,7 +1,6 @@ -import { useState, useCallback, useEffect } from "react"; +import { useEffect } from "react"; import { trpc } from "@/lib/trpc"; import { PhaseAccordion } from "@/components/PhaseAccordion"; -import { PlanTasksFetcher } from "./PlanTasksFetcher"; import type { SerializedTask } from "@/components/TaskRow"; import type { TaskCounts, FlatTaskEntry } from "./ExecutionContext"; import { sortByPriorityAndQueueTime } from "@codewalk-district/shared"; @@ -30,17 +29,16 @@ export function PhaseWithTasks({ onTaskCounts, registerTasks, }: PhaseWithTasksProps) { - const plansQuery = trpc.listPlans.useQuery({ phaseId: phase.id }); + const tasksQuery = trpc.listPhaseTasks.useQuery({ phaseId: phase.id }); const depsQuery = trpc.getPhaseDependencies.useQuery({ phaseId: phase.id }); - const plans = plansQuery.data ?? []; - const planIds = plans.map((p) => p.id); + const tasks = tasksQuery.data ?? []; return ( void; @@ -63,38 +61,22 @@ interface PhaseWithTasksInnerProps { function PhaseWithTasksInner({ phase, - planIds, - plansLoaded, + tasks, + tasksLoaded, phaseDependencyIds: _phaseDependencyIds, defaultExpanded, onTaskClick, onTaskCounts, registerTasks, }: PhaseWithTasksInnerProps) { - const [planTasks, setPlanTasks] = useState>( - {}, - ); - - const handlePlanTasks = useCallback( - (planId: string, tasks: SerializedTask[]) => { - setPlanTasks((prev) => { - if (prev[planId] === tasks) return prev; - return { ...prev, [planId]: tasks }; - }); - }, - [], - ); - - // Propagate derived counts and entries outside the setState updater - // to avoid synchronous setState-inside-setState cascades. + // Propagate task counts and entries useEffect(() => { - const allTasks = Object.values(planTasks).flat(); - const complete = allTasks.filter( + const complete = tasks.filter( (t) => t.status === "completed", ).length; - onTaskCounts(phase.id, { complete, total: allTasks.length }); + onTaskCounts(phase.id, { complete, total: tasks.length }); - const entries: FlatTaskEntry[] = allTasks.map((task) => ({ + const entries: FlatTaskEntry[] = tasks.map((task) => ({ task, phaseName: `Phase ${phase.number}: ${phase.name}`, agentName: null, @@ -102,10 +84,9 @@ function PhaseWithTasksInner({ dependents: [], })); registerTasks(phase.id, entries); - }, [planTasks, phase.id, phase.number, phase.name, onTaskCounts, registerTasks]); + }, [tasks, phase.id, phase.number, phase.name, onTaskCounts, registerTasks]); - const allTasks = planIds.flatMap((pid) => planTasks[pid] ?? []); - const sortedTasks = sortByPriorityAndQueueTime(allTasks); + const sortedTasks = sortByPriorityAndQueueTime(tasks); const taskEntries = sortedTasks.map((task) => ({ task, agentName: null as string | null, @@ -114,24 +95,17 @@ function PhaseWithTasksInner({ const phaseDeps: Array<{ name: string; status: string }> = []; - return ( - <> - {plansLoaded && - planIds.map((planId) => ( - - ))} + if (!tasksLoaded) { + return null; + } - - + return ( + ); } \ No newline at end of file diff --git a/packages/web/src/components/execution/PlanTasksFetcher.tsx b/packages/web/src/components/execution/PlanTasksFetcher.tsx deleted file mode 100644 index 46e829e..0000000 --- a/packages/web/src/components/execution/PlanTasksFetcher.tsx +++ /dev/null @@ -1,20 +0,0 @@ -import { useEffect } from "react"; -import { trpc } from "@/lib/trpc"; -import type { SerializedTask } from "@/components/TaskRow"; - -interface PlanTasksFetcherProps { - planId: string; - onTasks: (planId: string, tasks: SerializedTask[]) => void; -} - -export function PlanTasksFetcher({ planId, onTasks }: PlanTasksFetcherProps) { - const tasksQuery = trpc.listTasks.useQuery({ planId }); - - useEffect(() => { - if (tasksQuery.data) { - onTasks(planId, tasksQuery.data as unknown as SerializedTask[]); - } - }, [tasksQuery.data, planId, onTasks]); - - return null; -} \ No newline at end of file diff --git a/packages/web/src/components/execution/index.ts b/packages/web/src/components/execution/index.ts index b16b156..b3a53da 100644 --- a/packages/web/src/components/execution/index.ts +++ b/packages/web/src/components/execution/index.ts @@ -3,7 +3,6 @@ export { BreakdownSection } from "./BreakdownSection"; export { PhaseActions } from "./PhaseActions"; export { PhasesList } from "./PhasesList"; export { PhaseWithTasks } from "./PhaseWithTasks"; -export { PlanTasksFetcher } from "./PlanTasksFetcher"; export { ProgressSidebar } from "./ProgressSidebar"; export { TaskModal } from "./TaskModal"; export type { TaskCounts, FlatTaskEntry, PhaseData } from "./ExecutionContext"; \ No newline at end of file diff --git a/packages/web/src/hooks/index.ts b/packages/web/src/hooks/index.ts index 42225c7..d9f9a1c 100644 --- a/packages/web/src/hooks/index.ts +++ b/packages/web/src/hooks/index.ts @@ -12,7 +12,6 @@ export { useSubscriptionWithErrorHandling } from './useSubscriptionWithErrorHand export type { RefineAgentState, - ContentProposal, SpawnRefineAgentOptions, UseRefineAgentResult, } from './useRefineAgent.js'; \ No newline at end of file diff --git a/packages/web/src/hooks/useRefineAgent.ts b/packages/web/src/hooks/useRefineAgent.ts index 5f9d657..b0171ca 100644 --- a/packages/web/src/hooks/useRefineAgent.ts +++ b/packages/web/src/hooks/useRefineAgent.ts @@ -1,16 +1,9 @@ import { useMemo, useCallback, useRef } from 'react'; import { trpc } from '@/lib/trpc'; -import type { Agent, PendingQuestions } from '@codewalk-district/shared'; +import type { Agent, PendingQuestions, Proposal } from '@codewalk-district/shared'; export type RefineAgentState = 'none' | 'running' | 'waiting' | 'completed' | 'crashed'; -export interface ContentProposal { - pageId: string; - pageTitle: string; - summary: string; - markdown: string; -} - export interface SpawnRefineAgentOptions { initiativeId: string; instruction?: string; @@ -23,8 +16,8 @@ export interface UseRefineAgentResult { state: RefineAgentState; /** Questions from the agent (when state is 'waiting') */ questions: PendingQuestions | null; - /** Parsed content proposals (when state is 'completed') */ - proposals: ContentProposal[] | null; + /** Proposal rows from the DB (when state is 'completed') */ + proposals: Proposal[] | null; /** Raw result message (when state is 'completed') */ result: string | null; /** Mutation for spawning a new refine agent */ @@ -39,6 +32,8 @@ export interface UseRefineAgentResult { isPending: boolean; error: Error | null; }; + /** Dismiss the current agent (sets userDismissedAt so it disappears) */ + dismiss: () => void; /** Whether any queries are loading */ isLoading: boolean; /** Function to refresh agent data */ @@ -50,55 +45,6 @@ export interface UseRefineAgentResult { * * Encapsulates the logic for finding, spawning, and interacting with refine agents * that analyze and suggest improvements to initiative content. - * - * @param initiativeId - The ID of the initiative to manage refine agents for - * @returns Object with agent state, mutations, and helper functions - * - * @example - * ```tsx - * function RefineSection({ initiativeId }: { initiativeId: string }) { - * const { - * state, - * agent, - * questions, - * proposals, - * spawn, - * resume, - * refresh - * } = useRefineAgent(initiativeId); - * - * const handleSpawn = () => { - * spawn.mutate({ - * initiativeId, - * instruction: 'Focus on clarity and structure' - * }); - * }; - * - * if (state === 'none') { - * return ( - * - * ); - * } - * - * if (state === 'waiting' && questions) { - * return ( - * resume.mutate(answers)} - * isSubmitting={resume.isPending} - * /> - * ); - * } - * - * if (state === 'completed' && proposals) { - * return ; - * } - * - * return
Agent is {state}...
; - * } - * ``` */ export function useRefineAgent(initiativeId: string): UseRefineAgentResult { const utils = trpc.useUtils(); @@ -146,38 +92,28 @@ export function useRefineAgent(initiativeId: string): UseRefineAgentResult { { enabled: state === 'waiting' && !!agent }, ); + // Fetch proposals from DB when completed + const proposalsQuery = trpc.listProposals.useQuery( + { agentId: agent?.id ?? '' }, + { enabled: state === 'completed' && !!agent }, + ); + // Fetch result when completed const resultQuery = trpc.getAgentResult.useQuery( { id: agent?.id ?? '' }, { enabled: state === 'completed' && !!agent }, ); - // Parse proposals from result - const { proposals, result } = useMemo(() => { - if (!resultQuery.data?.success || !resultQuery.data.message) { - return { proposals: null, result: null }; - } + // Filter to only pending proposals + const proposals = useMemo(() => { + if (!proposalsQuery.data || proposalsQuery.data.length === 0) return null; + const pending = proposalsQuery.data.filter((p) => p.status === 'pending'); + return pending.length > 0 ? pending : null; + }, [proposalsQuery.data]); - const message = resultQuery.data.message; - - try { - const parsed = JSON.parse(message); - if (parsed.proposals && Array.isArray(parsed.proposals)) { - const proposals: ContentProposal[] = parsed.proposals.map( - (p: { pageId: string; title?: string; pageTitle?: string; summary: string; body?: string; markdown?: string }) => ({ - pageId: p.pageId, - pageTitle: p.pageTitle ?? p.title ?? '', - summary: p.summary, - markdown: p.markdown ?? p.body ?? '', - }), - ); - return { proposals, result: message }; - } - } catch { - // Not JSON — treat as regular result - } - - return { proposals: null, result: message }; + const result = useMemo(() => { + if (!resultQuery.data?.success || !resultQuery.data.message) return null; + return resultQuery.data.message; }, [resultQuery.data]); // Spawn mutation @@ -194,16 +130,26 @@ export function useRefineAgent(initiativeId: string): UseRefineAgentResult { }, }); + // Dismiss mutation — sets userDismissedAt so agent disappears from the list + const dismissMutation = trpc.dismissAgent.useMutation({ + onSuccess: () => { + // Force immediate refetch of agents to update UI + void utils.listAgents.invalidate(); + void utils.listAgents.refetch(); + void utils.listProposals.invalidate(); + }, + }); + // Keep mutation functions in refs so the returned spawn/resume objects are - // stable across renders. tRPC mutation objects change identity every render, - // which cascades into unstable callbacks → unstable props → Radix Dialog - // re-renders that trigger the React 19 compose-refs infinite loop. + // stable across renders. const spawnMutateRef = useRef(spawnMutation.mutate); spawnMutateRef.current = spawnMutation.mutate; const agentRef = useRef(agent); agentRef.current = agent; const resumeMutateRef = useRef(resumeMutation.mutate); resumeMutateRef.current = resumeMutation.mutate; + const dismissMutateRef = useRef(dismissMutation.mutate); + dismissMutateRef.current = dismissMutation.mutate; const spawnFn = useCallback(({ initiativeId, instruction }: SpawnRefineAgentOptions) => { spawnMutateRef.current({ @@ -231,13 +177,21 @@ export function useRefineAgent(initiativeId: string): UseRefineAgentResult { error: resumeMutation.error, }), [resumeFn, resumeMutation.isPending, resumeMutation.error]); + const dismiss = useCallback(() => { + const a = agentRef.current; + if (a) { + dismissMutateRef.current({ id: a.id }); + } + }, []); + const refresh = useCallback(() => { void utils.listAgents.invalidate(); + void utils.listProposals.invalidate(); }, [utils]); const isLoading = agentsQuery.isLoading || (state === 'waiting' && questionsQuery.isLoading) || - (state === 'completed' && resultQuery.isLoading); + (state === 'completed' && (resultQuery.isLoading || proposalsQuery.isLoading)); return { agent, @@ -247,7 +201,8 @@ export function useRefineAgent(initiativeId: string): UseRefineAgentResult { result, spawn, resume, + dismiss, isLoading, refresh, }; -} \ No newline at end of file +} diff --git a/packages/web/src/routes/initiatives/$id.tsx b/packages/web/src/routes/initiatives/$id.tsx index 7bcac70..566e042 100644 --- a/packages/web/src/routes/initiatives/$id.tsx +++ b/packages/web/src/routes/initiatives/$id.tsx @@ -31,7 +31,6 @@ function InitiativeDetailPage() { onData: () => { void utils.listPhases.invalidate(); void utils.listTasks.invalidate(); - void utils.listPlans.invalidate(); }, onError: (error) => { toast.error("Live updates disconnected. Refresh to reconnect.", { diff --git a/src/agent/cleanup-manager.ts b/src/agent/cleanup-manager.ts index daed4f3..e5b43b1 100644 --- a/src/agent/cleanup-manager.ts +++ b/src/agent/cleanup-manager.ts @@ -388,14 +388,35 @@ export class CleanupManager { if (rawOutput.trim()) { const provider = getProvider(agent.provider); if (provider) { - await onAgentOutput(agent.id, rawOutput, provider); - continue; + // Check if agent actually completed successfully before processing + const hasCompletionResult = this.checkForCompletionResult(rawOutput); + if (hasCompletionResult) { + log.info({ agentId: agent.id }, 'reconcile: processing completed agent output'); + try { + await onAgentOutput(agent.id, rawOutput, provider); + continue; + } catch (err) { + log.error({ + agentId: agent.id, + err: err instanceof Error ? err.message : String(err) + }, 'reconcile: failed to process completed agent output'); + // Mark as crashed since processing failed + await this.repository.update(agent.id, { status: 'crashed' }); + this.emitCrashed(agent, `Failed to process output: ${err instanceof Error ? err.message : String(err)}`); + continue; + } + } } } - } catch { /* file missing or empty */ } - log.warn({ agentId: agent.id }, 'reconcile: marking agent crashed'); + } catch (readErr) { + log.warn({ + agentId: agent.id, + err: readErr instanceof Error ? readErr.message : String(readErr) + }, 'reconcile: failed to read output file'); + } + log.warn({ agentId: agent.id }, 'reconcile: marking agent crashed (no valid output)'); await this.repository.update(agent.id, { status: 'crashed' }); - this.emitCrashed(agent, 'Server restarted, agent output not found'); + this.emitCrashed(agent, 'Server restarted, agent output not found or invalid'); } else { log.warn({ agentId: agent.id }, 'reconcile: marking agent crashed'); await this.repository.update(agent.id, { status: 'crashed' }); @@ -415,6 +436,30 @@ export class CleanupManager { } } + /** + * Check if the agent output contains a completion result line. + * This indicates the agent finished successfully, even if processing fails. + */ + private checkForCompletionResult(rawOutput: string): boolean { + try { + const lines = rawOutput.trim().split('\n'); + for (const line of lines) { + try { + const parsed = JSON.parse(line); + // Look for Claude CLI result events with success status + if (parsed.type === 'result' && parsed.subtype === 'success') { + return true; + } + // Look for other providers' completion indicators + if (parsed.status === 'done' || parsed.status === 'questions') { + return true; + } + } catch { /* skip non-JSON lines */ } + } + } catch { /* invalid output format */ } + return false; + } + /** * Emit a crashed event for an agent. */ diff --git a/src/agent/completion-detection.test.ts b/src/agent/completion-detection.test.ts new file mode 100644 index 0000000..361420e --- /dev/null +++ b/src/agent/completion-detection.test.ts @@ -0,0 +1,146 @@ +/** + * Test for Phase 1 completion detection fix + */ + +import { describe, test, expect, beforeEach, afterEach, vi } from 'vitest'; +import { mkdtemp, writeFile, mkdir } from 'node:fs/promises'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { rmSync } from 'node:fs'; +import { OutputHandler } from './output-handler.js'; +import type { AgentRepository } from '../db/repositories/agent-repository.js'; +import type { ProposalRepository } from '../db/repositories/proposal-repository.js'; + +describe('Completion Detection Fix', () => { + let tempDir: string; + let outputHandler: OutputHandler; + let mockAgentRepo: AgentRepository; + let mockProposalRepo: ProposalRepository; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'completion-test-')); + + // Mock repositories + mockAgentRepo = { + update: vi.fn(), + findById: vi.fn().mockResolvedValue({ id: 'test-agent', mode: 'refine' }), + } as any; + + mockProposalRepo = { + create: vi.fn(), + } as any; + + outputHandler = new OutputHandler(mockAgentRepo, undefined, mockProposalRepo); + }); + + afterEach(() => { + rmSync(tempDir, { recursive: true, force: true }); + }); + + test('detects completion from signal.json with "questions" status', async () => { + const agentId = 'test-agent'; + const agentWorkdir = join(tempDir, agentId); + const cwDir = join(agentWorkdir, '.cw/output'); + + // Create agent workdir structure + await mkdir(cwDir, { recursive: true }); + + // Create a signal.json file with questions status + const signalContent = JSON.stringify({ + status: 'questions', + questions: [{ id: 'q1', text: 'Do you want to proceed?' }] + }); + await writeFile(join(cwDir, 'signal.json'), signalContent); + + // Test the private method via reflection (testing the fix) + const checkSignalCompletion = (outputHandler as any).checkSignalCompletion.bind(outputHandler); + const result = await checkSignalCompletion(agentWorkdir); + + expect(result).toBe(true); + }); + + test('detects completion from signal.json with "done" status', async () => { + const agentId = 'test-agent'; + const agentWorkdir = join(tempDir, agentId); + const cwDir = join(agentWorkdir, '.cw/output'); + + await mkdir(cwDir, { recursive: true }); + + const signalContent = JSON.stringify({ + status: 'done', + result: 'Task completed successfully' + }); + await writeFile(join(cwDir, 'signal.json'), signalContent); + + const checkSignalCompletion = (outputHandler as any).checkSignalCompletion.bind(outputHandler); + const result = await checkSignalCompletion(agentWorkdir); + + expect(result).toBe(true); + }); + + test('detects completion from signal.json with "error" status', async () => { + const agentId = 'test-agent'; + const agentWorkdir = join(tempDir, agentId); + const cwDir = join(agentWorkdir, '.cw/output'); + + await mkdir(cwDir, { recursive: true }); + + const signalContent = JSON.stringify({ + status: 'error', + error: 'Something went wrong' + }); + await writeFile(join(cwDir, 'signal.json'), signalContent); + + const checkSignalCompletion = (outputHandler as any).checkSignalCompletion.bind(outputHandler); + const result = await checkSignalCompletion(agentWorkdir); + + expect(result).toBe(true); + }); + + test('returns false when signal.json does not exist', async () => { + const agentId = 'test-agent'; + const agentWorkdir = join(tempDir, agentId); + + // Don't create any files + + const checkSignalCompletion = (outputHandler as any).checkSignalCompletion.bind(outputHandler); + const result = await checkSignalCompletion(agentWorkdir); + + expect(result).toBe(false); + }); + + test('returns false for incomplete status', async () => { + const agentId = 'test-agent'; + const agentWorkdir = join(tempDir, agentId); + const cwDir = join(agentWorkdir, '.cw/output'); + + await mkdir(cwDir, { recursive: true }); + + const signalContent = JSON.stringify({ + status: 'running', + progress: 'Still working...' + }); + await writeFile(join(cwDir, 'signal.json'), signalContent); + + const checkSignalCompletion = (outputHandler as any).checkSignalCompletion.bind(outputHandler); + const result = await checkSignalCompletion(agentWorkdir); + + expect(result).toBe(false); + }); + + test('handles malformed signal.json gracefully', async () => { + const agentId = 'test-agent'; + const agentWorkdir = join(tempDir, agentId); + const cwDir = join(agentWorkdir, '.cw/output'); + + await mkdir(cwDir, { recursive: true }); + + // Create malformed JSON + await writeFile(join(cwDir, 'signal.json'), '{ invalid json }'); + + const checkSignalCompletion = (outputHandler as any).checkSignalCompletion.bind(outputHandler); + const result = await checkSignalCompletion(agentWorkdir); + + expect(result).toBe(false); + }); +}); \ No newline at end of file diff --git a/src/agent/credential-handler.ts b/src/agent/credential-handler.ts index 32257c2..103d254 100644 --- a/src/agent/credential-handler.ts +++ b/src/agent/credential-handler.ts @@ -6,7 +6,7 @@ * ensuring they're fresh, and marking accounts as exhausted on failure. */ -import { readFileSync } from 'node:fs'; +import { readFileSync, existsSync } from 'node:fs'; import { join } from 'node:path'; import type { AccountRepository } from '../db/repositories/account-repository.js'; import type { AccountCredentialManager } from './credentials/types.js'; @@ -92,6 +92,23 @@ export class CredentialHandler { return { valid, refreshed: false }; } + /** + * Read the access token from a config directory's .credentials.json. + * Returns null if credentials file is missing or malformed. + * Used for CLAUDE_CODE_OAUTH_TOKEN env var injection. + */ + readAccessToken(configDir: string): string | null { + try { + const credPath = join(configDir, '.credentials.json'); + if (!existsSync(credPath)) return null; + const raw = readFileSync(credPath, 'utf-8'); + const parsed = JSON.parse(raw); + return parsed.claudeAiOauth?.accessToken ?? null; + } catch { + return null; + } + } + /** * Check if an error message indicates usage limit exhaustion. */ diff --git a/src/agent/file-io.ts b/src/agent/file-io.ts index e3e395a..1a15578 100644 --- a/src/agent/file-io.ts +++ b/src/agent/file-io.ts @@ -108,6 +108,15 @@ export function writeInputFiles(options: WriteInputFilesOptions): void { const inputDir = join(options.agentWorkdir, '.cw', 'input'); mkdirSync(inputDir, { recursive: true }); + // Write expected working directory marker for verification + writeFileSync( + join(inputDir, '../expected-pwd.txt'), + options.agentWorkdir, + 'utf-8' + ); + + const manifestFiles: string[] = []; + if (options.initiative) { const ini = options.initiative; const content = formatFrontmatter( @@ -121,6 +130,7 @@ export function writeInputFiles(options: WriteInputFilesOptions): void { '', ); writeFileSync(join(inputDir, 'initiative.md'), content, 'utf-8'); + manifestFiles.push('initiative.md'); } if (options.pages && options.pages.length > 0) { @@ -146,7 +156,9 @@ export function writeInputFiles(options: WriteInputFilesOptions): void { }, bodyMarkdown, ); + const filename = `pages/${page.id}.md`; writeFileSync(join(pagesDir, `${page.id}.md`), content, 'utf-8'); + manifestFiles.push(filename); } } @@ -162,6 +174,7 @@ export function writeInputFiles(options: WriteInputFilesOptions): void { ph.description ?? '', ); writeFileSync(join(inputDir, 'phase.md'), content, 'utf-8'); + manifestFiles.push('phase.md'); } if (options.task) { @@ -178,14 +191,22 @@ export function writeInputFiles(options: WriteInputFilesOptions): void { t.description ?? '', ); writeFileSync(join(inputDir, 'task.md'), content, 'utf-8'); + manifestFiles.push('task.md'); } + + // Write manifest listing exactly which files were created + writeFileSync( + join(inputDir, 'manifest.json'), + JSON.stringify({ files: manifestFiles }) + '\n', + 'utf-8', + ); } // ============================================================================= // OUTPUT FILE READING // ============================================================================= -function readFrontmatterFile(filePath: string): { data: Record; body: string } | null { +export function readFrontmatterFile(filePath: string): { data: Record; body: string } | null { try { const raw = readFileSync(filePath, 'utf-8'); const parsed = matter(raw); diff --git a/src/agent/manager.test.ts b/src/agent/manager.test.ts index 35d99a4..d364523 100644 --- a/src/agent/manager.test.ts +++ b/src/agent/manager.test.ts @@ -52,6 +52,7 @@ vi.mock('node:fs', async () => { mkdirSync: vi.fn(), writeFileSync: vi.fn(), createWriteStream: vi.fn().mockReturnValue(mockWriteStream), + existsSync: vi.fn().mockReturnValue(true), // Default to true for our new validation }; }); @@ -220,6 +221,49 @@ describe('MultiProviderAgentManager', () => { ).toBe('gastown'); }); + it('writes diagnostic files for workdir verification', async () => { + const mockChild = createMockChildProcess(); + mockSpawn.mockReturnValue(mockChild); + + // Mock fs.writeFileSync to capture diagnostic file writing + const { writeFileSync } = await import('node:fs'); + const mockWriteFileSync = vi.mocked(writeFileSync); + + // The existsSync is already mocked globally to return true + + await manager.spawn({ + name: 'gastown', + taskId: 'task-456', + prompt: 'Test task', + }); + + // Verify diagnostic file was written + const diagnosticCalls = mockWriteFileSync.mock.calls.filter(call => + call[0].toString().includes('spawn-diagnostic.json') + ); + expect(diagnosticCalls).toHaveLength(1); + + // Parse the diagnostic data to verify structure + const diagnosticCall = diagnosticCalls[0]; + const diagnosticData = JSON.parse(diagnosticCall[1] as string); + + expect(diagnosticData).toMatchObject({ + agentId: expect.any(String), + alias: 'gastown', + intendedCwd: expect.stringContaining('/agent-workdirs/gastown/workspace'), + worktreeId: 'gastown', + provider: 'claude', + command: expect.any(String), + args: expect.any(Array), + env: expect.any(Object), + cwdExistsAtSpawn: true, + initiativeId: null, + customCwdProvided: false, + accountId: null, + timestamp: expect.any(String), + }); + }); + it('uses custom cwd if provided', async () => { const mockChild = createMockChildProcess(); mockSpawn.mockReturnValue(mockChild); diff --git a/src/agent/manager.ts b/src/agent/manager.ts index d976413..f1ef6aa 100644 --- a/src/agent/manager.ts +++ b/src/agent/manager.ts @@ -29,11 +29,13 @@ import type { AgentStoppedEvent, AgentResumedEvent, AgentDeletedEvent, + ProcessCrashedEvent, } from '../events/index.js'; import { writeInputFiles } from './file-io.js'; import { getProvider } from './providers/registry.js'; import { createModuleLogger } from '../logger/index.js'; import { join } from 'node:path'; +import { unlink } from 'node:fs/promises'; import type { AccountCredentialManager } from './credentials/types.js'; import { ProcessManager } from './process-manager.js'; import { CredentialHandler } from './credential-handler.js'; @@ -67,6 +69,13 @@ export class MultiProviderAgentManager implements AgentManager { this.credentialHandler = new CredentialHandler(workspaceRoot, accountRepository, credentialManager); this.outputHandler = new OutputHandler(repository, eventBus, proposalRepository); this.cleanupManager = new CleanupManager(workspaceRoot, repository, projectRepository, eventBus, debug); + + // Listen for process crashed events to handle agents specially + if (eventBus) { + eventBus.on('process:crashed', async (event: ProcessCrashedEvent) => { + await this.handleProcessCrashed(event.payload.processId, event.payload.exitCode, event.payload.signal); + }); + } } /** @@ -476,6 +485,16 @@ export class MultiProviderAgentManager implements AgentManager { const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId); const prompt = this.outputHandler.formatAnswersAsPrompt(answers); + + // Clear previous signal.json to ensure clean completion detection + const signalPath = join(agentCwd, '.cw/output/signal.json'); + try { + await unlink(signalPath); + log.debug({ agentId, signalPath }, 'cleared previous signal.json for resume'); + } catch { + // File might not exist, which is fine + } + await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null }); const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, prompt); @@ -650,6 +669,118 @@ export class MultiProviderAgentManager implements AgentManager { ); } + /** + * Handle process crashed event specifically for agents. + * Check if the agent actually completed successfully despite the non-zero exit code. + */ + private async handleProcessCrashed(processId: string, exitCode: number | null, signal: string | null): Promise { + try { + // Check if this is an agent process + const agent = await this.repository.findById(processId); + if (!agent) { + return; // Not our agent + } + + // Store exit code and signal for debugging + await this.repository.update(processId, { exitCode }); + + log.info({ + agentId: processId, + name: agent.name, + exitCode, + signal, + outputFilePath: agent.outputFilePath + }, 'agent process crashed, analyzing completion status'); + + // Check if the agent has output that indicates successful completion + if (agent.outputFilePath) { + const hasCompletion = await this.checkAgentCompletionResult(agent.outputFilePath); + if (hasCompletion) { + log.info({ + agentId: processId, + name: agent.name, + exitCode, + signal + }, 'agent marked as crashed but completed successfully - completion already handled by polling'); + + // Note: We don't call handleCompletion() here because the polling handler + // (handleDetachedAgentCompletion) already processes completions. The mutex + // in OutputHandler.handleCompletion() prevents duplicate processing. + + log.info({ + agentId: processId, + name: agent.name, + exitCode + }, 'completion detection confirmed - deferring to polling handler'); + } else { + log.warn({ + agentId: processId, + name: agent.name, + exitCode, + signal, + outputFilePath: agent.outputFilePath + }, 'agent crashed and no successful completion detected - marking as truly crashed'); + + // Only mark as crashed if agent truly crashed (no completion detected) + await this.repository.update(processId, { status: 'crashed' }); + } + } else { + log.warn({ + agentId: processId, + name: agent.name, + exitCode, + signal + }, 'agent crashed with no output file path - marking as crashed'); + + await this.repository.update(processId, { status: 'crashed' }); + } + } catch (err) { + log.error({ + processId, + exitCode, + signal, + err: err instanceof Error ? err.message : String(err) + }, 'failed to check agent completion after crash'); + } + } + + /** + * Check if agent completed successfully by reading signal.json file. + */ + private async checkAgentCompletionResult(outputFilePath: string): Promise { + try { + const { readFile } = await import('node:fs/promises'); + const { existsSync } = await import('node:fs'); + const { dirname } = await import('node:path'); + + const agentDir = dirname(outputFilePath); + const signalPath = join(agentDir, '.cw/output/signal.json'); + + if (!existsSync(signalPath)) { + log.debug({ outputFilePath, signalPath }, 'no signal.json found - agent not completed'); + return false; + } + + const signalContent = await readFile(signalPath, 'utf-8'); + const signal = JSON.parse(signalContent); + + // Agent completed if status is done, questions, or error + const completed = signal.status === 'done' || signal.status === 'questions' || signal.status === 'error'; + + if (completed) { + log.debug({ outputFilePath, signal }, 'agent completion detected via signal.json'); + } else { + log.debug({ outputFilePath, signal }, 'signal.json found but status indicates incomplete'); + } + + return completed; + + } catch (err) { + log.warn({ outputFilePath, err: err instanceof Error ? err.message : String(err) }, 'failed to read or parse signal.json'); + return false; + } + } + /** * Convert database agent record to AgentInfo. */ diff --git a/src/agent/markdown-to-tiptap.ts b/src/agent/markdown-to-tiptap.ts new file mode 100644 index 0000000..7f18568 --- /dev/null +++ b/src/agent/markdown-to-tiptap.ts @@ -0,0 +1,32 @@ +/** + * Server-side Markdown → Tiptap JSON converter. + * + * Uses @tiptap/markdown's MarkdownManager.parse() — the same approach + * as content-serializer.ts but in reverse direction. + * No DOM needed, no new dependencies. + */ + +import StarterKit from '@tiptap/starter-kit'; +import Link from '@tiptap/extension-link'; +import { MarkdownManager } from '@tiptap/markdown'; + +let _manager: MarkdownManager | null = null; + +function getManager(): MarkdownManager { + if (!_manager) { + _manager = new MarkdownManager({ + extensions: [StarterKit, Link], + }); + } + return _manager; +} + +/** + * Convert a markdown string to Tiptap JSON document. + */ +export function markdownToTiptapJson(markdown: string): object { + if (!markdown.trim()) { + return { type: 'doc', content: [{ type: 'paragraph' }] }; + } + return getManager().parse(markdown).toJSON(); +} diff --git a/src/agent/mutex-completion.test.ts b/src/agent/mutex-completion.test.ts new file mode 100644 index 0000000..58e6930 --- /dev/null +++ b/src/agent/mutex-completion.test.ts @@ -0,0 +1,152 @@ +/** + * Focused test for completion handler mutex functionality. + * Tests the race condition fix without complex mocking. + */ + +import { describe, it, beforeEach, expect } from 'vitest'; +import { OutputHandler } from './output-handler.js'; +import type { AgentRepository } from '../db/repositories/agent-repository.js'; + +describe('OutputHandler completion mutex', () => { + let outputHandler: OutputHandler; + let completionCallCount: number; + let callOrder: string[]; + + // Simple mock that tracks completion attempts + const mockRepository: AgentRepository = { + async findById() { + return null; // Return null to cause early exit after mutex check + }, + async update() {}, + async create() { throw new Error('Not implemented'); }, + async findAll() { throw new Error('Not implemented'); }, + async findByStatus() { throw new Error('Not implemented'); }, + async findByTaskId() { throw new Error('Not implemented'); }, + async findByInitiativeId() { throw new Error('Not implemented'); }, + async deleteById() { throw new Error('Not implemented'); }, + async findPending() { throw new Error('Not implemented'); } + }; + + beforeEach(() => { + outputHandler = new OutputHandler(mockRepository); + completionCallCount = 0; + callOrder = []; + }); + + it('should prevent concurrent completion handling with mutex', async () => { + const agentId = 'test-agent'; + + // Mock the findById method to track calls and simulate processing time + let firstCallCompleted = false; + (mockRepository as any).findById = async (id: string) => { + completionCallCount++; + const callIndex = completionCallCount; + callOrder.push(`call-${callIndex}-start`); + + if (callIndex === 1) { + // First call - simulate some processing time + await new Promise(resolve => setTimeout(resolve, 50)); + firstCallCompleted = true; + } + + callOrder.push(`call-${callIndex}-end`); + return null; // Return null to exit early + }; + + // Start two concurrent completion handlers + const getAgentWorkdir = () => '/test/workdir'; + const completion1Promise = outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir); + const completion2Promise = outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir); + + await Promise.all([completion1Promise, completion2Promise]); + + // Verify only one completion handler executed + expect(completionCallCount, 'Should only execute one completion handler').toBe(1); + expect(firstCallCompleted, 'First handler should have completed').toBe(true); + expect(callOrder).toEqual(['call-1-start', 'call-1-end']); + }); + + it('should allow sequential completion handling after first completes', async () => { + const agentId = 'test-agent'; + + // Mock findById to track calls + (mockRepository as any).findById = async (id: string) => { + completionCallCount++; + callOrder.push(`call-${completionCallCount}`); + return null; // Return null to exit early + }; + + const getAgentWorkdir = () => '/test/workdir'; + + // First completion + await outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir); + + // Second completion (after first is done) + await outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir); + + // Both should execute sequentially + expect(completionCallCount, 'Should execute both handlers sequentially').toBe(2); + expect(callOrder).toEqual(['call-1', 'call-2']); + }); + + it('should clean up mutex lock even when exception is thrown', async () => { + const agentId = 'test-agent'; + + let firstCallMadeThrowCall = false; + let secondCallCompleted = false; + + // First call throws an error + (mockRepository as any).findById = async (id: string) => { + if (!firstCallMadeThrowCall) { + firstCallMadeThrowCall = true; + throw new Error('Database error'); + } else { + secondCallCompleted = true; + return null; + } + }; + + const getAgentWorkdir = () => '/test/workdir'; + + // First call should throw but clean up mutex + await expect(outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir)) + .rejects.toThrow('Database error'); + + expect(firstCallMadeThrowCall, 'First call should have thrown').toBe(true); + + // Second call should succeed (proving mutex was cleaned up) + await outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir); + expect(secondCallCompleted, 'Second call should have completed').toBe(true); + }); + + it('should use agent ID as mutex key', async () => { + const agentId1 = 'agent-1'; + const agentId2 = 'agent-2'; + + // Both agents can process concurrently since they have different IDs + let agent1Started = false; + let agent2Started = false; + + (mockRepository as any).findById = async (id: string) => { + if (id === agentId1) { + agent1Started = true; + await new Promise(resolve => setTimeout(resolve, 30)); + } else if (id === agentId2) { + agent2Started = true; + await new Promise(resolve => setTimeout(resolve, 30)); + } + return null; + }; + + const getAgentWorkdir = () => '/test/workdir'; + + // Start both agents concurrently - they should NOT block each other + const agent1Promise = outputHandler.handleCompletion(agentId1, undefined, getAgentWorkdir); + const agent2Promise = outputHandler.handleCompletion(agentId2, undefined, getAgentWorkdir); + + await Promise.all([agent1Promise, agent2Promise]); + + expect(agent1Started, 'Agent 1 should have started').toBe(true); + expect(agent2Started, 'Agent 2 should have started').toBe(true); + }); +}); \ No newline at end of file diff --git a/src/agent/output-handler.test.ts b/src/agent/output-handler.test.ts new file mode 100644 index 0000000..20f01c1 --- /dev/null +++ b/src/agent/output-handler.test.ts @@ -0,0 +1,280 @@ +/** + * OutputHandler Tests + * + * Test suite for the OutputHandler class, specifically focusing on + * question parsing and agent completion handling. + */ + +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { OutputHandler } from './output-handler.js'; +import type { AgentRepository } from '../db/repositories/agent-repository.js'; +import type { ProposalRepository } from '../db/repositories/proposal-repository.js'; +import type { EventBus, DomainEvent, AgentWaitingEvent } from '../events/types.js'; +import { getProvider } from './providers/registry.js'; + +// ============================================================================= +// Test Helpers +// ============================================================================= + +function createMockEventBus(): EventBus & { emittedEvents: DomainEvent[] } { + const emittedEvents: DomainEvent[] = []; + + const mockBus = { + emittedEvents, + emit: vi.fn().mockImplementation((event: T): void => { + emittedEvents.push(event); + }), + on: vi.fn(), + off: vi.fn(), + once: vi.fn(), + }; + + return mockBus; +} + +function createMockAgentRepository() { + return { + findById: vi.fn(), + update: vi.fn(), + create: vi.fn(), + findByName: vi.fn(), + findByStatus: vi.fn(), + findAll: vi.fn(), + delete: vi.fn(), + }; +} + +function createMockProposalRepository() { + return { + createMany: vi.fn(), + findByAgentId: vi.fn(), + findByInitiativeId: vi.fn(), + findById: vi.fn(), + update: vi.fn(), + delete: vi.fn(), + create: vi.fn(), + findAll: vi.fn(), + }; +} + +// ============================================================================= +// Tests +// ============================================================================= + +describe('OutputHandler', () => { + let outputHandler: OutputHandler; + let mockAgentRepo: ReturnType; + let mockProposalRepo: ReturnType; + let eventBus: ReturnType; + + const mockAgent = { + id: 'agent-123', + name: 'test-agent', + taskId: 'task-456', + sessionId: 'session-789', + provider: 'claude', + mode: 'refine', + }; + + beforeEach(() => { + mockAgentRepo = createMockAgentRepository(); + mockProposalRepo = createMockProposalRepository(); + eventBus = createMockEventBus(); + + outputHandler = new OutputHandler( + mockAgentRepo as any, + eventBus, + mockProposalRepo as any + ); + + // Setup default mock behavior + mockAgentRepo.findById.mockResolvedValue(mockAgent); + }); + + describe('processAgentOutput', () => { + it('should correctly parse and handle questions from Claude CLI output', async () => { + // Arrange: Create realistic Claude CLI output with questions (like fantastic-crane) + const questionsResult = { + status: "questions", + questions: [ + { + id: "q1", + question: "What specific components are in the current admin UI? (e.g., tables, forms, modals, navigation)" + }, + { + id: "q2", + question: "What does 'modern look' mean for you? (e.g., dark mode support, specific color scheme, animations)" + }, + { + id: "q3", + question: "Are there any specific shadcn components you want to use or prioritize?" + } + ] + }; + + const claudeOutput = JSON.stringify({ + type: "result", + subtype: "success", + is_error: false, + session_id: "test-session-123", + result: JSON.stringify(questionsResult), + total_cost_usd: 0.05 + }); + + const getAgentWorkdir = vi.fn().mockReturnValue('/test/workdir'); + const provider = getProvider('claude')!; + + // Act + await outputHandler.processAgentOutput( + mockAgent.id, + claudeOutput, + provider, + getAgentWorkdir + ); + + // Assert: Agent should be updated with questions and waiting_for_input status + expect(mockAgentRepo.update).toHaveBeenCalledWith(mockAgent.id, { + pendingQuestions: JSON.stringify({ + questions: [ + { + id: 'q1', + question: 'What specific components are in the current admin UI? (e.g., tables, forms, modals, navigation)' + }, + { + id: 'q2', + question: 'What does \'modern look\' mean for you? (e.g., dark mode support, specific color scheme, animations)' + }, + { + id: 'q3', + question: 'Are there any specific shadcn components you want to use or prioritize?' + } + ] + }), + status: 'waiting_for_input' + }); + + // Should be called at least once (could be once or twice depending on session ID extraction) + expect(mockAgentRepo.update).toHaveBeenCalledTimes(1); + + // Assert: AgentWaitingEvent should be emitted + const waitingEvents = eventBus.emittedEvents.filter(e => e.type === 'agent:waiting') as AgentWaitingEvent[]; + expect(waitingEvents).toHaveLength(1); + expect(waitingEvents[0].payload.questions).toEqual([ + { + id: 'q1', + question: 'What specific components are in the current admin UI? (e.g., tables, forms, modals, navigation)' + }, + { + id: 'q2', + question: 'What does \'modern look\' mean for you? (e.g., dark mode support, specific color scheme, animations)' + }, + { + id: 'q3', + question: 'Are there any specific shadcn components you want to use or prioritize?' + } + ]); + }); + + it('should handle malformed questions gracefully', async () => { + // Arrange: Create output with malformed questions JSON + const malformedOutput = JSON.stringify({ + type: "result", + subtype: "success", + is_error: false, + session_id: "test-session", + result: '{"status": "questions", "questions": [malformed json]}', + total_cost_usd: 0.05 + }); + + const getAgentWorkdir = vi.fn().mockReturnValue('/test/workdir'); + const provider = getProvider('claude')!; + + // Act & Assert: Should not throw, should handle error gracefully + await expect( + outputHandler.processAgentOutput( + mockAgent.id, + malformedOutput, + provider, + getAgentWorkdir + ) + ).resolves.not.toThrow(); + + // Should update status to crashed due to malformed JSON + const updateCalls = mockAgentRepo.update.mock.calls; + const crashedCall = updateCalls.find(call => call[1]?.status === 'crashed'); + expect(crashedCall).toBeDefined(); + }); + + it('should correctly handle "done" status without questions', async () => { + // Arrange: Create output with done status + const doneOutput = JSON.stringify({ + type: "result", + subtype: "success", + is_error: false, + session_id: "test-session", + result: JSON.stringify({ + status: "done", + message: "Task completed successfully" + }), + total_cost_usd: 0.05 + }); + + const getAgentWorkdir = vi.fn().mockReturnValue('/test/workdir'); + const provider = getProvider('claude')!; + + // Act + await outputHandler.processAgentOutput( + mockAgent.id, + doneOutput, + provider, + getAgentWorkdir + ); + + // Assert: Should not set waiting_for_input status or pendingQuestions + const updateCalls = mockAgentRepo.update.mock.calls; + const waitingCall = updateCalls.find(call => call[1]?.status === 'waiting_for_input'); + expect(waitingCall).toBeUndefined(); + + const questionsCall = updateCalls.find(call => call[1]?.pendingQuestions); + expect(questionsCall).toBeUndefined(); + }); + }); + + describe('getPendingQuestions', () => { + it('should retrieve and parse stored pending questions', async () => { + // Arrange + const questionsPayload = { + questions: [ + { id: 'q1', question: 'Test question 1?' }, + { id: 'q2', question: 'Test question 2?' } + ] + }; + + mockAgentRepo.findById.mockResolvedValue({ + ...mockAgent, + pendingQuestions: JSON.stringify(questionsPayload) + }); + + // Act + const result = await outputHandler.getPendingQuestions(mockAgent.id); + + // Assert + expect(result).toEqual(questionsPayload); + expect(mockAgentRepo.findById).toHaveBeenCalledWith(mockAgent.id); + }); + + it('should return null when no pending questions exist', async () => { + // Arrange + mockAgentRepo.findById.mockResolvedValue({ + ...mockAgent, + pendingQuestions: null + }); + + // Act + const result = await outputHandler.getPendingQuestions(mockAgent.id); + + // Assert + expect(result).toBeNull(); + }); + }); +}); \ No newline at end of file diff --git a/src/agent/output-handler.ts b/src/agent/output-handler.ts index 7ca616a..ca2480f 100644 --- a/src/agent/output-handler.ts +++ b/src/agent/output-handler.ts @@ -75,6 +75,7 @@ interface ClaudeCliResult { export class OutputHandler { private filePositions = new Map(); + private completionLocks = new Set(); // Track agents currently being processed constructor( private repository: AgentRepository, @@ -199,95 +200,123 @@ export class OutputHandler { /** * Handle completion of a detached agent. * Processes the final result from the stream data captured by the tailer. + * + * RACE CONDITION FIX: Uses a completion lock to prevent duplicate processing. + * Both the polling handler (handleDetachedAgentCompletion) and crash handler + * (handleProcessCrashed) can call this method when a process exits with non-zero code. + * The mutex ensures only one handler processes the completion per agent. */ async handleCompletion( agentId: string, active: ActiveAgent | undefined, getAgentWorkdir: (alias: string) => string, ): Promise { - const agent = await this.repository.findById(agentId); - if (!agent) return; - - const provider = getProvider(agent.provider); - if (!provider) return; - - log.debug({ agentId }, 'detached agent completed'); - - // Verify agent worked in correct location by checking for output files - const agentWorkdir = getAgentWorkdir(agent.worktreeId); - const outputDir = join(agentWorkdir, '.cw', 'output'); - const expectedPwdFile = join(agentWorkdir, '.cw', 'expected-pwd.txt'); - const diagnosticFile = join(agentWorkdir, '.cw', 'spawn-diagnostic.json'); - - const outputDirExists = existsSync(outputDir); - const expectedPwdExists = existsSync(expectedPwdFile); - const diagnosticExists = existsSync(diagnosticFile); - - log.info({ - agentId, - agentWorkdir, - outputDirExists, - expectedPwdExists, - diagnosticExists, - verification: outputDirExists ? 'PASS' : 'FAIL' - }, 'agent workdir verification completed'); - - if (!outputDirExists) { - log.warn({ - agentId, - agentWorkdir - }, 'No output files found in agent workdir! Agent may have run in wrong location.'); - } - - let signalText = active?.streamResultText; - - // If the stream result indicated an error (e.g. auth failure, usage limit), - // route directly to error handling instead of trying to parse as signal JSON - if (signalText && active?.streamIsError) { - log.warn({ agentId, error: signalText }, 'agent returned error result'); - await this.handleAgentError(agentId, new Error(signalText), provider, getAgentWorkdir); + // CRITICAL: Prevent race condition - only one completion handler per agent + if (this.completionLocks.has(agentId)) { + log.debug({ agentId }, 'completion already being processed - skipping duplicate'); return; } - if (!signalText) { - try { - const outputFilePath = active?.outputFilePath ?? ''; - if (outputFilePath) { - // Read only complete lines from the file, avoiding race conditions - const lastPosition = this.filePositions.get(agentId) || 0; - const { content: fileContent, lastPosition: newPosition } = await this.readCompleteLines(outputFilePath, lastPosition); + this.completionLocks.add(agentId); - if (fileContent.trim()) { - this.filePositions.set(agentId, newPosition); - await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir); - return; - } + try { + const agent = await this.repository.findById(agentId); + if (!agent) return; - // If no new complete lines, but file might still be writing, try again with validation - if (await this.validateSignalFile(outputFilePath)) { - const fullContent = await readFile(outputFilePath, 'utf-8'); - if (fullContent.trim() && fullContent.length > newPosition) { - // File is complete and has content beyond what we've read + const provider = getProvider(agent.provider); + if (!provider) return; + + log.debug({ agentId }, 'detached agent completed'); + + // Verify agent worked in correct location by checking for output files + const agentWorkdir = getAgentWorkdir(agent.worktreeId); + const outputDir = join(agentWorkdir, '.cw', 'output'); + const expectedPwdFile = join(agentWorkdir, '.cw', 'expected-pwd.txt'); + const diagnosticFile = join(agentWorkdir, '.cw', 'spawn-diagnostic.json'); + + const outputDirExists = existsSync(outputDir); + const expectedPwdExists = existsSync(expectedPwdFile); + const diagnosticExists = existsSync(diagnosticFile); + + log.info({ + agentId, + agentWorkdir, + outputDirExists, + expectedPwdExists, + diagnosticExists, + verification: outputDirExists ? 'PASS' : 'FAIL' + }, 'agent workdir verification completed'); + + if (!outputDirExists) { + log.warn({ + agentId, + agentWorkdir + }, 'No output files found in agent workdir! Agent may have run in wrong location.'); + } + + let signalText = active?.streamResultText; + + // If the stream result indicated an error (e.g. auth failure, usage limit), + // route directly to error handling instead of trying to parse as signal JSON + if (signalText && active?.streamIsError) { + log.warn({ agentId, error: signalText }, 'agent returned error result'); + await this.handleAgentError(agentId, new Error(signalText), provider, getAgentWorkdir); + return; + } + + if (!signalText) { + try { + const outputFilePath = active?.outputFilePath ?? ''; + if (outputFilePath) { + // First, check for robust signal.json completion before attempting incremental reading + const agentWorkdir = getAgentWorkdir(agentId); + if (await this.checkSignalCompletion(agentWorkdir)) { + const signalPath = join(agentWorkdir, '.cw/output/signal.json'); + const signalContent = await readFile(signalPath, 'utf-8'); + log.debug({ agentId, signalPath }, 'detected completion via signal.json'); this.filePositions.delete(agentId); // Clean up tracking - await this.processAgentOutput(agentId, fullContent, provider, getAgentWorkdir); + await this.processSignalAndFiles(agentId, signalContent, agent.mode as AgentMode, getAgentWorkdir, active?.streamSessionId); return; } + + // Read only complete lines from the file, avoiding race conditions + const lastPosition = this.filePositions.get(agentId) || 0; + const { content: fileContent, lastPosition: newPosition } = await this.readCompleteLines(outputFilePath, lastPosition); + + if (fileContent.trim()) { + this.filePositions.set(agentId, newPosition); + await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir); + return; + } + + // If no new complete lines, but file might still be writing, try again with validation + if (await this.validateSignalFile(outputFilePath)) { + const fullContent = await readFile(outputFilePath, 'utf-8'); + if (fullContent.trim() && fullContent.length > newPosition) { + // File is complete and has content beyond what we've read + this.filePositions.delete(agentId); // Clean up tracking + await this.processAgentOutput(agentId, fullContent, provider, getAgentWorkdir); + return; + } + } } - } - } catch { /* file empty or missing */ } + } catch { /* file empty or missing */ } - log.warn({ agentId }, 'no result text from stream or file'); - await this.handleAgentError(agentId, new Error('No output received'), provider, getAgentWorkdir); - return; + log.warn({ agentId }, 'no result text from stream or file'); + await this.handleAgentError(agentId, new Error('No output received'), provider, getAgentWorkdir); + return; + } + + await this.processSignalAndFiles( + agentId, + signalText, + agent.mode as AgentMode, + getAgentWorkdir, + active?.streamSessionId, + ); + } finally { + this.completionLocks.delete(agentId); // Always clean up } - - await this.processSignalAndFiles( - agentId, - signalText, - agent.mode as AgentMode, - getAgentWorkdir, - active?.streamSessionId, - ); } /** @@ -724,6 +753,33 @@ export class OutputHandler { // Private Helpers // ========================================================================= + /** + * Check if agent completed successfully by reading signal.json file. + * This is the robust completion detection logic that handles all completion statuses. + */ + private async checkSignalCompletion(agentWorkdir: string): Promise { + try { + const { existsSync } = await import('node:fs'); + const signalPath = join(agentWorkdir, '.cw/output/signal.json'); + + if (!existsSync(signalPath)) { + return false; + } + + const signalContent = await readFile(signalPath, 'utf-8'); + const signal = JSON.parse(signalContent); + + // Agent completed if status is done, questions, or error + const completed = signal.status === 'done' || signal.status === 'questions' || signal.status === 'error'; + + return completed; + + } catch (err) { + log.warn({ agentWorkdir, err: err instanceof Error ? err.message : String(err) }, 'failed to read or parse signal.json'); + return false; + } + } + private emitCrashed(agent: { id: string; name: string; taskId: string | null }, error: string): void { if (this.eventBus) { const event: AgentCrashedEvent = { diff --git a/src/agent/process-manager.test.ts b/src/agent/process-manager.test.ts new file mode 100644 index 0000000..15c9a9f --- /dev/null +++ b/src/agent/process-manager.test.ts @@ -0,0 +1,423 @@ +/** + * ProcessManager Unit Tests + * + * Tests for ProcessManager class focusing on working directory handling, + * command building, and spawn validation. + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { ProcessManager } from './process-manager.js'; +import type { ProjectRepository } from '../db/repositories/project-repository.js'; +import type { EventBus } from '../events/index.js'; + +// Mock child_process.spawn +vi.mock('node:child_process', () => ({ + spawn: vi.fn(), +})); + +// Mock fs operations +vi.mock('node:fs', () => ({ + writeFileSync: vi.fn(), + mkdirSync: vi.fn(), + openSync: vi.fn((path) => { + // Return different fd numbers for stdout and stderr + if (path.includes('output.jsonl')) return 99; + if (path.includes('stderr.log')) return 100; + return 101; + }), + closeSync: vi.fn(), + existsSync: vi.fn(), +})); + +// Mock FileTailer +vi.mock('./file-tailer.js', () => ({ + FileTailer: class MockFileTailer { + start = vi.fn().mockResolvedValue(undefined); + stop = vi.fn().mockResolvedValue(undefined); + }, +})); + +// Mock SimpleGitWorktreeManager +const mockCreate = vi.fn(); +vi.mock('../git/manager.js', () => ({ + SimpleGitWorktreeManager: class MockWorktreeManager { + create = mockCreate; + }, +})); + +// Mock project clones +vi.mock('../git/project-clones.js', () => ({ + ensureProjectClone: vi.fn().mockResolvedValue('/mock/clone/path'), + getProjectCloneDir: vi.fn().mockReturnValue('/mock/clone/path'), +})); + +// Mock providers +vi.mock('./providers/parsers/index.js', () => ({ + getStreamParser: vi.fn().mockReturnValue({ parse: vi.fn() }), +})); + +import { spawn } from 'node:child_process'; +import { existsSync, writeFileSync, mkdirSync, openSync, closeSync } from 'node:fs'; +import { ensureProjectClone } from '../git/project-clones.js'; + +const mockSpawn = vi.mocked(spawn); +const mockExistsSync = vi.mocked(existsSync); +const mockWriteFileSync = vi.mocked(writeFileSync); +const mockMkdirSync = vi.mocked(mkdirSync); +const mockOpenSync = vi.mocked(openSync); +const mockCloseSync = vi.mocked(closeSync); + +describe('ProcessManager', () => { + let processManager: ProcessManager; + let mockProjectRepository: ProjectRepository; + let mockEventBus: EventBus; + + const workspaceRoot = '/test/workspace'; + + beforeEach(() => { + vi.clearAllMocks(); + + // Mock child process + const mockChild = { + pid: 12345, + unref: vi.fn(), + on: vi.fn(), + kill: vi.fn(), + }; + mockSpawn.mockReturnValue(mockChild as any); + + // Mock project repository + mockProjectRepository = { + findProjectsByInitiativeId: vi.fn().mockResolvedValue([]), + create: vi.fn(), + findAll: vi.fn(), + findById: vi.fn(), + findByName: vi.fn(), + update: vi.fn(), + delete: vi.fn(), + setInitiativeProjects: vi.fn(), + addProjectToInitiative: vi.fn(), + removeProjectFromInitiative: vi.fn(), + }; + + // Mock event bus + mockEventBus = { + emit: vi.fn(), + on: vi.fn(), + off: vi.fn(), + once: vi.fn(), + }; + + processManager = new ProcessManager(workspaceRoot, mockProjectRepository, mockEventBus); + }); + + afterEach(() => { + vi.resetAllMocks(); + }); + + describe('getAgentWorkdir', () => { + it('returns correct agent workdir path', () => { + const alias = 'test-agent'; + const expected = '/test/workspace/agent-workdirs/test-agent'; + + const result = processManager.getAgentWorkdir(alias); + + expect(result).toBe(expected); + }); + }); + + describe('createProjectWorktrees', () => { + beforeEach(() => { + // Mock the global worktree create function + mockCreate.mockResolvedValue({ + id: 'project1', + path: '/test/workspace/agent-workdirs/test-agent/project1', + branch: 'agent/test-agent', + isMainWorktree: false, + }); + + // Mock project repository + vi.mocked(mockProjectRepository.findProjectsByInitiativeId).mockResolvedValue([ + { id: '1', name: 'project1', url: 'https://github.com/user/project1.git', createdAt: new Date(), updatedAt: new Date() } + ]); + + // Mock existsSync to return true for worktree paths + mockExistsSync.mockImplementation((path) => { + return path.toString().includes('/agent-workdirs/'); + }); + }); + + it('creates worktrees for initiative projects', async () => { + const alias = 'test-agent'; + const initiativeId = 'init-123'; + + const result = await processManager.createProjectWorktrees(alias, initiativeId); + + expect(result).toBe('/test/workspace/agent-workdirs/test-agent'); + expect(mockProjectRepository.findProjectsByInitiativeId).toHaveBeenCalledWith('init-123'); + expect(ensureProjectClone).toHaveBeenCalled(); + }); + + it('throws error when worktree creation fails', async () => { + // Mock worktree path to not exist after creation + mockExistsSync.mockReturnValue(false); + + const alias = 'test-agent'; + const initiativeId = 'init-123'; + + await expect(processManager.createProjectWorktrees(alias, initiativeId)) + .rejects.toThrow('Worktree creation failed:'); + }); + + it('logs comprehensive worktree creation details', async () => { + const alias = 'test-agent'; + const initiativeId = 'init-123'; + + await processManager.createProjectWorktrees(alias, initiativeId); + + // Verify logging (implementation would need to capture log calls) + // For now, just verify the method completes successfully + expect(mockProjectRepository.findProjectsByInitiativeId).toHaveBeenCalledWith('init-123'); + }); + }); + + describe('createStandaloneWorktree', () => { + beforeEach(() => { + mockCreate.mockResolvedValue({ + id: 'workspace', + path: '/test/workspace/agent-workdirs/test-agent/workspace', + branch: 'agent/test-agent', + isMainWorktree: false, + }); + + mockExistsSync.mockImplementation((path) => { + return path.toString().includes('/workspace'); + }); + }); + + it('creates standalone worktree', async () => { + const alias = 'test-agent'; + + const result = await processManager.createStandaloneWorktree(alias); + + expect(result).toBe('/test/workspace/agent-workdirs/test-agent/workspace'); + }); + + it('throws error when standalone worktree creation fails', async () => { + mockExistsSync.mockReturnValue(false); + + const alias = 'test-agent'; + + await expect(processManager.createStandaloneWorktree(alias)) + .rejects.toThrow('Standalone worktree creation failed:'); + }); + }); + + describe('spawnDetached', () => { + beforeEach(() => { + mockExistsSync.mockReturnValue(true); // CWD exists + }); + + it('validates cwd exists before spawn', () => { + const agentId = 'agent-123'; + const command = 'claude'; + const args = ['--help']; + const cwd = '/test/workspace/agent-workdirs/test-agent'; + const env = { TEST_VAR: 'value' }; + const providerName = 'claude'; + + processManager.spawnDetached(agentId, command, args, cwd, env, providerName); + + expect(mockExistsSync).toHaveBeenCalledWith(cwd); + expect(mockSpawn).toHaveBeenCalledWith(command, args, { + cwd, + env: expect.objectContaining(env), + detached: true, + stdio: ['ignore', 99, 100], + }); + }); + + it('throws error when cwd does not exist', () => { + mockExistsSync.mockReturnValue(false); + + const agentId = 'agent-123'; + const command = 'claude'; + const args = ['--help']; + const cwd = '/nonexistent/path'; + const env = {}; + const providerName = 'claude'; + + expect(() => { + processManager.spawnDetached(agentId, command, args, cwd, env, providerName); + }).toThrow('Agent working directory does not exist: /nonexistent/path'); + }); + + it('passes correct cwd parameter to spawn', () => { + const agentId = 'agent-123'; + const command = 'claude'; + const args = ['--help']; + const cwd = '/test/workspace/agent-workdirs/test-agent'; + const env = { CLAUDE_CONFIG_DIR: '/config' }; + const providerName = 'claude'; + + processManager.spawnDetached(agentId, command, args, cwd, env, providerName); + + expect(mockSpawn).toHaveBeenCalledTimes(1); + const spawnCall = mockSpawn.mock.calls[0]; + expect(spawnCall[0]).toBe(command); + expect(spawnCall[1]).toEqual(args); + expect(spawnCall[2]).toEqual({ + cwd, + env: expect.objectContaining({ + ...process.env, + CLAUDE_CONFIG_DIR: '/config', + }), + detached: true, + stdio: ['ignore', 99, 100], + }); + }); + + it('logs comprehensive spawn information', () => { + const agentId = 'agent-123'; + const command = 'claude'; + const args = ['--json-schema', 'schema.json']; + const cwd = '/test/workspace/agent-workdirs/test-agent'; + const env = { CLAUDE_CONFIG_DIR: '/config' }; + const providerName = 'claude'; + + const result = processManager.spawnDetached(agentId, command, args, cwd, env, providerName); + + expect(result).toHaveProperty('pid', 12345); + expect(result).toHaveProperty('outputFilePath'); + expect(result).toHaveProperty('tailer'); + + // Verify log directory creation + expect(mockMkdirSync).toHaveBeenCalledWith( + '/test/workspace/.cw/agent-logs/agent-123', + { recursive: true } + ); + }); + + it('writes prompt file when provided', () => { + const agentId = 'agent-123'; + const command = 'claude'; + const args = ['--help']; + const cwd = '/test/workspace/agent-workdirs/test-agent'; + const env = {}; + const providerName = 'claude'; + const prompt = 'Test prompt'; + + processManager.spawnDetached(agentId, command, args, cwd, env, providerName, prompt); + + expect(mockWriteFileSync).toHaveBeenCalledWith( + '/test/workspace/.cw/agent-logs/agent-123/PROMPT.md', + 'Test prompt', + 'utf-8' + ); + }); + }); + + describe('buildSpawnCommand', () => { + it('builds command with native prompt mode', () => { + const provider = { + name: 'claude', + command: 'claude', + args: ['--json-schema', 'schema.json'], + env: {}, + promptMode: 'native' as const, + processNames: ['claude'], + resumeStyle: 'flag' as const, + resumeFlag: '--resume', + nonInteractive: { + subcommand: 'chat', + promptFlag: '-p', + outputFlag: '--output-format json', + }, + }; + const prompt = 'Test prompt'; + + const result = processManager.buildSpawnCommand(provider, prompt); + + expect(result).toEqual({ + command: 'claude', + args: ['chat', '--json-schema', 'schema.json', '-p', 'Test prompt', '--output-format', 'json'], + env: {}, + }); + }); + + it('builds command with flag prompt mode', () => { + const provider = { + name: 'codex', + command: 'codex', + args: ['--format', 'json'], + env: {}, + promptMode: 'flag' as const, + processNames: ['codex'], + resumeStyle: 'subcommand' as const, + resumeFlag: 'resume', + nonInteractive: { + subcommand: 'run', + promptFlag: '--prompt', + outputFlag: '--json', + }, + }; + const prompt = 'Test prompt'; + + const result = processManager.buildSpawnCommand(provider, prompt); + + expect(result).toEqual({ + command: 'codex', + args: ['run', '--format', 'json', '--prompt', 'Test prompt', '--json'], + env: {}, + }); + }); + }); + + describe('buildResumeCommand', () => { + it('builds resume command with flag style', () => { + const provider = { + name: 'claude', + command: 'claude', + args: [], + env: {}, + promptMode: 'native' as const, + processNames: ['claude'], + resumeStyle: 'flag' as const, + resumeFlag: '--resume', + nonInteractive: { + subcommand: 'chat', + promptFlag: '-p', + outputFlag: '--json', + }, + }; + const sessionId = 'session-123'; + const prompt = 'Continue working'; + + const result = processManager.buildResumeCommand(provider, sessionId, prompt); + + expect(result).toEqual({ + command: 'claude', + args: ['--resume', 'session-123', '-p', 'Continue working', '--json'], + env: {}, + }); + }); + + it('throws error for providers without resume support', () => { + const provider = { + name: 'noresume', + command: 'noresume', + args: [], + env: {}, + promptMode: 'native' as const, + processNames: ['noresume'], + resumeStyle: 'none' as const, + }; + const sessionId = 'session-123'; + const prompt = 'Continue working'; + + expect(() => { + processManager.buildResumeCommand(provider, sessionId, prompt); + }).toThrow("Provider 'noresume' does not support resume"); + }); + }); +}); \ No newline at end of file diff --git a/src/agent/prompts.ts b/src/agent/prompts.ts index 9432b03..6cf06c2 100644 --- a/src/agent/prompts.ts +++ b/src/agent/prompts.ts @@ -9,22 +9,27 @@ const SIGNAL_FORMAT = ` ## Signal Output -When done, output ONLY this JSON (no other text before or after): +When done, write \`.cw/output/signal.json\` with: { "status": "done" } -If you need clarification, output: +If you need clarification, write: { "status": "questions", "questions": [{ "id": "q1", "question": "Your question" }] } -If you hit an unrecoverable error, output: -{ "status": "error", "error": "Description of what went wrong" }`; +If you hit an unrecoverable error, write: +{ "status": "error", "error": "Description of what went wrong" } + +IMPORTANT: Always write this file as your final action before terminating.`; const INPUT_FILES = ` ## Input Files -Read context from \`.cw/input/\`: +Read \`.cw/input/manifest.json\` first — it lists exactly which input files exist. +Then read only those files from \`.cw/input/\`. + +Possible files: - \`initiative.md\` — Initiative details (frontmatter: id, name, status) -- \`phase.md\` — Phase details if applicable (frontmatter: id, number, name, status; body: description) -- \`task.md\` — Task details if applicable (frontmatter: id, name, category, type, priority, status; body: description) +- \`phase.md\` — Phase details (frontmatter: id, number, name, status; body: description) +- \`task.md\` — Task details (frontmatter: id, name, category, type, priority, status; body: description) - \`pages/\` — Initiative pages (one file per page; frontmatter: title, parentPageId, sortOrder; body: markdown content)`; const SUMMARY_REQUIREMENT = ` diff --git a/src/agent/providers/parsers/claude.ts b/src/agent/providers/parsers/claude.ts index 0abf1b8..8bb2ade 100644 --- a/src/agent/providers/parsers/claude.ts +++ b/src/agent/providers/parsers/claude.ts @@ -76,8 +76,9 @@ export class ClaudeStreamParser implements StreamParser { return []; } - // Check for error first (can appear on any event type) - if ('is_error' in parsed && parsed.is_error && 'result' in parsed) { + // Check for error on non-result events (e.g. stream errors) + // Result events with is_error are handled in the 'result' case below + if ('is_error' in parsed && parsed.is_error && 'result' in parsed && parsed.type !== 'result') { return [{ type: 'error', message: String(parsed.result) }]; } @@ -148,6 +149,7 @@ export class ClaudeStreamParser implements StreamParser { text: resultEvent.result || '', sessionId: resultEvent.session_id, costUsd: resultEvent.total_cost_usd, + isError: resultEvent.is_error === true, }); break; } diff --git a/src/agent/providers/stream-types.ts b/src/agent/providers/stream-types.ts index 7704926..6e8241a 100644 --- a/src/agent/providers/stream-types.ts +++ b/src/agent/providers/stream-types.ts @@ -42,6 +42,8 @@ export interface StreamResultEvent { text: string; sessionId?: string; costUsd?: number; + /** True when the CLI returned an error result (e.g. auth failure, usage limit) */ + isError?: boolean; } /** Error event */ diff --git a/src/agent/types.ts b/src/agent/types.ts index 8ecae2f..3babdb7 100644 --- a/src/agent/types.ts +++ b/src/agent/types.ts @@ -77,6 +77,8 @@ export interface AgentInfo { createdAt: Date; /** Last activity timestamp */ updatedAt: Date; + /** When the user dismissed this agent (null if not dismissed) */ + userDismissedAt?: Date | null; } /** diff --git a/src/cli/index.ts b/src/cli/index.ts index 1de4808..f82b735 100644 --- a/src/cli/index.ts +++ b/src/cli/index.ts @@ -21,14 +21,14 @@ const CW_PORT_ENV = 'CW_PORT'; * Starts the coordination server in foreground mode. * Server runs until terminated via SIGTERM/SIGINT. */ -async function startServer(port?: number): Promise { +async function startServer(port?: number, debug?: boolean): Promise { // Get port from option, env var, or default const serverPort = port ?? (process.env[CW_PORT_ENV] ? parseInt(process.env[CW_PORT_ENV], 10) : undefined); const log = createModuleLogger('server'); // Create full dependency graph - const container = await createContainer(); + const container = await createContainer({ debug }); // Create and start server const server = new CoordinationServer( @@ -66,10 +66,11 @@ export function createCli(serverHandler?: (port?: number) => Promise): Com .description('Multi-agent workspace for orchestrating multiple Claude Code agents') .version(VERSION, '-v, --version', 'Display version number'); - // Server mode option (global flag) + // Server mode options (global flags) program .option('-s, --server', 'Start the coordination server') - .option('-p, --port ', 'Port for the server (default: 3847, env: CW_PORT)', parseInt); + .option('-p, --port ', 'Port for the server (default: 3847, env: CW_PORT)', parseInt) + .option('-d, --debug', 'Enable debug mode (archive agent workdirs before cleanup)'); // Handle the case where --server is provided without a command // This makes --server work as a standalone action @@ -1118,14 +1119,34 @@ export function createCli(serverHandler?: (port?: number) => Promise): Com const existing = await client.listAccounts.query(); const alreadyRegistered = existing.find((a: any) => a.email === extracted.email); if (alreadyRegistered) { - // Upsert: update credentials on existing account + // Compare refresh tokens to detect staleness + let credentialsChanged = true; + try { + const dbCreds = alreadyRegistered.credentials ? JSON.parse(alreadyRegistered.credentials) : null; + const sourceCreds = JSON.parse(extracted.credentials); + const dbRefreshToken = dbCreds?.claudeAiOauth?.refreshToken; + const sourceRefreshToken = sourceCreds?.claudeAiOauth?.refreshToken; + credentialsChanged = dbRefreshToken !== sourceRefreshToken; + } catch { + // Parse error — assume changed, update to be safe + } + + // Upsert: always update to be safe await client.updateAccountAuth.mutate({ id: alreadyRegistered.id, configJson: JSON.stringify(extracted.configJson), credentials: extracted.credentials, }); - console.log(`Updated credentials for account: ${alreadyRegistered.id}`); - console.log(` Email: ${extracted.email}`); + + if (credentialsChanged) { + console.log(`Updated credentials for account: ${alreadyRegistered.id}`); + console.log(` Email: ${extracted.email}`); + console.log(` Refresh token changed (source had fresher credentials)`); + } else { + console.log(`Credentials current for account: ${alreadyRegistered.id}`); + console.log(` Email: ${extracted.email}`); + console.log(` Refresh token unchanged`); + } return; } @@ -1217,7 +1238,9 @@ export async function runCli(): Promise { ? parseInt(process.argv[portIndex + 1], 10) : undefined; - await startServer(port); + const debug = process.argv.includes('--debug') || process.argv.includes('-d'); + + await startServer(port, debug); // Server runs indefinitely until signal return; } diff --git a/src/container.ts b/src/container.ts index 1ef1098..a479125 100644 --- a/src/container.ts +++ b/src/container.ts @@ -17,6 +17,7 @@ import { DrizzlePageRepository, DrizzleProjectRepository, DrizzleAccountRepository, + DrizzleProposalRepository, } from './db/index.js'; import type { InitiativeRepository } from './db/repositories/initiative-repository.js'; import type { PhaseRepository } from './db/repositories/phase-repository.js'; @@ -26,6 +27,7 @@ import type { AgentRepository } from './db/repositories/agent-repository.js'; import type { PageRepository } from './db/repositories/page-repository.js'; import type { ProjectRepository } from './db/repositories/project-repository.js'; import type { AccountRepository } from './db/repositories/account-repository.js'; +import type { ProposalRepository } from './db/repositories/proposal-repository.js'; import type { EventBus } from './events/index.js'; import { createEventBus } from './events/index.js'; import { ProcessManager, ProcessRegistry } from './process/index.js'; @@ -42,7 +44,7 @@ import type { ServerContextDeps } from './server/index.js'; // ============================================================================= /** - * All 8 repository ports. + * All 9 repository ports. */ export interface Repositories { initiativeRepository: InitiativeRepository; @@ -53,10 +55,11 @@ export interface Repositories { pageRepository: PageRepository; projectRepository: ProjectRepository; accountRepository: AccountRepository; + proposalRepository: ProposalRepository; } /** - * Create all 8 Drizzle repository adapters from a database instance. + * Create all 9 Drizzle repository adapters from a database instance. * Reusable by both the production server and the test harness. */ export function createRepositories(db: DrizzleDatabase): Repositories { @@ -69,6 +72,7 @@ export function createRepositories(db: DrizzleDatabase): Repositories { pageRepository: new DrizzlePageRepository(db), projectRepository: new DrizzleProjectRepository(db), accountRepository: new DrizzleAccountRepository(db), + proposalRepository: new DrizzleProposalRepository(db), }; } @@ -92,6 +96,13 @@ export interface Container extends Repositories { toContextDeps(): ServerContextDeps; } +/** + * Options for container creation. + */ +export interface ContainerOptions { + debug?: boolean; +} + /** * Create the full dependency container. * @@ -99,7 +110,7 @@ export interface Container extends Repositories { * Database → Repositories → CredentialManager → AgentManager. * Runs ensureSchema() and reconcileAfterRestart() before returning. */ -export async function createContainer(): Promise { +export async function createContainer(options?: ContainerOptions): Promise { const log = createModuleLogger('container'); // Infrastructure @@ -133,6 +144,8 @@ export async function createContainer(): Promise { repos.accountRepository, eventBus, credentialManager, + repos.proposalRepository, + options?.debug ?? false, ); log.info('agent manager created'); diff --git a/src/db/repositories/agent-repository.ts b/src/db/repositories/agent-repository.ts index e363f72..c54ca40 100644 --- a/src/db/repositories/agent-repository.ts +++ b/src/db/repositories/agent-repository.ts @@ -44,6 +44,7 @@ export interface UpdateAgentData { provider?: string; accountId?: string | null; pid?: number | null; + exitCode?: number | null; outputFilePath?: string | null; result?: string | null; pendingQuestions?: string | null; diff --git a/src/db/repositories/drizzle/index.ts b/src/db/repositories/drizzle/index.ts index 4c6f033..5966d24 100644 --- a/src/db/repositories/drizzle/index.ts +++ b/src/db/repositories/drizzle/index.ts @@ -13,3 +13,4 @@ export { DrizzleMessageRepository } from './message.js'; export { DrizzlePageRepository } from './page.js'; export { DrizzleProjectRepository } from './project.js'; export { DrizzleAccountRepository } from './account.js'; +export { DrizzleProposalRepository } from './proposal.js'; diff --git a/src/db/repositories/drizzle/proposal.ts b/src/db/repositories/drizzle/proposal.ts new file mode 100644 index 0000000..4ce4855 --- /dev/null +++ b/src/db/repositories/drizzle/proposal.ts @@ -0,0 +1,133 @@ +/** + * Drizzle Proposal Repository Adapter + * + * Implements ProposalRepository interface using Drizzle ORM. + */ + +import { eq, and, count, asc } from 'drizzle-orm'; +import { nanoid } from 'nanoid'; +import type { DrizzleDatabase } from '../../index.js'; +import { proposals, type Proposal } from '../../schema.js'; +import type { + ProposalRepository, + CreateProposalData, + UpdateProposalData, +} from '../proposal-repository.js'; + +export class DrizzleProposalRepository implements ProposalRepository { + constructor(private db: DrizzleDatabase) {} + + async create(data: CreateProposalData): Promise { + const id = nanoid(); + const now = new Date(); + + const [created] = await this.db.insert(proposals).values({ + id, + ...data, + createdAt: now, + updatedAt: now, + }).returning(); + + return created; + } + + async createMany(data: CreateProposalData[]): Promise { + if (data.length === 0) return []; + + const now = new Date(); + const rows = data.map((d) => ({ + id: nanoid(), + ...d, + createdAt: now, + updatedAt: now, + })); + + return this.db.insert(proposals).values(rows).returning(); + } + + async findById(id: string): Promise { + const result = await this.db + .select() + .from(proposals) + .where(eq(proposals.id, id)) + .limit(1); + + return result[0] ?? null; + } + + async findByAgentId(agentId: string): Promise { + return this.db + .select() + .from(proposals) + .where(eq(proposals.agentId, agentId)) + .orderBy(asc(proposals.sortOrder)); + } + + async findByInitiativeId(initiativeId: string): Promise { + return this.db + .select() + .from(proposals) + .where(eq(proposals.initiativeId, initiativeId)) + .orderBy(asc(proposals.sortOrder)); + } + + async findByAgentIdAndStatus(agentId: string, status: string): Promise { + return this.db + .select() + .from(proposals) + .where( + and( + eq(proposals.agentId, agentId), + eq(proposals.status, status as 'pending' | 'accepted' | 'dismissed'), + ), + ) + .orderBy(asc(proposals.sortOrder)); + } + + async update(id: string, data: UpdateProposalData): Promise { + const [updated] = await this.db + .update(proposals) + .set({ ...data, updatedAt: new Date() }) + .where(eq(proposals.id, id)) + .returning(); + + if (!updated) { + throw new Error(`Proposal not found: ${id}`); + } + + return updated; + } + + async updateManyByAgentId(agentId: string, data: UpdateProposalData): Promise { + await this.db + .update(proposals) + .set({ ...data, updatedAt: new Date() }) + .where(eq(proposals.agentId, agentId)); + } + + async updateManyByAgentIdAndStatus(agentId: string, currentStatus: string, data: UpdateProposalData): Promise { + await this.db + .update(proposals) + .set({ ...data, updatedAt: new Date() }) + .where( + and( + eq(proposals.agentId, agentId), + eq(proposals.status, currentStatus as 'pending' | 'accepted' | 'dismissed'), + ), + ); + } + + async countByAgentIdAndStatus(agentId: string, status: string): Promise { + const result = await this.db + .select({ count: count() }) + .from(proposals) + .where( + and( + eq(proposals.agentId, agentId), + eq(proposals.status, status as 'pending' | 'accepted' | 'dismissed'), + ), + ); + + return result[0]?.count ?? 0; + } +} diff --git a/src/db/repositories/index.ts b/src/db/repositories/index.ts index 00c4aa9..6b40ce7 100644 --- a/src/db/repositories/index.ts +++ b/src/db/repositories/index.ts @@ -56,3 +56,9 @@ export type { AccountRepository, CreateAccountData, } from './account-repository.js'; + +export type { + ProposalRepository, + CreateProposalData, + UpdateProposalData, +} from './proposal-repository.js'; diff --git a/src/db/repositories/proposal-repository.ts b/src/db/repositories/proposal-repository.ts new file mode 100644 index 0000000..0745e5c --- /dev/null +++ b/src/db/repositories/proposal-repository.ts @@ -0,0 +1,35 @@ +/** + * Proposal Repository Port Interface + * + * Port for Proposal aggregate operations. + * Implementations (Drizzle, etc.) are adapters. + */ + +import type { Proposal, NewProposal } from '../schema.js'; + +/** + * Data for creating a new proposal. + * Omits system-managed fields (id, createdAt, updatedAt). + */ +export type CreateProposalData = Omit; + +/** + * Data for updating a proposal. + */ +export type UpdateProposalData = Partial>; + +/** + * Proposal Repository Port + */ +export interface ProposalRepository { + create(data: CreateProposalData): Promise; + createMany(data: CreateProposalData[]): Promise; + findById(id: string): Promise; + findByAgentId(agentId: string): Promise; + findByInitiativeId(initiativeId: string): Promise; + findByAgentIdAndStatus(agentId: string, status: string): Promise; + update(id: string, data: UpdateProposalData): Promise; + updateManyByAgentId(agentId: string, data: UpdateProposalData): Promise; + updateManyByAgentIdAndStatus(agentId: string, currentStatus: string, data: UpdateProposalData): Promise; + countByAgentIdAndStatus(agentId: string, status: string): Promise; +} diff --git a/src/db/schema.ts b/src/db/schema.ts index c365d3f..a5b3ab0 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -35,6 +35,7 @@ export const initiativesRelations = relations(initiatives, ({ many }) => ({ pages: many(pages), initiativeProjects: many(initiativeProjects), tasks: many(tasks), + proposals: many(proposals), })); export type Initiative = InferSelectModel; @@ -264,6 +265,7 @@ export const agents = sqliteTable('agents', { .notNull() .default('execute'), pid: integer('pid'), + exitCode: integer('exit_code'), // Process exit code for debugging crashes outputFilePath: text('output_file_path'), result: text('result'), pendingQuestions: text('pending_questions'), @@ -272,7 +274,7 @@ export const agents = sqliteTable('agents', { userDismissedAt: integer('user_dismissed_at', { mode: 'timestamp' }), }); -export const agentsRelations = relations(agents, ({ one }) => ({ +export const agentsRelations = relations(agents, ({ one, many }) => ({ task: one(tasks, { fields: [agents.taskId], references: [tasks.id], @@ -285,11 +287,52 @@ export const agentsRelations = relations(agents, ({ one }) => ({ fields: [agents.accountId], references: [accounts.id], }), + proposals: many(proposals), })); export type Agent = InferSelectModel; export type NewAgent = InferInsertModel; +// ============================================================================ +// PROPOSALS +// ============================================================================ + +export const proposals = sqliteTable('proposals', { + id: text('id').primaryKey(), + agentId: text('agent_id') + .notNull() + .references(() => agents.id, { onDelete: 'cascade' }), + initiativeId: text('initiative_id') + .notNull() + .references(() => initiatives.id, { onDelete: 'cascade' }), + targetType: text('target_type', { enum: ['page', 'phase', 'task'] }).notNull(), + targetId: text('target_id'), // existing entity ID (e.g. pageId for updates), null for creates + title: text('title').notNull(), + summary: text('summary'), + content: text('content'), // markdown body (pages), description (phases/tasks) + metadata: text('metadata'), // JSON: type-specific data (phase number, task category, deps) + status: text('status', { enum: ['pending', 'accepted', 'dismissed'] }) + .notNull() + .default('pending'), + sortOrder: integer('sort_order').notNull().default(0), + createdAt: integer('created_at', { mode: 'timestamp' }).notNull(), + updatedAt: integer('updated_at', { mode: 'timestamp' }).notNull(), +}); + +export const proposalsRelations = relations(proposals, ({ one }) => ({ + agent: one(agents, { + fields: [proposals.agentId], + references: [agents.id], + }), + initiative: one(initiatives, { + fields: [proposals.initiativeId], + references: [initiatives.id], + }), +})); + +export type Proposal = InferSelectModel; +export type NewProposal = InferInsertModel; + // ============================================================================ // MESSAGES // ============================================================================ diff --git a/src/events/types.ts b/src/events/types.ts index 2c9b345..3d8725c 100644 --- a/src/events/types.ts +++ b/src/events/types.ts @@ -55,6 +55,7 @@ export interface ProcessCrashedEvent extends DomainEvent { payload: { processId: string; pid: number; + exitCode: number | null; signal: string | null; }; } diff --git a/src/process/manager.test.ts b/src/process/manager.test.ts index 4784306..ee22ae0 100644 --- a/src/process/manager.test.ts +++ b/src/process/manager.test.ts @@ -389,6 +389,7 @@ describe('ProcessManager', () => { payload: { processId: 'proc-1', pid: 12345, + exitCode: 1, signal: 'SIGTERM', }, }) diff --git a/src/process/manager.ts b/src/process/manager.ts index 71ffb04..e48901f 100644 --- a/src/process/manager.ts +++ b/src/process/manager.ts @@ -124,6 +124,7 @@ export class ProcessManager { payload: { processId: id, pid, + exitCode: code, signal, }, }; diff --git a/src/server/trpc-adapter.ts b/src/server/trpc-adapter.ts index 97085de..128a518 100644 --- a/src/server/trpc-adapter.ts +++ b/src/server/trpc-adapter.ts @@ -17,6 +17,7 @@ import type { PhaseRepository } from '../db/repositories/phase-repository.js'; import type { PageRepository } from '../db/repositories/page-repository.js'; import type { ProjectRepository } from '../db/repositories/project-repository.js'; import type { AccountRepository } from '../db/repositories/account-repository.js'; +import type { ProposalRepository } from '../db/repositories/proposal-repository.js'; import type { AccountCredentialManager } from '../agent/credentials/types.js'; import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js'; import type { CoordinationManager } from '../coordination/types.js'; @@ -53,6 +54,8 @@ export interface TrpcAdapterOptions { projectRepository?: ProjectRepository; /** Account repository for account CRUD and load balancing */ accountRepository?: AccountRepository; + /** Proposal repository for agent proposal CRUD operations */ + proposalRepository?: ProposalRepository; /** Credential manager for account OAuth token management */ credentialManager?: AccountCredentialManager; /** Absolute path to the workspace root (.cwrc directory) */ @@ -129,6 +132,7 @@ export function createTrpcHandler(options: TrpcAdapterOptions) { pageRepository: options.pageRepository, projectRepository: options.projectRepository, accountRepository: options.accountRepository, + proposalRepository: options.proposalRepository, credentialManager: options.credentialManager, workspaceRoot: options.workspaceRoot, }), diff --git a/src/test/integration/agent-workdir-verification.test.ts b/src/test/integration/agent-workdir-verification.test.ts new file mode 100644 index 0000000..f8d629c --- /dev/null +++ b/src/test/integration/agent-workdir-verification.test.ts @@ -0,0 +1,203 @@ +/** + * Agent Working Directory Verification Tests + * + * Tests that verify agents actually run in their intended working directories. + * These tests use simple shell commands to prove the agent execution location. + * + * IMPORTANT: These tests spawn real CLI processes and may incur API costs. + * They are SKIPPED by default to prevent accidental charges. + * + * To run these tests: + * ```bash + * REAL_WORKDIR_TESTS=1 npm test -- src/test/integration/agent-workdir-verification.test.ts --test-timeout=120000 + * ``` + */ + +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import { mkdtemp, rm, readFile } from 'node:fs/promises'; +import { existsSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { MultiProviderAgentManager } from '../../agent/manager.js'; +import { createTestDatabase } from '../../db/repositories/drizzle/test-helpers.js'; +import { + DrizzleAgentRepository, + DrizzleProjectRepository, + DrizzleAccountRepository, + DrizzleInitiativeRepository, +} from '../../db/repositories/drizzle/index.js'; +import { EventEmitterBus } from '../../events/bus.js'; + +const SHOULD_SKIP = !process.env.REAL_WORKDIR_TESTS; +const TEST_TIMEOUT = 60000; + +describe.skipIf(SHOULD_SKIP)('Agent Working Directory Verification', () => { + let tempDir: string; + let agentManager: MultiProviderAgentManager; + let agentRepository: DrizzleAgentRepository; + + beforeAll(async () => { + if (SHOULD_SKIP) return; + + console.log('\n=== Running Agent Working Directory Tests ==='); + console.log('These tests verify agents run in correct working directories.\n'); + + // Create temp directory for test workspace + tempDir = await mkdtemp(join(tmpdir(), 'cw-workdir-test-')); + + // Set up test database and repositories + const db = await createTestDatabase(); + const eventBus = new EventEmitterBus(); + + agentRepository = new DrizzleAgentRepository(db); + const projectRepository = new DrizzleProjectRepository(db); + const accountRepository = new DrizzleAccountRepository(db); + + agentManager = new MultiProviderAgentManager( + agentRepository, + tempDir, + projectRepository, + accountRepository, + eventBus, + ); + }); + + afterAll(async () => { + if (SHOULD_SKIP || !tempDir) return; + try { + await rm(tempDir, { recursive: true }); + } catch (err) { + console.warn('Failed to cleanup temp directory:', err); + } + }); + + it('spawns agent in correct standalone working directory', async () => { + const prompt = ` +Write your current working directory to a file called 'verify-pwd.txt'. +Use this exact bash command: + +pwd > verify-pwd.txt + +Then output the signal: {"done": true} +`.trim(); + + // Spawn standalone agent + const agent = await agentManager.spawn({ + taskId: null, + prompt, + mode: 'execute', + provider: 'claude', + }); + + expect(agent.id).toBeTruthy(); + expect(agent.status).toBe('running'); + + // Wait for completion (poll agent status) + let attempts = 0; + const maxAttempts = 60; // 60 seconds timeout + + while (attempts < maxAttempts) { + await new Promise(resolve => setTimeout(resolve, 1000)); + attempts++; + + const currentAgent = await agentRepository.findById(agent.id); + if (!currentAgent || currentAgent.status !== 'running') { + break; + } + } + + // Verify final agent state + const completedAgent = await agentRepository.findById(agent.id); + expect(completedAgent).toBeTruthy(); + expect(completedAgent!.status).not.toBe('running'); + + // Get the agent's expected working directory + const expectedWorkdir = join(tempDir, 'agent-workdirs', agent.name, 'workspace'); + + // Read diagnostic files + const diagnosticFile = join(expectedWorkdir, '.cw', 'spawn-diagnostic.json'); + const expectedPwdFile = join(expectedWorkdir, '.cw', 'expected-pwd.txt'); + const verifyPwdFile = join(expectedWorkdir, 'verify-pwd.txt'); + + // Verify diagnostic files exist + expect(existsSync(diagnosticFile), 'spawn diagnostic file should exist').toBe(true); + expect(existsSync(expectedPwdFile), 'expected pwd file should exist').toBe(true); + + // Read diagnostic data + const diagnostic = JSON.parse(await readFile(diagnosticFile, 'utf-8')); + const expectedPwd = (await readFile(expectedPwdFile, 'utf-8')).trim(); + + console.log('Diagnostic data:', diagnostic); + console.log('Expected working directory:', expectedPwd); + + // Verify diagnostic consistency + expect(diagnostic.intendedCwd).toBe(expectedWorkdir); + expect(diagnostic.cwdExistsAtSpawn).toBe(true); + expect(expectedPwd).toBe(expectedWorkdir); + + // The critical test: verify the agent actually wrote the file in the expected location + if (existsSync(verifyPwdFile)) { + const actualPwd = (await readFile(verifyPwdFile, 'utf-8')).trim(); + console.log('Agent reported working directory:', actualPwd); + + // This is the key verification: the pwd reported by the agent should match expected + expect(actualPwd).toBe(expectedWorkdir); + } else { + // If the file doesn't exist, the agent either failed or ran somewhere else + console.warn('Agent did not create verify-pwd.txt file'); + console.log('Expected at:', verifyPwdFile); + + // Let's check if it was created elsewhere (debugging) + const alternativeLocations = [ + join(tempDir, 'verify-pwd.txt'), + join(process.cwd(), 'verify-pwd.txt'), + ]; + + for (const loc of alternativeLocations) { + if (existsSync(loc)) { + const content = await readFile(loc, 'utf-8'); + console.log(`Found verify-pwd.txt at unexpected location ${loc}:`, content.trim()); + } + } + + throw new Error('Agent did not create pwd verification file in expected location'); + } + }, TEST_TIMEOUT); + + it('creates diagnostic files with correct metadata', async () => { + const prompt = `Output the signal: {"done": true}`; + + const agent = await agentManager.spawn({ + taskId: null, + prompt, + mode: 'execute', + provider: 'claude', + }); + + // Wait a bit for spawn to complete + await new Promise(resolve => setTimeout(resolve, 2000)); + + const expectedWorkdir = join(tempDir, 'agent-workdirs', agent.name, 'workspace'); + const diagnosticFile = join(expectedWorkdir, '.cw', 'spawn-diagnostic.json'); + const expectedPwdFile = join(expectedWorkdir, '.cw', 'expected-pwd.txt'); + + // Verify files exist immediately after spawn + expect(existsSync(diagnosticFile), 'diagnostic file should be created after spawn').toBe(true); + expect(existsSync(expectedPwdFile), 'expected pwd file should be created').toBe(true); + + // Verify diagnostic content + const diagnostic = JSON.parse(await readFile(diagnosticFile, 'utf-8')); + const expectedPwd = (await readFile(expectedPwdFile, 'utf-8')).trim(); + + expect(diagnostic.agentId).toBe(agent.id); + expect(diagnostic.alias).toBe(agent.name); + expect(diagnostic.intendedCwd).toBe(expectedWorkdir); + expect(diagnostic.provider).toBe('claude'); + expect(diagnostic.cwdExistsAtSpawn).toBe(true); + expect(diagnostic.customCwdProvided).toBe(false); + expect(typeof diagnostic.timestamp).toBe('string'); + expect(Array.isArray(diagnostic.args)).toBe(true); + + expect(expectedPwd).toBe(expectedWorkdir); + }); +}); \ No newline at end of file diff --git a/src/test/integration/real-providers/harness.ts b/src/test/integration/real-providers/harness.ts index 28f28ab..f872229 100644 --- a/src/test/integration/real-providers/harness.ts +++ b/src/test/integration/real-providers/harness.ts @@ -358,12 +358,12 @@ export const shouldRunRealCodexTests = process.env.REAL_CODEX_TESTS === '1'; /** * Skip wrapper for Claude tests - skips unless REAL_CLAUDE_TESTS=1. */ -export const describeRealClaude: typeof describe = shouldRunRealClaudeTests ? describe : describe.skip; +export const describeRealClaude: typeof describe = shouldRunRealClaudeTests ? describe : (describe.skip as typeof describe); /** * Skip wrapper for Codex tests - skips unless REAL_CODEX_TESTS=1. */ -export const describeRealCodex: typeof describe = shouldRunRealCodexTests ? describe : describe.skip; +export const describeRealCodex: typeof describe = shouldRunRealCodexTests ? describe : (describe.skip as typeof describe); /** * Default test timeout for real CLI tests (2 minutes). diff --git a/src/trpc/context.ts b/src/trpc/context.ts index cc8b10c..4027b25 100644 --- a/src/trpc/context.ts +++ b/src/trpc/context.ts @@ -14,6 +14,7 @@ import type { PhaseRepository } from '../db/repositories/phase-repository.js'; import type { PageRepository } from '../db/repositories/page-repository.js'; import type { ProjectRepository } from '../db/repositories/project-repository.js'; import type { AccountRepository } from '../db/repositories/account-repository.js'; +import type { ProposalRepository } from '../db/repositories/proposal-repository.js'; import type { AccountCredentialManager } from '../agent/credentials/types.js'; import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js'; import type { CoordinationManager } from '../coordination/types.js'; @@ -53,6 +54,8 @@ export interface TRPCContext { projectRepository?: ProjectRepository; /** Account repository for account CRUD and load balancing */ accountRepository?: AccountRepository; + /** Proposal repository for agent proposal CRUD operations */ + proposalRepository?: ProposalRepository; /** Credential manager for account OAuth token management */ credentialManager?: AccountCredentialManager; /** Absolute path to the workspace root (.cwrc directory) */ @@ -77,6 +80,7 @@ export interface CreateContextOptions { pageRepository?: PageRepository; projectRepository?: ProjectRepository; accountRepository?: AccountRepository; + proposalRepository?: ProposalRepository; credentialManager?: AccountCredentialManager; workspaceRoot?: string; } @@ -103,6 +107,7 @@ export function createContext(options: CreateContextOptions): TRPCContext { pageRepository: options.pageRepository, projectRepository: options.projectRepository, accountRepository: options.accountRepository, + proposalRepository: options.proposalRepository, credentialManager: options.credentialManager, workspaceRoot: options.workspaceRoot, }; diff --git a/src/trpc/router.ts b/src/trpc/router.ts index 4e9587d..608d2cb 100644 --- a/src/trpc/router.ts +++ b/src/trpc/router.ts @@ -19,6 +19,7 @@ import { architectProcedures } from './routers/architect.js'; import { projectProcedures } from './routers/project.js'; import { pageProcedures } from './routers/page.js'; import { accountProcedures } from './routers/account.js'; +import { proposalProcedures } from './routers/proposal.js'; import { subscriptionProcedures } from './routers/subscription.js'; // Re-export tRPC primitives (preserves existing import paths) @@ -54,6 +55,7 @@ export const appRouter = router({ ...projectProcedures(publicProcedure), ...pageProcedures(publicProcedure), ...accountProcedures(publicProcedure), + ...proposalProcedures(publicProcedure), ...subscriptionProcedures(publicProcedure), }); diff --git a/src/trpc/routers/_helpers.ts b/src/trpc/routers/_helpers.ts index 006f0e1..97bf8bd 100644 --- a/src/trpc/routers/_helpers.ts +++ b/src/trpc/routers/_helpers.ts @@ -14,6 +14,7 @@ import type { PhaseRepository } from '../../db/repositories/phase-repository.js' import type { PageRepository } from '../../db/repositories/page-repository.js'; import type { ProjectRepository } from '../../db/repositories/project-repository.js'; import type { AccountRepository } from '../../db/repositories/account-repository.js'; +import type { ProposalRepository } from '../../db/repositories/proposal-repository.js'; import type { DispatchManager, PhaseDispatchManager } from '../../dispatch/types.js'; import type { CoordinationManager } from '../../coordination/types.js'; @@ -126,3 +127,13 @@ export function requireAccountRepository(ctx: TRPCContext): AccountRepository { } return ctx.accountRepository; } + +export function requireProposalRepository(ctx: TRPCContext): ProposalRepository { + if (!ctx.proposalRepository) { + throw new TRPCError({ + code: 'INTERNAL_SERVER_ERROR', + message: 'Proposal repository not available', + }); + } + return ctx.proposalRepository; +} diff --git a/src/trpc/routers/architect.ts b/src/trpc/routers/architect.ts index 717da93..0e4b7de 100644 --- a/src/trpc/routers/architect.ts +++ b/src/trpc/routers/architect.ts @@ -132,6 +132,33 @@ export function architectProcedures(publicProcedure: ProcedureBuilder) { }); } + // Bug #10: Auto-dismiss stale (crashed/idle) refine agents before checking for active ones + const allAgents = await agentManager.list(); + const staleAgents = allAgents.filter( + (a) => + a.mode === 'refine' && + a.initiativeId === input.initiativeId && + ['crashed', 'idle'].includes(a.status) && + !a.userDismissedAt, + ); + for (const stale of staleAgents) { + await agentManager.dismiss(stale.id); + } + + // Bug #9: Prevent concurrent refine agents on the same initiative + const activeRefineAgents = allAgents.filter( + (a) => + a.mode === 'refine' && + a.initiativeId === input.initiativeId && + ['running', 'waiting_for_input'].includes(a.status), + ); + if (activeRefineAgents.length > 0) { + throw new TRPCError({ + code: 'CONFLICT', + message: `A refine agent is already running for this initiative`, + }); + } + const pages = await pageRepo.findByInitiativeId(input.initiativeId); if (pages.length === 0) { diff --git a/src/trpc/routers/initiative.ts b/src/trpc/routers/initiative.ts index 074cc59..c351c9f 100644 --- a/src/trpc/routers/initiative.ts +++ b/src/trpc/routers/initiative.ts @@ -5,7 +5,7 @@ import { TRPCError } from '@trpc/server'; import { z } from 'zod'; import type { ProcedureBuilder } from '../trpc.js'; -import { requireInitiativeRepository, requireProjectRepository, requirePageRepository } from './_helpers.js'; +import { requireInitiativeRepository, requireProjectRepository } from './_helpers.js'; export function initiativeProcedures(publicProcedure: ProcedureBuilder) { return { diff --git a/src/trpc/routers/proposal.ts b/src/trpc/routers/proposal.ts new file mode 100644 index 0000000..27beb5e --- /dev/null +++ b/src/trpc/routers/proposal.ts @@ -0,0 +1,170 @@ +/** + * Proposal Router — CRUD + accept/dismiss workflows + */ + +import { TRPCError } from '@trpc/server'; +import { z } from 'zod'; +import type { ProcedureBuilder } from '../trpc.js'; +import type { TRPCContext } from '../context.js'; +import type { Proposal } from '../../db/schema.js'; +import { + requireProposalRepository, + requirePageRepository, + requirePhaseRepository, + requireTaskRepository, + requireAgentManager, +} from './_helpers.js'; +import { markdownToTiptapJson } from '../../agent/markdown-to-tiptap.js'; + +/** + * Accept a single proposal: apply side effects based on targetType. + */ +async function applyProposal(proposal: Proposal, ctx: TRPCContext): Promise { + switch (proposal.targetType) { + case 'page': { + if (!proposal.targetId || !proposal.content) break; + const pageRepo = requirePageRepository(ctx); + const tiptapJson = markdownToTiptapJson(proposal.content); + await pageRepo.update(proposal.targetId, { + content: JSON.stringify(tiptapJson), + title: proposal.title, + }); + ctx.eventBus.emit({ + type: 'page:updated', + timestamp: new Date(), + payload: { pageId: proposal.targetId, initiativeId: proposal.initiativeId, title: proposal.title }, + }); + break; + } + case 'phase': { + const phaseRepo = requirePhaseRepository(ctx); + const meta = proposal.metadata ? JSON.parse(proposal.metadata) : {}; + await phaseRepo.create({ + initiativeId: proposal.initiativeId, + number: meta.number ?? 0, + name: proposal.title, + description: proposal.content ?? undefined, + }); + break; + } + case 'task': { + const taskRepo = requireTaskRepository(ctx); + const meta = proposal.metadata ? JSON.parse(proposal.metadata) : {}; + await taskRepo.create({ + initiativeId: proposal.initiativeId, + phaseId: meta.phaseId ?? null, + parentTaskId: meta.parentTaskId ?? null, + name: proposal.title, + description: proposal.content ?? undefined, + category: meta.category ?? 'execute', + type: meta.type ?? 'auto', + }); + break; + } + } +} + +/** + * After every accept/dismiss, check if all proposals for the agent are resolved. + * If so, auto-dismiss the agent. + */ +async function maybeAutoDismiss(agentId: string, ctx: TRPCContext): Promise { + const proposalRepo = requireProposalRepository(ctx); + const pendingCount = await proposalRepo.countByAgentIdAndStatus(agentId, 'pending'); + if (pendingCount === 0) { + try { + const agentManager = requireAgentManager(ctx); + await agentManager.dismiss(agentId); + } catch { + // Agent manager not available or agent already dismissed — not critical + } + } +} + +export function proposalProcedures(publicProcedure: ProcedureBuilder) { + return { + listProposals: publicProcedure + .input(z.object({ + agentId: z.string().min(1).optional(), + initiativeId: z.string().min(1).optional(), + })) + .query(async ({ ctx, input }) => { + const repo = requireProposalRepository(ctx); + if (input.agentId) { + return repo.findByAgentId(input.agentId); + } + if (input.initiativeId) { + return repo.findByInitiativeId(input.initiativeId); + } + throw new TRPCError({ + code: 'BAD_REQUEST', + message: 'Either agentId or initiativeId is required', + }); + }), + + acceptProposal: publicProcedure + .input(z.object({ id: z.string().min(1) })) + .mutation(async ({ ctx, input }) => { + const repo = requireProposalRepository(ctx); + const proposal = await repo.findById(input.id); + if (!proposal) { + throw new TRPCError({ code: 'NOT_FOUND', message: `Proposal '${input.id}' not found` }); + } + if (proposal.status !== 'pending') { + throw new TRPCError({ code: 'BAD_REQUEST', message: `Proposal is already ${proposal.status}` }); + } + await applyProposal(proposal, ctx); + const updated = await repo.update(input.id, { status: 'accepted' }); + await maybeAutoDismiss(proposal.agentId, ctx); + return updated; + }), + + dismissProposal: publicProcedure + .input(z.object({ id: z.string().min(1) })) + .mutation(async ({ ctx, input }) => { + const repo = requireProposalRepository(ctx); + const proposal = await repo.findById(input.id); + if (!proposal) { + throw new TRPCError({ code: 'NOT_FOUND', message: `Proposal '${input.id}' not found` }); + } + if (proposal.status !== 'pending') { + throw new TRPCError({ code: 'BAD_REQUEST', message: `Proposal is already ${proposal.status}` }); + } + const updated = await repo.update(input.id, { status: 'dismissed' }); + await maybeAutoDismiss(proposal.agentId, ctx); + return updated; + }), + + acceptAllProposals: publicProcedure + .input(z.object({ agentId: z.string().min(1) })) + .mutation(async ({ ctx, input }) => { + const repo = requireProposalRepository(ctx); + const pending = await repo.findByAgentIdAndStatus(input.agentId, 'pending'); + let successCount = 0; + let failedCount = 0; + const errorMessages: string[] = []; + for (const proposal of pending) { + try { + await applyProposal(proposal, ctx); + await repo.update(proposal.id, { status: 'accepted' }); + successCount++; + } catch (err) { + failedCount++; + const message = err instanceof Error ? err.message : String(err); + errorMessages.push(`${proposal.title}: ${message}`); + } + } + await maybeAutoDismiss(input.agentId, ctx); + return { accepted: successCount, failed: failedCount, errors: errorMessages }; + }), + + dismissAllProposals: publicProcedure + .input(z.object({ agentId: z.string().min(1) })) + .mutation(async ({ ctx, input }) => { + const repo = requireProposalRepository(ctx); + await repo.updateManyByAgentIdAndStatus(input.agentId, 'pending', { status: 'dismissed' }); + await maybeAutoDismiss(input.agentId, ctx); + return { success: true }; + }), + }; +}