fix: Stabilize full-flow cassette keys and restore output files on replay
Three issues discovered and fixed after initial recording: 1. Agent workdir names not normalized — random animal names (e.g. "available-sheep") embedded in workspace paths caused key drift. Added AGENT_WORKDIR_RE to replace agent-workdirs/<name> with agent-workdirs/__AGENT__ in normalizer.ts. 2. Phase/task files missing on replay — plan/detail agents write output to .cw/output/ (phases/, tasks/) which the server reads on completion. The replay worker only emits JSONL; it doesn't re-execute file writes. Extended cassette format with outputFiles field and added capture (walkOutputDir) + restore (restoreOutputFiles) logic to process-manager. 3. Recording timeout too short — fixed CASSETTE_FLOW_TIMEOUT to be mode-aware: 60 min for recording runs, 5 min for replay. Also commit the 4 recorded cassettes (discuss/plan/detail/execute) that make the full-flow cassette test runnable in CI without API costs.
This commit is contained in:
@@ -12,6 +12,10 @@ const NANOID_RE = /(?<![A-Za-z0-9])[A-Za-z0-9_-]{21}(?![A-Za-z0-9_-])/g;
|
|||||||
const ISO_TIMESTAMP_RE = /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:\d{2})?/g;
|
const ISO_TIMESTAMP_RE = /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:\d{2})?/g;
|
||||||
const UNIX_EPOCH_MS_RE = /\b1[0-9]{12}\b/g;
|
const UNIX_EPOCH_MS_RE = /\b1[0-9]{12}\b/g;
|
||||||
const SESSION_NUM_RE = /\bsession[_\s-]?\d+\b/gi;
|
const SESSION_NUM_RE = /\bsession[_\s-]?\d+\b/gi;
|
||||||
|
// Agent worktree paths: agent-workdirs/<random-agent-name> (with or without trailing slash)
|
||||||
|
// The agent name (e.g. "available-sheep") changes every run but is not a UUID or nanoid.
|
||||||
|
// Stop at the first slash so the project name after it is preserved.
|
||||||
|
const AGENT_WORKDIR_RE = /agent-workdirs\/[^\s/\\]+/g;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Normalize a prompt for stable cassette key generation.
|
* Normalize a prompt for stable cassette key generation.
|
||||||
@@ -23,6 +27,7 @@ const SESSION_NUM_RE = /\bsession[_\s-]?\d+\b/gi;
|
|||||||
* 3. ISO 8601 timestamps → __TIMESTAMP__
|
* 3. ISO 8601 timestamps → __TIMESTAMP__
|
||||||
* 4. Unix epoch milliseconds → __EPOCH__
|
* 4. Unix epoch milliseconds → __EPOCH__
|
||||||
* 5. Session numbers → session__N__
|
* 5. Session numbers → session__N__
|
||||||
|
* 6. Agent worktree path segment → agent-workdirs/__AGENT__/
|
||||||
*/
|
*/
|
||||||
export function normalizePrompt(prompt: string, workspaceRoot: string): string {
|
export function normalizePrompt(prompt: string, workspaceRoot: string): string {
|
||||||
let normalized = prompt;
|
let normalized = prompt;
|
||||||
@@ -36,6 +41,7 @@ export function normalizePrompt(prompt: string, workspaceRoot: string): string {
|
|||||||
normalized = normalized.replace(ISO_TIMESTAMP_RE, '__TIMESTAMP__');
|
normalized = normalized.replace(ISO_TIMESTAMP_RE, '__TIMESTAMP__');
|
||||||
normalized = normalized.replace(UNIX_EPOCH_MS_RE, '__EPOCH__');
|
normalized = normalized.replace(UNIX_EPOCH_MS_RE, '__EPOCH__');
|
||||||
normalized = normalized.replace(SESSION_NUM_RE, 'session__N__');
|
normalized = normalized.replace(SESSION_NUM_RE, 'session__N__');
|
||||||
|
normalized = normalized.replace(AGENT_WORKDIR_RE, 'agent-workdirs/__AGENT__');
|
||||||
|
|
||||||
return normalized;
|
return normalized;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,8 +16,8 @@
|
|||||||
* - Worktree file hash (detects content drift for execute-mode agents)
|
* - Worktree file hash (detects content drift for execute-mode agents)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { readFileSync, existsSync } from 'node:fs';
|
import { readFileSync, existsSync, mkdirSync, writeFileSync, readdirSync } from 'node:fs';
|
||||||
import { join } from 'node:path';
|
import { join, dirname, relative } from 'node:path';
|
||||||
import { ProcessManager } from '../../agent/process-manager.js';
|
import { ProcessManager } from '../../agent/process-manager.js';
|
||||||
import type { StreamEvent } from '../../agent/providers/parsers/index.js';
|
import type { StreamEvent } from '../../agent/providers/parsers/index.js';
|
||||||
import type { FileTailer } from '../../agent/file-tailer.js';
|
import type { FileTailer } from '../../agent/file-tailer.js';
|
||||||
@@ -35,10 +35,16 @@ interface PendingRecording {
|
|||||||
agentCwd: string;
|
agentCwd: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface PendingReplay {
|
||||||
|
cassette: CassetteEntry;
|
||||||
|
agentCwd: string;
|
||||||
|
}
|
||||||
|
|
||||||
export class CassetteProcessManager extends ProcessManager {
|
export class CassetteProcessManager extends ProcessManager {
|
||||||
private readonly _workspaceRoot: string;
|
private readonly _workspaceRoot: string;
|
||||||
private readonly replayWorkerPath: string;
|
private readonly replayWorkerPath: string;
|
||||||
private readonly pendingRecordings = new Map<number, PendingRecording>();
|
private readonly pendingRecordings = new Map<number, PendingRecording>();
|
||||||
|
private readonly pendingReplays = new Map<number, PendingReplay>();
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
workspaceRoot: string,
|
workspaceRoot: string,
|
||||||
@@ -74,7 +80,9 @@ export class CassetteProcessManager extends ProcessManager {
|
|||||||
const existing = this.cassetteMode !== 'record' ? this.store.find(key) : null;
|
const existing = this.cassetteMode !== 'record' ? this.store.find(key) : null;
|
||||||
|
|
||||||
if (existing) {
|
if (existing) {
|
||||||
return this.replayFromCassette(agentId, agentName, cwd, env, providerName, existing, onEvent, onRawContent);
|
const result = this.replayFromCassette(agentId, agentName, cwd, env, providerName, existing, onEvent, onRawContent);
|
||||||
|
this.pendingReplays.set(result.pid, { cassette: existing, agentCwd: cwd });
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.cassetteMode === 'replay') {
|
if (this.cassetteMode === 'replay') {
|
||||||
@@ -97,18 +105,28 @@ export class CassetteProcessManager extends ProcessManager {
|
|||||||
onComplete: () => Promise<void>,
|
onComplete: () => Promise<void>,
|
||||||
getTailer: () => FileTailer | undefined,
|
getTailer: () => FileTailer | undefined,
|
||||||
): { cancel: () => void } {
|
): { cancel: () => void } {
|
||||||
const pending = this.pendingRecordings.get(pid);
|
const recording = this.pendingRecordings.get(pid);
|
||||||
if (!pending) {
|
if (recording) {
|
||||||
// Replay mode — no recording to save; delegate to base implementation.
|
// Record mode — wrap onComplete to save the cassette before handing off.
|
||||||
return super.pollForCompletion(agentId, pid, onComplete, getTailer);
|
return super.pollForCompletion(agentId, pid, async () => {
|
||||||
|
await this.saveCassette(recording);
|
||||||
|
this.pendingRecordings.delete(pid);
|
||||||
|
await onComplete();
|
||||||
|
}, getTailer);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record mode — wrap onComplete to save the cassette before handing off.
|
const replay = this.pendingReplays.get(pid);
|
||||||
return super.pollForCompletion(agentId, pid, async () => {
|
if (replay) {
|
||||||
await this.saveCassette(pending);
|
// Replay mode — restore .cw/output/ files before onComplete so that
|
||||||
this.pendingRecordings.delete(pid);
|
// readPhaseFiles / readTaskFiles / readProposalFiles find their data.
|
||||||
await onComplete();
|
return super.pollForCompletion(agentId, pid, async () => {
|
||||||
}, getTailer);
|
this.restoreOutputFiles(replay.cassette, replay.agentCwd);
|
||||||
|
this.pendingReplays.delete(pid);
|
||||||
|
await onComplete();
|
||||||
|
}, getTailer);
|
||||||
|
}
|
||||||
|
|
||||||
|
return super.pollForCompletion(agentId, pid, onComplete, getTailer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async saveCassette(pending: PendingRecording): Promise<void> {
|
private async saveCassette(pending: PendingRecording): Promise<void> {
|
||||||
@@ -123,7 +141,8 @@ export class CassetteProcessManager extends ProcessManager {
|
|||||||
|
|
||||||
// Read signal.json from the agent working directory.
|
// Read signal.json from the agent working directory.
|
||||||
let signalJson: Record<string, unknown> | null = null;
|
let signalJson: Record<string, unknown> | null = null;
|
||||||
const signalPath = join(pending.agentCwd, '.cw', 'output', 'signal.json');
|
const outputDir = join(pending.agentCwd, '.cw', 'output');
|
||||||
|
const signalPath = join(outputDir, 'signal.json');
|
||||||
if (existsSync(signalPath)) {
|
if (existsSync(signalPath)) {
|
||||||
try {
|
try {
|
||||||
signalJson = JSON.parse(readFileSync(signalPath, 'utf-8')) as Record<string, unknown>;
|
signalJson = JSON.parse(readFileSync(signalPath, 'utf-8')) as Record<string, unknown>;
|
||||||
@@ -132,6 +151,16 @@ export class CassetteProcessManager extends ProcessManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Capture all other files in .cw/output/ (phase files, task files, etc.)
|
||||||
|
const outputFiles: Record<string, string> = {};
|
||||||
|
if (existsSync(outputDir)) {
|
||||||
|
this.walkOutputDir(outputDir, outputDir, (relPath, content) => {
|
||||||
|
if (relPath !== 'signal.json') {
|
||||||
|
outputFiles[relPath] = content;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
const entry: CassetteEntry = {
|
const entry: CassetteEntry = {
|
||||||
version: 1,
|
version: 1,
|
||||||
key: pending.key,
|
key: pending.key,
|
||||||
@@ -140,12 +169,67 @@ export class CassetteProcessManager extends ProcessManager {
|
|||||||
signalJson,
|
signalJson,
|
||||||
exitCode: 0,
|
exitCode: 0,
|
||||||
recordedAt: new Date().toISOString(),
|
recordedAt: new Date().toISOString(),
|
||||||
|
outputFiles,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
this.store.save(pending.key, entry);
|
this.store.save(pending.key, entry);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restore captured .cw/output/ files to the new agent working directory.
|
||||||
|
* Called before onComplete so that downstream readers (readPhaseFiles, etc.)
|
||||||
|
* find the expected files in place.
|
||||||
|
*/
|
||||||
|
private restoreOutputFiles(cassette: CassetteEntry, agentCwd: string): void {
|
||||||
|
const { outputFiles, signalJson } = cassette.recording;
|
||||||
|
const outputDir = join(agentCwd, '.cw', 'output');
|
||||||
|
|
||||||
|
// Restore captured output files
|
||||||
|
if (outputFiles) {
|
||||||
|
for (const [relPath, content] of Object.entries(outputFiles)) {
|
||||||
|
const fullPath = join(outputDir, relPath);
|
||||||
|
mkdirSync(dirname(fullPath), { recursive: true });
|
||||||
|
writeFileSync(fullPath, content, 'utf-8');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write signal.json (the manager reads this to detect completion status)
|
||||||
|
if (signalJson) {
|
||||||
|
mkdirSync(outputDir, { recursive: true });
|
||||||
|
writeFileSync(join(outputDir, 'signal.json'), JSON.stringify(signalJson), 'utf-8');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private walkOutputDir(
|
||||||
|
baseDir: string,
|
||||||
|
currentDir: string,
|
||||||
|
callback: (relPath: string, content: string) => void,
|
||||||
|
): void {
|
||||||
|
let entries;
|
||||||
|
try {
|
||||||
|
entries = readdirSync(currentDir, { withFileTypes: true });
|
||||||
|
} catch {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const entry of entries) {
|
||||||
|
const fullPath = join(currentDir, entry.name);
|
||||||
|
const relPath = relative(baseDir, fullPath);
|
||||||
|
|
||||||
|
if (entry.isDirectory()) {
|
||||||
|
this.walkOutputDir(baseDir, fullPath, callback);
|
||||||
|
} else if (entry.isFile()) {
|
||||||
|
try {
|
||||||
|
const content = readFileSync(fullPath, 'utf-8');
|
||||||
|
callback(relPath, content);
|
||||||
|
} catch {
|
||||||
|
// Skip unreadable files
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private replayFromCassette(
|
private replayFromCassette(
|
||||||
agentId: string,
|
agentId: string,
|
||||||
agentName: string,
|
agentName: string,
|
||||||
|
|||||||
@@ -26,6 +26,13 @@ export interface CassetteRecording {
|
|||||||
exitCode: number;
|
exitCode: number;
|
||||||
/** ISO timestamp when this cassette was recorded. */
|
/** ISO timestamp when this cassette was recorded. */
|
||||||
recordedAt: string;
|
recordedAt: string;
|
||||||
|
/**
|
||||||
|
* All files the agent wrote to .cw/output/ (relative path → UTF-8 content),
|
||||||
|
* excluding signal.json (which is captured separately in signalJson).
|
||||||
|
* Restored during replay before onComplete fires so downstream readers
|
||||||
|
* (e.g. readPhaseFiles, readTaskFiles) see the expected directory contents.
|
||||||
|
*/
|
||||||
|
outputFiles?: Record<string, string>;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface CassetteEntry {
|
export interface CassetteEntry {
|
||||||
|
|||||||
65
src/test/cassettes/1cd13ac7ceb5fffb6a8fd52fe5825dd5.json
Normal file
65
src/test/cassettes/1cd13ac7ceb5fffb6a8fd52fe5825dd5.json
Normal file
File diff suppressed because one or more lines are too long
60
src/test/cassettes/3ebb6b15ba29592585517881a8deabcd.json
Normal file
60
src/test/cassettes/3ebb6b15ba29592585517881a8deabcd.json
Normal file
File diff suppressed because one or more lines are too long
79
src/test/cassettes/80831e59bdc5ad35515a4c68d5d4ed22.json
Normal file
79
src/test/cassettes/80831e59bdc5ad35515a4c68d5d4ed22.json
Normal file
File diff suppressed because one or more lines are too long
61
src/test/cassettes/ff2b1ae8f39a02ab1009de52b5fbd8de.json
Normal file
61
src/test/cassettes/ff2b1ae8f39a02ab1009de52b5fbd8de.json
Normal file
File diff suppressed because one or more lines are too long
@@ -43,8 +43,15 @@ import {
|
|||||||
// Constants
|
// Constants
|
||||||
// =============================================================================
|
// =============================================================================
|
||||||
|
|
||||||
/** Total test timeout: 5 minutes (replay=seconds; 5min covers accidental record) */
|
const RECORDING =
|
||||||
const CASSETTE_FLOW_TIMEOUT = 5 * 60_000;
|
process.env.CW_CASSETTE_FORCE_RECORD === '1' || process.env.CW_CASSETTE_RECORD === '1';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test timeout.
|
||||||
|
* - Replay: 5 min (cassettes complete in seconds; cap is generous headroom)
|
||||||
|
* - Record: 60 min (real agents doing discuss/plan/detail/execute take API time)
|
||||||
|
*/
|
||||||
|
const CASSETTE_FLOW_TIMEOUT = RECORDING ? 60 * 60_000 : 5 * 60_000;
|
||||||
|
|
||||||
const __dirname = dirname(fileURLToPath(import.meta.url));
|
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||||||
const CASSETTE_DIR =
|
const CASSETTE_DIR =
|
||||||
|
|||||||
Reference in New Issue
Block a user