writeInputFiles, spawnDetached, and diagnostic writes now use fs/promises (mkdir, writeFile) instead of mkdirSync/writeFileSync. File writes in writeInputFiles are batched with Promise.all. openSync/closeSync for child process stdio FDs remain sync as spawn() requires the FDs immediately.
259 lines
9.0 KiB
TypeScript
259 lines
9.0 KiB
TypeScript
/**
|
|
* CassetteProcessManager
|
|
*
|
|
* Extends ProcessManager to intercept subprocess spawning and either:
|
|
* - Replay a recorded cassette (no API cost, deterministic)
|
|
* - Record a new cassette by running the real agent and capturing its output
|
|
*
|
|
* Modes:
|
|
* - 'replay': cassette MUST exist; throws if missing (safe for CI)
|
|
* - 'record': always runs real agent; saves/overwrites cassette on completion
|
|
* - 'auto': replays if cassette exists; falls through to record if missing
|
|
*
|
|
* The cassette key is built from:
|
|
* - Normalized prompt (dynamic content replaced with placeholders)
|
|
* - Provider name and stable CLI args (prompt value stripped)
|
|
* - Worktree file hash (detects content drift for execute-mode agents)
|
|
*/
|
|
|
|
import { readFileSync, existsSync, mkdirSync, writeFileSync, readdirSync } from 'node:fs';
|
|
import { join, dirname, relative } from 'node:path';
|
|
import { ProcessManager } from '../../agent/process-manager.js';
|
|
import type { StreamEvent } from '../../agent/providers/parsers/index.js';
|
|
import type { FileTailer } from '../../agent/file-tailer.js';
|
|
import type { ProjectRepository } from '../../db/repositories/project-repository.js';
|
|
import type { CassetteKey, CassetteEntry } from './types.js';
|
|
import type { CassetteStore } from './store.js';
|
|
import { normalizePrompt, stripPromptFromArgs } from './normalizer.js';
|
|
import { hashWorktreeFiles } from './key.js';
|
|
|
|
export type CassetteMode = 'replay' | 'record' | 'auto';
|
|
|
|
interface PendingRecording {
|
|
key: CassetteKey;
|
|
outputFilePath: string;
|
|
agentCwd: string;
|
|
}
|
|
|
|
interface PendingReplay {
|
|
cassette: CassetteEntry;
|
|
agentCwd: string;
|
|
}
|
|
|
|
export class CassetteProcessManager extends ProcessManager {
|
|
private readonly _workspaceRoot: string;
|
|
private readonly replayWorkerPath: string;
|
|
private readonly pendingRecordings = new Map<number, PendingRecording>();
|
|
private readonly pendingReplays = new Map<number, PendingReplay>();
|
|
|
|
constructor(
|
|
workspaceRoot: string,
|
|
projectRepository: ProjectRepository,
|
|
private readonly store: CassetteStore,
|
|
private readonly cassetteMode: CassetteMode = 'auto',
|
|
) {
|
|
super(workspaceRoot, projectRepository);
|
|
this._workspaceRoot = workspaceRoot;
|
|
this.replayWorkerPath = new URL('./replay-worker.mjs', import.meta.url).pathname;
|
|
}
|
|
|
|
override async spawnDetached(
|
|
agentId: string,
|
|
agentName: string,
|
|
command: string,
|
|
args: string[],
|
|
cwd: string,
|
|
env: Record<string, string>,
|
|
providerName: string,
|
|
prompt?: string,
|
|
onEvent?: (event: StreamEvent) => void,
|
|
onRawContent?: (content: string) => void,
|
|
): Promise<{ pid: number; outputFilePath: string; tailer: FileTailer }> {
|
|
const key: CassetteKey = {
|
|
normalizedPrompt: normalizePrompt(prompt ?? '', this._workspaceRoot),
|
|
providerName,
|
|
modelArgs: stripPromptFromArgs(args, prompt ?? ''),
|
|
worktreeHash: hashWorktreeFiles(cwd),
|
|
};
|
|
|
|
// In record mode we always skip the store lookup and go straight to real spawn.
|
|
const existing = this.cassetteMode !== 'record' ? this.store.find(key) : null;
|
|
|
|
if (existing) {
|
|
const result = await this.replayFromCassette(agentId, agentName, cwd, env, providerName, existing, onEvent, onRawContent);
|
|
this.pendingReplays.set(result.pid, { cassette: existing, agentCwd: cwd });
|
|
return result;
|
|
}
|
|
|
|
if (this.cassetteMode === 'replay') {
|
|
throw new Error(
|
|
`[cassette] No cassette found for agent '${agentName}' (provider=${providerName}, mode=replay).\n` +
|
|
`Run with CW_CASSETTE_RECORD=1 to record it.`,
|
|
);
|
|
}
|
|
|
|
// auto or record: run the real agent and record the cassette on completion.
|
|
console.log(`[cassette] recording new cassette for agent '${agentName}' (${providerName})`);
|
|
const result = await super.spawnDetached(agentId, agentName, command, args, cwd, env, providerName, prompt, onEvent, onRawContent);
|
|
this.pendingRecordings.set(result.pid, { key, outputFilePath: result.outputFilePath, agentCwd: cwd });
|
|
return result;
|
|
}
|
|
|
|
override pollForCompletion(
|
|
agentId: string,
|
|
pid: number,
|
|
onComplete: () => Promise<void>,
|
|
getTailer: () => FileTailer | undefined,
|
|
): { cancel: () => void } {
|
|
const recording = this.pendingRecordings.get(pid);
|
|
if (recording) {
|
|
// Record mode — wrap onComplete to save the cassette before handing off.
|
|
return super.pollForCompletion(agentId, pid, async () => {
|
|
await this.saveCassette(recording);
|
|
this.pendingRecordings.delete(pid);
|
|
await onComplete();
|
|
}, getTailer);
|
|
}
|
|
|
|
const replay = this.pendingReplays.get(pid);
|
|
if (replay) {
|
|
// Replay mode — restore .cw/output/ files before onComplete so that
|
|
// readPhaseFiles / readTaskFiles / readProposalFiles find their data.
|
|
return super.pollForCompletion(agentId, pid, async () => {
|
|
this.restoreOutputFiles(replay.cassette, replay.agentCwd);
|
|
this.pendingReplays.delete(pid);
|
|
await onComplete();
|
|
}, getTailer);
|
|
}
|
|
|
|
return super.pollForCompletion(agentId, pid, onComplete, getTailer);
|
|
}
|
|
|
|
private async saveCassette(pending: PendingRecording): Promise<void> {
|
|
// Read all JSONL lines from the output file the agent wrote to.
|
|
let jsonlLines: string[] = [];
|
|
try {
|
|
const content = readFileSync(pending.outputFilePath, 'utf-8');
|
|
jsonlLines = content.split('\n').filter(l => l.trim() !== '');
|
|
} catch {
|
|
// No output produced — record an empty cassette.
|
|
}
|
|
|
|
// Read signal.json from the agent working directory.
|
|
let signalJson: Record<string, unknown> | null = null;
|
|
const outputDir = join(pending.agentCwd, '.cw', 'output');
|
|
const signalPath = join(outputDir, 'signal.json');
|
|
if (existsSync(signalPath)) {
|
|
try {
|
|
signalJson = JSON.parse(readFileSync(signalPath, 'utf-8')) as Record<string, unknown>;
|
|
} catch {
|
|
// Corrupt signal file — record null.
|
|
}
|
|
}
|
|
|
|
// Capture all other files in .cw/output/ (phase files, task files, etc.)
|
|
const outputFiles: Record<string, string> = {};
|
|
if (existsSync(outputDir)) {
|
|
this.walkOutputDir(outputDir, outputDir, (relPath, content) => {
|
|
if (relPath !== 'signal.json') {
|
|
outputFiles[relPath] = content;
|
|
}
|
|
});
|
|
}
|
|
|
|
const entry: CassetteEntry = {
|
|
version: 1,
|
|
key: pending.key,
|
|
recording: {
|
|
jsonlLines,
|
|
signalJson,
|
|
exitCode: 0,
|
|
recordedAt: new Date().toISOString(),
|
|
outputFiles,
|
|
},
|
|
};
|
|
|
|
this.store.save(pending.key, entry);
|
|
}
|
|
|
|
/**
|
|
* Restore captured .cw/output/ files to the new agent working directory.
|
|
* Called before onComplete so that downstream readers (readPhaseFiles, etc.)
|
|
* find the expected files in place.
|
|
*/
|
|
private restoreOutputFiles(cassette: CassetteEntry, agentCwd: string): void {
|
|
const { outputFiles, signalJson } = cassette.recording;
|
|
const outputDir = join(agentCwd, '.cw', 'output');
|
|
|
|
// Restore captured output files
|
|
if (outputFiles) {
|
|
for (const [relPath, content] of Object.entries(outputFiles)) {
|
|
const fullPath = join(outputDir, relPath);
|
|
mkdirSync(dirname(fullPath), { recursive: true });
|
|
writeFileSync(fullPath, content, 'utf-8');
|
|
}
|
|
}
|
|
|
|
// Write signal.json (the manager reads this to detect completion status)
|
|
if (signalJson) {
|
|
mkdirSync(outputDir, { recursive: true });
|
|
writeFileSync(join(outputDir, 'signal.json'), JSON.stringify(signalJson), 'utf-8');
|
|
}
|
|
}
|
|
|
|
private walkOutputDir(
|
|
baseDir: string,
|
|
currentDir: string,
|
|
callback: (relPath: string, content: string) => void,
|
|
): void {
|
|
let entries;
|
|
try {
|
|
entries = readdirSync(currentDir, { withFileTypes: true });
|
|
} catch {
|
|
return;
|
|
}
|
|
|
|
for (const entry of entries) {
|
|
const fullPath = join(currentDir, entry.name);
|
|
const relPath = relative(baseDir, fullPath);
|
|
|
|
if (entry.isDirectory()) {
|
|
this.walkOutputDir(baseDir, fullPath, callback);
|
|
} else if (entry.isFile()) {
|
|
try {
|
|
const content = readFileSync(fullPath, 'utf-8');
|
|
callback(relPath, content);
|
|
} catch {
|
|
// Skip unreadable files
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private async replayFromCassette(
|
|
agentId: string,
|
|
agentName: string,
|
|
cwd: string,
|
|
env: Record<string, string>,
|
|
providerName: string,
|
|
cassette: CassetteEntry,
|
|
onEvent?: (event: StreamEvent) => void,
|
|
onRawContent?: (content: string) => void,
|
|
): Promise<{ pid: number; outputFilePath: string; tailer: FileTailer }> {
|
|
console.log(`[cassette] replaying cassette for agent '${agentName}' (${cassette.recording.jsonlLines.length} lines)`);
|
|
|
|
return super.spawnDetached(
|
|
agentId,
|
|
agentName,
|
|
process.execPath, // use the running node binary
|
|
[this.replayWorkerPath], // replay-worker.mjs
|
|
cwd,
|
|
{ ...env, CW_CASSETTE_DATA: JSON.stringify(cassette.recording) },
|
|
providerName, // use original provider's parser for the tailer
|
|
undefined, // no prompt — worker handles output directly
|
|
onEvent,
|
|
onRawContent,
|
|
);
|
|
}
|
|
}
|