Errands now create worktrees via ProcessManager.createWorktreesForProjects() into agent-workdirs/<alias>/<project.name>/ instead of repos/<project>/.cw-worktrees/<errandId>. This makes getAgentWorkdir + resolveAgentCwd work correctly for all agent types. Key changes: - Extract createWorktreesForProjects() from createProjectWorktrees() in ProcessManager - Add resolveAgentCwd() to ProcessManager (probes for .cw/output in subdirs) - Add projectId to SpawnAgentOptions for single-project agents (errands) - Skip auto-cleanup for errand agents (worktrees persist for merge/abandon) - Errand router uses agentManager.delete() for cleanup instead of SimpleGitWorktreeManager - Remove cwd parameter from sendUserMessage (resolves via worktreeId) - Add pruneProjectRepos() to CleanupManager for errand worktree refs
476 lines
15 KiB
TypeScript
476 lines
15 KiB
TypeScript
/**
|
|
* ProcessManager — Subprocess lifecycle, worktree creation, command building.
|
|
*
|
|
* Extracted from MultiProviderAgentManager. Manages the spawning of detached
|
|
* subprocesses, worktree creation per project, and provider-specific command
|
|
* construction.
|
|
*/
|
|
|
|
import { spawn } from 'node:child_process';
|
|
import { openSync, closeSync, existsSync, readdirSync } from 'node:fs';
|
|
import { mkdir, writeFile } from 'node:fs/promises';
|
|
import { join } from 'node:path';
|
|
import type { ProjectRepository } from '../db/repositories/project-repository.js';
|
|
import type { AgentProviderConfig } from './providers/types.js';
|
|
import type { StreamEvent } from './providers/parsers/index.js';
|
|
import { getStreamParser } from './providers/parsers/index.js';
|
|
import { SimpleGitWorktreeManager } from '../git/manager.js';
|
|
import { ensureProjectClone, getProjectCloneDir } from '../git/project-clones.js';
|
|
import { FileTailer } from './file-tailer.js';
|
|
import { createModuleLogger } from '../logger/index.js';
|
|
|
|
const log = createModuleLogger('process-manager');
|
|
|
|
/**
|
|
* Check if a process with the given PID is still alive.
|
|
*/
|
|
export function isPidAlive(pid: number): boolean {
|
|
try {
|
|
process.kill(pid, 0);
|
|
return true;
|
|
} catch {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
export class ProcessManager {
|
|
constructor(
|
|
private workspaceRoot: string,
|
|
private projectRepository: ProjectRepository,
|
|
) {}
|
|
|
|
/**
|
|
* Resolve the agent's working directory path.
|
|
*/
|
|
getAgentWorkdir(alias: string): string {
|
|
return join(this.workspaceRoot, 'agent-workdirs', alias);
|
|
}
|
|
|
|
/**
|
|
* Resolve the actual working directory for an agent by probing
|
|
* for the subdirectory that contains .cw/output/.
|
|
*/
|
|
resolveAgentCwd(alias: string): string {
|
|
const base = this.getAgentWorkdir(alias);
|
|
|
|
// Fast path: .cw/output exists at the base level
|
|
if (existsSync(join(base, '.cw', 'output'))) return base;
|
|
|
|
// Standalone agents use a workspace/ subdirectory
|
|
const workspaceSub = join(base, 'workspace');
|
|
if (existsSync(join(workspaceSub, '.cw'))) return workspaceSub;
|
|
|
|
// Initiative/errand agents may have written .cw/ inside a project
|
|
// subdirectory (e.g. agent-workdirs/<name>/<project-name>/.cw/).
|
|
try {
|
|
for (const entry of readdirSync(base, { withFileTypes: true })) {
|
|
if (entry.isDirectory() && entry.name !== '.cw') {
|
|
const sub = join(base, entry.name);
|
|
if (existsSync(join(sub, '.cw', 'output'))) return sub;
|
|
}
|
|
}
|
|
} catch {
|
|
// base dir may not exist
|
|
}
|
|
|
|
return base;
|
|
}
|
|
|
|
/**
|
|
* Create worktrees for all projects linked to an initiative.
|
|
* Returns the base agent workdir path.
|
|
*/
|
|
async createProjectWorktrees(
|
|
alias: string,
|
|
initiativeId: string,
|
|
baseBranch?: string,
|
|
branchName?: string,
|
|
): Promise<string> {
|
|
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
|
|
|
|
log.debug({
|
|
alias,
|
|
initiativeId,
|
|
projectCount: projects.length,
|
|
baseBranch
|
|
}, 'creating project worktrees');
|
|
|
|
// No linked projects — fall back to standalone worktree so the agent
|
|
// always has a git-backed working directory.
|
|
if (projects.length === 0) {
|
|
log.info({ alias, initiativeId }, 'initiative has no linked projects, falling back to standalone worktree');
|
|
return this.createStandaloneWorktree(alias);
|
|
}
|
|
|
|
return this.createWorktreesForProjects(alias, projects, baseBranch, branchName);
|
|
}
|
|
|
|
/**
|
|
* Create worktrees for a given list of projects under agent-workdirs/<alias>/.
|
|
* Used by both initiative-based and single-project (errand) agents.
|
|
* Returns the base agent workdir path.
|
|
*/
|
|
async createWorktreesForProjects(
|
|
alias: string,
|
|
projects: Array<{ name: string; url: string; id: string; defaultBranch: string }>,
|
|
baseBranch?: string,
|
|
branchName?: string,
|
|
): Promise<string> {
|
|
const agentWorkdir = this.getAgentWorkdir(alias);
|
|
|
|
for (const project of projects) {
|
|
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
|
|
const worktreeManager = new SimpleGitWorktreeManager(clonePath, undefined, agentWorkdir);
|
|
const effectiveBaseBranch = baseBranch ?? project.defaultBranch;
|
|
const worktree = await worktreeManager.create(project.name, branchName ?? `agent/${alias}`, effectiveBaseBranch);
|
|
const worktreePath = worktree.path;
|
|
const pathExists = existsSync(worktreePath);
|
|
|
|
log.debug({
|
|
alias,
|
|
agentWorkdir,
|
|
projectName: project.name,
|
|
worktreePath,
|
|
pathExists
|
|
}, 'worktree created');
|
|
|
|
if (!pathExists) {
|
|
log.error({ worktreePath }, 'Worktree path does not exist after creation!');
|
|
throw new Error(`Worktree creation failed: ${worktreePath}`);
|
|
}
|
|
}
|
|
|
|
return agentWorkdir;
|
|
}
|
|
|
|
/**
|
|
* Fallback: create a single "workspace" worktree for standalone agents.
|
|
*/
|
|
async createStandaloneWorktree(alias: string): Promise<string> {
|
|
const agentWorkdir = this.getAgentWorkdir(alias);
|
|
const worktreeManager = new SimpleGitWorktreeManager(this.workspaceRoot, undefined, agentWorkdir);
|
|
|
|
log.debug({ alias, agentWorkdir }, 'creating standalone worktree');
|
|
|
|
const worktree = await worktreeManager.create('workspace', `agent/${alias}`);
|
|
const worktreePath = worktree.path;
|
|
const pathExists = existsSync(worktreePath);
|
|
|
|
log.debug({
|
|
alias,
|
|
agentWorkdir,
|
|
worktreePath,
|
|
pathExists
|
|
}, 'standalone worktree created');
|
|
|
|
if (!pathExists) {
|
|
log.error({ worktreePath }, 'Standalone worktree path does not exist after creation!');
|
|
throw new Error(`Standalone worktree creation failed: ${worktreePath}`);
|
|
}
|
|
|
|
return worktree.path;
|
|
}
|
|
|
|
/**
|
|
* Build the spawn command for a given provider configuration.
|
|
*/
|
|
buildSpawnCommand(
|
|
provider: AgentProviderConfig,
|
|
prompt: string,
|
|
): { command: string; args: string[]; env: Record<string, string> } {
|
|
const args = [...provider.args];
|
|
const env: Record<string, string> = { ...provider.env };
|
|
|
|
if (provider.nonInteractive?.subcommand) {
|
|
args.unshift(provider.nonInteractive.subcommand);
|
|
}
|
|
|
|
if (provider.promptMode === 'native') {
|
|
args.push('-p', prompt);
|
|
} else if (provider.promptMode === 'flag' && provider.nonInteractive?.promptFlag) {
|
|
args.push(provider.nonInteractive.promptFlag, prompt);
|
|
}
|
|
|
|
if (provider.nonInteractive?.outputFlag) {
|
|
args.push(...provider.nonInteractive.outputFlag.split(' '));
|
|
}
|
|
|
|
return { command: provider.command, args, env };
|
|
}
|
|
|
|
/**
|
|
* Build the resume command for a given provider configuration.
|
|
*/
|
|
buildResumeCommand(
|
|
provider: AgentProviderConfig,
|
|
sessionId: string,
|
|
prompt: string,
|
|
): { command: string; args: string[]; env: Record<string, string> } {
|
|
const args = [...provider.args];
|
|
const env: Record<string, string> = { ...provider.env };
|
|
|
|
switch (provider.resumeStyle) {
|
|
case 'flag':
|
|
args.push(provider.resumeFlag!, sessionId);
|
|
break;
|
|
case 'subcommand':
|
|
if (provider.nonInteractive?.subcommand) {
|
|
args.unshift(provider.nonInteractive.subcommand);
|
|
}
|
|
args.push(provider.resumeFlag!, sessionId);
|
|
break;
|
|
case 'none':
|
|
throw new Error(`Provider '${provider.name}' does not support resume`);
|
|
}
|
|
|
|
if (provider.promptMode === 'native') {
|
|
args.push('-p', prompt);
|
|
} else if (provider.promptMode === 'flag' && provider.nonInteractive?.promptFlag) {
|
|
args.push(provider.nonInteractive.promptFlag, prompt);
|
|
}
|
|
|
|
if (provider.nonInteractive?.outputFlag) {
|
|
args.push(...provider.nonInteractive.outputFlag.split(' '));
|
|
}
|
|
|
|
return { command: provider.command, args, env };
|
|
}
|
|
|
|
/**
|
|
* Extract session ID from CLI output based on provider config.
|
|
*/
|
|
extractSessionId(
|
|
provider: AgentProviderConfig,
|
|
output: string,
|
|
): string | null {
|
|
if (!provider.sessionId) return null;
|
|
|
|
try {
|
|
if (provider.sessionId.extractFrom === 'result') {
|
|
const parsed = JSON.parse(output);
|
|
return parsed[provider.sessionId.field] ?? null;
|
|
}
|
|
|
|
if (provider.sessionId.extractFrom === 'event') {
|
|
const lines = output.trim().split('\n');
|
|
for (const line of lines) {
|
|
try {
|
|
const event = JSON.parse(line);
|
|
if (event.type === provider.sessionId.eventType) {
|
|
return event[provider.sessionId.field] ?? null;
|
|
}
|
|
} catch {
|
|
// Skip non-JSON lines
|
|
}
|
|
}
|
|
}
|
|
} catch {
|
|
// Parse failure
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Spawn a detached subprocess with file redirection for crash resilience.
|
|
* The subprocess writes directly to files and survives server crashes.
|
|
* A FileTailer watches the output file and emits events in real-time.
|
|
*
|
|
* @param onEvent - Callback for stream events from the tailer
|
|
*/
|
|
async spawnDetached(
|
|
agentId: string,
|
|
agentName: string,
|
|
command: string,
|
|
args: string[],
|
|
cwd: string,
|
|
env: Record<string, string>,
|
|
providerName: string,
|
|
prompt?: string,
|
|
onEvent?: (event: StreamEvent) => void,
|
|
onRawContent?: (content: string) => void,
|
|
): Promise<{ pid: number; outputFilePath: string; tailer: FileTailer }> {
|
|
// Pre-spawn validation and logging
|
|
const cwdExists = existsSync(cwd);
|
|
const commandWithArgs = [command, ...args].join(' ');
|
|
|
|
// Log environment variables that might affect working directory
|
|
const environmentInfo = {
|
|
PWD: process.env.PWD,
|
|
HOME: process.env.HOME,
|
|
CLAUDE_CONFIG_DIR: env.CLAUDE_CONFIG_DIR,
|
|
CW_CONFIG_DIR: env.CW_CONFIG_DIR
|
|
};
|
|
|
|
log.info({
|
|
agentId,
|
|
cwd,
|
|
cwdExists,
|
|
commandWithArgs,
|
|
providerName,
|
|
environmentInfo
|
|
}, 'spawning detached process with workdir validation');
|
|
|
|
if (!cwdExists) {
|
|
log.error({ cwd }, 'CWD does not exist before spawn!');
|
|
throw new Error(`Agent working directory does not exist: ${cwd}`);
|
|
}
|
|
|
|
const logDir = join(this.workspaceRoot, '.cw', 'agent-logs', agentName);
|
|
await mkdir(logDir, { recursive: true });
|
|
const outputFilePath = join(logDir, 'output.jsonl');
|
|
const stderrFilePath = join(logDir, 'stderr.log');
|
|
|
|
if (prompt) {
|
|
await writeFile(join(logDir, 'PROMPT.md'), prompt, 'utf-8');
|
|
}
|
|
|
|
const stdoutFd = openSync(outputFilePath, 'w');
|
|
const stderrFd = openSync(stderrFilePath, 'w');
|
|
|
|
const child = spawn(command, args, {
|
|
cwd,
|
|
env: { ...process.env, ...env },
|
|
detached: true,
|
|
stdio: ['ignore', stdoutFd, stderrFd],
|
|
});
|
|
|
|
closeSync(stdoutFd);
|
|
closeSync(stderrFd);
|
|
|
|
child.unref();
|
|
|
|
const pid = child.pid!;
|
|
log.info({
|
|
agentId,
|
|
pid,
|
|
command,
|
|
args: args.join(' '),
|
|
cwd,
|
|
spawnSuccess: true
|
|
}, 'spawned detached process successfully');
|
|
|
|
const parser = getStreamParser(providerName);
|
|
const tailer = new FileTailer({
|
|
filePath: outputFilePath,
|
|
agentId,
|
|
parser,
|
|
onEvent: onEvent ?? (() => {}),
|
|
startFromBeginning: true,
|
|
onRawContent,
|
|
});
|
|
|
|
tailer.start().catch((err) => {
|
|
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to start tailer');
|
|
});
|
|
|
|
return { pid, outputFilePath, tailer };
|
|
}
|
|
|
|
/**
|
|
* Poll for process completion by checking if PID is still alive.
|
|
* When the process exits, calls onComplete callback.
|
|
* Returns a cancel handle to stop polling (e.g. on agent cleanup or re-resume).
|
|
*
|
|
* Optionally checks signal.json after a grace period to detect hung processes
|
|
* that completed work but failed to exit. If a valid signal is found while the
|
|
* process is still alive, SIGTERM is sent and normal completion proceeds.
|
|
*
|
|
* @param onComplete - Called when the process is no longer alive
|
|
* @param getTailer - Function to get the current tailer for final flush
|
|
* @param checkEarlyCompletion - Optional callback that returns true if signal.json indicates completion
|
|
*/
|
|
pollForCompletion(
|
|
agentId: string,
|
|
pid: number,
|
|
onComplete: () => Promise<void>,
|
|
getTailer: () => FileTailer | undefined,
|
|
checkEarlyCompletion?: () => Promise<boolean>,
|
|
): { cancel: () => void } {
|
|
let cancelled = false;
|
|
const startTime = Date.now();
|
|
const GRACE_PERIOD_MS = 60_000;
|
|
const SIGNAL_CHECK_INTERVAL_MS = 30_000;
|
|
let lastSignalCheck = 0;
|
|
|
|
const finalize = async () => {
|
|
const tailer = getTailer();
|
|
if (tailer) {
|
|
await new Promise((resolve) => setTimeout(resolve, 500));
|
|
await tailer.stop();
|
|
}
|
|
if (!cancelled) await onComplete();
|
|
};
|
|
|
|
const check = async () => {
|
|
if (cancelled) return;
|
|
if (!isPidAlive(pid)) {
|
|
await finalize();
|
|
return;
|
|
}
|
|
|
|
// Defensive signal check: after grace period, periodically check signal.json
|
|
if (checkEarlyCompletion) {
|
|
const elapsed = Date.now() - startTime;
|
|
if (elapsed >= GRACE_PERIOD_MS && Date.now() - lastSignalCheck >= SIGNAL_CHECK_INTERVAL_MS) {
|
|
lastSignalCheck = Date.now();
|
|
try {
|
|
const hasSignal = await checkEarlyCompletion();
|
|
if (hasSignal) {
|
|
log.warn({ agentId, pid, elapsedMs: elapsed }, 'signal.json found but process still alive — sending SIGTERM');
|
|
try { process.kill(pid, 'SIGTERM'); } catch { /* already dead */ }
|
|
await new Promise((resolve) => setTimeout(resolve, 2000));
|
|
await finalize();
|
|
return;
|
|
}
|
|
} catch (err) {
|
|
log.debug({ agentId, err: err instanceof Error ? err.message : String(err) }, 'early completion check failed');
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!cancelled) setTimeout(check, 1000);
|
|
};
|
|
check();
|
|
return { cancel: () => { cancelled = true; } };
|
|
}
|
|
|
|
/**
|
|
* Wait for a process to complete with Promise-based API.
|
|
* Returns when the process is no longer alive.
|
|
*/
|
|
async waitForProcessCompletion(pid: number, timeoutMs: number = 300000): Promise<{ exitCode: number | null }> {
|
|
return new Promise((resolve, reject) => {
|
|
const startTime = Date.now();
|
|
|
|
const check = () => {
|
|
if (!isPidAlive(pid)) {
|
|
// Process has exited, try to get exit code
|
|
// Note: Getting exact exit code from detached process is limited
|
|
resolve({ exitCode: null });
|
|
return;
|
|
}
|
|
|
|
if (Date.now() - startTime > timeoutMs) {
|
|
reject(new Error(`Process ${pid} did not complete within ${timeoutMs}ms`));
|
|
return;
|
|
}
|
|
|
|
setTimeout(check, 1000);
|
|
};
|
|
|
|
check();
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Get the exit code of a completed process.
|
|
* Limited implementation since we use detached processes.
|
|
*/
|
|
async getExitCode(pid: number): Promise<number | null> {
|
|
// For detached processes, we can't easily get the exit code
|
|
// This would need to be enhanced with process tracking
|
|
return null;
|
|
}
|
|
}
|