Phase model changes:
- Drop `number` column (ordering now by createdAt + dependency DAG)
- Replace `description` (plain text) with `content` (Tiptap JSON)
- Add `approved` status as dispatch gate
- Add phase dependency management (list, remove, dependents)
- Approval gate in PhaseDispatchManager.queuePhase()
Agent log chunks:
- New `agent_log_chunks` table for DB-first output persistence
- LogChunkRepository port + DrizzleLogChunkRepository adapter
- FileTailer onRawContent callback streams chunks to DB
- getAgentOutput reads from DB first, falls back to file
Agent lifecycle module (src/agent/lifecycle/):
- SignalManager: atomic signal.json read/write/wait operations
- RetryPolicy: exponential backoff with error-specific strategies
- ErrorAnalyzer: pattern-based error classification
- CleanupStrategy: debug archival vs production cleanup
- AgentLifecycleController: orchestrates retry/recovery flow
- Missing signal recovery with instruction injection
Completion detection fixes:
- Read signal.json file instead of parsing stdout as JSON
- Cancellable pollForCompletion with { cancel } handle
- Centralized state cleanup via cleanupAgentState()
- Credential handler consolidation (prepareProcessEnv)
Prompts refactor:
- Split monolithic prompts.ts into per-mode modules
- Add workspace layout section to agent prompts
- Fix markdown-to-tiptap double-serialization
Server/tRPC:
- Subscription heartbeat (30s) and bounded queue (1000 max)
- Phase CRUD: approvePhase, deletePhase, dependency queries
- Page: findByIds, getPageUpdatedAtMap
- Wire new repositories through container and context
233 lines
8.0 KiB
TypeScript
233 lines
8.0 KiB
TypeScript
/**
|
|
* Test for completion handler race condition fix.
|
|
* Verifies that only one completion handler executes per agent.
|
|
*/
|
|
|
|
import { describe, it, beforeEach, afterEach, expect } from 'vitest';
|
|
import { join } from 'node:path';
|
|
import { mkdtemp, writeFile, mkdir, rm } from 'node:fs/promises';
|
|
import { tmpdir } from 'node:os';
|
|
import { OutputHandler, type ActiveAgent } from './output-handler.js';
|
|
import type { AgentRepository } from '../db/repositories/agent-repository.js';
|
|
import type { EventBus } from '../events/index.js';
|
|
|
|
// Default agent record for mocks
|
|
const defaultAgent = {
|
|
id: 'test-agent',
|
|
name: 'test-agent',
|
|
taskId: null,
|
|
provider: 'claude',
|
|
mode: 'refine' as const,
|
|
status: 'running' as const,
|
|
worktreeId: 'test-worktree',
|
|
outputFilePath: '',
|
|
sessionId: null,
|
|
result: null,
|
|
pendingQuestions: null,
|
|
initiativeId: null,
|
|
accountId: null,
|
|
userDismissedAt: null,
|
|
pid: null,
|
|
exitCode: null,
|
|
createdAt: new Date(),
|
|
updatedAt: new Date(),
|
|
};
|
|
|
|
// Mock agent repository
|
|
const mockAgentRepository: AgentRepository = {
|
|
async findById() {
|
|
return { ...defaultAgent };
|
|
},
|
|
async update(_id: string, data: any) {
|
|
return { ...defaultAgent, ...data };
|
|
},
|
|
async create() {
|
|
throw new Error('Not implemented');
|
|
},
|
|
async findAll() {
|
|
throw new Error('Not implemented');
|
|
},
|
|
async findByStatus() {
|
|
throw new Error('Not implemented');
|
|
},
|
|
async findByTaskId() {
|
|
throw new Error('Not implemented');
|
|
},
|
|
async findByName() {
|
|
throw new Error('Not implemented');
|
|
},
|
|
async findBySessionId() {
|
|
throw new Error('Not implemented');
|
|
},
|
|
async delete() {
|
|
throw new Error('Not implemented');
|
|
}
|
|
};
|
|
|
|
describe('OutputHandler completion race condition fix', () => {
|
|
let outputHandler: OutputHandler;
|
|
let testWorkdir: string;
|
|
|
|
beforeEach(async () => {
|
|
outputHandler = new OutputHandler(mockAgentRepository);
|
|
testWorkdir = await mkdtemp(join(tmpdir(), 'cw-completion-test-'));
|
|
});
|
|
|
|
afterEach(async () => {
|
|
await rm(testWorkdir, { recursive: true, force: true });
|
|
});
|
|
|
|
it('should prevent concurrent completion handling via mutex', async () => {
|
|
// Setup agent workdir with completion signal
|
|
const outputDir = join(testWorkdir, '.cw', 'output');
|
|
await mkdir(outputDir, { recursive: true });
|
|
await writeFile(join(outputDir, 'signal.json'), JSON.stringify({
|
|
status: 'questions',
|
|
questions: [{ id: 'q1', question: 'Test question?' }]
|
|
}));
|
|
|
|
const agentId = 'test-agent';
|
|
const getAgentWorkdir = (_alias: string) => testWorkdir;
|
|
|
|
// Provide a mock ActiveAgent with outputFilePath so the signal.json check path is reached
|
|
const mockActive = {
|
|
outputFilePath: join(testWorkdir, 'output.jsonl'),
|
|
streamSessionId: 'session-1'
|
|
};
|
|
// Create the output file so readCompleteLines doesn't error
|
|
await writeFile(mockActive.outputFilePath, '');
|
|
|
|
let completionCallCount = 0;
|
|
let firstHandlerStarted = false;
|
|
let secondHandlerBlocked = false;
|
|
|
|
// Track calls to the private processSignalAndFiles method
|
|
const originalProcessSignalAndFiles = (outputHandler as any).processSignalAndFiles;
|
|
(outputHandler as any).processSignalAndFiles = async (...args: any[]) => {
|
|
completionCallCount++;
|
|
|
|
if (completionCallCount === 1) {
|
|
firstHandlerStarted = true;
|
|
// Simulate some processing time
|
|
await new Promise(resolve => setTimeout(resolve, 50));
|
|
return originalProcessSignalAndFiles.apply(outputHandler, args);
|
|
} else {
|
|
// This should never be reached due to the mutex
|
|
secondHandlerBlocked = true;
|
|
return originalProcessSignalAndFiles.apply(outputHandler, args);
|
|
}
|
|
};
|
|
|
|
// Start two concurrent completion handlers
|
|
const completion1 = outputHandler.handleCompletion(agentId, mockActive as any, getAgentWorkdir);
|
|
const completion2 = outputHandler.handleCompletion(agentId, mockActive as any, getAgentWorkdir);
|
|
|
|
await Promise.all([completion1, completion2]);
|
|
|
|
// Verify mutex prevented duplicate processing
|
|
expect(firstHandlerStarted, 'First handler should have started').toBe(true);
|
|
expect(secondHandlerBlocked, 'Second handler should have been blocked by mutex').toBe(false);
|
|
expect(completionCallCount, 'Should only process completion once').toBe(1);
|
|
});
|
|
|
|
it('should handle completion when agent has questions status', async () => {
|
|
// Setup agent workdir with questions signal
|
|
const outputDir = join(testWorkdir, '.cw', 'output');
|
|
await mkdir(outputDir, { recursive: true });
|
|
await writeFile(join(outputDir, 'signal.json'), JSON.stringify({
|
|
status: 'questions',
|
|
questions: [{ id: 'q1', question: 'What should I do next?' }]
|
|
}));
|
|
|
|
const agentId = 'test-agent';
|
|
const getAgentWorkdir = (_alias: string) => testWorkdir;
|
|
|
|
// Provide a mock ActiveAgent with outputFilePath so the signal.json check path is reached
|
|
const mockActive = {
|
|
outputFilePath: join(testWorkdir, 'output.jsonl'),
|
|
streamSessionId: 'session-1'
|
|
};
|
|
await writeFile(mockActive.outputFilePath, '');
|
|
|
|
// Mock the update call to track status changes
|
|
let finalStatus: string | undefined;
|
|
(mockAgentRepository as any).update = async (id: string, updates: any) => {
|
|
if (updates.status) {
|
|
finalStatus = updates.status;
|
|
}
|
|
};
|
|
|
|
await outputHandler.handleCompletion(agentId, mockActive as any, getAgentWorkdir);
|
|
|
|
// Verify agent was marked as waiting for input, not crashed
|
|
expect(finalStatus, 'Agent should be waiting for input').toBe('waiting_for_input');
|
|
});
|
|
|
|
it('should handle completion when agent is done', async () => {
|
|
// Setup agent workdir with done signal
|
|
const outputDir = join(testWorkdir, '.cw', 'output');
|
|
await mkdir(outputDir, { recursive: true });
|
|
await writeFile(join(outputDir, 'signal.json'), JSON.stringify({
|
|
status: 'done'
|
|
}));
|
|
|
|
const agentId = 'test-agent';
|
|
const getAgentWorkdir = (_alias: string) => testWorkdir;
|
|
|
|
// Provide a mock ActiveAgent with outputFilePath so the signal.json check path is reached
|
|
const mockActive = {
|
|
outputFilePath: join(testWorkdir, 'output.jsonl'),
|
|
streamSessionId: 'session-1'
|
|
};
|
|
await writeFile(mockActive.outputFilePath, '');
|
|
|
|
// Mock the update call to track status changes
|
|
let finalStatus: string | undefined;
|
|
(mockAgentRepository as any).update = async (id: string, updates: any) => {
|
|
if (updates.status) {
|
|
finalStatus = updates.status;
|
|
}
|
|
};
|
|
|
|
await outputHandler.handleCompletion(agentId, mockActive as any, getAgentWorkdir);
|
|
|
|
// Verify agent was marked as idle, not crashed
|
|
expect(finalStatus, 'Agent should be idle after completion').toBe('idle');
|
|
});
|
|
|
|
it('should clean up completion lock even if processing fails', async () => {
|
|
const agentId = 'test-agent';
|
|
const getAgentWorkdir = (_alias: string) => testWorkdir;
|
|
|
|
// Force an error during processing
|
|
(mockAgentRepository as any).findById = async () => {
|
|
throw new Error('Database error');
|
|
};
|
|
|
|
// handleCompletion propagates errors but the finally block still cleans up the lock
|
|
await expect(outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir))
|
|
.rejects.toThrow('Database error');
|
|
|
|
// Verify lock was cleaned up by ensuring a second call can proceed
|
|
let secondCallStarted = false;
|
|
(mockAgentRepository as any).findById = async (id: string) => {
|
|
secondCallStarted = true;
|
|
return { ...defaultAgent };
|
|
};
|
|
|
|
// Create signal.json so the second call can complete successfully
|
|
const outputDir = join(testWorkdir, '.cw', 'output');
|
|
await mkdir(outputDir, { recursive: true });
|
|
await writeFile(join(outputDir, 'signal.json'), JSON.stringify({
|
|
status: 'done'
|
|
}));
|
|
const mockActive = {
|
|
outputFilePath: join(testWorkdir, 'output.jsonl'),
|
|
streamSessionId: 'session-1'
|
|
};
|
|
await writeFile(mockActive.outputFilePath, '');
|
|
|
|
await outputHandler.handleCompletion(agentId, mockActive as any, getAgentWorkdir);
|
|
expect(secondCallStarted, 'Second call should proceed after lock cleanup').toBe(true);
|
|
});
|
|
}); |