Files
Codewalkers/src/agent/mutex-completion.test.ts
Lukas May fab7706f5c feat: Phase schema refactor, agent lifecycle module, and log chunks
Phase model changes:
- Drop `number` column (ordering now by createdAt + dependency DAG)
- Replace `description` (plain text) with `content` (Tiptap JSON)
- Add `approved` status as dispatch gate
- Add phase dependency management (list, remove, dependents)
- Approval gate in PhaseDispatchManager.queuePhase()

Agent log chunks:
- New `agent_log_chunks` table for DB-first output persistence
- LogChunkRepository port + DrizzleLogChunkRepository adapter
- FileTailer onRawContent callback streams chunks to DB
- getAgentOutput reads from DB first, falls back to file

Agent lifecycle module (src/agent/lifecycle/):
- SignalManager: atomic signal.json read/write/wait operations
- RetryPolicy: exponential backoff with error-specific strategies
- ErrorAnalyzer: pattern-based error classification
- CleanupStrategy: debug archival vs production cleanup
- AgentLifecycleController: orchestrates retry/recovery flow
- Missing signal recovery with instruction injection

Completion detection fixes:
- Read signal.json file instead of parsing stdout as JSON
- Cancellable pollForCompletion with { cancel } handle
- Centralized state cleanup via cleanupAgentState()
- Credential handler consolidation (prepareProcessEnv)

Prompts refactor:
- Split monolithic prompts.ts into per-mode modules
- Add workspace layout section to agent prompts
- Fix markdown-to-tiptap double-serialization

Server/tRPC:
- Subscription heartbeat (30s) and bounded queue (1000 max)
- Phase CRUD: approvePhase, deletePhase, dependency queries
- Page: findByIds, getPageUpdatedAtMap
- Wire new repositories through container and context
2026-02-09 22:33:28 +01:00

174 lines
6.0 KiB
TypeScript

/**
* Focused test for completion handler mutex functionality.
* Tests the race condition fix without complex mocking.
*/
import { describe, it, beforeEach, expect } from 'vitest';
import { OutputHandler } from './output-handler.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
describe('OutputHandler completion mutex', () => {
let outputHandler: OutputHandler;
let completionCallCount: number;
let callOrder: string[];
// Default agent for update return value
const defaultAgent = {
id: 'test-agent',
name: 'test-agent',
taskId: null,
provider: 'claude',
mode: 'execute' as const,
status: 'idle' as const,
worktreeId: 'test-worktree',
outputFilePath: null,
sessionId: null,
result: null,
pendingQuestions: null,
initiativeId: null,
accountId: null,
userDismissedAt: null,
pid: null,
exitCode: null,
createdAt: new Date(),
updatedAt: new Date(),
};
// Simple mock that tracks completion attempts
const mockRepository: AgentRepository = {
async findById() {
return null; // Return null to cause early exit after mutex check
},
async update(_id: string, data: any) { return { ...defaultAgent, ...data }; },
async create() { throw new Error('Not implemented'); },
async findAll() { throw new Error('Not implemented'); },
async findByStatus() { throw new Error('Not implemented'); },
async findByTaskId() { throw new Error('Not implemented'); },
async findByName() { throw new Error('Not implemented'); },
async findBySessionId() { throw new Error('Not implemented'); },
async delete() { throw new Error('Not implemented'); }
};
beforeEach(() => {
outputHandler = new OutputHandler(mockRepository);
completionCallCount = 0;
callOrder = [];
});
it('should prevent concurrent completion handling with mutex', async () => {
const agentId = 'test-agent';
// Mock the findById method to track calls and simulate processing time
let firstCallCompleted = false;
(mockRepository as any).findById = async (id: string) => {
completionCallCount++;
const callIndex = completionCallCount;
callOrder.push(`call-${callIndex}-start`);
if (callIndex === 1) {
// First call - simulate some processing time
await new Promise(resolve => setTimeout(resolve, 50));
firstCallCompleted = true;
}
callOrder.push(`call-${callIndex}-end`);
return null; // Return null to exit early
};
// Start two concurrent completion handlers
const getAgentWorkdir = () => '/test/workdir';
const completion1Promise = outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir);
const completion2Promise = outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir);
await Promise.all([completion1Promise, completion2Promise]);
// Verify only one completion handler executed
expect(completionCallCount, 'Should only execute one completion handler').toBe(1);
expect(firstCallCompleted, 'First handler should have completed').toBe(true);
expect(callOrder).toEqual(['call-1-start', 'call-1-end']);
});
it('should allow sequential completion handling after first completes', async () => {
const agentId = 'test-agent';
// Mock findById to track calls
(mockRepository as any).findById = async (id: string) => {
completionCallCount++;
callOrder.push(`call-${completionCallCount}`);
return null; // Return null to exit early
};
const getAgentWorkdir = () => '/test/workdir';
// First completion
await outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir);
// Second completion (after first is done)
await outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir);
// Both should execute sequentially
expect(completionCallCount, 'Should execute both handlers sequentially').toBe(2);
expect(callOrder).toEqual(['call-1', 'call-2']);
});
it('should clean up mutex lock even when exception is thrown', async () => {
const agentId = 'test-agent';
let firstCallMadeThrowCall = false;
let secondCallCompleted = false;
// First call throws an error
(mockRepository as any).findById = async (id: string) => {
if (!firstCallMadeThrowCall) {
firstCallMadeThrowCall = true;
throw new Error('Database error');
} else {
secondCallCompleted = true;
return null;
}
};
const getAgentWorkdir = () => '/test/workdir';
// First call should throw but clean up mutex
await expect(outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir))
.rejects.toThrow('Database error');
expect(firstCallMadeThrowCall, 'First call should have thrown').toBe(true);
// Second call should succeed (proving mutex was cleaned up)
await outputHandler.handleCompletion(agentId, undefined, getAgentWorkdir);
expect(secondCallCompleted, 'Second call should have completed').toBe(true);
});
it('should use agent ID as mutex key', async () => {
const agentId1 = 'agent-1';
const agentId2 = 'agent-2';
// Both agents can process concurrently since they have different IDs
let agent1Started = false;
let agent2Started = false;
(mockRepository as any).findById = async (id: string) => {
if (id === agentId1) {
agent1Started = true;
await new Promise(resolve => setTimeout(resolve, 30));
} else if (id === agentId2) {
agent2Started = true;
await new Promise(resolve => setTimeout(resolve, 30));
}
return null;
};
const getAgentWorkdir = () => '/test/workdir';
// Start both agents concurrently - they should NOT block each other
const agent1Promise = outputHandler.handleCompletion(agentId1, undefined, getAgentWorkdir);
const agent2Promise = outputHandler.handleCompletion(agentId2, undefined, getAgentWorkdir);
await Promise.all([agent1Promise, agent2Promise]);
expect(agent1Started, 'Agent 1 should have started').toBe(true);
expect(agent2Started, 'Agent 2 should have started').toBe(true);
});
});