fix: Stabilize full-flow cassette keys and restore output files on replay
Three issues discovered and fixed after initial recording: 1. Agent workdir names not normalized — random animal names (e.g. "available-sheep") embedded in workspace paths caused key drift. Added AGENT_WORKDIR_RE to replace agent-workdirs/<name> with agent-workdirs/__AGENT__ in normalizer.ts. 2. Phase/task files missing on replay — plan/detail agents write output to .cw/output/ (phases/, tasks/) which the server reads on completion. The replay worker only emits JSONL; it doesn't re-execute file writes. Extended cassette format with outputFiles field and added capture (walkOutputDir) + restore (restoreOutputFiles) logic to process-manager. 3. Recording timeout too short — fixed CASSETTE_FLOW_TIMEOUT to be mode-aware: 60 min for recording runs, 5 min for replay. Also commit the 4 recorded cassettes (discuss/plan/detail/execute) that make the full-flow cassette test runnable in CI without API costs.
This commit is contained in:
@@ -12,6 +12,10 @@ const NANOID_RE = /(?<![A-Za-z0-9])[A-Za-z0-9_-]{21}(?![A-Za-z0-9_-])/g;
|
||||
const ISO_TIMESTAMP_RE = /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:\d{2})?/g;
|
||||
const UNIX_EPOCH_MS_RE = /\b1[0-9]{12}\b/g;
|
||||
const SESSION_NUM_RE = /\bsession[_\s-]?\d+\b/gi;
|
||||
// Agent worktree paths: agent-workdirs/<random-agent-name> (with or without trailing slash)
|
||||
// The agent name (e.g. "available-sheep") changes every run but is not a UUID or nanoid.
|
||||
// Stop at the first slash so the project name after it is preserved.
|
||||
const AGENT_WORKDIR_RE = /agent-workdirs\/[^\s/\\]+/g;
|
||||
|
||||
/**
|
||||
* Normalize a prompt for stable cassette key generation.
|
||||
@@ -23,6 +27,7 @@ const SESSION_NUM_RE = /\bsession[_\s-]?\d+\b/gi;
|
||||
* 3. ISO 8601 timestamps → __TIMESTAMP__
|
||||
* 4. Unix epoch milliseconds → __EPOCH__
|
||||
* 5. Session numbers → session__N__
|
||||
* 6. Agent worktree path segment → agent-workdirs/__AGENT__/
|
||||
*/
|
||||
export function normalizePrompt(prompt: string, workspaceRoot: string): string {
|
||||
let normalized = prompt;
|
||||
@@ -36,6 +41,7 @@ export function normalizePrompt(prompt: string, workspaceRoot: string): string {
|
||||
normalized = normalized.replace(ISO_TIMESTAMP_RE, '__TIMESTAMP__');
|
||||
normalized = normalized.replace(UNIX_EPOCH_MS_RE, '__EPOCH__');
|
||||
normalized = normalized.replace(SESSION_NUM_RE, 'session__N__');
|
||||
normalized = normalized.replace(AGENT_WORKDIR_RE, 'agent-workdirs/__AGENT__');
|
||||
|
||||
return normalized;
|
||||
}
|
||||
|
||||
@@ -16,8 +16,8 @@
|
||||
* - Worktree file hash (detects content drift for execute-mode agents)
|
||||
*/
|
||||
|
||||
import { readFileSync, existsSync } from 'node:fs';
|
||||
import { join } from 'node:path';
|
||||
import { readFileSync, existsSync, mkdirSync, writeFileSync, readdirSync } from 'node:fs';
|
||||
import { join, dirname, relative } from 'node:path';
|
||||
import { ProcessManager } from '../../agent/process-manager.js';
|
||||
import type { StreamEvent } from '../../agent/providers/parsers/index.js';
|
||||
import type { FileTailer } from '../../agent/file-tailer.js';
|
||||
@@ -35,10 +35,16 @@ interface PendingRecording {
|
||||
agentCwd: string;
|
||||
}
|
||||
|
||||
interface PendingReplay {
|
||||
cassette: CassetteEntry;
|
||||
agentCwd: string;
|
||||
}
|
||||
|
||||
export class CassetteProcessManager extends ProcessManager {
|
||||
private readonly _workspaceRoot: string;
|
||||
private readonly replayWorkerPath: string;
|
||||
private readonly pendingRecordings = new Map<number, PendingRecording>();
|
||||
private readonly pendingReplays = new Map<number, PendingReplay>();
|
||||
|
||||
constructor(
|
||||
workspaceRoot: string,
|
||||
@@ -74,7 +80,9 @@ export class CassetteProcessManager extends ProcessManager {
|
||||
const existing = this.cassetteMode !== 'record' ? this.store.find(key) : null;
|
||||
|
||||
if (existing) {
|
||||
return this.replayFromCassette(agentId, agentName, cwd, env, providerName, existing, onEvent, onRawContent);
|
||||
const result = this.replayFromCassette(agentId, agentName, cwd, env, providerName, existing, onEvent, onRawContent);
|
||||
this.pendingReplays.set(result.pid, { cassette: existing, agentCwd: cwd });
|
||||
return result;
|
||||
}
|
||||
|
||||
if (this.cassetteMode === 'replay') {
|
||||
@@ -97,18 +105,28 @@ export class CassetteProcessManager extends ProcessManager {
|
||||
onComplete: () => Promise<void>,
|
||||
getTailer: () => FileTailer | undefined,
|
||||
): { cancel: () => void } {
|
||||
const pending = this.pendingRecordings.get(pid);
|
||||
if (!pending) {
|
||||
// Replay mode — no recording to save; delegate to base implementation.
|
||||
return super.pollForCompletion(agentId, pid, onComplete, getTailer);
|
||||
const recording = this.pendingRecordings.get(pid);
|
||||
if (recording) {
|
||||
// Record mode — wrap onComplete to save the cassette before handing off.
|
||||
return super.pollForCompletion(agentId, pid, async () => {
|
||||
await this.saveCassette(recording);
|
||||
this.pendingRecordings.delete(pid);
|
||||
await onComplete();
|
||||
}, getTailer);
|
||||
}
|
||||
|
||||
// Record mode — wrap onComplete to save the cassette before handing off.
|
||||
return super.pollForCompletion(agentId, pid, async () => {
|
||||
await this.saveCassette(pending);
|
||||
this.pendingRecordings.delete(pid);
|
||||
await onComplete();
|
||||
}, getTailer);
|
||||
const replay = this.pendingReplays.get(pid);
|
||||
if (replay) {
|
||||
// Replay mode — restore .cw/output/ files before onComplete so that
|
||||
// readPhaseFiles / readTaskFiles / readProposalFiles find their data.
|
||||
return super.pollForCompletion(agentId, pid, async () => {
|
||||
this.restoreOutputFiles(replay.cassette, replay.agentCwd);
|
||||
this.pendingReplays.delete(pid);
|
||||
await onComplete();
|
||||
}, getTailer);
|
||||
}
|
||||
|
||||
return super.pollForCompletion(agentId, pid, onComplete, getTailer);
|
||||
}
|
||||
|
||||
private async saveCassette(pending: PendingRecording): Promise<void> {
|
||||
@@ -123,7 +141,8 @@ export class CassetteProcessManager extends ProcessManager {
|
||||
|
||||
// Read signal.json from the agent working directory.
|
||||
let signalJson: Record<string, unknown> | null = null;
|
||||
const signalPath = join(pending.agentCwd, '.cw', 'output', 'signal.json');
|
||||
const outputDir = join(pending.agentCwd, '.cw', 'output');
|
||||
const signalPath = join(outputDir, 'signal.json');
|
||||
if (existsSync(signalPath)) {
|
||||
try {
|
||||
signalJson = JSON.parse(readFileSync(signalPath, 'utf-8')) as Record<string, unknown>;
|
||||
@@ -132,6 +151,16 @@ export class CassetteProcessManager extends ProcessManager {
|
||||
}
|
||||
}
|
||||
|
||||
// Capture all other files in .cw/output/ (phase files, task files, etc.)
|
||||
const outputFiles: Record<string, string> = {};
|
||||
if (existsSync(outputDir)) {
|
||||
this.walkOutputDir(outputDir, outputDir, (relPath, content) => {
|
||||
if (relPath !== 'signal.json') {
|
||||
outputFiles[relPath] = content;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const entry: CassetteEntry = {
|
||||
version: 1,
|
||||
key: pending.key,
|
||||
@@ -140,12 +169,67 @@ export class CassetteProcessManager extends ProcessManager {
|
||||
signalJson,
|
||||
exitCode: 0,
|
||||
recordedAt: new Date().toISOString(),
|
||||
outputFiles,
|
||||
},
|
||||
};
|
||||
|
||||
this.store.save(pending.key, entry);
|
||||
}
|
||||
|
||||
/**
|
||||
* Restore captured .cw/output/ files to the new agent working directory.
|
||||
* Called before onComplete so that downstream readers (readPhaseFiles, etc.)
|
||||
* find the expected files in place.
|
||||
*/
|
||||
private restoreOutputFiles(cassette: CassetteEntry, agentCwd: string): void {
|
||||
const { outputFiles, signalJson } = cassette.recording;
|
||||
const outputDir = join(agentCwd, '.cw', 'output');
|
||||
|
||||
// Restore captured output files
|
||||
if (outputFiles) {
|
||||
for (const [relPath, content] of Object.entries(outputFiles)) {
|
||||
const fullPath = join(outputDir, relPath);
|
||||
mkdirSync(dirname(fullPath), { recursive: true });
|
||||
writeFileSync(fullPath, content, 'utf-8');
|
||||
}
|
||||
}
|
||||
|
||||
// Write signal.json (the manager reads this to detect completion status)
|
||||
if (signalJson) {
|
||||
mkdirSync(outputDir, { recursive: true });
|
||||
writeFileSync(join(outputDir, 'signal.json'), JSON.stringify(signalJson), 'utf-8');
|
||||
}
|
||||
}
|
||||
|
||||
private walkOutputDir(
|
||||
baseDir: string,
|
||||
currentDir: string,
|
||||
callback: (relPath: string, content: string) => void,
|
||||
): void {
|
||||
let entries;
|
||||
try {
|
||||
entries = readdirSync(currentDir, { withFileTypes: true });
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
|
||||
for (const entry of entries) {
|
||||
const fullPath = join(currentDir, entry.name);
|
||||
const relPath = relative(baseDir, fullPath);
|
||||
|
||||
if (entry.isDirectory()) {
|
||||
this.walkOutputDir(baseDir, fullPath, callback);
|
||||
} else if (entry.isFile()) {
|
||||
try {
|
||||
const content = readFileSync(fullPath, 'utf-8');
|
||||
callback(relPath, content);
|
||||
} catch {
|
||||
// Skip unreadable files
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private replayFromCassette(
|
||||
agentId: string,
|
||||
agentName: string,
|
||||
|
||||
@@ -26,6 +26,13 @@ export interface CassetteRecording {
|
||||
exitCode: number;
|
||||
/** ISO timestamp when this cassette was recorded. */
|
||||
recordedAt: string;
|
||||
/**
|
||||
* All files the agent wrote to .cw/output/ (relative path → UTF-8 content),
|
||||
* excluding signal.json (which is captured separately in signalJson).
|
||||
* Restored during replay before onComplete fires so downstream readers
|
||||
* (e.g. readPhaseFiles, readTaskFiles) see the expected directory contents.
|
||||
*/
|
||||
outputFiles?: Record<string, string>;
|
||||
}
|
||||
|
||||
export interface CassetteEntry {
|
||||
|
||||
65
src/test/cassettes/1cd13ac7ceb5fffb6a8fd52fe5825dd5.json
Normal file
65
src/test/cassettes/1cd13ac7ceb5fffb6a8fd52fe5825dd5.json
Normal file
File diff suppressed because one or more lines are too long
60
src/test/cassettes/3ebb6b15ba29592585517881a8deabcd.json
Normal file
60
src/test/cassettes/3ebb6b15ba29592585517881a8deabcd.json
Normal file
File diff suppressed because one or more lines are too long
79
src/test/cassettes/80831e59bdc5ad35515a4c68d5d4ed22.json
Normal file
79
src/test/cassettes/80831e59bdc5ad35515a4c68d5d4ed22.json
Normal file
File diff suppressed because one or more lines are too long
61
src/test/cassettes/ff2b1ae8f39a02ab1009de52b5fbd8de.json
Normal file
61
src/test/cassettes/ff2b1ae8f39a02ab1009de52b5fbd8de.json
Normal file
File diff suppressed because one or more lines are too long
@@ -43,8 +43,15 @@ import {
|
||||
// Constants
|
||||
// =============================================================================
|
||||
|
||||
/** Total test timeout: 5 minutes (replay=seconds; 5min covers accidental record) */
|
||||
const CASSETTE_FLOW_TIMEOUT = 5 * 60_000;
|
||||
const RECORDING =
|
||||
process.env.CW_CASSETTE_FORCE_RECORD === '1' || process.env.CW_CASSETTE_RECORD === '1';
|
||||
|
||||
/**
|
||||
* Test timeout.
|
||||
* - Replay: 5 min (cassettes complete in seconds; cap is generous headroom)
|
||||
* - Record: 60 min (real agents doing discuss/plan/detail/execute take API time)
|
||||
*/
|
||||
const CASSETTE_FLOW_TIMEOUT = RECORDING ? 60 * 60_000 : 5 * 60_000;
|
||||
|
||||
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||||
const CASSETTE_DIR =
|
||||
|
||||
Reference in New Issue
Block a user