Files
Codewalkers/apps/server/agent/manager.ts
Lukas May b853b28751 fix: Resolve agent workdir probing for initiative project subdirectories
Conflict-resolution agents (and any initiative-based agent) can write
.cw/output/signal.json inside a project subdirectory (e.g.
agent-workdirs/<name>/codewalk-district/.cw/output/) rather than the
parent agent workdir. This caused two failures:

1. spawnInternal wrote spawn-diagnostic.json before registering the
   agent in activeAgents and starting pollForCompletion. If the .cw/
   directory didn't exist (no inputContext provided), the write threw
   ENOENT, orphaning the running process with no completion monitoring.

2. resolveAgentCwd in cleanup-manager and output-handler only probed
   for a workspace/ subdirectory (standalone agents) but not project
   subdirectories, so reconciliation and completion handling couldn't
   find signal.json and marked the agent as crashed.

Fixes:
- Move activeAgents registration and pollForCompletion setup before
  the diagnostic write; make the write non-fatal with mkdir -p
- Add project subdirectory probing to resolveAgentCwd in both
  cleanup-manager.ts and output-handler.ts
2026-03-06 12:03:20 +01:00

1117 lines
42 KiB
TypeScript

/**
* Multi-Provider Agent Manager — Orchestrator
*
* Implementation of AgentManager port supporting multiple CLI providers.
* Delegates to extracted helpers:
* - ProcessManager: subprocess spawn/kill/poll, worktree creation, command building
* - CredentialHandler: account selection, credential write/refresh, exhaustion handling
* - OutputHandler: stream events, signal parsing, file reading, result capture
* - CleanupManager: worktree/branch/log removal, orphan cleanup, reconciliation
*/
import type {
AgentManager,
AgentInfo,
SpawnAgentOptions,
AgentResult,
AgentStatus,
AgentMode,
PendingQuestions,
} from './types.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { AccountRepository } from '../db/repositories/account-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { ChangeSetRepository } from '../db/repositories/change-set-repository.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { PageRepository } from '../db/repositories/page-repository.js';
import type { LogChunkRepository } from '../db/repositories/log-chunk-repository.js';
import type { ChatSessionRepository } from '../db/repositories/chat-session-repository.js';
import type { ReviewCommentRepository } from '../db/repositories/review-comment-repository.js';
import { generateUniqueAlias } from './alias.js';
import type {
EventBus,
AgentSpawnedEvent,
AgentStoppedEvent,
AgentResumedEvent,
AgentDeletedEvent,
ProcessCrashedEvent,
} from '../events/index.js';
import { writeInputFiles } from './file-io.js';
import { buildWorkspaceLayout, buildInterAgentCommunication, buildPreviewInstructions } from './prompts/index.js';
import { getProvider } from './providers/registry.js';
import { createModuleLogger } from '../logger/index.js';
import { getProjectCloneDir } from '../git/project-clones.js';
import { join } from 'node:path';
import { unlink, readFile, writeFile as writeFileAsync, mkdir } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import type { AccountCredentialManager } from './credentials/types.js';
import { ProcessManager } from './process-manager.js';
import { CredentialHandler } from './credential-handler.js';
import { OutputHandler, type ActiveAgent } from './output-handler.js';
import { CleanupManager } from './cleanup-manager.js';
import { createLifecycleController } from './lifecycle/factory.js';
import type { AgentLifecycleController } from './lifecycle/controller.js';
import { AgentExhaustedError, AgentFailureError } from './lifecycle/retry-policy.js';
import { FileSystemSignalManager } from './lifecycle/signal-manager.js';
import type { SignalManager } from './lifecycle/signal-manager.js';
const log = createModuleLogger('agent-manager');
export class MultiProviderAgentManager implements AgentManager {
private static readonly MAX_COMMIT_RETRIES = 1;
private activeAgents: Map<string, ActiveAgent> = new Map();
private commitRetryCount: Map<string, number> = new Map();
private processManager: ProcessManager;
private credentialHandler: CredentialHandler;
private outputHandler: OutputHandler;
private cleanupManager: CleanupManager;
private lifecycleController: AgentLifecycleController;
private signalManager: SignalManager;
constructor(
private repository: AgentRepository,
private workspaceRoot: string,
private projectRepository: ProjectRepository,
private accountRepository?: AccountRepository,
private eventBus?: EventBus,
private credentialManager?: AccountCredentialManager,
private changeSetRepository?: ChangeSetRepository,
private phaseRepository?: PhaseRepository,
private taskRepository?: TaskRepository,
private pageRepository?: PageRepository,
private logChunkRepository?: LogChunkRepository,
private debug: boolean = false,
processManagerOverride?: ProcessManager,
private chatSessionRepository?: ChatSessionRepository,
private reviewCommentRepository?: ReviewCommentRepository,
) {
this.signalManager = new FileSystemSignalManager();
this.processManager = processManagerOverride ?? new ProcessManager(workspaceRoot, projectRepository);
this.credentialHandler = new CredentialHandler(workspaceRoot, accountRepository, credentialManager);
this.outputHandler = new OutputHandler(repository, eventBus, changeSetRepository, phaseRepository, taskRepository, pageRepository, this.signalManager, chatSessionRepository, reviewCommentRepository);
this.cleanupManager = new CleanupManager(workspaceRoot, repository, projectRepository, eventBus, debug, this.signalManager);
this.lifecycleController = createLifecycleController({
repository,
processManager: this.processManager,
cleanupManager: this.cleanupManager,
accountRepository,
debug,
});
// Listen for process crashed events to handle agents specially
if (eventBus) {
eventBus.on('process:crashed', async (event: ProcessCrashedEvent) => {
await this.handleProcessCrashed(event.payload.processId, event.payload.exitCode, event.payload.signal);
});
}
}
/**
* Centralized cleanup of all in-memory state for an agent.
* Cancels polling timer, removes from activeAgents.
* NOTE: Does NOT clear commitRetryCount — that's managed by tryAutoCleanup()
* and explicitly by stop()/delete() to avoid resetting retries mid-cycle.
*/
private cleanupAgentState(agentId: string): void {
const active = this.activeAgents.get(agentId);
if (active?.cancelPoll) active.cancelPoll();
this.activeAgents.delete(agentId);
}
/**
* Create a fire-and-forget callback for persisting raw output chunks to the DB.
* Returns undefined if no logChunkRepository is configured.
*/
private createLogChunkCallback(
agentId: string,
agentName: string,
sessionNumber: number,
): ((content: string) => void) | undefined {
const repo = this.logChunkRepository;
if (!repo) return undefined;
return (content) => {
repo.insertChunk({ agentId, agentName, sessionNumber, content })
.then(() => {
if (this.eventBus) {
this.eventBus.emit({
type: 'agent:output' as const,
timestamp: new Date(),
payload: { agentId, stream: 'stdout', data: content },
});
}
})
.catch(err => log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to persist log chunk'));
};
}
/**
* Spawn a new agent using the unified lifecycle controller.
* Features comprehensive retry, error handling, and cleanup.
*/
async spawnWithLifecycle(options: SpawnAgentOptions): Promise<AgentInfo> {
log.info({
taskId: options.taskId,
provider: options.provider,
initiativeId: options.initiativeId,
mode: options.mode
}, 'spawning agent with unified lifecycle management');
let spawnedAgent: AgentInfo | undefined;
await this.lifecycleController.spawnWithRetry(
async (opts) => {
const agent = await this.spawnInternal(opts);
spawnedAgent = agent;
return { id: agent.id, name: agent.name, status: agent.status, initiativeId: agent.initiativeId, worktreeId: agent.worktreeId };
},
options
);
return spawnedAgent!;
}
/**
* Spawn a new agent to work on a task (legacy method).
* Consider using spawnWithLifecycle for better error handling.
*/
async spawn(options: SpawnAgentOptions): Promise<AgentInfo> {
return this.spawnInternal(options);
}
/**
* Internal spawn implementation without lifecycle management.
* Used by both legacy spawn() and new lifecycle-managed spawn.
*/
private async spawnInternal(options: SpawnAgentOptions): Promise<AgentInfo> {
const { taskId, cwd, mode = 'execute', provider: providerName = 'claude', initiativeId, baseBranch, branchName } = options;
let { prompt } = options;
log.info({ taskId, provider: providerName, initiativeId, mode, baseBranch, branchName }, 'spawn requested');
const provider = getProvider(providerName);
if (!provider) {
throw new Error(`Unknown provider: '${providerName}'. Available: claude, codex, gemini, cursor, auggie, amp, opencode`);
}
// Generate or validate name
let name: string;
if (options.name) {
name = options.name;
const existing = await this.repository.findByName(name);
if (existing) {
throw new Error(`Agent with name '${name}' already exists`);
}
} else {
name = await generateUniqueAlias(this.repository);
}
const alias = name;
log.debug({ alias }, 'alias generated');
// 1. Account selection
let accountId: string | null = null;
let accountConfigDir: string | null = null;
const accountResult = await this.credentialHandler.selectAccount(providerName);
if (accountResult) {
accountId = accountResult.accountId;
accountConfigDir = accountResult.configDir;
await this.credentialHandler.writeCredentialsToDisk(accountResult.account, accountConfigDir);
const { valid, refreshed } = await this.credentialHandler.ensureCredentials(accountConfigDir, accountId);
if (!valid) {
log.warn({ alias, accountId }, 'failed to refresh account credentials, proceeding anyway');
}
if (refreshed) {
await this.credentialHandler.persistRefreshedCredentials(accountId, accountConfigDir);
}
}
if (accountId) {
log.info({ alias, accountId }, 'account selected');
} else {
log.debug('no accounts available, spawning without account');
}
// 2. Create isolated worktrees
let agentCwd: string;
if (initiativeId) {
log.debug({ alias, initiativeId, baseBranch, branchName }, 'creating initiative-based worktrees');
agentCwd = await this.processManager.createProjectWorktrees(alias, initiativeId, baseBranch, branchName);
// Log projects linked to the initiative
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
log.info({
alias,
initiativeId,
projectCount: projects.length,
projects: projects.map(p => ({ name: p.name, url: p.url })),
agentCwd
}, 'initiative-based agent workdir created');
} else {
log.debug({ alias }, 'creating standalone worktree');
agentCwd = await this.processManager.createStandaloneWorktree(alias);
log.info({ alias, agentCwd }, 'standalone agent workdir created');
}
// Verify the final agentCwd exists
const cwdVerified = existsSync(agentCwd);
log.info({
alias,
agentCwd,
cwdVerified,
initiativeBasedAgent: !!initiativeId
}, 'agent workdir setup completed');
// 2b. Append workspace layout to prompt now that worktrees exist
const workspaceSection = buildWorkspaceLayout(agentCwd);
if (workspaceSection) {
prompt = prompt + workspaceSection;
}
// 3. Create agent record
const agent = await this.repository.create({
name: alias,
taskId: taskId ?? null,
initiativeId: initiativeId ?? null,
sessionId: null,
worktreeId: alias,
status: 'running',
mode,
provider: providerName,
accountId,
});
const agentId = agent.id;
// 3a. Append inter-agent communication instructions with actual agent ID
prompt = prompt + buildInterAgentCommunication(agentId, mode);
// 3b. Append preview deployment instructions if applicable
if (['execute', 'refine', 'discuss'].includes(mode) && initiativeId) {
const shouldInject = await this.shouldInjectPreviewInstructions(initiativeId);
if (shouldInject) {
prompt = prompt + buildPreviewInstructions(agentId);
}
}
// 3c. Write input files (after agent creation so we can include agentId/agentName)
if (options.inputContext) {
await writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext, agentId, agentName: alias });
log.debug({ alias }, 'input files written');
}
// 4. Build spawn command
const { command, args, env: providerEnv } = this.processManager.buildSpawnCommand(provider, prompt);
const finalCwd = cwd ?? agentCwd;
log.info({
agentId,
alias,
command,
args: args.join(' '),
finalCwd,
customCwdProvided: !!cwd,
providerEnv: Object.keys(providerEnv)
}, 'spawn command built');
// 5. Prepare process environment with credentials
const { processEnv } = await this.credentialHandler.prepareProcessEnv(providerEnv, provider, accountId);
log.debug({
agentId,
finalProcessEnv: Object.keys(processEnv),
hasAccountConfig: !!accountId,
hasOAuthToken: !!processEnv['CLAUDE_CODE_OAUTH_TOKEN'],
}, 'process environment prepared');
// 6. Spawn detached subprocess
const { pid, outputFilePath, tailer } = await this.processManager.spawnDetached(
agentId, alias, command, args, cwd ?? agentCwd, processEnv, providerName, prompt,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
this.createLogChunkCallback(agentId, alias, 1),
);
await this.repository.update(agentId, { pid, outputFilePath });
// Register agent and start polling BEFORE non-critical I/O so that a
// diagnostic-write failure can never orphan a running process.
const activeEntry: ActiveAgent = { agentId, pid, tailer, outputFilePath, agentCwd: finalCwd };
this.activeAgents.set(agentId, activeEntry);
// Emit spawned event
if (this.eventBus) {
const event: AgentSpawnedEvent = {
type: 'agent:spawned',
timestamp: new Date(),
payload: { agentId, name: alias, taskId: taskId ?? null, worktreeId: alias, provider: providerName },
};
this.eventBus.emit(event);
}
// Start polling for completion
const { cancel } = this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
);
activeEntry.cancelPoll = cancel;
// Write spawn diagnostic file (non-fatal — .cw/ may not exist yet for
// agents spawned without inputContext, e.g. conflict-resolution agents)
try {
const diagnosticDir = join(finalCwd, '.cw');
await mkdir(diagnosticDir, { recursive: true });
const diagnostic = {
timestamp: new Date().toISOString(),
agentId,
alias,
intendedCwd: finalCwd,
worktreeId: agent.worktreeId,
provider: providerName,
command,
args,
env: processEnv,
cwdExistsAtSpawn: existsSync(finalCwd),
initiativeId: initiativeId || null,
customCwdProvided: !!cwd,
accountId: accountId || null,
};
await writeFileAsync(
join(diagnosticDir, 'spawn-diagnostic.json'),
JSON.stringify(diagnostic, null, 2),
'utf-8'
);
} catch (err) {
log.warn({ agentId, alias, err: err instanceof Error ? err.message : String(err) }, 'failed to write spawn diagnostic');
}
log.info({ agentId, alias, pid }, 'detached subprocess started');
return this.toAgentInfo(agent);
}
/**
* Handle completion of a detached agent.
*/
private async handleDetachedAgentCompletion(agentId: string): Promise<void> {
if (!this.activeAgents.has(agentId)) return;
const active = this.activeAgents.get(agentId);
await this.outputHandler.handleCompletion(
agentId,
active,
(alias) => this.processManager.getAgentWorkdir(alias),
);
// Sync credentials back to DB if the agent had an account
await this.syncCredentialsPostCompletion(agentId);
this.cleanupAgentState(agentId);
// Auto-cleanup workdir after completion
await this.tryAutoCleanup(agentId);
}
/**
* Attempt auto-cleanup of agent workdir after completion.
* If dirty and retries remain, resumes the agent to commit changes.
*/
private async tryAutoCleanup(agentId: string): Promise<void> {
try {
const agent = await this.repository.findById(agentId);
if (!agent || agent.status !== 'idle') return;
const { clean, removed } = await this.cleanupManager.autoCleanupAfterCompletion(
agentId, agent.name, agent.initiativeId,
);
if (removed) {
this.commitRetryCount.delete(agentId);
log.info({ agentId, alias: agent.name }, 'auto-cleanup completed');
return;
}
if (!clean) {
const retries = this.commitRetryCount.get(agentId) ?? 0;
if (retries < MultiProviderAgentManager.MAX_COMMIT_RETRIES) {
this.commitRetryCount.set(agentId, retries + 1);
const resumed = await this.resumeForCommit(agentId);
if (resumed) {
log.info({ agentId, alias: agent.name, retry: retries + 1 }, 'resumed agent to commit uncommitted changes');
return;
}
}
log.warn({ agentId, alias: agent.name }, 'agent workdir has uncommitted changes after max retries, leaving in place');
this.commitRetryCount.delete(agentId);
}
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'auto-cleanup failed');
this.commitRetryCount.delete(agentId);
}
}
/**
* Resume an agent's session with a prompt to commit uncommitted changes.
* Returns false if the agent can't be resumed (no session, provider doesn't support resume).
*/
private async resumeForCommit(agentId: string): Promise<boolean> {
const agent = await this.repository.findById(agentId);
if (!agent?.sessionId) return false;
const provider = getProvider(agent.provider);
if (!provider || provider.resumeStyle === 'none') return false;
// Check which specific worktrees are dirty — skip resume if all clean
const dirtyPaths = await this.cleanupManager.getDirtyWorktreePaths(agent.name, agent.initiativeId);
if (dirtyPaths.length === 0) return false;
// Use absolute paths so the agent can't accidentally commit in the main repo
// Use `git add -u` (tracked files only) instead of `git add -A` to avoid staging unrelated files
const dirtyList = dirtyPaths.map(p => `- \`${p.absPath}\``).join('\n');
const commitPrompt =
'You have uncommitted changes in the following directories:\n' +
dirtyList + '\n\n' +
'For each directory listed above, `cd` into the EXACT absolute path shown, then run:\n' +
'1. `git add -u` to stage only tracked modified files\n' +
'2. `git commit -m "<message>"` with a message describing the work\n' +
'Do not use `git add -A` or `git add .`. Do not stage untracked files. Do not make any other changes.';
await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null });
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, commitPrompt);
const { processEnv } = await this.credentialHandler.prepareProcessEnv(providerEnv, provider, agent.accountId);
const prevActive = this.activeAgents.get(agentId);
prevActive?.cancelPoll?.();
if (prevActive?.tailer) {
await prevActive.tailer.stop();
}
// Determine session number for commit retry
let commitSessionNumber = 1;
if (this.logChunkRepository) {
commitSessionNumber = (await this.logChunkRepository.getSessionCount(agentId)) + 1;
}
const { pid, outputFilePath, tailer } = await this.processManager.spawnDetached(
agentId, agent.name, command, args, agentCwd, processEnv, provider.name, commitPrompt,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
this.createLogChunkCallback(agentId, agent.name, commitSessionNumber),
);
await this.repository.update(agentId, { pid, outputFilePath });
const commitActiveEntry: ActiveAgent = { agentId, pid, tailer, outputFilePath };
this.activeAgents.set(agentId, commitActiveEntry);
const { cancel: commitCancel } = this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
);
commitActiveEntry.cancelPoll = commitCancel;
return true;
}
/**
* Resume an idle agent to answer an inter-agent conversation.
* Returns false if the agent can't be resumed (no session, provider doesn't support resume, etc.).
*/
private conversationResumeLocks = new Set<string>();
async resumeForConversation(
agentId: string,
conversationId: string,
question: string,
fromAgentId: string,
): Promise<boolean> {
// Concurrency guard — prevent double-resume race
if (this.conversationResumeLocks.has(agentId)) {
log.info({ agentId, conversationId }, 'conversation resume already in progress, skipping');
return false;
}
const agent = await this.repository.findById(agentId);
if (!agent) return false;
if (agent.status !== 'idle') {
log.debug({ agentId, status: agent.status }, 'agent not idle, skipping conversation resume');
return false;
}
if (!agent.sessionId) {
log.debug({ agentId }, 'no session ID, cannot resume for conversation');
return false;
}
const provider = getProvider(agent.provider);
if (!provider || provider.resumeStyle === 'none') {
log.debug({ agentId, provider: agent.provider }, 'provider does not support resume');
return false;
}
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
if (!existsSync(agentCwd)) {
log.debug({ agentId, agentCwd }, 'worktree no longer exists, cannot resume');
return false;
}
this.conversationResumeLocks.add(agentId);
try {
const conversationPrompt =
`Another agent (ID: ${fromAgentId}) asked you a question via inter-agent communication.\n\n` +
`**Conversation ID**: ${conversationId}\n` +
`**Question**: ${question}\n\n` +
`Please answer this question using:\n` +
` cw answer "<your answer>" --conversation-id ${conversationId}\n\n` +
`After answering, check for any other pending conversations:\n` +
` cw listen --agent-id ${agentId}\n\n` +
`Answer any additional pending conversations the same way, then complete your session.`;
// Clear previous signal.json
const signalPath = join(agentCwd, '.cw/output/signal.json');
try {
await unlink(signalPath);
} catch {
// File might not exist
}
await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null });
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, conversationPrompt);
const { processEnv } = await this.credentialHandler.prepareProcessEnv(providerEnv, provider, agent.accountId);
// Stop previous tailer/poll
const prevActive = this.activeAgents.get(agentId);
prevActive?.cancelPoll?.();
if (prevActive?.tailer) {
await prevActive.tailer.stop();
}
let sessionNumber = 1;
if (this.logChunkRepository) {
sessionNumber = (await this.logChunkRepository.getSessionCount(agentId)) + 1;
}
const { pid, outputFilePath, tailer } = await this.processManager.spawnDetached(
agentId, agent.name, command, args, agentCwd, processEnv, provider.name, conversationPrompt,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
this.createLogChunkCallback(agentId, agent.name, sessionNumber),
);
await this.repository.update(agentId, { pid, outputFilePath });
const activeEntry: ActiveAgent = { agentId, pid, tailer, outputFilePath };
this.activeAgents.set(agentId, activeEntry);
if (this.eventBus) {
const event: AgentResumedEvent = {
type: 'agent:resumed',
timestamp: new Date(),
payload: { agentId, name: agent.name, taskId: agent.taskId ?? '', sessionId: agent.sessionId },
};
this.eventBus.emit(event);
}
const { cancel } = this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
);
activeEntry.cancelPoll = cancel;
log.info({ agentId, conversationId, pid }, 'resumed idle agent for conversation');
return true;
} finally {
this.conversationResumeLocks.delete(agentId);
}
}
/**
* Sync credentials from agent's config dir back to DB after completion.
* The subprocess may have refreshed tokens mid-session; this ensures
* the DB stays current and the next spawn uses fresh tokens.
*/
private async syncCredentialsPostCompletion(agentId: string): Promise<void> {
if (!this.accountRepository) return;
try {
const agent = await this.repository.findById(agentId);
if (!agent?.accountId) return;
const { getAccountConfigDir } = await import('./accounts/paths.js');
const configDir = getAccountConfigDir(this.workspaceRoot, agent.accountId);
await this.credentialHandler.persistRefreshedCredentials(agent.accountId, configDir);
log.debug({ agentId, accountId: agent.accountId }, 'post-completion credential sync done');
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'post-completion credential sync failed');
}
}
/**
* Stop a running agent.
*/
async stop(agentId: string): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) throw new Error(`Agent '${agentId}' not found`);
log.info({ agentId, name: agent.name }, 'stopping agent');
const active = this.activeAgents.get(agentId);
if (active) {
try { process.kill(active.pid, 'SIGTERM'); } catch { /* already exited */ }
await active.tailer.stop();
}
this.cleanupAgentState(agentId);
this.commitRetryCount.delete(agentId);
// Sync credentials before marking stopped
await this.syncCredentialsPostCompletion(agentId);
await this.repository.update(agentId, { status: 'stopped', pendingQuestions: null });
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: { agentId, name: agent.name, taskId: agent.taskId ?? '', reason: 'user_requested' },
};
this.eventBus.emit(event);
}
}
/**
* List all agents with their current status.
*/
async list(): Promise<AgentInfo[]> {
const agents = await this.repository.findAll();
return agents.map((a) => this.toAgentInfo(a));
}
/**
* Get a specific agent by ID.
*/
async get(agentId: string): Promise<AgentInfo | null> {
const agent = await this.repository.findById(agentId);
return agent ? this.toAgentInfo(agent) : null;
}
/**
* Get a specific agent by name.
*/
async getByName(name: string): Promise<AgentInfo | null> {
const agent = await this.repository.findByName(name);
return agent ? this.toAgentInfo(agent) : null;
}
/**
* Resume an agent using the unified lifecycle controller.
* Features comprehensive retry, error handling, and cleanup.
*/
async resumeWithLifecycle(agentId: string, answers: Record<string, string>): Promise<void> {
log.info({
agentId,
answerKeys: Object.keys(answers)
}, 'resuming agent with unified lifecycle management');
await this.lifecycleController.resumeWithRetry(
(id, modifiedAnswers) => this.resumeInternal(id, modifiedAnswers),
{ agentId, answers }
);
}
/**
* Resume an agent that's waiting for input (legacy method).
* Consider using resumeWithLifecycle for better error handling.
*/
async resume(agentId: string, answers: Record<string, string>): Promise<void> {
return this.resumeInternal(agentId, answers);
}
/**
* Internal resume implementation without lifecycle management.
* Used by both legacy resume() and new lifecycle-managed resume.
*/
private async resumeInternal(agentId: string, answers: Record<string, string>): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) throw new Error(`Agent '${agentId}' not found`);
if (agent.status !== 'waiting_for_input') {
throw new Error(`Agent '${agent.name}' is not waiting for input (status: ${agent.status})`);
}
if (!agent.sessionId) {
throw new Error(`Agent '${agent.name}' has no session to resume`);
}
log.info({ agentId, sessionId: agent.sessionId, provider: agent.provider }, 'resuming agent');
const provider = getProvider(agent.provider);
if (!provider) throw new Error(`Unknown provider: '${agent.provider}'`);
if (provider.resumeStyle === 'none') {
throw new Error(`Provider '${provider.name}' does not support resume`);
}
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
const prompt = this.outputHandler.formatAnswersAsPrompt(answers);
// Clear previous signal.json to ensure clean completion detection
const signalPath = join(agentCwd, '.cw/output/signal.json');
try {
await unlink(signalPath);
log.debug({ agentId, signalPath }, 'cleared previous signal.json for resume');
} catch {
// File might not exist, which is fine
}
await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null });
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, prompt);
log.debug({ command, args: args.join(' ') }, 'resume command built');
// Prepare process environment with credentials
const { processEnv } = await this.credentialHandler.prepareProcessEnv(providerEnv, provider, agent.accountId);
// Stop previous tailer and cancel previous poll
const prevActive = this.activeAgents.get(agentId);
prevActive?.cancelPoll?.();
if (prevActive?.tailer) {
await prevActive.tailer.stop();
}
// Determine session number for this resume
let resumeSessionNumber = 1;
if (this.logChunkRepository) {
resumeSessionNumber = (await this.logChunkRepository.getSessionCount(agentId)) + 1;
}
const { pid, outputFilePath, tailer } = await this.processManager.spawnDetached(
agentId, agent.name, command, args, agentCwd, processEnv, provider.name, prompt,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
this.createLogChunkCallback(agentId, agent.name, resumeSessionNumber),
);
await this.repository.update(agentId, { pid, outputFilePath });
const resumeActiveEntry: ActiveAgent = { agentId, pid, tailer, outputFilePath };
this.activeAgents.set(agentId, resumeActiveEntry);
log.info({ agentId, pid }, 'resume detached subprocess started');
if (this.eventBus) {
const event: AgentResumedEvent = {
type: 'agent:resumed',
timestamp: new Date(),
payload: { agentId, name: agent.name, taskId: agent.taskId ?? '', sessionId: agent.sessionId },
};
this.eventBus.emit(event);
}
const { cancel: resumeCancel } = this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
);
resumeActiveEntry.cancelPoll = resumeCancel;
}
/**
* Get the result of an agent's work.
*/
async getResult(agentId: string): Promise<AgentResult | null> {
return this.outputHandler.getResult(agentId, this.activeAgents.get(agentId));
}
/**
* Get pending questions for an agent waiting for input.
*/
async getPendingQuestions(agentId: string): Promise<PendingQuestions | null> {
return this.outputHandler.getPendingQuestions(agentId, this.activeAgents.get(agentId));
}
/**
* Delete an agent and clean up all associated resources.
*/
async delete(agentId: string): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) throw new Error(`Agent '${agentId}' not found`);
log.info({ agentId, name: agent.name }, 'deleting agent');
// 1. Kill process, stop tailer, clear all in-memory state
const active = this.activeAgents.get(agentId);
if (active) {
try { process.kill(active.pid, 'SIGTERM'); } catch { /* already exited */ }
await active.tailer.stop();
}
this.cleanupAgentState(agentId);
this.commitRetryCount.delete(agentId);
// 2. Best-effort cleanup
try { await this.cleanupManager.removeAgentWorktrees(agent.name, agent.initiativeId); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to remove worktrees'); }
try { await this.cleanupManager.removeAgentBranches(agent.name, agent.initiativeId); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to remove branches'); }
try { await this.cleanupManager.removeAgentLogs(agent.name); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to remove logs'); }
// 3b. Delete log chunks from DB
if (this.logChunkRepository) {
try { await this.logChunkRepository.deleteByAgentId(agentId); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to delete log chunks'); }
}
// 4. Delete DB record
await this.repository.delete(agentId);
// 5. Emit deleted event
if (this.eventBus) {
const event: AgentDeletedEvent = {
type: 'agent:deleted',
timestamp: new Date(),
payload: { agentId, name: agent.name },
};
this.eventBus.emit(event);
}
log.info({ agentId, name: agent.name }, 'agent deleted');
}
/**
* Dismiss an agent.
*/
async dismiss(agentId: string): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) throw new Error(`Agent '${agentId}' not found`);
log.info({ agentId, name: agent.name }, 'dismissing agent');
this.cleanupAgentState(agentId);
this.commitRetryCount.delete(agentId);
// Best-effort filesystem cleanup
try { await this.cleanupManager.removeAgentWorktrees(agent.name, agent.initiativeId); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'dismiss: failed to remove worktrees'); }
try { await this.cleanupManager.removeAgentBranches(agent.name, agent.initiativeId); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'dismiss: failed to remove branches'); }
try { await this.cleanupManager.removeAgentLogs(agent.name); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'dismiss: failed to remove logs'); }
await this.repository.update(agentId, {
userDismissedAt: new Date(),
updatedAt: new Date(),
});
log.info({ agentId, name: agent.name }, 'agent dismissed');
}
/**
* Clean up orphaned agent workdirs.
*/
async cleanupOrphanedWorkdirs(): Promise<void> {
return this.cleanupManager.cleanupOrphanedWorkdirs();
}
/**
* Clean up orphaned agent log directories.
*/
async cleanupOrphanedLogs(): Promise<void> {
return this.cleanupManager.cleanupOrphanedLogs();
}
/**
* Reconcile agent state after server restart.
*/
async reconcileAfterRestart(): Promise<void> {
const reconcileLogChunkRepo = this.logChunkRepository;
await this.cleanupManager.reconcileAfterRestart(
this.activeAgents,
(agentId, event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
(agentId, rawOutput, provider) => this.outputHandler.processAgentOutput(agentId, rawOutput, provider, (alias) => this.processManager.getAgentWorkdir(alias)),
(agentId, pid) => {
const { cancel } = this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
);
const active = this.activeAgents.get(agentId);
if (active) active.cancelPoll = cancel;
},
reconcileLogChunkRepo
? (agentId, agentName, content) => {
// Determine session number asynchronously — use fire-and-forget
reconcileLogChunkRepo.getSessionCount(agentId).then(count => {
return reconcileLogChunkRepo.insertChunk({
agentId,
agentName,
sessionNumber: count + 1,
content,
});
}).catch(err => log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to persist log chunk during reconciliation'));
}
: undefined,
);
}
/**
* Handle process crashed event specifically for agents.
* Check if the agent actually completed successfully despite the non-zero exit code.
*/
private async handleProcessCrashed(processId: string, exitCode: number | null, signal: string | null): Promise<void> {
try {
// Check if this is an agent process
const agent = await this.repository.findById(processId);
if (!agent) {
return; // Not our agent
}
// Store exit code and signal for debugging
await this.repository.update(processId, { exitCode });
log.info({
agentId: processId,
name: agent.name,
exitCode,
signal,
outputFilePath: agent.outputFilePath
}, 'agent process crashed, analyzing completion status');
// Check if the agent has output that indicates successful completion
if (agent.outputFilePath) {
const hasCompletion = await this.checkAgentCompletionResult(agent.worktreeId);
if (hasCompletion) {
log.info({
agentId: processId,
name: agent.name,
exitCode,
signal
}, 'agent marked as crashed but completed successfully - completion already handled by polling');
// Note: We don't call handleCompletion() here because the polling handler
// (handleDetachedAgentCompletion) already processes completions. The mutex
// in OutputHandler.handleCompletion() prevents duplicate processing.
log.info({
agentId: processId,
name: agent.name,
exitCode
}, 'completion detection confirmed - deferring to polling handler');
} else {
log.warn({
agentId: processId,
name: agent.name,
exitCode,
signal,
outputFilePath: agent.outputFilePath
}, 'agent crashed and no successful completion detected - marking as truly crashed');
// Only mark as crashed if agent truly crashed (no completion detected)
await this.repository.update(processId, { status: 'crashed' });
}
} else {
log.warn({
agentId: processId,
name: agent.name,
exitCode,
signal
}, 'agent crashed with no output file path - marking as crashed');
await this.repository.update(processId, { status: 'crashed' });
}
} catch (err) {
log.error({
processId,
exitCode,
signal,
err: err instanceof Error ? err.message : String(err)
}, 'failed to check agent completion after crash');
}
}
/**
* Check if agent completed successfully by reading signal.json file.
* Probes the workspace/ subdirectory for standalone agents.
*/
private async checkAgentCompletionResult(worktreeId: string): Promise<boolean> {
try {
// Resolve actual agent workdir — standalone agents have .cw inside workspace/ subdir
let agentWorkdir = this.processManager.getAgentWorkdir(worktreeId);
const workspaceSub = join(agentWorkdir, 'workspace');
if (!existsSync(join(agentWorkdir, '.cw', 'output')) && existsSync(join(workspaceSub, '.cw'))) {
agentWorkdir = workspaceSub;
}
const signalPath = join(agentWorkdir, '.cw/output/signal.json');
if (!existsSync(signalPath)) {
log.debug({ worktreeId, signalPath }, 'no signal.json found - agent not completed');
return false;
}
const signalContent = await readFile(signalPath, 'utf-8');
const signal = JSON.parse(signalContent);
// Agent completed if status is done, questions, or error
const completed = signal.status === 'done' || signal.status === 'questions' || signal.status === 'error';
if (completed) {
log.debug({ worktreeId, signal }, 'agent completion detected via signal.json');
} else {
log.debug({ worktreeId, signal }, 'signal.json found but status indicates incomplete');
}
return completed;
} catch (err) {
log.warn({ worktreeId, err: err instanceof Error ? err.message : String(err) }, 'failed to read or parse signal.json');
return false;
}
}
/**
* Check whether preview instructions should be injected for this initiative.
* Returns true if exactly one project linked and it has .cw-preview.yml.
*/
private async shouldInjectPreviewInstructions(initiativeId: string): Promise<boolean> {
try {
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
if (projects.length !== 1) return false;
const project = projects[0];
const cloneDir = join(this.workspaceRoot, getProjectCloneDir(project.name, project.id));
return existsSync(join(cloneDir, '.cw-preview.yml'));
} catch {
return false;
}
}
/**
* Convert database agent record to AgentInfo.
*/
private toAgentInfo(agent: {
id: string;
name: string;
taskId: string | null;
initiativeId: string | null;
sessionId: string | null;
worktreeId: string;
status: string;
mode: string;
provider: string;
accountId: string | null;
createdAt: Date;
updatedAt: Date;
userDismissedAt?: Date | null;
}): AgentInfo {
return {
id: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
initiativeId: agent.initiativeId,
sessionId: agent.sessionId,
worktreeId: agent.worktreeId,
status: agent.status as AgentStatus,
mode: agent.mode as AgentMode,
provider: agent.provider,
accountId: agent.accountId,
createdAt: agent.createdAt,
updatedAt: agent.updatedAt,
userDismissedAt: agent.userDismissedAt,
};
}
}