/** * Full-Flow Test Harness * * Wires up the complete system with real agents for end-to-end multi-agent * workflow testing: discuss → plan → detail → execute. * * Unlike the standard TestHarness (MockAgentManager) or RealProviderHarness * (agents only), this harness adds: * - All 11 repositories * - tRPC caller for architect/agent procedures * - A self-contained fixture git repo (todo-api) for agents to work on * - Helpers for driving agents through question/answer loops * * Used by full-flow-cassette.test.ts (replay) and for manual recording runs. */ import { mkdtemp, rm, cp } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { join, dirname } from 'node:path'; import { fileURLToPath } from 'node:url'; import { execSync } from 'node:child_process'; import type { DrizzleDatabase } from '../../../db/index.js'; import type { DomainEvent } from '../../../events/types.js'; import { EventEmitterBus } from '../../../events/bus.js'; import { MultiProviderAgentManager } from '../../../agent/manager.js'; import type { AgentResult, PendingQuestions } from '../../../agent/types.js'; import type { Initiative, Project, Phase, Task } from '../../../db/schema.js'; import type { InitiativeRepository } from '../../../db/repositories/initiative-repository.js'; import type { PhaseRepository } from '../../../db/repositories/phase-repository.js'; import type { TaskRepository } from '../../../db/repositories/task-repository.js'; import type { MessageRepository } from '../../../db/repositories/message-repository.js'; import type { AgentRepository } from '../../../db/repositories/agent-repository.js'; import type { PageRepository } from '../../../db/repositories/page-repository.js'; import type { ProjectRepository } from '../../../db/repositories/project-repository.js'; import type { AccountRepository } from '../../../db/repositories/account-repository.js'; import type { ChangeSetRepository } from '../../../db/repositories/change-set-repository.js'; import type { LogChunkRepository } from '../../../db/repositories/log-chunk-repository.js'; import type { ConversationRepository } from '../../../db/repositories/conversation-repository.js'; import type { ProcessManager } from '../../../agent/process-manager.js'; import { createTestDatabase } from '../../../db/repositories/drizzle/test-helpers.js'; import { createRepositories } from '../../../container.js'; import { DefaultDispatchManager } from '../../../dispatch/manager.js'; import { appRouter, createCallerFactory } from '../../../trpc/router.js'; import { createContext } from '../../../trpc/context.js'; // ============================================================================= // CapturingEventBus // ============================================================================= export class CapturingEventBus extends EventEmitterBus { emittedEvents: DomainEvent[] = []; emit(event: T): void { this.emittedEvents.push(event); super.emit(event); } getEventsByType(type: T['type']): T[] { return this.emittedEvents.filter((e) => e.type === type) as T[]; } clearEvents(): void { this.emittedEvents = []; } } // ============================================================================= // Sleep helper // ============================================================================= export function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } // ============================================================================= // tRPC caller type // ============================================================================= const createCaller = createCallerFactory(appRouter); export type FullFlowCaller = ReturnType; // ============================================================================= // FullFlowHarness interface // ============================================================================= /** Status of an agent that requires attention: done, waiting for answers, or crashed */ export type AgentAttentionStatus = 'done' | 'waiting' | 'crashed'; export interface FullFlowHarness { /** Absolute path to the CW workspace (worktrees are created here) */ workspaceRoot: string; /** Absolute path to the cloned todo-api fixture git repo */ fixtureRoot: string; /** The registered todo-api project */ project: Project; /** The initiative created for the test run */ initiative: Initiative; /** tRPC caller (all procedures available) */ caller: FullFlowCaller; /** Real MultiProviderAgentManager */ agentManager: MultiProviderAgentManager; /** In-memory SQLite database */ db: DrizzleDatabase; /** Event bus with capture capability */ eventBus: CapturingEventBus; // All 11 repositories initiativeRepository: InitiativeRepository; phaseRepository: PhaseRepository; taskRepository: TaskRepository; messageRepository: MessageRepository; agentRepository: AgentRepository; pageRepository: PageRepository; projectRepository: ProjectRepository; accountRepository: AccountRepository; changeSetRepository: ChangeSetRepository; logChunkRepository: LogChunkRepository; conversationRepository: ConversationRepository; /** * Wait for an agent to reach a terminal status (idle/stopped/crashed). * Returns null if the agent enters waiting_for_input. */ waitForAgentCompletion(agentId: string, timeoutMs?: number): Promise; /** * Poll until the agent needs attention: done (idle/stopped), waiting for input, or crashed. * Useful for the question/answer loop in discuss mode. */ waitForAgentAttention(agentId: string, timeoutMs?: number): Promise; /** * Drive an agent to full completion, answering any questions along the way. * Answers all questions with the provided answer string (or a default). */ driveToCompletion( agentId: string, answer?: string, timeoutMs?: number, ): Promise; /** * Get captured events filtered by type. */ getEventsByType(type: T['type']): T[]; /** * Kill all running agents and remove temp directories. */ cleanup(): Promise; } // ============================================================================= // Poll interval // ============================================================================= const POLL_INTERVAL_MS = 1500; // ============================================================================= // Factory // ============================================================================= const __dirname = dirname(fileURLToPath(import.meta.url)); const FIXTURES_DIR = join(__dirname, '../../fixtures/todo-api'); export interface FullFlowHarnessOptions { /** Factory called after workspaceRoot + repos are created. Return a custom ProcessManager. */ processManagerFactory?: (workspaceRoot: string, projectRepo: ProjectRepository) => ProcessManager; } /** * Create a full-flow test harness. * * Setup steps: * 1. Copy todo-api fixture into a temp git repo (fixtureRoot). * 2. Create workspace temp dir (workspaceRoot) for CW operations. * 3. Init in-memory DB + all 11 repos. * 4. Wire real MultiProviderAgentManager with all repos. * 5. Wire DefaultDispatchManager for execute stage. * 6. Create tRPC caller with full context. * 7. Register project in DB directly (url = fixtureRoot). * 8. Create initiative via tRPC (links project, creates root page). */ export async function createFullFlowHarness( initiativeName = 'Add complete() method to TodoStore', options?: FullFlowHarnessOptions, ): Promise { // ── 0. Allow nested claude invocations ──────────────────────────────────── // Claude Code sets CLAUDECODE in the environment, which prevents nested // claude CLI calls from starting ("cannot be launched inside another Claude // Code session"). Save and remove it so spawned agents can run normally. // It is restored in cleanup(). const savedClaudeCodeEnv = process.env.CLAUDECODE; delete process.env.CLAUDECODE; // ── 1. Fixture project ──────────────────────────────────────────────────── // IMPORTANT: cp(src, dest) puts src INSIDE dest when dest already exists // (like `cp -r src dest/` → creates dest/src/). We need dest to NOT exist // yet so that cp creates it as a copy of src directly. const fixtureBase = await mkdtemp(join(tmpdir(), 'cw-fixture-')); const fixtureRoot = join(fixtureBase, 'todo-api'); // does not exist yet await cp(FIXTURES_DIR, fixtureRoot, { recursive: true }); // Verify files landed at the right level before git operations execSync(`test -f "${join(fixtureRoot, 'package.json')}"`, { stdio: 'pipe' }); execSync('git init', { cwd: fixtureRoot, stdio: 'pipe' }); execSync('git config user.email "test@test.com"', { cwd: fixtureRoot, stdio: 'pipe' }); execSync('git config user.name "Test"', { cwd: fixtureRoot, stdio: 'pipe' }); execSync('git add .', { cwd: fixtureRoot, stdio: 'pipe' }); execSync('git commit -m "initial todo-api with missing complete()"', { cwd: fixtureRoot, stdio: 'pipe', }); // ── 2. Workspace root ───────────────────────────────────────────────────── // Just a plain temp directory — agent worktrees live under repos/ inside it. // No git init needed; the PROJECT clone (repos/-/) is the git repo. const workspaceRoot = await mkdtemp(join(tmpdir(), 'cw-workspace-')); // ── 3. Database + repositories ──────────────────────────────────────────── const db = createTestDatabase(); const repos = createRepositories(db); // ── 4. Event bus ────────────────────────────────────────────────────────── const eventBus = new CapturingEventBus(); // ── 5. Real agent manager ───────────────────────────────────────────────── const customProcessManager = options?.processManagerFactory?.(workspaceRoot, repos.projectRepository); const agentManager = new MultiProviderAgentManager( repos.agentRepository, workspaceRoot, repos.projectRepository, repos.accountRepository, eventBus, undefined, // no credential manager needed for default claude account repos.changeSetRepository, repos.phaseRepository, repos.taskRepository, repos.pageRepository, repos.logChunkRepository, false, // debug customProcessManager, // processManagerOverride ); // ── 6. Dispatch manager (for execute stage) ─────────────────────────────── const dispatchManager = new DefaultDispatchManager( repos.taskRepository, repos.messageRepository, agentManager, eventBus, repos.initiativeRepository, repos.phaseRepository, ); // ── 7. tRPC caller ──────────────────────────────────────────────────────── const ctx = createContext({ eventBus, serverStartedAt: new Date(), processCount: 0, agentManager, dispatchManager, workspaceRoot, ...repos, }); const caller = createCaller(ctx); // ── 8. Register project directly in DB (bypass tRPC clone) ─────────────── const project = await repos.projectRepository.create({ name: 'todo-api', url: fixtureRoot, }); // ── 9. Create initiative via tRPC (creates root page automatically) ─────── const initiative = await caller.createInitiative({ name: initiativeName, projectIds: [project.id], }); // ── Helpers ─────────────────────────────────────────────────────────────── async function waitForAgentCompletion( agentId: string, timeoutMs = 120_000, ): Promise { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { const agent = await repos.agentRepository.findById(agentId); if (!agent) return null; if (agent.status === 'idle' || agent.status === 'stopped' || agent.status === 'crashed') { return agentManager.getResult(agentId); } if (agent.status === 'waiting_for_input') return null; await sleep(POLL_INTERVAL_MS); } throw new Error(`Timeout: agent ${agentId} did not complete within ${timeoutMs}ms`); } async function waitForAgentAttention( agentId: string, timeoutMs = 120_000, ): Promise { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { const agent = await repos.agentRepository.findById(agentId); if (!agent) return 'crashed'; if (agent.status === 'idle' || agent.status === 'stopped') return 'done'; if (agent.status === 'crashed') return 'crashed'; if (agent.status === 'waiting_for_input') return 'waiting'; await sleep(POLL_INTERVAL_MS); } throw new Error(`Timeout: agent ${agentId} did not reach attention state within ${timeoutMs}ms`); } async function driveToCompletion( agentId: string, answer = 'Use your best judgment and keep it simple.', timeoutMs = 10 * 60_000, ): Promise { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { const remaining = deadline - Date.now(); if (remaining <= 0) break; let status: AgentAttentionStatus; try { status = await waitForAgentAttention(agentId, Math.min(remaining, 3 * 60_000)); } catch { // Agent is still running (hasn't reached an attention state within the polling // window). This is normal for long-running execute agents. Continue the outer // loop — the deadline check above will terminate us if we truly time out. continue; } if (status === 'done' || status === 'crashed') { return agentManager.getResult(agentId); } if (status === 'waiting') { const pending = await agentManager.getPendingQuestions(agentId); if (!pending || pending.questions.length === 0) { // Shouldn't happen, but guard against it await sleep(POLL_INTERVAL_MS); continue; } const answers = Object.fromEntries( pending.questions.map((q) => [q.id, answer]), ); await agentManager.resume(agentId, answers); } } throw new Error(`driveToCompletion: agent ${agentId} did not finish within ${timeoutMs}ms`); } // ── Build and return harness ─────────────────────────────────────────────── const harness: FullFlowHarness = { workspaceRoot, fixtureRoot, project, initiative, caller, agentManager, db, eventBus, ...repos, waitForAgentCompletion, waitForAgentAttention, driveToCompletion, getEventsByType(type: T['type']): T[] { return eventBus.getEventsByType(type); }, async cleanup() { // Kill any running agents const agents = await repos.agentRepository.findAll(); await Promise.allSettled( agents .filter((a) => a.status === 'running') .map((a) => agentManager.stop(a.id)), ); // Restore CLAUDECODE env var if (savedClaudeCodeEnv !== undefined) { process.env.CLAUDECODE = savedClaudeCodeEnv; } // Remove temp directories (fixtureBase contains fixtureRoot) await Promise.allSettled([ rm(fixtureBase, { recursive: true, force: true }), rm(workspaceRoot, { recursive: true, force: true }), ]); }, }; return harness; }