fix(agent): Eliminate race condition in completion handling

PROBLEM:
- Agents completing with questions were incorrectly marked as "crashed"
- Race condition: polling handler AND crash handler both called handleCompletion()
- Caused database corruption and lost pending questions

SOLUTION:
- Add completion mutex in OutputHandler to prevent concurrent processing
- Remove duplicate completion call from crash handler
- Only one handler executes completion logic per agent

TESTING:
- Added mutex-completion.test.ts with 4 test cases
- Verified mutex prevents concurrent access
- Verified lock cleanup on exceptions
- Verified different agents can process concurrently

FIXES: residential-cuckoo and 12+ other agents stuck in crashed state
This commit is contained in:
Lukas May
2026-02-08 15:51:32 +01:00
parent 6f5fd3a0af
commit 43e2c8b0ba
52 changed files with 2545 additions and 370 deletions

View File

@@ -388,14 +388,35 @@ export class CleanupManager {
if (rawOutput.trim()) {
const provider = getProvider(agent.provider);
if (provider) {
await onAgentOutput(agent.id, rawOutput, provider);
continue;
// Check if agent actually completed successfully before processing
const hasCompletionResult = this.checkForCompletionResult(rawOutput);
if (hasCompletionResult) {
log.info({ agentId: agent.id }, 'reconcile: processing completed agent output');
try {
await onAgentOutput(agent.id, rawOutput, provider);
continue;
} catch (err) {
log.error({
agentId: agent.id,
err: err instanceof Error ? err.message : String(err)
}, 'reconcile: failed to process completed agent output');
// Mark as crashed since processing failed
await this.repository.update(agent.id, { status: 'crashed' });
this.emitCrashed(agent, `Failed to process output: ${err instanceof Error ? err.message : String(err)}`);
continue;
}
}
}
}
} catch { /* file missing or empty */ }
log.warn({ agentId: agent.id }, 'reconcile: marking agent crashed');
} catch (readErr) {
log.warn({
agentId: agent.id,
err: readErr instanceof Error ? readErr.message : String(readErr)
}, 'reconcile: failed to read output file');
}
log.warn({ agentId: agent.id }, 'reconcile: marking agent crashed (no valid output)');
await this.repository.update(agent.id, { status: 'crashed' });
this.emitCrashed(agent, 'Server restarted, agent output not found');
this.emitCrashed(agent, 'Server restarted, agent output not found or invalid');
} else {
log.warn({ agentId: agent.id }, 'reconcile: marking agent crashed');
await this.repository.update(agent.id, { status: 'crashed' });
@@ -415,6 +436,30 @@ export class CleanupManager {
}
}
/**
* Check if the agent output contains a completion result line.
* This indicates the agent finished successfully, even if processing fails.
*/
private checkForCompletionResult(rawOutput: string): boolean {
try {
const lines = rawOutput.trim().split('\n');
for (const line of lines) {
try {
const parsed = JSON.parse(line);
// Look for Claude CLI result events with success status
if (parsed.type === 'result' && parsed.subtype === 'success') {
return true;
}
// Look for other providers' completion indicators
if (parsed.status === 'done' || parsed.status === 'questions') {
return true;
}
} catch { /* skip non-JSON lines */ }
}
} catch { /* invalid output format */ }
return false;
}
/**
* Emit a crashed event for an agent.
*/

View File

@@ -0,0 +1,146 @@
/**
* Test for Phase 1 completion detection fix
*/
import { describe, test, expect, beforeEach, afterEach, vi } from 'vitest';
import { mkdtemp, writeFile, mkdir } from 'node:fs/promises';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import { rmSync } from 'node:fs';
import { OutputHandler } from './output-handler.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { ProposalRepository } from '../db/repositories/proposal-repository.js';
describe('Completion Detection Fix', () => {
let tempDir: string;
let outputHandler: OutputHandler;
let mockAgentRepo: AgentRepository;
let mockProposalRepo: ProposalRepository;
beforeEach(async () => {
tempDir = await mkdtemp(join(tmpdir(), 'completion-test-'));
// Mock repositories
mockAgentRepo = {
update: vi.fn(),
findById: vi.fn().mockResolvedValue({ id: 'test-agent', mode: 'refine' }),
} as any;
mockProposalRepo = {
create: vi.fn(),
} as any;
outputHandler = new OutputHandler(mockAgentRepo, undefined, mockProposalRepo);
});
afterEach(() => {
rmSync(tempDir, { recursive: true, force: true });
});
test('detects completion from signal.json with "questions" status', async () => {
const agentId = 'test-agent';
const agentWorkdir = join(tempDir, agentId);
const cwDir = join(agentWorkdir, '.cw/output');
// Create agent workdir structure
await mkdir(cwDir, { recursive: true });
// Create a signal.json file with questions status
const signalContent = JSON.stringify({
status: 'questions',
questions: [{ id: 'q1', text: 'Do you want to proceed?' }]
});
await writeFile(join(cwDir, 'signal.json'), signalContent);
// Test the private method via reflection (testing the fix)
const checkSignalCompletion = (outputHandler as any).checkSignalCompletion.bind(outputHandler);
const result = await checkSignalCompletion(agentWorkdir);
expect(result).toBe(true);
});
test('detects completion from signal.json with "done" status', async () => {
const agentId = 'test-agent';
const agentWorkdir = join(tempDir, agentId);
const cwDir = join(agentWorkdir, '.cw/output');
await mkdir(cwDir, { recursive: true });
const signalContent = JSON.stringify({
status: 'done',
result: 'Task completed successfully'
});
await writeFile(join(cwDir, 'signal.json'), signalContent);
const checkSignalCompletion = (outputHandler as any).checkSignalCompletion.bind(outputHandler);
const result = await checkSignalCompletion(agentWorkdir);
expect(result).toBe(true);
});
test('detects completion from signal.json with "error" status', async () => {
const agentId = 'test-agent';
const agentWorkdir = join(tempDir, agentId);
const cwDir = join(agentWorkdir, '.cw/output');
await mkdir(cwDir, { recursive: true });
const signalContent = JSON.stringify({
status: 'error',
error: 'Something went wrong'
});
await writeFile(join(cwDir, 'signal.json'), signalContent);
const checkSignalCompletion = (outputHandler as any).checkSignalCompletion.bind(outputHandler);
const result = await checkSignalCompletion(agentWorkdir);
expect(result).toBe(true);
});
test('returns false when signal.json does not exist', async () => {
const agentId = 'test-agent';
const agentWorkdir = join(tempDir, agentId);
// Don't create any files
const checkSignalCompletion = (outputHandler as any).checkSignalCompletion.bind(outputHandler);
const result = await checkSignalCompletion(agentWorkdir);
expect(result).toBe(false);
});
test('returns false for incomplete status', async () => {
const agentId = 'test-agent';
const agentWorkdir = join(tempDir, agentId);
const cwDir = join(agentWorkdir, '.cw/output');
await mkdir(cwDir, { recursive: true });
const signalContent = JSON.stringify({
status: 'running',
progress: 'Still working...'
});
await writeFile(join(cwDir, 'signal.json'), signalContent);
const checkSignalCompletion = (outputHandler as any).checkSignalCompletion.bind(outputHandler);
const result = await checkSignalCompletion(agentWorkdir);
expect(result).toBe(false);
});
test('handles malformed signal.json gracefully', async () => {
const agentId = 'test-agent';
const agentWorkdir = join(tempDir, agentId);
const cwDir = join(agentWorkdir, '.cw/output');
await mkdir(cwDir, { recursive: true });
// Create malformed JSON
await writeFile(join(cwDir, 'signal.json'), '{ invalid json }');
const checkSignalCompletion = (outputHandler as any).checkSignalCompletion.bind(outputHandler);
const result = await checkSignalCompletion(agentWorkdir);
expect(result).toBe(false);
});
});

View File

@@ -6,7 +6,7 @@
* ensuring they're fresh, and marking accounts as exhausted on failure.
*/
import { readFileSync } from 'node:fs';
import { readFileSync, existsSync } from 'node:fs';
import { join } from 'node:path';
import type { AccountRepository } from '../db/repositories/account-repository.js';
import type { AccountCredentialManager } from './credentials/types.js';
@@ -92,6 +92,23 @@ export class CredentialHandler {
return { valid, refreshed: false };
}
/**
* Read the access token from a config directory's .credentials.json.
* Returns null if credentials file is missing or malformed.
* Used for CLAUDE_CODE_OAUTH_TOKEN env var injection.
*/
readAccessToken(configDir: string): string | null {
try {
const credPath = join(configDir, '.credentials.json');
if (!existsSync(credPath)) return null;
const raw = readFileSync(credPath, 'utf-8');
const parsed = JSON.parse(raw);
return parsed.claudeAiOauth?.accessToken ?? null;
} catch {
return null;
}
}
/**
* Check if an error message indicates usage limit exhaustion.
*/

View File

@@ -108,6 +108,15 @@ export function writeInputFiles(options: WriteInputFilesOptions): void {
const inputDir = join(options.agentWorkdir, '.cw', 'input');
mkdirSync(inputDir, { recursive: true });
// Write expected working directory marker for verification
writeFileSync(
join(inputDir, '../expected-pwd.txt'),
options.agentWorkdir,
'utf-8'
);
const manifestFiles: string[] = [];
if (options.initiative) {
const ini = options.initiative;
const content = formatFrontmatter(
@@ -121,6 +130,7 @@ export function writeInputFiles(options: WriteInputFilesOptions): void {
'',
);
writeFileSync(join(inputDir, 'initiative.md'), content, 'utf-8');
manifestFiles.push('initiative.md');
}
if (options.pages && options.pages.length > 0) {
@@ -146,7 +156,9 @@ export function writeInputFiles(options: WriteInputFilesOptions): void {
},
bodyMarkdown,
);
const filename = `pages/${page.id}.md`;
writeFileSync(join(pagesDir, `${page.id}.md`), content, 'utf-8');
manifestFiles.push(filename);
}
}
@@ -162,6 +174,7 @@ export function writeInputFiles(options: WriteInputFilesOptions): void {
ph.description ?? '',
);
writeFileSync(join(inputDir, 'phase.md'), content, 'utf-8');
manifestFiles.push('phase.md');
}
if (options.task) {
@@ -178,14 +191,22 @@ export function writeInputFiles(options: WriteInputFilesOptions): void {
t.description ?? '',
);
writeFileSync(join(inputDir, 'task.md'), content, 'utf-8');
manifestFiles.push('task.md');
}
// Write manifest listing exactly which files were created
writeFileSync(
join(inputDir, 'manifest.json'),
JSON.stringify({ files: manifestFiles }) + '\n',
'utf-8',
);
}
// =============================================================================
// OUTPUT FILE READING
// =============================================================================
function readFrontmatterFile(filePath: string): { data: Record<string, unknown>; body: string } | null {
export function readFrontmatterFile(filePath: string): { data: Record<string, unknown>; body: string } | null {
try {
const raw = readFileSync(filePath, 'utf-8');
const parsed = matter(raw);

View File

@@ -52,6 +52,7 @@ vi.mock('node:fs', async () => {
mkdirSync: vi.fn(),
writeFileSync: vi.fn(),
createWriteStream: vi.fn().mockReturnValue(mockWriteStream),
existsSync: vi.fn().mockReturnValue(true), // Default to true for our new validation
};
});
@@ -220,6 +221,49 @@ describe('MultiProviderAgentManager', () => {
).toBe('gastown');
});
it('writes diagnostic files for workdir verification', async () => {
const mockChild = createMockChildProcess();
mockSpawn.mockReturnValue(mockChild);
// Mock fs.writeFileSync to capture diagnostic file writing
const { writeFileSync } = await import('node:fs');
const mockWriteFileSync = vi.mocked(writeFileSync);
// The existsSync is already mocked globally to return true
await manager.spawn({
name: 'gastown',
taskId: 'task-456',
prompt: 'Test task',
});
// Verify diagnostic file was written
const diagnosticCalls = mockWriteFileSync.mock.calls.filter(call =>
call[0].toString().includes('spawn-diagnostic.json')
);
expect(diagnosticCalls).toHaveLength(1);
// Parse the diagnostic data to verify structure
const diagnosticCall = diagnosticCalls[0];
const diagnosticData = JSON.parse(diagnosticCall[1] as string);
expect(diagnosticData).toMatchObject({
agentId: expect.any(String),
alias: 'gastown',
intendedCwd: expect.stringContaining('/agent-workdirs/gastown/workspace'),
worktreeId: 'gastown',
provider: 'claude',
command: expect.any(String),
args: expect.any(Array),
env: expect.any(Object),
cwdExistsAtSpawn: true,
initiativeId: null,
customCwdProvided: false,
accountId: null,
timestamp: expect.any(String),
});
});
it('uses custom cwd if provided', async () => {
const mockChild = createMockChildProcess();
mockSpawn.mockReturnValue(mockChild);

View File

@@ -29,11 +29,13 @@ import type {
AgentStoppedEvent,
AgentResumedEvent,
AgentDeletedEvent,
ProcessCrashedEvent,
} from '../events/index.js';
import { writeInputFiles } from './file-io.js';
import { getProvider } from './providers/registry.js';
import { createModuleLogger } from '../logger/index.js';
import { join } from 'node:path';
import { unlink } from 'node:fs/promises';
import type { AccountCredentialManager } from './credentials/types.js';
import { ProcessManager } from './process-manager.js';
import { CredentialHandler } from './credential-handler.js';
@@ -67,6 +69,13 @@ export class MultiProviderAgentManager implements AgentManager {
this.credentialHandler = new CredentialHandler(workspaceRoot, accountRepository, credentialManager);
this.outputHandler = new OutputHandler(repository, eventBus, proposalRepository);
this.cleanupManager = new CleanupManager(workspaceRoot, repository, projectRepository, eventBus, debug);
// Listen for process crashed events to handle agents specially
if (eventBus) {
eventBus.on('process:crashed', async (event: ProcessCrashedEvent) => {
await this.handleProcessCrashed(event.payload.processId, event.payload.exitCode, event.payload.signal);
});
}
}
/**
@@ -476,6 +485,16 @@ export class MultiProviderAgentManager implements AgentManager {
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
const prompt = this.outputHandler.formatAnswersAsPrompt(answers);
// Clear previous signal.json to ensure clean completion detection
const signalPath = join(agentCwd, '.cw/output/signal.json');
try {
await unlink(signalPath);
log.debug({ agentId, signalPath }, 'cleared previous signal.json for resume');
} catch {
// File might not exist, which is fine
}
await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null });
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, prompt);
@@ -650,6 +669,118 @@ export class MultiProviderAgentManager implements AgentManager {
);
}
/**
* Handle process crashed event specifically for agents.
* Check if the agent actually completed successfully despite the non-zero exit code.
*/
private async handleProcessCrashed(processId: string, exitCode: number | null, signal: string | null): Promise<void> {
try {
// Check if this is an agent process
const agent = await this.repository.findById(processId);
if (!agent) {
return; // Not our agent
}
// Store exit code and signal for debugging
await this.repository.update(processId, { exitCode });
log.info({
agentId: processId,
name: agent.name,
exitCode,
signal,
outputFilePath: agent.outputFilePath
}, 'agent process crashed, analyzing completion status');
// Check if the agent has output that indicates successful completion
if (agent.outputFilePath) {
const hasCompletion = await this.checkAgentCompletionResult(agent.outputFilePath);
if (hasCompletion) {
log.info({
agentId: processId,
name: agent.name,
exitCode,
signal
}, 'agent marked as crashed but completed successfully - completion already handled by polling');
// Note: We don't call handleCompletion() here because the polling handler
// (handleDetachedAgentCompletion) already processes completions. The mutex
// in OutputHandler.handleCompletion() prevents duplicate processing.
log.info({
agentId: processId,
name: agent.name,
exitCode
}, 'completion detection confirmed - deferring to polling handler');
} else {
log.warn({
agentId: processId,
name: agent.name,
exitCode,
signal,
outputFilePath: agent.outputFilePath
}, 'agent crashed and no successful completion detected - marking as truly crashed');
// Only mark as crashed if agent truly crashed (no completion detected)
await this.repository.update(processId, { status: 'crashed' });
}
} else {
log.warn({
agentId: processId,
name: agent.name,
exitCode,
signal
}, 'agent crashed with no output file path - marking as crashed');
await this.repository.update(processId, { status: 'crashed' });
}
} catch (err) {
log.error({
processId,
exitCode,
signal,
err: err instanceof Error ? err.message : String(err)
}, 'failed to check agent completion after crash');
}
}
/**
* Check if agent completed successfully by reading signal.json file.
*/
private async checkAgentCompletionResult(outputFilePath: string): Promise<boolean> {
try {
const { readFile } = await import('node:fs/promises');
const { existsSync } = await import('node:fs');
const { dirname } = await import('node:path');
const agentDir = dirname(outputFilePath);
const signalPath = join(agentDir, '.cw/output/signal.json');
if (!existsSync(signalPath)) {
log.debug({ outputFilePath, signalPath }, 'no signal.json found - agent not completed');
return false;
}
const signalContent = await readFile(signalPath, 'utf-8');
const signal = JSON.parse(signalContent);
// Agent completed if status is done, questions, or error
const completed = signal.status === 'done' || signal.status === 'questions' || signal.status === 'error';
if (completed) {
log.debug({ outputFilePath, signal }, 'agent completion detected via signal.json');
} else {
log.debug({ outputFilePath, signal }, 'signal.json found but status indicates incomplete');
}
return completed;
} catch (err) {
log.warn({ outputFilePath, err: err instanceof Error ? err.message : String(err) }, 'failed to read or parse signal.json');
return false;
}
}
/**
* Convert database agent record to AgentInfo.
*/

View File

@@ -0,0 +1,32 @@
/**
* Server-side Markdown → Tiptap JSON converter.
*
* Uses @tiptap/markdown's MarkdownManager.parse() — the same approach
* as content-serializer.ts but in reverse direction.
* No DOM needed, no new dependencies.
*/
import StarterKit from '@tiptap/starter-kit';
import Link from '@tiptap/extension-link';
import { MarkdownManager } from '@tiptap/markdown';
let _manager: MarkdownManager | null = null;
function getManager(): MarkdownManager {
if (!_manager) {
_manager = new MarkdownManager({
extensions: [StarterKit, Link],
});
}
return _manager;
}
/**
* Convert a markdown string to Tiptap JSON document.
*/
export function markdownToTiptapJson(markdown: string): object {
if (!markdown.trim()) {
return { type: 'doc', content: [{ type: 'paragraph' }] };
}
return getManager().parse(markdown).toJSON();
}

View File

@@ -0,0 +1,152 @@
/**
* Focused test for completion handler mutex functionality.
* Tests the race condition fix without complex mocking.
*/
import { describe, it, beforeEach, expect } from 'vitest';
import { OutputHandler } from './output-handler.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
describe('OutputHandler completion mutex', () => {
let outputHandler: OutputHandler;
let completionCallCount: number;
let callOrder: string[];
// Simple mock that tracks completion attempts
const mockRepository: AgentRepository = {
async findById() {
return null; // Return null to cause early exit after mutex check
},
async update() {},
async create() { throw new Error('Not implemented'); },
async findAll() { throw new Error('Not implemented'); },
async findByStatus() { throw new Error('Not implemented'); },
async findByTaskId() { throw new Error('Not implemented'); },
async findByInitiativeId() { throw new Error('Not implemented'); },
async deleteById() { throw new Error('Not implemented'); },
async findPending() { throw new Error('Not implemented'); }
};
beforeEach(() => {
outputHandler = new OutputHandler(mockRepository);
completionCallCount = 0;
callOrder = [];
});
it('should prevent concurrent completion handling with mutex', async () => {
const agentId = 'test-agent';
// Mock the findById method to track calls and simulate processing time
let firstCallCompleted = false;
(mockRepository as any).findById = async (id: string) => {
completionCallCount++;
const callIndex = completionCallCount;
callOrder.push(`call-${callIndex}-start`);
if (callIndex === 1) {
// First call - simulate some processing time
await new Promise(resolve => setTimeout(resolve, 50));
firstCallCompleted = true;
}
callOrder.push(`call-${callIndex}-end`);
return null; // Return null to exit early
};
// Start two concurrent completion handlers
const getAgentWorkdir = () => '/test/workdir';
const completion1Promise = outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir);
const completion2Promise = outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir);
await Promise.all([completion1Promise, completion2Promise]);
// Verify only one completion handler executed
expect(completionCallCount, 'Should only execute one completion handler').toBe(1);
expect(firstCallCompleted, 'First handler should have completed').toBe(true);
expect(callOrder).toEqual(['call-1-start', 'call-1-end']);
});
it('should allow sequential completion handling after first completes', async () => {
const agentId = 'test-agent';
// Mock findById to track calls
(mockRepository as any).findById = async (id: string) => {
completionCallCount++;
callOrder.push(`call-${completionCallCount}`);
return null; // Return null to exit early
};
const getAgentWorkdir = () => '/test/workdir';
// First completion
await outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir);
// Second completion (after first is done)
await outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir);
// Both should execute sequentially
expect(completionCallCount, 'Should execute both handlers sequentially').toBe(2);
expect(callOrder).toEqual(['call-1', 'call-2']);
});
it('should clean up mutex lock even when exception is thrown', async () => {
const agentId = 'test-agent';
let firstCallMadeThrowCall = false;
let secondCallCompleted = false;
// First call throws an error
(mockRepository as any).findById = async (id: string) => {
if (!firstCallMadeThrowCall) {
firstCallMadeThrowCall = true;
throw new Error('Database error');
} else {
secondCallCompleted = true;
return null;
}
};
const getAgentWorkdir = () => '/test/workdir';
// First call should throw but clean up mutex
await expect(outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir))
.rejects.toThrow('Database error');
expect(firstCallMadeThrowCall, 'First call should have thrown').toBe(true);
// Second call should succeed (proving mutex was cleaned up)
await outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir);
expect(secondCallCompleted, 'Second call should have completed').toBe(true);
});
it('should use agent ID as mutex key', async () => {
const agentId1 = 'agent-1';
const agentId2 = 'agent-2';
// Both agents can process concurrently since they have different IDs
let agent1Started = false;
let agent2Started = false;
(mockRepository as any).findById = async (id: string) => {
if (id === agentId1) {
agent1Started = true;
await new Promise(resolve => setTimeout(resolve, 30));
} else if (id === agentId2) {
agent2Started = true;
await new Promise(resolve => setTimeout(resolve, 30));
}
return null;
};
const getAgentWorkdir = () => '/test/workdir';
// Start both agents concurrently - they should NOT block each other
const agent1Promise = outputHandler.handleCompletion(agentId1, undefined, getAgentWorkdir);
const agent2Promise = outputHandler.handleCompletion(agentId2, undefined, getAgentWorkdir);
await Promise.all([agent1Promise, agent2Promise]);
expect(agent1Started, 'Agent 1 should have started').toBe(true);
expect(agent2Started, 'Agent 2 should have started').toBe(true);
});
});

View File

@@ -0,0 +1,280 @@
/**
* OutputHandler Tests
*
* Test suite for the OutputHandler class, specifically focusing on
* question parsing and agent completion handling.
*/
import { describe, it, expect, beforeEach, vi } from 'vitest';
import { OutputHandler } from './output-handler.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { ProposalRepository } from '../db/repositories/proposal-repository.js';
import type { EventBus, DomainEvent, AgentWaitingEvent } from '../events/types.js';
import { getProvider } from './providers/registry.js';
// =============================================================================
// Test Helpers
// =============================================================================
function createMockEventBus(): EventBus & { emittedEvents: DomainEvent[] } {
const emittedEvents: DomainEvent[] = [];
const mockBus = {
emittedEvents,
emit: vi.fn().mockImplementation(<T extends DomainEvent>(event: T): void => {
emittedEvents.push(event);
}),
on: vi.fn(),
off: vi.fn(),
once: vi.fn(),
};
return mockBus;
}
function createMockAgentRepository() {
return {
findById: vi.fn(),
update: vi.fn(),
create: vi.fn(),
findByName: vi.fn(),
findByStatus: vi.fn(),
findAll: vi.fn(),
delete: vi.fn(),
};
}
function createMockProposalRepository() {
return {
createMany: vi.fn(),
findByAgentId: vi.fn(),
findByInitiativeId: vi.fn(),
findById: vi.fn(),
update: vi.fn(),
delete: vi.fn(),
create: vi.fn(),
findAll: vi.fn(),
};
}
// =============================================================================
// Tests
// =============================================================================
describe('OutputHandler', () => {
let outputHandler: OutputHandler;
let mockAgentRepo: ReturnType<typeof createMockAgentRepository>;
let mockProposalRepo: ReturnType<typeof createMockProposalRepository>;
let eventBus: ReturnType<typeof createMockEventBus>;
const mockAgent = {
id: 'agent-123',
name: 'test-agent',
taskId: 'task-456',
sessionId: 'session-789',
provider: 'claude',
mode: 'refine',
};
beforeEach(() => {
mockAgentRepo = createMockAgentRepository();
mockProposalRepo = createMockProposalRepository();
eventBus = createMockEventBus();
outputHandler = new OutputHandler(
mockAgentRepo as any,
eventBus,
mockProposalRepo as any
);
// Setup default mock behavior
mockAgentRepo.findById.mockResolvedValue(mockAgent);
});
describe('processAgentOutput', () => {
it('should correctly parse and handle questions from Claude CLI output', async () => {
// Arrange: Create realistic Claude CLI output with questions (like fantastic-crane)
const questionsResult = {
status: "questions",
questions: [
{
id: "q1",
question: "What specific components are in the current admin UI? (e.g., tables, forms, modals, navigation)"
},
{
id: "q2",
question: "What does 'modern look' mean for you? (e.g., dark mode support, specific color scheme, animations)"
},
{
id: "q3",
question: "Are there any specific shadcn components you want to use or prioritize?"
}
]
};
const claudeOutput = JSON.stringify({
type: "result",
subtype: "success",
is_error: false,
session_id: "test-session-123",
result: JSON.stringify(questionsResult),
total_cost_usd: 0.05
});
const getAgentWorkdir = vi.fn().mockReturnValue('/test/workdir');
const provider = getProvider('claude')!;
// Act
await outputHandler.processAgentOutput(
mockAgent.id,
claudeOutput,
provider,
getAgentWorkdir
);
// Assert: Agent should be updated with questions and waiting_for_input status
expect(mockAgentRepo.update).toHaveBeenCalledWith(mockAgent.id, {
pendingQuestions: JSON.stringify({
questions: [
{
id: 'q1',
question: 'What specific components are in the current admin UI? (e.g., tables, forms, modals, navigation)'
},
{
id: 'q2',
question: 'What does \'modern look\' mean for you? (e.g., dark mode support, specific color scheme, animations)'
},
{
id: 'q3',
question: 'Are there any specific shadcn components you want to use or prioritize?'
}
]
}),
status: 'waiting_for_input'
});
// Should be called at least once (could be once or twice depending on session ID extraction)
expect(mockAgentRepo.update).toHaveBeenCalledTimes(1);
// Assert: AgentWaitingEvent should be emitted
const waitingEvents = eventBus.emittedEvents.filter(e => e.type === 'agent:waiting') as AgentWaitingEvent[];
expect(waitingEvents).toHaveLength(1);
expect(waitingEvents[0].payload.questions).toEqual([
{
id: 'q1',
question: 'What specific components are in the current admin UI? (e.g., tables, forms, modals, navigation)'
},
{
id: 'q2',
question: 'What does \'modern look\' mean for you? (e.g., dark mode support, specific color scheme, animations)'
},
{
id: 'q3',
question: 'Are there any specific shadcn components you want to use or prioritize?'
}
]);
});
it('should handle malformed questions gracefully', async () => {
// Arrange: Create output with malformed questions JSON
const malformedOutput = JSON.stringify({
type: "result",
subtype: "success",
is_error: false,
session_id: "test-session",
result: '{"status": "questions", "questions": [malformed json]}',
total_cost_usd: 0.05
});
const getAgentWorkdir = vi.fn().mockReturnValue('/test/workdir');
const provider = getProvider('claude')!;
// Act & Assert: Should not throw, should handle error gracefully
await expect(
outputHandler.processAgentOutput(
mockAgent.id,
malformedOutput,
provider,
getAgentWorkdir
)
).resolves.not.toThrow();
// Should update status to crashed due to malformed JSON
const updateCalls = mockAgentRepo.update.mock.calls;
const crashedCall = updateCalls.find(call => call[1]?.status === 'crashed');
expect(crashedCall).toBeDefined();
});
it('should correctly handle "done" status without questions', async () => {
// Arrange: Create output with done status
const doneOutput = JSON.stringify({
type: "result",
subtype: "success",
is_error: false,
session_id: "test-session",
result: JSON.stringify({
status: "done",
message: "Task completed successfully"
}),
total_cost_usd: 0.05
});
const getAgentWorkdir = vi.fn().mockReturnValue('/test/workdir');
const provider = getProvider('claude')!;
// Act
await outputHandler.processAgentOutput(
mockAgent.id,
doneOutput,
provider,
getAgentWorkdir
);
// Assert: Should not set waiting_for_input status or pendingQuestions
const updateCalls = mockAgentRepo.update.mock.calls;
const waitingCall = updateCalls.find(call => call[1]?.status === 'waiting_for_input');
expect(waitingCall).toBeUndefined();
const questionsCall = updateCalls.find(call => call[1]?.pendingQuestions);
expect(questionsCall).toBeUndefined();
});
});
describe('getPendingQuestions', () => {
it('should retrieve and parse stored pending questions', async () => {
// Arrange
const questionsPayload = {
questions: [
{ id: 'q1', question: 'Test question 1?' },
{ id: 'q2', question: 'Test question 2?' }
]
};
mockAgentRepo.findById.mockResolvedValue({
...mockAgent,
pendingQuestions: JSON.stringify(questionsPayload)
});
// Act
const result = await outputHandler.getPendingQuestions(mockAgent.id);
// Assert
expect(result).toEqual(questionsPayload);
expect(mockAgentRepo.findById).toHaveBeenCalledWith(mockAgent.id);
});
it('should return null when no pending questions exist', async () => {
// Arrange
mockAgentRepo.findById.mockResolvedValue({
...mockAgent,
pendingQuestions: null
});
// Act
const result = await outputHandler.getPendingQuestions(mockAgent.id);
// Assert
expect(result).toBeNull();
});
});
});

View File

@@ -75,6 +75,7 @@ interface ClaudeCliResult {
export class OutputHandler {
private filePositions = new Map<string, number>();
private completionLocks = new Set<string>(); // Track agents currently being processed
constructor(
private repository: AgentRepository,
@@ -199,95 +200,123 @@ export class OutputHandler {
/**
* Handle completion of a detached agent.
* Processes the final result from the stream data captured by the tailer.
*
* RACE CONDITION FIX: Uses a completion lock to prevent duplicate processing.
* Both the polling handler (handleDetachedAgentCompletion) and crash handler
* (handleProcessCrashed) can call this method when a process exits with non-zero code.
* The mutex ensures only one handler processes the completion per agent.
*/
async handleCompletion(
agentId: string,
active: ActiveAgent | undefined,
getAgentWorkdir: (alias: string) => string,
): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) return;
const provider = getProvider(agent.provider);
if (!provider) return;
log.debug({ agentId }, 'detached agent completed');
// Verify agent worked in correct location by checking for output files
const agentWorkdir = getAgentWorkdir(agent.worktreeId);
const outputDir = join(agentWorkdir, '.cw', 'output');
const expectedPwdFile = join(agentWorkdir, '.cw', 'expected-pwd.txt');
const diagnosticFile = join(agentWorkdir, '.cw', 'spawn-diagnostic.json');
const outputDirExists = existsSync(outputDir);
const expectedPwdExists = existsSync(expectedPwdFile);
const diagnosticExists = existsSync(diagnosticFile);
log.info({
agentId,
agentWorkdir,
outputDirExists,
expectedPwdExists,
diagnosticExists,
verification: outputDirExists ? 'PASS' : 'FAIL'
}, 'agent workdir verification completed');
if (!outputDirExists) {
log.warn({
agentId,
agentWorkdir
}, 'No output files found in agent workdir! Agent may have run in wrong location.');
}
let signalText = active?.streamResultText;
// If the stream result indicated an error (e.g. auth failure, usage limit),
// route directly to error handling instead of trying to parse as signal JSON
if (signalText && active?.streamIsError) {
log.warn({ agentId, error: signalText }, 'agent returned error result');
await this.handleAgentError(agentId, new Error(signalText), provider, getAgentWorkdir);
// CRITICAL: Prevent race condition - only one completion handler per agent
if (this.completionLocks.has(agentId)) {
log.debug({ agentId }, 'completion already being processed - skipping duplicate');
return;
}
if (!signalText) {
try {
const outputFilePath = active?.outputFilePath ?? '';
if (outputFilePath) {
// Read only complete lines from the file, avoiding race conditions
const lastPosition = this.filePositions.get(agentId) || 0;
const { content: fileContent, lastPosition: newPosition } = await this.readCompleteLines(outputFilePath, lastPosition);
this.completionLocks.add(agentId);
if (fileContent.trim()) {
this.filePositions.set(agentId, newPosition);
await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir);
return;
}
try {
const agent = await this.repository.findById(agentId);
if (!agent) return;
// If no new complete lines, but file might still be writing, try again with validation
if (await this.validateSignalFile(outputFilePath)) {
const fullContent = await readFile(outputFilePath, 'utf-8');
if (fullContent.trim() && fullContent.length > newPosition) {
// File is complete and has content beyond what we've read
const provider = getProvider(agent.provider);
if (!provider) return;
log.debug({ agentId }, 'detached agent completed');
// Verify agent worked in correct location by checking for output files
const agentWorkdir = getAgentWorkdir(agent.worktreeId);
const outputDir = join(agentWorkdir, '.cw', 'output');
const expectedPwdFile = join(agentWorkdir, '.cw', 'expected-pwd.txt');
const diagnosticFile = join(agentWorkdir, '.cw', 'spawn-diagnostic.json');
const outputDirExists = existsSync(outputDir);
const expectedPwdExists = existsSync(expectedPwdFile);
const diagnosticExists = existsSync(diagnosticFile);
log.info({
agentId,
agentWorkdir,
outputDirExists,
expectedPwdExists,
diagnosticExists,
verification: outputDirExists ? 'PASS' : 'FAIL'
}, 'agent workdir verification completed');
if (!outputDirExists) {
log.warn({
agentId,
agentWorkdir
}, 'No output files found in agent workdir! Agent may have run in wrong location.');
}
let signalText = active?.streamResultText;
// If the stream result indicated an error (e.g. auth failure, usage limit),
// route directly to error handling instead of trying to parse as signal JSON
if (signalText && active?.streamIsError) {
log.warn({ agentId, error: signalText }, 'agent returned error result');
await this.handleAgentError(agentId, new Error(signalText), provider, getAgentWorkdir);
return;
}
if (!signalText) {
try {
const outputFilePath = active?.outputFilePath ?? '';
if (outputFilePath) {
// First, check for robust signal.json completion before attempting incremental reading
const agentWorkdir = getAgentWorkdir(agentId);
if (await this.checkSignalCompletion(agentWorkdir)) {
const signalPath = join(agentWorkdir, '.cw/output/signal.json');
const signalContent = await readFile(signalPath, 'utf-8');
log.debug({ agentId, signalPath }, 'detected completion via signal.json');
this.filePositions.delete(agentId); // Clean up tracking
await this.processAgentOutput(agentId, fullContent, provider, getAgentWorkdir);
await this.processSignalAndFiles(agentId, signalContent, agent.mode as AgentMode, getAgentWorkdir, active?.streamSessionId);
return;
}
// Read only complete lines from the file, avoiding race conditions
const lastPosition = this.filePositions.get(agentId) || 0;
const { content: fileContent, lastPosition: newPosition } = await this.readCompleteLines(outputFilePath, lastPosition);
if (fileContent.trim()) {
this.filePositions.set(agentId, newPosition);
await this.processAgentOutput(agentId, fileContent, provider, getAgentWorkdir);
return;
}
// If no new complete lines, but file might still be writing, try again with validation
if (await this.validateSignalFile(outputFilePath)) {
const fullContent = await readFile(outputFilePath, 'utf-8');
if (fullContent.trim() && fullContent.length > newPosition) {
// File is complete and has content beyond what we've read
this.filePositions.delete(agentId); // Clean up tracking
await this.processAgentOutput(agentId, fullContent, provider, getAgentWorkdir);
return;
}
}
}
}
} catch { /* file empty or missing */ }
} catch { /* file empty or missing */ }
log.warn({ agentId }, 'no result text from stream or file');
await this.handleAgentError(agentId, new Error('No output received'), provider, getAgentWorkdir);
return;
log.warn({ agentId }, 'no result text from stream or file');
await this.handleAgentError(agentId, new Error('No output received'), provider, getAgentWorkdir);
return;
}
await this.processSignalAndFiles(
agentId,
signalText,
agent.mode as AgentMode,
getAgentWorkdir,
active?.streamSessionId,
);
} finally {
this.completionLocks.delete(agentId); // Always clean up
}
await this.processSignalAndFiles(
agentId,
signalText,
agent.mode as AgentMode,
getAgentWorkdir,
active?.streamSessionId,
);
}
/**
@@ -724,6 +753,33 @@ export class OutputHandler {
// Private Helpers
// =========================================================================
/**
* Check if agent completed successfully by reading signal.json file.
* This is the robust completion detection logic that handles all completion statuses.
*/
private async checkSignalCompletion(agentWorkdir: string): Promise<boolean> {
try {
const { existsSync } = await import('node:fs');
const signalPath = join(agentWorkdir, '.cw/output/signal.json');
if (!existsSync(signalPath)) {
return false;
}
const signalContent = await readFile(signalPath, 'utf-8');
const signal = JSON.parse(signalContent);
// Agent completed if status is done, questions, or error
const completed = signal.status === 'done' || signal.status === 'questions' || signal.status === 'error';
return completed;
} catch (err) {
log.warn({ agentWorkdir, err: err instanceof Error ? err.message : String(err) }, 'failed to read or parse signal.json');
return false;
}
}
private emitCrashed(agent: { id: string; name: string; taskId: string | null }, error: string): void {
if (this.eventBus) {
const event: AgentCrashedEvent = {

View File

@@ -0,0 +1,423 @@
/**
* ProcessManager Unit Tests
*
* Tests for ProcessManager class focusing on working directory handling,
* command building, and spawn validation.
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { ProcessManager } from './process-manager.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { EventBus } from '../events/index.js';
// Mock child_process.spawn
vi.mock('node:child_process', () => ({
spawn: vi.fn(),
}));
// Mock fs operations
vi.mock('node:fs', () => ({
writeFileSync: vi.fn(),
mkdirSync: vi.fn(),
openSync: vi.fn((path) => {
// Return different fd numbers for stdout and stderr
if (path.includes('output.jsonl')) return 99;
if (path.includes('stderr.log')) return 100;
return 101;
}),
closeSync: vi.fn(),
existsSync: vi.fn(),
}));
// Mock FileTailer
vi.mock('./file-tailer.js', () => ({
FileTailer: class MockFileTailer {
start = vi.fn().mockResolvedValue(undefined);
stop = vi.fn().mockResolvedValue(undefined);
},
}));
// Mock SimpleGitWorktreeManager
const mockCreate = vi.fn();
vi.mock('../git/manager.js', () => ({
SimpleGitWorktreeManager: class MockWorktreeManager {
create = mockCreate;
},
}));
// Mock project clones
vi.mock('../git/project-clones.js', () => ({
ensureProjectClone: vi.fn().mockResolvedValue('/mock/clone/path'),
getProjectCloneDir: vi.fn().mockReturnValue('/mock/clone/path'),
}));
// Mock providers
vi.mock('./providers/parsers/index.js', () => ({
getStreamParser: vi.fn().mockReturnValue({ parse: vi.fn() }),
}));
import { spawn } from 'node:child_process';
import { existsSync, writeFileSync, mkdirSync, openSync, closeSync } from 'node:fs';
import { ensureProjectClone } from '../git/project-clones.js';
const mockSpawn = vi.mocked(spawn);
const mockExistsSync = vi.mocked(existsSync);
const mockWriteFileSync = vi.mocked(writeFileSync);
const mockMkdirSync = vi.mocked(mkdirSync);
const mockOpenSync = vi.mocked(openSync);
const mockCloseSync = vi.mocked(closeSync);
describe('ProcessManager', () => {
let processManager: ProcessManager;
let mockProjectRepository: ProjectRepository;
let mockEventBus: EventBus;
const workspaceRoot = '/test/workspace';
beforeEach(() => {
vi.clearAllMocks();
// Mock child process
const mockChild = {
pid: 12345,
unref: vi.fn(),
on: vi.fn(),
kill: vi.fn(),
};
mockSpawn.mockReturnValue(mockChild as any);
// Mock project repository
mockProjectRepository = {
findProjectsByInitiativeId: vi.fn().mockResolvedValue([]),
create: vi.fn(),
findAll: vi.fn(),
findById: vi.fn(),
findByName: vi.fn(),
update: vi.fn(),
delete: vi.fn(),
setInitiativeProjects: vi.fn(),
addProjectToInitiative: vi.fn(),
removeProjectFromInitiative: vi.fn(),
};
// Mock event bus
mockEventBus = {
emit: vi.fn(),
on: vi.fn(),
off: vi.fn(),
once: vi.fn(),
};
processManager = new ProcessManager(workspaceRoot, mockProjectRepository, mockEventBus);
});
afterEach(() => {
vi.resetAllMocks();
});
describe('getAgentWorkdir', () => {
it('returns correct agent workdir path', () => {
const alias = 'test-agent';
const expected = '/test/workspace/agent-workdirs/test-agent';
const result = processManager.getAgentWorkdir(alias);
expect(result).toBe(expected);
});
});
describe('createProjectWorktrees', () => {
beforeEach(() => {
// Mock the global worktree create function
mockCreate.mockResolvedValue({
id: 'project1',
path: '/test/workspace/agent-workdirs/test-agent/project1',
branch: 'agent/test-agent',
isMainWorktree: false,
});
// Mock project repository
vi.mocked(mockProjectRepository.findProjectsByInitiativeId).mockResolvedValue([
{ id: '1', name: 'project1', url: 'https://github.com/user/project1.git', createdAt: new Date(), updatedAt: new Date() }
]);
// Mock existsSync to return true for worktree paths
mockExistsSync.mockImplementation((path) => {
return path.toString().includes('/agent-workdirs/');
});
});
it('creates worktrees for initiative projects', async () => {
const alias = 'test-agent';
const initiativeId = 'init-123';
const result = await processManager.createProjectWorktrees(alias, initiativeId);
expect(result).toBe('/test/workspace/agent-workdirs/test-agent');
expect(mockProjectRepository.findProjectsByInitiativeId).toHaveBeenCalledWith('init-123');
expect(ensureProjectClone).toHaveBeenCalled();
});
it('throws error when worktree creation fails', async () => {
// Mock worktree path to not exist after creation
mockExistsSync.mockReturnValue(false);
const alias = 'test-agent';
const initiativeId = 'init-123';
await expect(processManager.createProjectWorktrees(alias, initiativeId))
.rejects.toThrow('Worktree creation failed:');
});
it('logs comprehensive worktree creation details', async () => {
const alias = 'test-agent';
const initiativeId = 'init-123';
await processManager.createProjectWorktrees(alias, initiativeId);
// Verify logging (implementation would need to capture log calls)
// For now, just verify the method completes successfully
expect(mockProjectRepository.findProjectsByInitiativeId).toHaveBeenCalledWith('init-123');
});
});
describe('createStandaloneWorktree', () => {
beforeEach(() => {
mockCreate.mockResolvedValue({
id: 'workspace',
path: '/test/workspace/agent-workdirs/test-agent/workspace',
branch: 'agent/test-agent',
isMainWorktree: false,
});
mockExistsSync.mockImplementation((path) => {
return path.toString().includes('/workspace');
});
});
it('creates standalone worktree', async () => {
const alias = 'test-agent';
const result = await processManager.createStandaloneWorktree(alias);
expect(result).toBe('/test/workspace/agent-workdirs/test-agent/workspace');
});
it('throws error when standalone worktree creation fails', async () => {
mockExistsSync.mockReturnValue(false);
const alias = 'test-agent';
await expect(processManager.createStandaloneWorktree(alias))
.rejects.toThrow('Standalone worktree creation failed:');
});
});
describe('spawnDetached', () => {
beforeEach(() => {
mockExistsSync.mockReturnValue(true); // CWD exists
});
it('validates cwd exists before spawn', () => {
const agentId = 'agent-123';
const command = 'claude';
const args = ['--help'];
const cwd = '/test/workspace/agent-workdirs/test-agent';
const env = { TEST_VAR: 'value' };
const providerName = 'claude';
processManager.spawnDetached(agentId, command, args, cwd, env, providerName);
expect(mockExistsSync).toHaveBeenCalledWith(cwd);
expect(mockSpawn).toHaveBeenCalledWith(command, args, {
cwd,
env: expect.objectContaining(env),
detached: true,
stdio: ['ignore', 99, 100],
});
});
it('throws error when cwd does not exist', () => {
mockExistsSync.mockReturnValue(false);
const agentId = 'agent-123';
const command = 'claude';
const args = ['--help'];
const cwd = '/nonexistent/path';
const env = {};
const providerName = 'claude';
expect(() => {
processManager.spawnDetached(agentId, command, args, cwd, env, providerName);
}).toThrow('Agent working directory does not exist: /nonexistent/path');
});
it('passes correct cwd parameter to spawn', () => {
const agentId = 'agent-123';
const command = 'claude';
const args = ['--help'];
const cwd = '/test/workspace/agent-workdirs/test-agent';
const env = { CLAUDE_CONFIG_DIR: '/config' };
const providerName = 'claude';
processManager.spawnDetached(agentId, command, args, cwd, env, providerName);
expect(mockSpawn).toHaveBeenCalledTimes(1);
const spawnCall = mockSpawn.mock.calls[0];
expect(spawnCall[0]).toBe(command);
expect(spawnCall[1]).toEqual(args);
expect(spawnCall[2]).toEqual({
cwd,
env: expect.objectContaining({
...process.env,
CLAUDE_CONFIG_DIR: '/config',
}),
detached: true,
stdio: ['ignore', 99, 100],
});
});
it('logs comprehensive spawn information', () => {
const agentId = 'agent-123';
const command = 'claude';
const args = ['--json-schema', 'schema.json'];
const cwd = '/test/workspace/agent-workdirs/test-agent';
const env = { CLAUDE_CONFIG_DIR: '/config' };
const providerName = 'claude';
const result = processManager.spawnDetached(agentId, command, args, cwd, env, providerName);
expect(result).toHaveProperty('pid', 12345);
expect(result).toHaveProperty('outputFilePath');
expect(result).toHaveProperty('tailer');
// Verify log directory creation
expect(mockMkdirSync).toHaveBeenCalledWith(
'/test/workspace/.cw/agent-logs/agent-123',
{ recursive: true }
);
});
it('writes prompt file when provided', () => {
const agentId = 'agent-123';
const command = 'claude';
const args = ['--help'];
const cwd = '/test/workspace/agent-workdirs/test-agent';
const env = {};
const providerName = 'claude';
const prompt = 'Test prompt';
processManager.spawnDetached(agentId, command, args, cwd, env, providerName, prompt);
expect(mockWriteFileSync).toHaveBeenCalledWith(
'/test/workspace/.cw/agent-logs/agent-123/PROMPT.md',
'Test prompt',
'utf-8'
);
});
});
describe('buildSpawnCommand', () => {
it('builds command with native prompt mode', () => {
const provider = {
name: 'claude',
command: 'claude',
args: ['--json-schema', 'schema.json'],
env: {},
promptMode: 'native' as const,
processNames: ['claude'],
resumeStyle: 'flag' as const,
resumeFlag: '--resume',
nonInteractive: {
subcommand: 'chat',
promptFlag: '-p',
outputFlag: '--output-format json',
},
};
const prompt = 'Test prompt';
const result = processManager.buildSpawnCommand(provider, prompt);
expect(result).toEqual({
command: 'claude',
args: ['chat', '--json-schema', 'schema.json', '-p', 'Test prompt', '--output-format', 'json'],
env: {},
});
});
it('builds command with flag prompt mode', () => {
const provider = {
name: 'codex',
command: 'codex',
args: ['--format', 'json'],
env: {},
promptMode: 'flag' as const,
processNames: ['codex'],
resumeStyle: 'subcommand' as const,
resumeFlag: 'resume',
nonInteractive: {
subcommand: 'run',
promptFlag: '--prompt',
outputFlag: '--json',
},
};
const prompt = 'Test prompt';
const result = processManager.buildSpawnCommand(provider, prompt);
expect(result).toEqual({
command: 'codex',
args: ['run', '--format', 'json', '--prompt', 'Test prompt', '--json'],
env: {},
});
});
});
describe('buildResumeCommand', () => {
it('builds resume command with flag style', () => {
const provider = {
name: 'claude',
command: 'claude',
args: [],
env: {},
promptMode: 'native' as const,
processNames: ['claude'],
resumeStyle: 'flag' as const,
resumeFlag: '--resume',
nonInteractive: {
subcommand: 'chat',
promptFlag: '-p',
outputFlag: '--json',
},
};
const sessionId = 'session-123';
const prompt = 'Continue working';
const result = processManager.buildResumeCommand(provider, sessionId, prompt);
expect(result).toEqual({
command: 'claude',
args: ['--resume', 'session-123', '-p', 'Continue working', '--json'],
env: {},
});
});
it('throws error for providers without resume support', () => {
const provider = {
name: 'noresume',
command: 'noresume',
args: [],
env: {},
promptMode: 'native' as const,
processNames: ['noresume'],
resumeStyle: 'none' as const,
};
const sessionId = 'session-123';
const prompt = 'Continue working';
expect(() => {
processManager.buildResumeCommand(provider, sessionId, prompt);
}).toThrow("Provider 'noresume' does not support resume");
});
});
});

View File

@@ -9,22 +9,27 @@
const SIGNAL_FORMAT = `
## Signal Output
When done, output ONLY this JSON (no other text before or after):
When done, write \`.cw/output/signal.json\` with:
{ "status": "done" }
If you need clarification, output:
If you need clarification, write:
{ "status": "questions", "questions": [{ "id": "q1", "question": "Your question" }] }
If you hit an unrecoverable error, output:
{ "status": "error", "error": "Description of what went wrong" }`;
If you hit an unrecoverable error, write:
{ "status": "error", "error": "Description of what went wrong" }
IMPORTANT: Always write this file as your final action before terminating.`;
const INPUT_FILES = `
## Input Files
Read context from \`.cw/input/\`:
Read \`.cw/input/manifest.json\` first — it lists exactly which input files exist.
Then read only those files from \`.cw/input/\`.
Possible files:
- \`initiative.md\` — Initiative details (frontmatter: id, name, status)
- \`phase.md\` — Phase details if applicable (frontmatter: id, number, name, status; body: description)
- \`task.md\` — Task details if applicable (frontmatter: id, name, category, type, priority, status; body: description)
- \`phase.md\` — Phase details (frontmatter: id, number, name, status; body: description)
- \`task.md\` — Task details (frontmatter: id, name, category, type, priority, status; body: description)
- \`pages/\` — Initiative pages (one file per page; frontmatter: title, parentPageId, sortOrder; body: markdown content)`;
const SUMMARY_REQUIREMENT = `

View File

@@ -76,8 +76,9 @@ export class ClaudeStreamParser implements StreamParser {
return [];
}
// Check for error first (can appear on any event type)
if ('is_error' in parsed && parsed.is_error && 'result' in parsed) {
// Check for error on non-result events (e.g. stream errors)
// Result events with is_error are handled in the 'result' case below
if ('is_error' in parsed && parsed.is_error && 'result' in parsed && parsed.type !== 'result') {
return [{ type: 'error', message: String(parsed.result) }];
}
@@ -148,6 +149,7 @@ export class ClaudeStreamParser implements StreamParser {
text: resultEvent.result || '',
sessionId: resultEvent.session_id,
costUsd: resultEvent.total_cost_usd,
isError: resultEvent.is_error === true,
});
break;
}

View File

@@ -42,6 +42,8 @@ export interface StreamResultEvent {
text: string;
sessionId?: string;
costUsd?: number;
/** True when the CLI returned an error result (e.g. auth failure, usage limit) */
isError?: boolean;
}
/** Error event */

View File

@@ -77,6 +77,8 @@ export interface AgentInfo {
createdAt: Date;
/** Last activity timestamp */
updatedAt: Date;
/** When the user dismissed this agent (null if not dismissed) */
userDismissedAt?: Date | null;
}
/**

View File

@@ -21,14 +21,14 @@ const CW_PORT_ENV = 'CW_PORT';
* Starts the coordination server in foreground mode.
* Server runs until terminated via SIGTERM/SIGINT.
*/
async function startServer(port?: number): Promise<void> {
async function startServer(port?: number, debug?: boolean): Promise<void> {
// Get port from option, env var, or default
const serverPort = port ??
(process.env[CW_PORT_ENV] ? parseInt(process.env[CW_PORT_ENV], 10) : undefined);
const log = createModuleLogger('server');
// Create full dependency graph
const container = await createContainer();
const container = await createContainer({ debug });
// Create and start server
const server = new CoordinationServer(
@@ -66,10 +66,11 @@ export function createCli(serverHandler?: (port?: number) => Promise<void>): Com
.description('Multi-agent workspace for orchestrating multiple Claude Code agents')
.version(VERSION, '-v, --version', 'Display version number');
// Server mode option (global flag)
// Server mode options (global flags)
program
.option('-s, --server', 'Start the coordination server')
.option('-p, --port <number>', 'Port for the server (default: 3847, env: CW_PORT)', parseInt);
.option('-p, --port <number>', 'Port for the server (default: 3847, env: CW_PORT)', parseInt)
.option('-d, --debug', 'Enable debug mode (archive agent workdirs before cleanup)');
// Handle the case where --server is provided without a command
// This makes --server work as a standalone action
@@ -1118,14 +1119,34 @@ export function createCli(serverHandler?: (port?: number) => Promise<void>): Com
const existing = await client.listAccounts.query();
const alreadyRegistered = existing.find((a: any) => a.email === extracted.email);
if (alreadyRegistered) {
// Upsert: update credentials on existing account
// Compare refresh tokens to detect staleness
let credentialsChanged = true;
try {
const dbCreds = alreadyRegistered.credentials ? JSON.parse(alreadyRegistered.credentials) : null;
const sourceCreds = JSON.parse(extracted.credentials);
const dbRefreshToken = dbCreds?.claudeAiOauth?.refreshToken;
const sourceRefreshToken = sourceCreds?.claudeAiOauth?.refreshToken;
credentialsChanged = dbRefreshToken !== sourceRefreshToken;
} catch {
// Parse error — assume changed, update to be safe
}
// Upsert: always update to be safe
await client.updateAccountAuth.mutate({
id: alreadyRegistered.id,
configJson: JSON.stringify(extracted.configJson),
credentials: extracted.credentials,
});
console.log(`Updated credentials for account: ${alreadyRegistered.id}`);
console.log(` Email: ${extracted.email}`);
if (credentialsChanged) {
console.log(`Updated credentials for account: ${alreadyRegistered.id}`);
console.log(` Email: ${extracted.email}`);
console.log(` Refresh token changed (source had fresher credentials)`);
} else {
console.log(`Credentials current for account: ${alreadyRegistered.id}`);
console.log(` Email: ${extracted.email}`);
console.log(` Refresh token unchanged`);
}
return;
}
@@ -1217,7 +1238,9 @@ export async function runCli(): Promise<void> {
? parseInt(process.argv[portIndex + 1], 10)
: undefined;
await startServer(port);
const debug = process.argv.includes('--debug') || process.argv.includes('-d');
await startServer(port, debug);
// Server runs indefinitely until signal
return;
}

View File

@@ -17,6 +17,7 @@ import {
DrizzlePageRepository,
DrizzleProjectRepository,
DrizzleAccountRepository,
DrizzleProposalRepository,
} from './db/index.js';
import type { InitiativeRepository } from './db/repositories/initiative-repository.js';
import type { PhaseRepository } from './db/repositories/phase-repository.js';
@@ -26,6 +27,7 @@ import type { AgentRepository } from './db/repositories/agent-repository.js';
import type { PageRepository } from './db/repositories/page-repository.js';
import type { ProjectRepository } from './db/repositories/project-repository.js';
import type { AccountRepository } from './db/repositories/account-repository.js';
import type { ProposalRepository } from './db/repositories/proposal-repository.js';
import type { EventBus } from './events/index.js';
import { createEventBus } from './events/index.js';
import { ProcessManager, ProcessRegistry } from './process/index.js';
@@ -42,7 +44,7 @@ import type { ServerContextDeps } from './server/index.js';
// =============================================================================
/**
* All 8 repository ports.
* All 9 repository ports.
*/
export interface Repositories {
initiativeRepository: InitiativeRepository;
@@ -53,10 +55,11 @@ export interface Repositories {
pageRepository: PageRepository;
projectRepository: ProjectRepository;
accountRepository: AccountRepository;
proposalRepository: ProposalRepository;
}
/**
* Create all 8 Drizzle repository adapters from a database instance.
* Create all 9 Drizzle repository adapters from a database instance.
* Reusable by both the production server and the test harness.
*/
export function createRepositories(db: DrizzleDatabase): Repositories {
@@ -69,6 +72,7 @@ export function createRepositories(db: DrizzleDatabase): Repositories {
pageRepository: new DrizzlePageRepository(db),
projectRepository: new DrizzleProjectRepository(db),
accountRepository: new DrizzleAccountRepository(db),
proposalRepository: new DrizzleProposalRepository(db),
};
}
@@ -92,6 +96,13 @@ export interface Container extends Repositories {
toContextDeps(): ServerContextDeps;
}
/**
* Options for container creation.
*/
export interface ContainerOptions {
debug?: boolean;
}
/**
* Create the full dependency container.
*
@@ -99,7 +110,7 @@ export interface Container extends Repositories {
* Database → Repositories → CredentialManager → AgentManager.
* Runs ensureSchema() and reconcileAfterRestart() before returning.
*/
export async function createContainer(): Promise<Container> {
export async function createContainer(options?: ContainerOptions): Promise<Container> {
const log = createModuleLogger('container');
// Infrastructure
@@ -133,6 +144,8 @@ export async function createContainer(): Promise<Container> {
repos.accountRepository,
eventBus,
credentialManager,
repos.proposalRepository,
options?.debug ?? false,
);
log.info('agent manager created');

View File

@@ -44,6 +44,7 @@ export interface UpdateAgentData {
provider?: string;
accountId?: string | null;
pid?: number | null;
exitCode?: number | null;
outputFilePath?: string | null;
result?: string | null;
pendingQuestions?: string | null;

View File

@@ -13,3 +13,4 @@ export { DrizzleMessageRepository } from './message.js';
export { DrizzlePageRepository } from './page.js';
export { DrizzleProjectRepository } from './project.js';
export { DrizzleAccountRepository } from './account.js';
export { DrizzleProposalRepository } from './proposal.js';

View File

@@ -0,0 +1,133 @@
/**
* Drizzle Proposal Repository Adapter
*
* Implements ProposalRepository interface using Drizzle ORM.
*/
import { eq, and, count, asc } from 'drizzle-orm';
import { nanoid } from 'nanoid';
import type { DrizzleDatabase } from '../../index.js';
import { proposals, type Proposal } from '../../schema.js';
import type {
ProposalRepository,
CreateProposalData,
UpdateProposalData,
} from '../proposal-repository.js';
export class DrizzleProposalRepository implements ProposalRepository {
constructor(private db: DrizzleDatabase) {}
async create(data: CreateProposalData): Promise<Proposal> {
const id = nanoid();
const now = new Date();
const [created] = await this.db.insert(proposals).values({
id,
...data,
createdAt: now,
updatedAt: now,
}).returning();
return created;
}
async createMany(data: CreateProposalData[]): Promise<Proposal[]> {
if (data.length === 0) return [];
const now = new Date();
const rows = data.map((d) => ({
id: nanoid(),
...d,
createdAt: now,
updatedAt: now,
}));
return this.db.insert(proposals).values(rows).returning();
}
async findById(id: string): Promise<Proposal | null> {
const result = await this.db
.select()
.from(proposals)
.where(eq(proposals.id, id))
.limit(1);
return result[0] ?? null;
}
async findByAgentId(agentId: string): Promise<Proposal[]> {
return this.db
.select()
.from(proposals)
.where(eq(proposals.agentId, agentId))
.orderBy(asc(proposals.sortOrder));
}
async findByInitiativeId(initiativeId: string): Promise<Proposal[]> {
return this.db
.select()
.from(proposals)
.where(eq(proposals.initiativeId, initiativeId))
.orderBy(asc(proposals.sortOrder));
}
async findByAgentIdAndStatus(agentId: string, status: string): Promise<Proposal[]> {
return this.db
.select()
.from(proposals)
.where(
and(
eq(proposals.agentId, agentId),
eq(proposals.status, status as 'pending' | 'accepted' | 'dismissed'),
),
)
.orderBy(asc(proposals.sortOrder));
}
async update(id: string, data: UpdateProposalData): Promise<Proposal> {
const [updated] = await this.db
.update(proposals)
.set({ ...data, updatedAt: new Date() })
.where(eq(proposals.id, id))
.returning();
if (!updated) {
throw new Error(`Proposal not found: ${id}`);
}
return updated;
}
async updateManyByAgentId(agentId: string, data: UpdateProposalData): Promise<void> {
await this.db
.update(proposals)
.set({ ...data, updatedAt: new Date() })
.where(eq(proposals.agentId, agentId));
}
async updateManyByAgentIdAndStatus(agentId: string, currentStatus: string, data: UpdateProposalData): Promise<void> {
await this.db
.update(proposals)
.set({ ...data, updatedAt: new Date() })
.where(
and(
eq(proposals.agentId, agentId),
eq(proposals.status, currentStatus as 'pending' | 'accepted' | 'dismissed'),
),
);
}
async countByAgentIdAndStatus(agentId: string, status: string): Promise<number> {
const result = await this.db
.select({ count: count() })
.from(proposals)
.where(
and(
eq(proposals.agentId, agentId),
eq(proposals.status, status as 'pending' | 'accepted' | 'dismissed'),
),
);
return result[0]?.count ?? 0;
}
}

View File

@@ -56,3 +56,9 @@ export type {
AccountRepository,
CreateAccountData,
} from './account-repository.js';
export type {
ProposalRepository,
CreateProposalData,
UpdateProposalData,
} from './proposal-repository.js';

View File

@@ -0,0 +1,35 @@
/**
* Proposal Repository Port Interface
*
* Port for Proposal aggregate operations.
* Implementations (Drizzle, etc.) are adapters.
*/
import type { Proposal, NewProposal } from '../schema.js';
/**
* Data for creating a new proposal.
* Omits system-managed fields (id, createdAt, updatedAt).
*/
export type CreateProposalData = Omit<NewProposal, 'id' | 'createdAt' | 'updatedAt'>;
/**
* Data for updating a proposal.
*/
export type UpdateProposalData = Partial<Pick<NewProposal, 'status'>>;
/**
* Proposal Repository Port
*/
export interface ProposalRepository {
create(data: CreateProposalData): Promise<Proposal>;
createMany(data: CreateProposalData[]): Promise<Proposal[]>;
findById(id: string): Promise<Proposal | null>;
findByAgentId(agentId: string): Promise<Proposal[]>;
findByInitiativeId(initiativeId: string): Promise<Proposal[]>;
findByAgentIdAndStatus(agentId: string, status: string): Promise<Proposal[]>;
update(id: string, data: UpdateProposalData): Promise<Proposal>;
updateManyByAgentId(agentId: string, data: UpdateProposalData): Promise<void>;
updateManyByAgentIdAndStatus(agentId: string, currentStatus: string, data: UpdateProposalData): Promise<void>;
countByAgentIdAndStatus(agentId: string, status: string): Promise<number>;
}

View File

@@ -35,6 +35,7 @@ export const initiativesRelations = relations(initiatives, ({ many }) => ({
pages: many(pages),
initiativeProjects: many(initiativeProjects),
tasks: many(tasks),
proposals: many(proposals),
}));
export type Initiative = InferSelectModel<typeof initiatives>;
@@ -264,6 +265,7 @@ export const agents = sqliteTable('agents', {
.notNull()
.default('execute'),
pid: integer('pid'),
exitCode: integer('exit_code'), // Process exit code for debugging crashes
outputFilePath: text('output_file_path'),
result: text('result'),
pendingQuestions: text('pending_questions'),
@@ -272,7 +274,7 @@ export const agents = sqliteTable('agents', {
userDismissedAt: integer('user_dismissed_at', { mode: 'timestamp' }),
});
export const agentsRelations = relations(agents, ({ one }) => ({
export const agentsRelations = relations(agents, ({ one, many }) => ({
task: one(tasks, {
fields: [agents.taskId],
references: [tasks.id],
@@ -285,11 +287,52 @@ export const agentsRelations = relations(agents, ({ one }) => ({
fields: [agents.accountId],
references: [accounts.id],
}),
proposals: many(proposals),
}));
export type Agent = InferSelectModel<typeof agents>;
export type NewAgent = InferInsertModel<typeof agents>;
// ============================================================================
// PROPOSALS
// ============================================================================
export const proposals = sqliteTable('proposals', {
id: text('id').primaryKey(),
agentId: text('agent_id')
.notNull()
.references(() => agents.id, { onDelete: 'cascade' }),
initiativeId: text('initiative_id')
.notNull()
.references(() => initiatives.id, { onDelete: 'cascade' }),
targetType: text('target_type', { enum: ['page', 'phase', 'task'] }).notNull(),
targetId: text('target_id'), // existing entity ID (e.g. pageId for updates), null for creates
title: text('title').notNull(),
summary: text('summary'),
content: text('content'), // markdown body (pages), description (phases/tasks)
metadata: text('metadata'), // JSON: type-specific data (phase number, task category, deps)
status: text('status', { enum: ['pending', 'accepted', 'dismissed'] })
.notNull()
.default('pending'),
sortOrder: integer('sort_order').notNull().default(0),
createdAt: integer('created_at', { mode: 'timestamp' }).notNull(),
updatedAt: integer('updated_at', { mode: 'timestamp' }).notNull(),
});
export const proposalsRelations = relations(proposals, ({ one }) => ({
agent: one(agents, {
fields: [proposals.agentId],
references: [agents.id],
}),
initiative: one(initiatives, {
fields: [proposals.initiativeId],
references: [initiatives.id],
}),
}));
export type Proposal = InferSelectModel<typeof proposals>;
export type NewProposal = InferInsertModel<typeof proposals>;
// ============================================================================
// MESSAGES
// ============================================================================

View File

@@ -55,6 +55,7 @@ export interface ProcessCrashedEvent extends DomainEvent {
payload: {
processId: string;
pid: number;
exitCode: number | null;
signal: string | null;
};
}

View File

@@ -389,6 +389,7 @@ describe('ProcessManager', () => {
payload: {
processId: 'proc-1',
pid: 12345,
exitCode: 1,
signal: 'SIGTERM',
},
})

View File

@@ -124,6 +124,7 @@ export class ProcessManager {
payload: {
processId: id,
pid,
exitCode: code,
signal,
},
};

View File

@@ -17,6 +17,7 @@ import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { PageRepository } from '../db/repositories/page-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { AccountRepository } from '../db/repositories/account-repository.js';
import type { ProposalRepository } from '../db/repositories/proposal-repository.js';
import type { AccountCredentialManager } from '../agent/credentials/types.js';
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
import type { CoordinationManager } from '../coordination/types.js';
@@ -53,6 +54,8 @@ export interface TrpcAdapterOptions {
projectRepository?: ProjectRepository;
/** Account repository for account CRUD and load balancing */
accountRepository?: AccountRepository;
/** Proposal repository for agent proposal CRUD operations */
proposalRepository?: ProposalRepository;
/** Credential manager for account OAuth token management */
credentialManager?: AccountCredentialManager;
/** Absolute path to the workspace root (.cwrc directory) */
@@ -129,6 +132,7 @@ export function createTrpcHandler(options: TrpcAdapterOptions) {
pageRepository: options.pageRepository,
projectRepository: options.projectRepository,
accountRepository: options.accountRepository,
proposalRepository: options.proposalRepository,
credentialManager: options.credentialManager,
workspaceRoot: options.workspaceRoot,
}),

View File

@@ -0,0 +1,203 @@
/**
* Agent Working Directory Verification Tests
*
* Tests that verify agents actually run in their intended working directories.
* These tests use simple shell commands to prove the agent execution location.
*
* IMPORTANT: These tests spawn real CLI processes and may incur API costs.
* They are SKIPPED by default to prevent accidental charges.
*
* To run these tests:
* ```bash
* REAL_WORKDIR_TESTS=1 npm test -- src/test/integration/agent-workdir-verification.test.ts --test-timeout=120000
* ```
*/
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import { mkdtemp, rm, readFile } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { MultiProviderAgentManager } from '../../agent/manager.js';
import { createTestDatabase } from '../../db/repositories/drizzle/test-helpers.js';
import {
DrizzleAgentRepository,
DrizzleProjectRepository,
DrizzleAccountRepository,
DrizzleInitiativeRepository,
} from '../../db/repositories/drizzle/index.js';
import { EventEmitterBus } from '../../events/bus.js';
const SHOULD_SKIP = !process.env.REAL_WORKDIR_TESTS;
const TEST_TIMEOUT = 60000;
describe.skipIf(SHOULD_SKIP)('Agent Working Directory Verification', () => {
let tempDir: string;
let agentManager: MultiProviderAgentManager;
let agentRepository: DrizzleAgentRepository;
beforeAll(async () => {
if (SHOULD_SKIP) return;
console.log('\n=== Running Agent Working Directory Tests ===');
console.log('These tests verify agents run in correct working directories.\n');
// Create temp directory for test workspace
tempDir = await mkdtemp(join(tmpdir(), 'cw-workdir-test-'));
// Set up test database and repositories
const db = await createTestDatabase();
const eventBus = new EventEmitterBus();
agentRepository = new DrizzleAgentRepository(db);
const projectRepository = new DrizzleProjectRepository(db);
const accountRepository = new DrizzleAccountRepository(db);
agentManager = new MultiProviderAgentManager(
agentRepository,
tempDir,
projectRepository,
accountRepository,
eventBus,
);
});
afterAll(async () => {
if (SHOULD_SKIP || !tempDir) return;
try {
await rm(tempDir, { recursive: true });
} catch (err) {
console.warn('Failed to cleanup temp directory:', err);
}
});
it('spawns agent in correct standalone working directory', async () => {
const prompt = `
Write your current working directory to a file called 'verify-pwd.txt'.
Use this exact bash command:
pwd > verify-pwd.txt
Then output the signal: {"done": true}
`.trim();
// Spawn standalone agent
const agent = await agentManager.spawn({
taskId: null,
prompt,
mode: 'execute',
provider: 'claude',
});
expect(agent.id).toBeTruthy();
expect(agent.status).toBe('running');
// Wait for completion (poll agent status)
let attempts = 0;
const maxAttempts = 60; // 60 seconds timeout
while (attempts < maxAttempts) {
await new Promise(resolve => setTimeout(resolve, 1000));
attempts++;
const currentAgent = await agentRepository.findById(agent.id);
if (!currentAgent || currentAgent.status !== 'running') {
break;
}
}
// Verify final agent state
const completedAgent = await agentRepository.findById(agent.id);
expect(completedAgent).toBeTruthy();
expect(completedAgent!.status).not.toBe('running');
// Get the agent's expected working directory
const expectedWorkdir = join(tempDir, 'agent-workdirs', agent.name, 'workspace');
// Read diagnostic files
const diagnosticFile = join(expectedWorkdir, '.cw', 'spawn-diagnostic.json');
const expectedPwdFile = join(expectedWorkdir, '.cw', 'expected-pwd.txt');
const verifyPwdFile = join(expectedWorkdir, 'verify-pwd.txt');
// Verify diagnostic files exist
expect(existsSync(diagnosticFile), 'spawn diagnostic file should exist').toBe(true);
expect(existsSync(expectedPwdFile), 'expected pwd file should exist').toBe(true);
// Read diagnostic data
const diagnostic = JSON.parse(await readFile(diagnosticFile, 'utf-8'));
const expectedPwd = (await readFile(expectedPwdFile, 'utf-8')).trim();
console.log('Diagnostic data:', diagnostic);
console.log('Expected working directory:', expectedPwd);
// Verify diagnostic consistency
expect(diagnostic.intendedCwd).toBe(expectedWorkdir);
expect(diagnostic.cwdExistsAtSpawn).toBe(true);
expect(expectedPwd).toBe(expectedWorkdir);
// The critical test: verify the agent actually wrote the file in the expected location
if (existsSync(verifyPwdFile)) {
const actualPwd = (await readFile(verifyPwdFile, 'utf-8')).trim();
console.log('Agent reported working directory:', actualPwd);
// This is the key verification: the pwd reported by the agent should match expected
expect(actualPwd).toBe(expectedWorkdir);
} else {
// If the file doesn't exist, the agent either failed or ran somewhere else
console.warn('Agent did not create verify-pwd.txt file');
console.log('Expected at:', verifyPwdFile);
// Let's check if it was created elsewhere (debugging)
const alternativeLocations = [
join(tempDir, 'verify-pwd.txt'),
join(process.cwd(), 'verify-pwd.txt'),
];
for (const loc of alternativeLocations) {
if (existsSync(loc)) {
const content = await readFile(loc, 'utf-8');
console.log(`Found verify-pwd.txt at unexpected location ${loc}:`, content.trim());
}
}
throw new Error('Agent did not create pwd verification file in expected location');
}
}, TEST_TIMEOUT);
it('creates diagnostic files with correct metadata', async () => {
const prompt = `Output the signal: {"done": true}`;
const agent = await agentManager.spawn({
taskId: null,
prompt,
mode: 'execute',
provider: 'claude',
});
// Wait a bit for spawn to complete
await new Promise(resolve => setTimeout(resolve, 2000));
const expectedWorkdir = join(tempDir, 'agent-workdirs', agent.name, 'workspace');
const diagnosticFile = join(expectedWorkdir, '.cw', 'spawn-diagnostic.json');
const expectedPwdFile = join(expectedWorkdir, '.cw', 'expected-pwd.txt');
// Verify files exist immediately after spawn
expect(existsSync(diagnosticFile), 'diagnostic file should be created after spawn').toBe(true);
expect(existsSync(expectedPwdFile), 'expected pwd file should be created').toBe(true);
// Verify diagnostic content
const diagnostic = JSON.parse(await readFile(diagnosticFile, 'utf-8'));
const expectedPwd = (await readFile(expectedPwdFile, 'utf-8')).trim();
expect(diagnostic.agentId).toBe(agent.id);
expect(diagnostic.alias).toBe(agent.name);
expect(diagnostic.intendedCwd).toBe(expectedWorkdir);
expect(diagnostic.provider).toBe('claude');
expect(diagnostic.cwdExistsAtSpawn).toBe(true);
expect(diagnostic.customCwdProvided).toBe(false);
expect(typeof diagnostic.timestamp).toBe('string');
expect(Array.isArray(diagnostic.args)).toBe(true);
expect(expectedPwd).toBe(expectedWorkdir);
});
});

View File

@@ -358,12 +358,12 @@ export const shouldRunRealCodexTests = process.env.REAL_CODEX_TESTS === '1';
/**
* Skip wrapper for Claude tests - skips unless REAL_CLAUDE_TESTS=1.
*/
export const describeRealClaude: typeof describe = shouldRunRealClaudeTests ? describe : describe.skip;
export const describeRealClaude: typeof describe = shouldRunRealClaudeTests ? describe : (describe.skip as typeof describe);
/**
* Skip wrapper for Codex tests - skips unless REAL_CODEX_TESTS=1.
*/
export const describeRealCodex: typeof describe = shouldRunRealCodexTests ? describe : describe.skip;
export const describeRealCodex: typeof describe = shouldRunRealCodexTests ? describe : (describe.skip as typeof describe);
/**
* Default test timeout for real CLI tests (2 minutes).

View File

@@ -14,6 +14,7 @@ import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { PageRepository } from '../db/repositories/page-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { AccountRepository } from '../db/repositories/account-repository.js';
import type { ProposalRepository } from '../db/repositories/proposal-repository.js';
import type { AccountCredentialManager } from '../agent/credentials/types.js';
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
import type { CoordinationManager } from '../coordination/types.js';
@@ -53,6 +54,8 @@ export interface TRPCContext {
projectRepository?: ProjectRepository;
/** Account repository for account CRUD and load balancing */
accountRepository?: AccountRepository;
/** Proposal repository for agent proposal CRUD operations */
proposalRepository?: ProposalRepository;
/** Credential manager for account OAuth token management */
credentialManager?: AccountCredentialManager;
/** Absolute path to the workspace root (.cwrc directory) */
@@ -77,6 +80,7 @@ export interface CreateContextOptions {
pageRepository?: PageRepository;
projectRepository?: ProjectRepository;
accountRepository?: AccountRepository;
proposalRepository?: ProposalRepository;
credentialManager?: AccountCredentialManager;
workspaceRoot?: string;
}
@@ -103,6 +107,7 @@ export function createContext(options: CreateContextOptions): TRPCContext {
pageRepository: options.pageRepository,
projectRepository: options.projectRepository,
accountRepository: options.accountRepository,
proposalRepository: options.proposalRepository,
credentialManager: options.credentialManager,
workspaceRoot: options.workspaceRoot,
};

View File

@@ -19,6 +19,7 @@ import { architectProcedures } from './routers/architect.js';
import { projectProcedures } from './routers/project.js';
import { pageProcedures } from './routers/page.js';
import { accountProcedures } from './routers/account.js';
import { proposalProcedures } from './routers/proposal.js';
import { subscriptionProcedures } from './routers/subscription.js';
// Re-export tRPC primitives (preserves existing import paths)
@@ -54,6 +55,7 @@ export const appRouter = router({
...projectProcedures(publicProcedure),
...pageProcedures(publicProcedure),
...accountProcedures(publicProcedure),
...proposalProcedures(publicProcedure),
...subscriptionProcedures(publicProcedure),
});

View File

@@ -14,6 +14,7 @@ import type { PhaseRepository } from '../../db/repositories/phase-repository.js'
import type { PageRepository } from '../../db/repositories/page-repository.js';
import type { ProjectRepository } from '../../db/repositories/project-repository.js';
import type { AccountRepository } from '../../db/repositories/account-repository.js';
import type { ProposalRepository } from '../../db/repositories/proposal-repository.js';
import type { DispatchManager, PhaseDispatchManager } from '../../dispatch/types.js';
import type { CoordinationManager } from '../../coordination/types.js';
@@ -126,3 +127,13 @@ export function requireAccountRepository(ctx: TRPCContext): AccountRepository {
}
return ctx.accountRepository;
}
export function requireProposalRepository(ctx: TRPCContext): ProposalRepository {
if (!ctx.proposalRepository) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Proposal repository not available',
});
}
return ctx.proposalRepository;
}

View File

@@ -132,6 +132,33 @@ export function architectProcedures(publicProcedure: ProcedureBuilder) {
});
}
// Bug #10: Auto-dismiss stale (crashed/idle) refine agents before checking for active ones
const allAgents = await agentManager.list();
const staleAgents = allAgents.filter(
(a) =>
a.mode === 'refine' &&
a.initiativeId === input.initiativeId &&
['crashed', 'idle'].includes(a.status) &&
!a.userDismissedAt,
);
for (const stale of staleAgents) {
await agentManager.dismiss(stale.id);
}
// Bug #9: Prevent concurrent refine agents on the same initiative
const activeRefineAgents = allAgents.filter(
(a) =>
a.mode === 'refine' &&
a.initiativeId === input.initiativeId &&
['running', 'waiting_for_input'].includes(a.status),
);
if (activeRefineAgents.length > 0) {
throw new TRPCError({
code: 'CONFLICT',
message: `A refine agent is already running for this initiative`,
});
}
const pages = await pageRepo.findByInitiativeId(input.initiativeId);
if (pages.length === 0) {

View File

@@ -5,7 +5,7 @@
import { TRPCError } from '@trpc/server';
import { z } from 'zod';
import type { ProcedureBuilder } from '../trpc.js';
import { requireInitiativeRepository, requireProjectRepository, requirePageRepository } from './_helpers.js';
import { requireInitiativeRepository, requireProjectRepository } from './_helpers.js';
export function initiativeProcedures(publicProcedure: ProcedureBuilder) {
return {

View File

@@ -0,0 +1,170 @@
/**
* Proposal Router — CRUD + accept/dismiss workflows
*/
import { TRPCError } from '@trpc/server';
import { z } from 'zod';
import type { ProcedureBuilder } from '../trpc.js';
import type { TRPCContext } from '../context.js';
import type { Proposal } from '../../db/schema.js';
import {
requireProposalRepository,
requirePageRepository,
requirePhaseRepository,
requireTaskRepository,
requireAgentManager,
} from './_helpers.js';
import { markdownToTiptapJson } from '../../agent/markdown-to-tiptap.js';
/**
* Accept a single proposal: apply side effects based on targetType.
*/
async function applyProposal(proposal: Proposal, ctx: TRPCContext): Promise<void> {
switch (proposal.targetType) {
case 'page': {
if (!proposal.targetId || !proposal.content) break;
const pageRepo = requirePageRepository(ctx);
const tiptapJson = markdownToTiptapJson(proposal.content);
await pageRepo.update(proposal.targetId, {
content: JSON.stringify(tiptapJson),
title: proposal.title,
});
ctx.eventBus.emit({
type: 'page:updated',
timestamp: new Date(),
payload: { pageId: proposal.targetId, initiativeId: proposal.initiativeId, title: proposal.title },
});
break;
}
case 'phase': {
const phaseRepo = requirePhaseRepository(ctx);
const meta = proposal.metadata ? JSON.parse(proposal.metadata) : {};
await phaseRepo.create({
initiativeId: proposal.initiativeId,
number: meta.number ?? 0,
name: proposal.title,
description: proposal.content ?? undefined,
});
break;
}
case 'task': {
const taskRepo = requireTaskRepository(ctx);
const meta = proposal.metadata ? JSON.parse(proposal.metadata) : {};
await taskRepo.create({
initiativeId: proposal.initiativeId,
phaseId: meta.phaseId ?? null,
parentTaskId: meta.parentTaskId ?? null,
name: proposal.title,
description: proposal.content ?? undefined,
category: meta.category ?? 'execute',
type: meta.type ?? 'auto',
});
break;
}
}
}
/**
* After every accept/dismiss, check if all proposals for the agent are resolved.
* If so, auto-dismiss the agent.
*/
async function maybeAutoDismiss(agentId: string, ctx: TRPCContext): Promise<void> {
const proposalRepo = requireProposalRepository(ctx);
const pendingCount = await proposalRepo.countByAgentIdAndStatus(agentId, 'pending');
if (pendingCount === 0) {
try {
const agentManager = requireAgentManager(ctx);
await agentManager.dismiss(agentId);
} catch {
// Agent manager not available or agent already dismissed — not critical
}
}
}
export function proposalProcedures(publicProcedure: ProcedureBuilder) {
return {
listProposals: publicProcedure
.input(z.object({
agentId: z.string().min(1).optional(),
initiativeId: z.string().min(1).optional(),
}))
.query(async ({ ctx, input }) => {
const repo = requireProposalRepository(ctx);
if (input.agentId) {
return repo.findByAgentId(input.agentId);
}
if (input.initiativeId) {
return repo.findByInitiativeId(input.initiativeId);
}
throw new TRPCError({
code: 'BAD_REQUEST',
message: 'Either agentId or initiativeId is required',
});
}),
acceptProposal: publicProcedure
.input(z.object({ id: z.string().min(1) }))
.mutation(async ({ ctx, input }) => {
const repo = requireProposalRepository(ctx);
const proposal = await repo.findById(input.id);
if (!proposal) {
throw new TRPCError({ code: 'NOT_FOUND', message: `Proposal '${input.id}' not found` });
}
if (proposal.status !== 'pending') {
throw new TRPCError({ code: 'BAD_REQUEST', message: `Proposal is already ${proposal.status}` });
}
await applyProposal(proposal, ctx);
const updated = await repo.update(input.id, { status: 'accepted' });
await maybeAutoDismiss(proposal.agentId, ctx);
return updated;
}),
dismissProposal: publicProcedure
.input(z.object({ id: z.string().min(1) }))
.mutation(async ({ ctx, input }) => {
const repo = requireProposalRepository(ctx);
const proposal = await repo.findById(input.id);
if (!proposal) {
throw new TRPCError({ code: 'NOT_FOUND', message: `Proposal '${input.id}' not found` });
}
if (proposal.status !== 'pending') {
throw new TRPCError({ code: 'BAD_REQUEST', message: `Proposal is already ${proposal.status}` });
}
const updated = await repo.update(input.id, { status: 'dismissed' });
await maybeAutoDismiss(proposal.agentId, ctx);
return updated;
}),
acceptAllProposals: publicProcedure
.input(z.object({ agentId: z.string().min(1) }))
.mutation(async ({ ctx, input }) => {
const repo = requireProposalRepository(ctx);
const pending = await repo.findByAgentIdAndStatus(input.agentId, 'pending');
let successCount = 0;
let failedCount = 0;
const errorMessages: string[] = [];
for (const proposal of pending) {
try {
await applyProposal(proposal, ctx);
await repo.update(proposal.id, { status: 'accepted' });
successCount++;
} catch (err) {
failedCount++;
const message = err instanceof Error ? err.message : String(err);
errorMessages.push(`${proposal.title}: ${message}`);
}
}
await maybeAutoDismiss(input.agentId, ctx);
return { accepted: successCount, failed: failedCount, errors: errorMessages };
}),
dismissAllProposals: publicProcedure
.input(z.object({ agentId: z.string().min(1) }))
.mutation(async ({ ctx, input }) => {
const repo = requireProposalRepository(ctx);
await repo.updateManyByAgentIdAndStatus(input.agentId, 'pending', { status: 'dismissed' });
await maybeAutoDismiss(input.agentId, ctx);
return { success: true };
}),
};
}