fix: Stabilize full-flow cassette keys and restore output files on replay

Three issues discovered and fixed after initial recording:

1. Agent workdir names not normalized — random animal names (e.g.
   "available-sheep") embedded in workspace paths caused key drift.
   Added AGENT_WORKDIR_RE to replace agent-workdirs/<name> with
   agent-workdirs/__AGENT__ in normalizer.ts.

2. Phase/task files missing on replay — plan/detail agents write output
   to .cw/output/ (phases/, tasks/) which the server reads on completion.
   The replay worker only emits JSONL; it doesn't re-execute file writes.
   Extended cassette format with outputFiles field and added capture
   (walkOutputDir) + restore (restoreOutputFiles) logic to process-manager.

3. Recording timeout too short — fixed CASSETTE_FLOW_TIMEOUT to be
   mode-aware: 60 min for recording runs, 5 min for replay.

Also commit the 4 recorded cassettes (discuss/plan/detail/execute)
that make the full-flow cassette test runnable in CI without API costs.
This commit is contained in:
Lukas May
2026-03-03 10:35:13 +01:00
parent 1e374abcd6
commit 25360e1711
8 changed files with 385 additions and 16 deletions

View File

@@ -12,6 +12,10 @@ const NANOID_RE = /(?<![A-Za-z0-9])[A-Za-z0-9_-]{21}(?![A-Za-z0-9_-])/g;
const ISO_TIMESTAMP_RE = /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:\d{2})?/g;
const UNIX_EPOCH_MS_RE = /\b1[0-9]{12}\b/g;
const SESSION_NUM_RE = /\bsession[_\s-]?\d+\b/gi;
// Agent worktree paths: agent-workdirs/<random-agent-name> (with or without trailing slash)
// The agent name (e.g. "available-sheep") changes every run but is not a UUID or nanoid.
// Stop at the first slash so the project name after it is preserved.
const AGENT_WORKDIR_RE = /agent-workdirs\/[^\s/\\]+/g;
/**
* Normalize a prompt for stable cassette key generation.
@@ -23,6 +27,7 @@ const SESSION_NUM_RE = /\bsession[_\s-]?\d+\b/gi;
* 3. ISO 8601 timestamps → __TIMESTAMP__
* 4. Unix epoch milliseconds → __EPOCH__
* 5. Session numbers → session__N__
* 6. Agent worktree path segment → agent-workdirs/__AGENT__/
*/
export function normalizePrompt(prompt: string, workspaceRoot: string): string {
let normalized = prompt;
@@ -36,6 +41,7 @@ export function normalizePrompt(prompt: string, workspaceRoot: string): string {
normalized = normalized.replace(ISO_TIMESTAMP_RE, '__TIMESTAMP__');
normalized = normalized.replace(UNIX_EPOCH_MS_RE, '__EPOCH__');
normalized = normalized.replace(SESSION_NUM_RE, 'session__N__');
normalized = normalized.replace(AGENT_WORKDIR_RE, 'agent-workdirs/__AGENT__');
return normalized;
}

View File

@@ -16,8 +16,8 @@
* - Worktree file hash (detects content drift for execute-mode agents)
*/
import { readFileSync, existsSync } from 'node:fs';
import { join } from 'node:path';
import { readFileSync, existsSync, mkdirSync, writeFileSync, readdirSync } from 'node:fs';
import { join, dirname, relative } from 'node:path';
import { ProcessManager } from '../../agent/process-manager.js';
import type { StreamEvent } from '../../agent/providers/parsers/index.js';
import type { FileTailer } from '../../agent/file-tailer.js';
@@ -35,10 +35,16 @@ interface PendingRecording {
agentCwd: string;
}
interface PendingReplay {
cassette: CassetteEntry;
agentCwd: string;
}
export class CassetteProcessManager extends ProcessManager {
private readonly _workspaceRoot: string;
private readonly replayWorkerPath: string;
private readonly pendingRecordings = new Map<number, PendingRecording>();
private readonly pendingReplays = new Map<number, PendingReplay>();
constructor(
workspaceRoot: string,
@@ -74,7 +80,9 @@ export class CassetteProcessManager extends ProcessManager {
const existing = this.cassetteMode !== 'record' ? this.store.find(key) : null;
if (existing) {
return this.replayFromCassette(agentId, agentName, cwd, env, providerName, existing, onEvent, onRawContent);
const result = this.replayFromCassette(agentId, agentName, cwd, env, providerName, existing, onEvent, onRawContent);
this.pendingReplays.set(result.pid, { cassette: existing, agentCwd: cwd });
return result;
}
if (this.cassetteMode === 'replay') {
@@ -97,18 +105,28 @@ export class CassetteProcessManager extends ProcessManager {
onComplete: () => Promise<void>,
getTailer: () => FileTailer | undefined,
): { cancel: () => void } {
const pending = this.pendingRecordings.get(pid);
if (!pending) {
// Replay mode — no recording to save; delegate to base implementation.
return super.pollForCompletion(agentId, pid, onComplete, getTailer);
const recording = this.pendingRecordings.get(pid);
if (recording) {
// Record mode — wrap onComplete to save the cassette before handing off.
return super.pollForCompletion(agentId, pid, async () => {
await this.saveCassette(recording);
this.pendingRecordings.delete(pid);
await onComplete();
}, getTailer);
}
// Record mode — wrap onComplete to save the cassette before handing off.
return super.pollForCompletion(agentId, pid, async () => {
await this.saveCassette(pending);
this.pendingRecordings.delete(pid);
await onComplete();
}, getTailer);
const replay = this.pendingReplays.get(pid);
if (replay) {
// Replay mode — restore .cw/output/ files before onComplete so that
// readPhaseFiles / readTaskFiles / readProposalFiles find their data.
return super.pollForCompletion(agentId, pid, async () => {
this.restoreOutputFiles(replay.cassette, replay.agentCwd);
this.pendingReplays.delete(pid);
await onComplete();
}, getTailer);
}
return super.pollForCompletion(agentId, pid, onComplete, getTailer);
}
private async saveCassette(pending: PendingRecording): Promise<void> {
@@ -123,7 +141,8 @@ export class CassetteProcessManager extends ProcessManager {
// Read signal.json from the agent working directory.
let signalJson: Record<string, unknown> | null = null;
const signalPath = join(pending.agentCwd, '.cw', 'output', 'signal.json');
const outputDir = join(pending.agentCwd, '.cw', 'output');
const signalPath = join(outputDir, 'signal.json');
if (existsSync(signalPath)) {
try {
signalJson = JSON.parse(readFileSync(signalPath, 'utf-8')) as Record<string, unknown>;
@@ -132,6 +151,16 @@ export class CassetteProcessManager extends ProcessManager {
}
}
// Capture all other files in .cw/output/ (phase files, task files, etc.)
const outputFiles: Record<string, string> = {};
if (existsSync(outputDir)) {
this.walkOutputDir(outputDir, outputDir, (relPath, content) => {
if (relPath !== 'signal.json') {
outputFiles[relPath] = content;
}
});
}
const entry: CassetteEntry = {
version: 1,
key: pending.key,
@@ -140,12 +169,67 @@ export class CassetteProcessManager extends ProcessManager {
signalJson,
exitCode: 0,
recordedAt: new Date().toISOString(),
outputFiles,
},
};
this.store.save(pending.key, entry);
}
/**
* Restore captured .cw/output/ files to the new agent working directory.
* Called before onComplete so that downstream readers (readPhaseFiles, etc.)
* find the expected files in place.
*/
private restoreOutputFiles(cassette: CassetteEntry, agentCwd: string): void {
const { outputFiles, signalJson } = cassette.recording;
const outputDir = join(agentCwd, '.cw', 'output');
// Restore captured output files
if (outputFiles) {
for (const [relPath, content] of Object.entries(outputFiles)) {
const fullPath = join(outputDir, relPath);
mkdirSync(dirname(fullPath), { recursive: true });
writeFileSync(fullPath, content, 'utf-8');
}
}
// Write signal.json (the manager reads this to detect completion status)
if (signalJson) {
mkdirSync(outputDir, { recursive: true });
writeFileSync(join(outputDir, 'signal.json'), JSON.stringify(signalJson), 'utf-8');
}
}
private walkOutputDir(
baseDir: string,
currentDir: string,
callback: (relPath: string, content: string) => void,
): void {
let entries;
try {
entries = readdirSync(currentDir, { withFileTypes: true });
} catch {
return;
}
for (const entry of entries) {
const fullPath = join(currentDir, entry.name);
const relPath = relative(baseDir, fullPath);
if (entry.isDirectory()) {
this.walkOutputDir(baseDir, fullPath, callback);
} else if (entry.isFile()) {
try {
const content = readFileSync(fullPath, 'utf-8');
callback(relPath, content);
} catch {
// Skip unreadable files
}
}
}
}
private replayFromCassette(
agentId: string,
agentName: string,

View File

@@ -26,6 +26,13 @@ export interface CassetteRecording {
exitCode: number;
/** ISO timestamp when this cassette was recorded. */
recordedAt: string;
/**
* All files the agent wrote to .cw/output/ (relative path → UTF-8 content),
* excluding signal.json (which is captured separately in signalJson).
* Restored during replay before onComplete fires so downstream readers
* (e.g. readPhaseFiles, readTaskFiles) see the expected directory contents.
*/
outputFiles?: Record<string, string>;
}
export interface CassetteEntry {

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -43,8 +43,15 @@ import {
// Constants
// =============================================================================
/** Total test timeout: 5 minutes (replay=seconds; 5min covers accidental record) */
const CASSETTE_FLOW_TIMEOUT = 5 * 60_000;
const RECORDING =
process.env.CW_CASSETTE_FORCE_RECORD === '1' || process.env.CW_CASSETTE_RECORD === '1';
/**
* Test timeout.
* - Replay: 5 min (cassettes complete in seconds; cap is generous headroom)
* - Record: 60 min (real agents doing discuss/plan/detail/execute take API time)
*/
const CASSETTE_FLOW_TIMEOUT = RECORDING ? 60 * 60_000 : 5 * 60_000;
const __dirname = dirname(fileURLToPath(import.meta.url));
const CASSETTE_DIR =