Files
Codewalkers/apps/server/test/cassette/process-manager.ts
Lukas May 34578d39c6 refactor: Restructure monorepo to apps/server/ and apps/web/ layout
Move src/ → apps/server/ and packages/web/ → apps/web/ to adopt
standard monorepo conventions (apps/ for runnable apps, packages/
for reusable libraries). Update all config files, shared package
imports, test fixtures, and documentation to reflect new paths.

Key fixes:
- Update workspace config to ["apps/*", "packages/*"]
- Update tsconfig.json rootDir/include for apps/server/
- Add apps/web/** to vitest exclude list
- Update drizzle.config.ts schema path
- Fix ensure-schema.ts migration path detection (3 levels up in dev,
  2 levels up in dist)
- Fix tests/integration/cli-server.test.ts import paths
- Update packages/shared imports to apps/server/ paths
- Update all docs/ files with new paths
2026-03-03 11:22:53 +01:00

259 lines
9.0 KiB
TypeScript

/**
* CassetteProcessManager
*
* Extends ProcessManager to intercept subprocess spawning and either:
* - Replay a recorded cassette (no API cost, deterministic)
* - Record a new cassette by running the real agent and capturing its output
*
* Modes:
* - 'replay': cassette MUST exist; throws if missing (safe for CI)
* - 'record': always runs real agent; saves/overwrites cassette on completion
* - 'auto': replays if cassette exists; falls through to record if missing
*
* The cassette key is built from:
* - Normalized prompt (dynamic content replaced with placeholders)
* - Provider name and stable CLI args (prompt value stripped)
* - Worktree file hash (detects content drift for execute-mode agents)
*/
import { readFileSync, existsSync, mkdirSync, writeFileSync, readdirSync } from 'node:fs';
import { join, dirname, relative } from 'node:path';
import { ProcessManager } from '../../agent/process-manager.js';
import type { StreamEvent } from '../../agent/providers/parsers/index.js';
import type { FileTailer } from '../../agent/file-tailer.js';
import type { ProjectRepository } from '../../db/repositories/project-repository.js';
import type { CassetteKey, CassetteEntry } from './types.js';
import type { CassetteStore } from './store.js';
import { normalizePrompt, stripPromptFromArgs } from './normalizer.js';
import { hashWorktreeFiles } from './key.js';
export type CassetteMode = 'replay' | 'record' | 'auto';
interface PendingRecording {
key: CassetteKey;
outputFilePath: string;
agentCwd: string;
}
interface PendingReplay {
cassette: CassetteEntry;
agentCwd: string;
}
export class CassetteProcessManager extends ProcessManager {
private readonly _workspaceRoot: string;
private readonly replayWorkerPath: string;
private readonly pendingRecordings = new Map<number, PendingRecording>();
private readonly pendingReplays = new Map<number, PendingReplay>();
constructor(
workspaceRoot: string,
projectRepository: ProjectRepository,
private readonly store: CassetteStore,
private readonly cassetteMode: CassetteMode = 'auto',
) {
super(workspaceRoot, projectRepository);
this._workspaceRoot = workspaceRoot;
this.replayWorkerPath = new URL('./replay-worker.mjs', import.meta.url).pathname;
}
override spawnDetached(
agentId: string,
agentName: string,
command: string,
args: string[],
cwd: string,
env: Record<string, string>,
providerName: string,
prompt?: string,
onEvent?: (event: StreamEvent) => void,
onRawContent?: (content: string) => void,
): { pid: number; outputFilePath: string; tailer: FileTailer } {
const key: CassetteKey = {
normalizedPrompt: normalizePrompt(prompt ?? '', this._workspaceRoot),
providerName,
modelArgs: stripPromptFromArgs(args, prompt ?? ''),
worktreeHash: hashWorktreeFiles(cwd),
};
// In record mode we always skip the store lookup and go straight to real spawn.
const existing = this.cassetteMode !== 'record' ? this.store.find(key) : null;
if (existing) {
const result = this.replayFromCassette(agentId, agentName, cwd, env, providerName, existing, onEvent, onRawContent);
this.pendingReplays.set(result.pid, { cassette: existing, agentCwd: cwd });
return result;
}
if (this.cassetteMode === 'replay') {
throw new Error(
`[cassette] No cassette found for agent '${agentName}' (provider=${providerName}, mode=replay).\n` +
`Run with CW_CASSETTE_RECORD=1 to record it.`,
);
}
// auto or record: run the real agent and record the cassette on completion.
console.log(`[cassette] recording new cassette for agent '${agentName}' (${providerName})`);
const result = super.spawnDetached(agentId, agentName, command, args, cwd, env, providerName, prompt, onEvent, onRawContent);
this.pendingRecordings.set(result.pid, { key, outputFilePath: result.outputFilePath, agentCwd: cwd });
return result;
}
override pollForCompletion(
agentId: string,
pid: number,
onComplete: () => Promise<void>,
getTailer: () => FileTailer | undefined,
): { cancel: () => void } {
const recording = this.pendingRecordings.get(pid);
if (recording) {
// Record mode — wrap onComplete to save the cassette before handing off.
return super.pollForCompletion(agentId, pid, async () => {
await this.saveCassette(recording);
this.pendingRecordings.delete(pid);
await onComplete();
}, getTailer);
}
const replay = this.pendingReplays.get(pid);
if (replay) {
// Replay mode — restore .cw/output/ files before onComplete so that
// readPhaseFiles / readTaskFiles / readProposalFiles find their data.
return super.pollForCompletion(agentId, pid, async () => {
this.restoreOutputFiles(replay.cassette, replay.agentCwd);
this.pendingReplays.delete(pid);
await onComplete();
}, getTailer);
}
return super.pollForCompletion(agentId, pid, onComplete, getTailer);
}
private async saveCassette(pending: PendingRecording): Promise<void> {
// Read all JSONL lines from the output file the agent wrote to.
let jsonlLines: string[] = [];
try {
const content = readFileSync(pending.outputFilePath, 'utf-8');
jsonlLines = content.split('\n').filter(l => l.trim() !== '');
} catch {
// No output produced — record an empty cassette.
}
// Read signal.json from the agent working directory.
let signalJson: Record<string, unknown> | null = null;
const outputDir = join(pending.agentCwd, '.cw', 'output');
const signalPath = join(outputDir, 'signal.json');
if (existsSync(signalPath)) {
try {
signalJson = JSON.parse(readFileSync(signalPath, 'utf-8')) as Record<string, unknown>;
} catch {
// Corrupt signal file — record null.
}
}
// Capture all other files in .cw/output/ (phase files, task files, etc.)
const outputFiles: Record<string, string> = {};
if (existsSync(outputDir)) {
this.walkOutputDir(outputDir, outputDir, (relPath, content) => {
if (relPath !== 'signal.json') {
outputFiles[relPath] = content;
}
});
}
const entry: CassetteEntry = {
version: 1,
key: pending.key,
recording: {
jsonlLines,
signalJson,
exitCode: 0,
recordedAt: new Date().toISOString(),
outputFiles,
},
};
this.store.save(pending.key, entry);
}
/**
* Restore captured .cw/output/ files to the new agent working directory.
* Called before onComplete so that downstream readers (readPhaseFiles, etc.)
* find the expected files in place.
*/
private restoreOutputFiles(cassette: CassetteEntry, agentCwd: string): void {
const { outputFiles, signalJson } = cassette.recording;
const outputDir = join(agentCwd, '.cw', 'output');
// Restore captured output files
if (outputFiles) {
for (const [relPath, content] of Object.entries(outputFiles)) {
const fullPath = join(outputDir, relPath);
mkdirSync(dirname(fullPath), { recursive: true });
writeFileSync(fullPath, content, 'utf-8');
}
}
// Write signal.json (the manager reads this to detect completion status)
if (signalJson) {
mkdirSync(outputDir, { recursive: true });
writeFileSync(join(outputDir, 'signal.json'), JSON.stringify(signalJson), 'utf-8');
}
}
private walkOutputDir(
baseDir: string,
currentDir: string,
callback: (relPath: string, content: string) => void,
): void {
let entries;
try {
entries = readdirSync(currentDir, { withFileTypes: true });
} catch {
return;
}
for (const entry of entries) {
const fullPath = join(currentDir, entry.name);
const relPath = relative(baseDir, fullPath);
if (entry.isDirectory()) {
this.walkOutputDir(baseDir, fullPath, callback);
} else if (entry.isFile()) {
try {
const content = readFileSync(fullPath, 'utf-8');
callback(relPath, content);
} catch {
// Skip unreadable files
}
}
}
}
private replayFromCassette(
agentId: string,
agentName: string,
cwd: string,
env: Record<string, string>,
providerName: string,
cassette: CassetteEntry,
onEvent?: (event: StreamEvent) => void,
onRawContent?: (content: string) => void,
): { pid: number; outputFilePath: string; tailer: FileTailer } {
console.log(`[cassette] replaying cassette for agent '${agentName}' (${cassette.recording.jsonlLines.length} lines)`);
return super.spawnDetached(
agentId,
agentName,
process.execPath, // use the running node binary
[this.replayWorkerPath], // replay-worker.mjs
cwd,
{ ...env, CW_CASSETTE_DATA: JSON.stringify(cassette.recording) },
providerName, // use original provider's parser for the tailer
undefined, // no prompt — worker handles output directly
onEvent,
onRawContent,
);
}
}