Files
Codewalkers/apps/server/agent/manager.ts
Lukas May 28521e1c20 chore: merge main into cw/small-change-flow
Integrates main branch changes (headquarters dashboard, task retry count,
agent prompt persistence, remote sync improvements) with the initiative's
errand agent feature. Both features coexist in the merged result.

Key resolutions:
- Schema: take main's errands table (nullable projectId, no conflictFiles,
  with errandsRelations); migrate to 0035_faulty_human_fly
- Router: keep both errandProcedures and headquartersProcedures
- Errand prompt: take main's simpler version (no question-asking flow)
- Manager: take main's status check (running|idle only, no waiting_for_input)
- Tests: update to match removed conflictFiles field and undefined vs null
2026-03-06 16:48:12 +01:00

1207 lines
46 KiB
TypeScript

/**
* Multi-Provider Agent Manager — Orchestrator
*
* Implementation of AgentManager port supporting multiple CLI providers.
* Delegates to extracted helpers:
* - ProcessManager: subprocess spawn/kill/poll, worktree creation, command building
* - CredentialHandler: account selection, credential write/refresh, exhaustion handling
* - OutputHandler: stream events, signal parsing, file reading, result capture
* - CleanupManager: worktree/branch/log removal, orphan cleanup, reconciliation
*/
import type {
AgentManager,
AgentInfo,
SpawnAgentOptions,
AgentResult,
AgentStatus,
AgentMode,
PendingQuestions,
} from './types.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { AccountRepository } from '../db/repositories/account-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { ChangeSetRepository } from '../db/repositories/change-set-repository.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { PageRepository } from '../db/repositories/page-repository.js';
import type { LogChunkRepository } from '../db/repositories/log-chunk-repository.js';
import type { ChatSessionRepository } from '../db/repositories/chat-session-repository.js';
import type { ReviewCommentRepository } from '../db/repositories/review-comment-repository.js';
import { generateUniqueAlias } from './alias.js';
import type {
EventBus,
AgentSpawnedEvent,
AgentStoppedEvent,
AgentResumedEvent,
AgentDeletedEvent,
ProcessCrashedEvent,
} from '../events/index.js';
import { writeInputFiles } from './file-io.js';
import { buildWorkspaceLayout, buildInterAgentCommunication, buildPreviewInstructions } from './prompts/index.js';
import { getProvider } from './providers/registry.js';
import { createModuleLogger } from '../logger/index.js';
import { getProjectCloneDir } from '../git/project-clones.js';
import { join } from 'node:path';
import { unlink, readFile, writeFile as writeFileAsync, mkdir } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import type { AccountCredentialManager } from './credentials/types.js';
import { ProcessManager } from './process-manager.js';
import { CredentialHandler } from './credential-handler.js';
import { OutputHandler, type ActiveAgent } from './output-handler.js';
import { CleanupManager } from './cleanup-manager.js';
import { createLifecycleController } from './lifecycle/factory.js';
import type { AgentLifecycleController } from './lifecycle/controller.js';
import { AgentExhaustedError, AgentFailureError } from './lifecycle/retry-policy.js';
import { FileSystemSignalManager } from './lifecycle/signal-manager.js';
import type { SignalManager } from './lifecycle/signal-manager.js';
const log = createModuleLogger('agent-manager');
export class MultiProviderAgentManager implements AgentManager {
private static readonly MAX_COMMIT_RETRIES = 1;
private activeAgents: Map<string, ActiveAgent> = new Map();
private commitRetryCount: Map<string, number> = new Map();
private processManager: ProcessManager;
private credentialHandler: CredentialHandler;
private outputHandler: OutputHandler;
private cleanupManager: CleanupManager;
private lifecycleController: AgentLifecycleController;
private signalManager: SignalManager;
constructor(
private repository: AgentRepository,
private workspaceRoot: string,
private projectRepository: ProjectRepository,
private accountRepository?: AccountRepository,
private eventBus?: EventBus,
private credentialManager?: AccountCredentialManager,
private changeSetRepository?: ChangeSetRepository,
private phaseRepository?: PhaseRepository,
private taskRepository?: TaskRepository,
private pageRepository?: PageRepository,
private logChunkRepository?: LogChunkRepository,
private debug: boolean = false,
processManagerOverride?: ProcessManager,
private chatSessionRepository?: ChatSessionRepository,
private reviewCommentRepository?: ReviewCommentRepository,
) {
this.signalManager = new FileSystemSignalManager();
this.processManager = processManagerOverride ?? new ProcessManager(workspaceRoot, projectRepository);
this.credentialHandler = new CredentialHandler(workspaceRoot, accountRepository, credentialManager);
this.outputHandler = new OutputHandler(repository, eventBus, changeSetRepository, phaseRepository, taskRepository, pageRepository, this.signalManager, chatSessionRepository, reviewCommentRepository);
this.cleanupManager = new CleanupManager(workspaceRoot, repository, projectRepository, eventBus, debug, this.signalManager);
this.lifecycleController = createLifecycleController({
repository,
processManager: this.processManager,
cleanupManager: this.cleanupManager,
accountRepository,
debug,
eventBus,
});
// Listen for process crashed events to handle agents specially
if (eventBus) {
eventBus.on('process:crashed', async (event: ProcessCrashedEvent) => {
await this.handleProcessCrashed(event.payload.processId, event.payload.exitCode, event.payload.signal);
});
}
}
/**
* Centralized cleanup of all in-memory state for an agent.
* Cancels polling timer, removes from activeAgents.
* NOTE: Does NOT clear commitRetryCount — that's managed by tryAutoCleanup()
* and explicitly by stop()/delete() to avoid resetting retries mid-cycle.
*/
private cleanupAgentState(agentId: string): void {
const active = this.activeAgents.get(agentId);
if (active?.cancelPoll) active.cancelPoll();
this.activeAgents.delete(agentId);
}
/**
* Create a fire-and-forget callback for persisting raw output chunks to the DB.
* Returns undefined if no logChunkRepository is configured.
*/
private createLogChunkCallback(
agentId: string,
agentName: string,
sessionNumber: number,
): ((content: string) => void) | undefined {
const repo = this.logChunkRepository;
if (!repo) return undefined;
return (content) => {
repo.insertChunk({ agentId, agentName, sessionNumber, content })
.then(() => {
if (this.eventBus) {
this.eventBus.emit({
type: 'agent:output' as const,
timestamp: new Date(),
payload: { agentId, stream: 'stdout', data: content },
});
}
})
.catch(err => log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to persist log chunk'));
};
}
/**
* Spawn a new agent using the unified lifecycle controller.
* Features comprehensive retry, error handling, and cleanup.
*/
async spawnWithLifecycle(options: SpawnAgentOptions): Promise<AgentInfo> {
log.info({
taskId: options.taskId,
provider: options.provider,
initiativeId: options.initiativeId,
mode: options.mode
}, 'spawning agent with unified lifecycle management');
let spawnedAgent: AgentInfo | undefined;
await this.lifecycleController.spawnWithRetry(
async (opts) => {
const agent = await this.spawnInternal(opts);
spawnedAgent = agent;
return { id: agent.id, name: agent.name, status: agent.status, initiativeId: agent.initiativeId, worktreeId: agent.worktreeId };
},
options
);
return spawnedAgent!;
}
/**
* Spawn a new agent to work on a task (legacy method).
* Consider using spawnWithLifecycle for better error handling.
*/
async spawn(options: SpawnAgentOptions): Promise<AgentInfo> {
return this.spawnInternal(options);
}
/**
* Internal spawn implementation without lifecycle management.
* Used by both legacy spawn() and new lifecycle-managed spawn.
*/
private async spawnInternal(options: SpawnAgentOptions): Promise<AgentInfo> {
const { taskId, cwd, mode = 'execute', provider: providerName = 'claude', initiativeId, baseBranch, branchName } = options;
let { prompt } = options;
log.info({ taskId, provider: providerName, initiativeId, mode, baseBranch, branchName }, 'spawn requested');
const provider = getProvider(providerName);
if (!provider) {
throw new Error(`Unknown provider: '${providerName}'. Available: claude, codex, gemini, cursor, auggie, amp, opencode`);
}
// Generate or validate name
let name: string;
if (options.name) {
name = options.name;
const existing = await this.repository.findByName(name);
if (existing) {
throw new Error(`Agent with name '${name}' already exists`);
}
} else {
name = await generateUniqueAlias(this.repository);
}
const alias = name;
log.debug({ alias }, 'alias generated');
// 1. Account selection
let accountId: string | null = null;
let accountConfigDir: string | null = null;
const accountResult = await this.credentialHandler.selectAccount(providerName);
if (accountResult) {
accountId = accountResult.accountId;
accountConfigDir = accountResult.configDir;
await this.credentialHandler.writeCredentialsToDisk(accountResult.account, accountConfigDir);
const { valid, refreshed } = await this.credentialHandler.ensureCredentials(accountConfigDir, accountId);
if (!valid) {
log.warn({ alias, accountId }, 'failed to refresh account credentials, proceeding anyway');
}
if (refreshed) {
await this.credentialHandler.persistRefreshedCredentials(accountId, accountConfigDir);
}
}
if (accountId) {
log.info({ alias, accountId }, 'account selected');
} else {
log.debug('no accounts available, spawning without account');
}
// 2. Create isolated worktrees
let agentCwd: string;
if (initiativeId) {
log.debug({ alias, initiativeId, baseBranch, branchName }, 'creating initiative-based worktrees');
agentCwd = await this.processManager.createProjectWorktrees(alias, initiativeId, baseBranch, branchName);
// Verify each project worktree subdirectory actually exists
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
for (const project of projects) {
const projectWorktreePath = join(agentCwd, project.name);
if (!existsSync(projectWorktreePath)) {
throw new Error(
`Worktree subdirectory missing after createProjectWorktrees: ${projectWorktreePath}. ` +
`Agent ${alias} cannot run without an isolated worktree.`
);
}
}
log.info({
alias,
initiativeId,
projectCount: projects.length,
projects: projects.map(p => ({ name: p.name, url: p.url })),
agentCwd
}, 'initiative-based agent workdir created');
} else {
log.debug({ alias }, 'creating standalone worktree');
agentCwd = await this.processManager.createStandaloneWorktree(alias);
log.info({ alias, agentCwd }, 'standalone agent workdir created');
}
// Verify the final agentCwd exists
if (!existsSync(agentCwd)) {
throw new Error(`Agent workdir does not exist after creation: ${agentCwd}`);
}
log.info({
alias,
agentCwd,
initiativeBasedAgent: !!initiativeId
}, 'agent workdir setup completed');
// 2b. Append workspace layout to prompt now that worktrees exist
const workspaceSection = buildWorkspaceLayout(agentCwd);
if (workspaceSection) {
prompt = prompt + workspaceSection;
}
// 3. Create agent record
const agent = await this.repository.create({
name: alias,
taskId: taskId ?? null,
initiativeId: initiativeId ?? null,
sessionId: null,
worktreeId: alias,
status: 'running',
mode,
provider: providerName,
accountId,
});
const agentId = agent.id;
// 3a. Append inter-agent communication + preview instructions (skipped for focused agents)
if (!options.skipPromptExtras) {
prompt = prompt + buildInterAgentCommunication(agentId, mode);
if (['execute', 'refine', 'discuss'].includes(mode) && initiativeId) {
const shouldInject = await this.shouldInjectPreviewInstructions(initiativeId);
if (shouldInject) {
prompt = prompt + buildPreviewInstructions(agentId);
}
}
}
// 3c. Write input files (after agent creation so we can include agentId/agentName)
if (options.inputContext) {
await writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext, agentId, agentName: alias });
log.debug({ alias }, 'input files written');
} else {
// Always create .cw/output/ at the agent workdir root so the agent
// writes signal.json here rather than in a project subdirectory.
await mkdir(join(agentCwd, '.cw', 'output'), { recursive: true });
}
// 4. Build spawn command
const { command, args, env: providerEnv } = this.processManager.buildSpawnCommand(provider, prompt);
const finalCwd = cwd ?? agentCwd;
log.info({
agentId,
alias,
command,
args: args.join(' '),
finalCwd,
customCwdProvided: !!cwd,
providerEnv: Object.keys(providerEnv)
}, 'spawn command built');
// 5. Prepare process environment with credentials
const { processEnv } = await this.credentialHandler.prepareProcessEnv(providerEnv, provider, accountId);
log.debug({
agentId,
finalProcessEnv: Object.keys(processEnv),
hasAccountConfig: !!accountId,
hasOAuthToken: !!processEnv['CLAUDE_CODE_OAUTH_TOKEN'],
}, 'process environment prepared');
// 6. Spawn detached subprocess
const { pid, outputFilePath, tailer } = await this.processManager.spawnDetached(
agentId, alias, command, args, cwd ?? agentCwd, processEnv, providerName, prompt,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
this.createLogChunkCallback(agentId, alias, 1),
);
await this.repository.update(agentId, { pid, outputFilePath, prompt });
// Register agent and start polling BEFORE non-critical I/O so that a
// diagnostic-write failure can never orphan a running process.
const activeEntry: ActiveAgent = { agentId, pid, tailer, outputFilePath, agentCwd: finalCwd };
this.activeAgents.set(agentId, activeEntry);
// Emit spawned event
if (this.eventBus) {
const event: AgentSpawnedEvent = {
type: 'agent:spawned',
timestamp: new Date(),
payload: { agentId, name: alias, taskId: taskId ?? null, worktreeId: alias, provider: providerName },
};
this.eventBus.emit(event);
}
// Start polling for completion
const { cancel } = this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
);
activeEntry.cancelPoll = cancel;
// Write spawn diagnostic file (non-fatal — .cw/ may not exist yet for
// agents spawned without inputContext, e.g. conflict-resolution agents)
try {
const diagnosticDir = join(finalCwd, '.cw');
await mkdir(diagnosticDir, { recursive: true });
const diagnostic = {
timestamp: new Date().toISOString(),
agentId,
alias,
intendedCwd: finalCwd,
worktreeId: agent.worktreeId,
provider: providerName,
command,
args,
env: processEnv,
cwdExistsAtSpawn: existsSync(finalCwd),
initiativeId: initiativeId || null,
customCwdProvided: !!cwd,
accountId: accountId || null,
};
await writeFileAsync(
join(diagnosticDir, 'spawn-diagnostic.json'),
JSON.stringify(diagnostic, null, 2),
'utf-8'
);
} catch (err) {
log.warn({ agentId, alias, err: err instanceof Error ? err.message : String(err) }, 'failed to write spawn diagnostic');
}
log.info({ agentId, alias, pid }, 'detached subprocess started');
return this.toAgentInfo(agent);
}
/**
* Handle completion of a detached agent.
*/
private async handleDetachedAgentCompletion(agentId: string): Promise<void> {
if (!this.activeAgents.has(agentId)) return;
const active = this.activeAgents.get(agentId);
await this.outputHandler.handleCompletion(
agentId,
active,
(alias) => this.processManager.getAgentWorkdir(alias),
);
// Sync credentials back to DB if the agent had an account
await this.syncCredentialsPostCompletion(agentId);
this.cleanupAgentState(agentId);
// Auto-cleanup workdir after completion
await this.tryAutoCleanup(agentId);
}
/**
* Attempt auto-cleanup of agent workdir after completion.
* If dirty and retries remain, resumes the agent to commit changes.
*/
private async tryAutoCleanup(agentId: string): Promise<void> {
try {
const agent = await this.repository.findById(agentId);
if (!agent || agent.status !== 'idle') return;
const { clean, removed } = await this.cleanupManager.autoCleanupAfterCompletion(
agentId, agent.name, agent.initiativeId,
);
if (removed) {
this.commitRetryCount.delete(agentId);
log.info({ agentId, alias: agent.name }, 'auto-cleanup completed');
return;
}
if (!clean) {
const retries = this.commitRetryCount.get(agentId) ?? 0;
if (retries < MultiProviderAgentManager.MAX_COMMIT_RETRIES) {
this.commitRetryCount.set(agentId, retries + 1);
const resumed = await this.resumeForCommit(agentId);
if (resumed) {
log.info({ agentId, alias: agent.name, retry: retries + 1 }, 'resumed agent to commit uncommitted changes');
return;
}
}
log.warn({ agentId, alias: agent.name }, 'agent workdir has uncommitted changes after max retries, leaving in place');
this.commitRetryCount.delete(agentId);
}
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'auto-cleanup failed');
this.commitRetryCount.delete(agentId);
}
}
/**
* Resume an agent's session with a prompt to commit uncommitted changes.
* Returns false if the agent can't be resumed (no session, provider doesn't support resume).
*/
private async resumeForCommit(agentId: string): Promise<boolean> {
const agent = await this.repository.findById(agentId);
if (!agent?.sessionId) return false;
const provider = getProvider(agent.provider);
if (!provider || provider.resumeStyle === 'none') return false;
// Check which specific worktrees are dirty — skip resume if all clean
const dirtyPaths = await this.cleanupManager.getDirtyWorktreePaths(agent.name, agent.initiativeId);
if (dirtyPaths.length === 0) return false;
// Use absolute paths so the agent can't accidentally commit in the main repo
// Use `git add -u` (tracked files only) instead of `git add -A` to avoid staging unrelated files
const dirtyList = dirtyPaths.map(p => `- \`${p.absPath}\``).join('\n');
const commitPrompt =
'You have uncommitted changes in the following directories:\n' +
dirtyList + '\n\n' +
'For each directory listed above, `cd` into the EXACT absolute path shown, then run:\n' +
'1. `git add -u` to stage only tracked modified files\n' +
'2. `git commit -m "<message>"` with a message describing the work\n' +
'Do not use `git add -A` or `git add .`. Do not stage untracked files. Do not make any other changes.';
await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null });
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, commitPrompt);
const { processEnv } = await this.credentialHandler.prepareProcessEnv(providerEnv, provider, agent.accountId);
const prevActive = this.activeAgents.get(agentId);
prevActive?.cancelPoll?.();
if (prevActive?.tailer) {
await prevActive.tailer.stop();
}
// Determine session number for commit retry
let commitSessionNumber = 1;
if (this.logChunkRepository) {
commitSessionNumber = (await this.logChunkRepository.getSessionCount(agentId)) + 1;
}
const { pid, outputFilePath, tailer } = await this.processManager.spawnDetached(
agentId, agent.name, command, args, agentCwd, processEnv, provider.name, commitPrompt,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
this.createLogChunkCallback(agentId, agent.name, commitSessionNumber),
);
await this.repository.update(agentId, { pid, outputFilePath });
const commitActiveEntry: ActiveAgent = { agentId, pid, tailer, outputFilePath };
this.activeAgents.set(agentId, commitActiveEntry);
const { cancel: commitCancel } = this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
);
commitActiveEntry.cancelPoll = commitCancel;
return true;
}
/**
* Resume an idle agent to answer an inter-agent conversation.
* Returns false if the agent can't be resumed (no session, provider doesn't support resume, etc.).
*/
private conversationResumeLocks = new Set<string>();
async resumeForConversation(
agentId: string,
conversationId: string,
question: string,
fromAgentId: string,
): Promise<boolean> {
// Concurrency guard — prevent double-resume race
if (this.conversationResumeLocks.has(agentId)) {
log.info({ agentId, conversationId }, 'conversation resume already in progress, skipping');
return false;
}
const agent = await this.repository.findById(agentId);
if (!agent) return false;
if (agent.status !== 'idle') {
log.debug({ agentId, status: agent.status }, 'agent not idle, skipping conversation resume');
return false;
}
if (!agent.sessionId) {
log.debug({ agentId }, 'no session ID, cannot resume for conversation');
return false;
}
const provider = getProvider(agent.provider);
if (!provider || provider.resumeStyle === 'none') {
log.debug({ agentId, provider: agent.provider }, 'provider does not support resume');
return false;
}
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
if (!existsSync(agentCwd)) {
log.debug({ agentId, agentCwd }, 'worktree no longer exists, cannot resume');
return false;
}
this.conversationResumeLocks.add(agentId);
try {
const conversationPrompt =
`Another agent (ID: ${fromAgentId}) asked you a question via inter-agent communication.\n\n` +
`**Conversation ID**: ${conversationId}\n` +
`**Question**: ${question}\n\n` +
`Please answer this question using:\n` +
` cw answer "<your answer>" --conversation-id ${conversationId}\n\n` +
`After answering, check for any other pending conversations:\n` +
` cw listen --agent-id ${agentId}\n\n` +
`Answer any additional pending conversations the same way, then complete your session.`;
// Clear previous signal.json
const signalPath = join(agentCwd, '.cw/output/signal.json');
try {
await unlink(signalPath);
} catch {
// File might not exist
}
await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null });
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, conversationPrompt);
const { processEnv } = await this.credentialHandler.prepareProcessEnv(providerEnv, provider, agent.accountId);
// Stop previous tailer/poll
const prevActive = this.activeAgents.get(agentId);
prevActive?.cancelPoll?.();
if (prevActive?.tailer) {
await prevActive.tailer.stop();
}
let sessionNumber = 1;
if (this.logChunkRepository) {
sessionNumber = (await this.logChunkRepository.getSessionCount(agentId)) + 1;
}
const { pid, outputFilePath, tailer } = await this.processManager.spawnDetached(
agentId, agent.name, command, args, agentCwd, processEnv, provider.name, conversationPrompt,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
this.createLogChunkCallback(agentId, agent.name, sessionNumber),
);
await this.repository.update(agentId, { pid, outputFilePath });
const activeEntry: ActiveAgent = { agentId, pid, tailer, outputFilePath };
this.activeAgents.set(agentId, activeEntry);
if (this.eventBus) {
// verified: payload matches AgentResumedEvent shape (agentId, name, taskId, sessionId)
const event: AgentResumedEvent = {
type: 'agent:resumed',
timestamp: new Date(),
payload: { agentId, name: agent.name, taskId: agent.taskId ?? '', sessionId: agent.sessionId },
};
this.eventBus.emit(event);
}
const { cancel } = this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
);
activeEntry.cancelPoll = cancel;
log.info({ agentId, conversationId, pid }, 'resumed idle agent for conversation');
return true;
} finally {
this.conversationResumeLocks.delete(agentId);
}
}
/**
* Deliver a user message to a running or idle errand agent.
* Does not use the conversations table — the message is injected directly
* as the next resume prompt for the agent's Claude Code session.
*/
async sendUserMessage(agentId: string, message: string): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) throw new Error(`Agent not found: ${agentId}`);
if (agent.status !== 'running' && agent.status !== 'idle') {
throw new Error(`Agent is not running (status: ${agent.status})`);
}
if (!agent.sessionId) {
throw new Error('Agent has no session ID');
}
const provider = getProvider(agent.provider);
if (!provider) throw new Error(`Unknown provider: ${agent.provider}`);
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
// Clear previous signal.json
const signalPath = join(agentCwd, '.cw/output/signal.json');
try {
await unlink(signalPath);
} catch {
// File might not exist
}
await this.repository.update(agentId, { status: 'running', result: null });
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, message);
const { processEnv } = await this.credentialHandler.prepareProcessEnv(providerEnv, provider, agent.accountId);
// Stop previous tailer/poll
const prevActive = this.activeAgents.get(agentId);
prevActive?.cancelPoll?.();
if (prevActive?.tailer) {
await prevActive.tailer.stop();
}
let sessionNumber = 1;
if (this.logChunkRepository) {
sessionNumber = (await this.logChunkRepository.getSessionCount(agentId)) + 1;
}
const { pid, outputFilePath, tailer } = await this.processManager.spawnDetached(
agentId, agent.name, command, args, agentCwd, processEnv, provider.name, message,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
this.createLogChunkCallback(agentId, agent.name, sessionNumber),
);
await this.repository.update(agentId, { pid, outputFilePath });
const activeEntry: ActiveAgent = { agentId, pid, tailer, outputFilePath };
this.activeAgents.set(agentId, activeEntry);
const { cancel } = this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
);
activeEntry.cancelPoll = cancel;
log.info({ agentId, pid }, 'resumed errand agent for user message');
}
/**
* Sync credentials from agent's config dir back to DB after completion.
* The subprocess may have refreshed tokens mid-session; this ensures
* the DB stays current and the next spawn uses fresh tokens.
*/
private async syncCredentialsPostCompletion(agentId: string): Promise<void> {
if (!this.accountRepository) return;
try {
const agent = await this.repository.findById(agentId);
if (!agent?.accountId) return;
const { getAccountConfigDir } = await import('./accounts/paths.js');
const configDir = getAccountConfigDir(this.workspaceRoot, agent.accountId);
await this.credentialHandler.persistRefreshedCredentials(agent.accountId, configDir);
log.debug({ agentId, accountId: agent.accountId }, 'post-completion credential sync done');
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'post-completion credential sync failed');
}
}
/**
* Stop a running agent.
*/
async stop(agentId: string): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) throw new Error(`Agent '${agentId}' not found`);
log.info({ agentId, name: agent.name }, 'stopping agent');
const active = this.activeAgents.get(agentId);
if (active) {
try { process.kill(active.pid, 'SIGTERM'); } catch { /* already exited */ }
await active.tailer.stop();
}
this.cleanupAgentState(agentId);
this.commitRetryCount.delete(agentId);
// Sync credentials before marking stopped
await this.syncCredentialsPostCompletion(agentId);
await this.repository.update(agentId, { status: 'stopped', pendingQuestions: null });
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: { agentId, name: agent.name, taskId: agent.taskId ?? '', reason: 'user_requested' },
};
this.eventBus.emit(event);
}
}
/**
* List all agents with their current status.
*/
async list(): Promise<AgentInfo[]> {
const agents = await this.repository.findAll();
return agents.map((a) => this.toAgentInfo(a));
}
/**
* Get a specific agent by ID.
*/
async get(agentId: string): Promise<AgentInfo | null> {
const agent = await this.repository.findById(agentId);
return agent ? this.toAgentInfo(agent) : null;
}
/**
* Get a specific agent by name.
*/
async getByName(name: string): Promise<AgentInfo | null> {
const agent = await this.repository.findByName(name);
return agent ? this.toAgentInfo(agent) : null;
}
/**
* Resume an agent using the unified lifecycle controller.
* Features comprehensive retry, error handling, and cleanup.
*/
async resumeWithLifecycle(agentId: string, answers: Record<string, string>): Promise<void> {
log.info({
agentId,
answerKeys: Object.keys(answers)
}, 'resuming agent with unified lifecycle management');
await this.lifecycleController.resumeWithRetry(
(id, modifiedAnswers) => this.resumeInternal(id, modifiedAnswers),
{ agentId, answers }
);
}
/**
* Resume an agent that's waiting for input (legacy method).
* Consider using resumeWithLifecycle for better error handling.
*/
async resume(agentId: string, answers: Record<string, string>): Promise<void> {
return this.resumeInternal(agentId, answers);
}
/**
* Internal resume implementation without lifecycle management.
* Used by both legacy resume() and new lifecycle-managed resume.
*/
private async resumeInternal(agentId: string, answers: Record<string, string>): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) throw new Error(`Agent '${agentId}' not found`);
if (agent.status !== 'waiting_for_input') {
throw new Error(`Agent '${agent.name}' is not waiting for input (status: ${agent.status})`);
}
if (!agent.sessionId) {
throw new Error(`Agent '${agent.name}' has no session to resume`);
}
log.info({ agentId, sessionId: agent.sessionId, provider: agent.provider }, 'resuming agent');
const provider = getProvider(agent.provider);
if (!provider) throw new Error(`Unknown provider: '${agent.provider}'`);
if (provider.resumeStyle === 'none') {
throw new Error(`Provider '${provider.name}' does not support resume`);
}
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
const prompt = this.outputHandler.formatAnswersAsPrompt(answers);
// Clear previous signal.json to ensure clean completion detection
const signalPath = join(agentCwd, '.cw/output/signal.json');
try {
await unlink(signalPath);
log.debug({ agentId, signalPath }, 'cleared previous signal.json for resume');
} catch {
// File might not exist, which is fine
}
await this.repository.update(agentId, { status: 'running', pendingQuestions: null, result: null });
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, prompt);
log.debug({ command, args: args.join(' ') }, 'resume command built');
// Prepare process environment with credentials
const { processEnv } = await this.credentialHandler.prepareProcessEnv(providerEnv, provider, agent.accountId);
// Stop previous tailer and cancel previous poll
const prevActive = this.activeAgents.get(agentId);
prevActive?.cancelPoll?.();
if (prevActive?.tailer) {
await prevActive.tailer.stop();
}
// Determine session number for this resume
let resumeSessionNumber = 1;
if (this.logChunkRepository) {
resumeSessionNumber = (await this.logChunkRepository.getSessionCount(agentId)) + 1;
}
const { pid, outputFilePath, tailer } = await this.processManager.spawnDetached(
agentId, agent.name, command, args, agentCwd, processEnv, provider.name, prompt,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
this.createLogChunkCallback(agentId, agent.name, resumeSessionNumber),
);
await this.repository.update(agentId, { pid, outputFilePath });
const resumeActiveEntry: ActiveAgent = { agentId, pid, tailer, outputFilePath };
this.activeAgents.set(agentId, resumeActiveEntry);
log.info({ agentId, pid }, 'resume detached subprocess started');
if (this.eventBus) {
// verified: payload matches AgentResumedEvent shape (agentId, name, taskId, sessionId)
const event: AgentResumedEvent = {
type: 'agent:resumed',
timestamp: new Date(),
payload: { agentId, name: agent.name, taskId: agent.taskId ?? '', sessionId: agent.sessionId },
};
this.eventBus.emit(event);
}
const { cancel: resumeCancel } = this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
);
resumeActiveEntry.cancelPoll = resumeCancel;
}
/**
* Get the result of an agent's work.
*/
async getResult(agentId: string): Promise<AgentResult | null> {
return this.outputHandler.getResult(agentId, this.activeAgents.get(agentId));
}
/**
* Get pending questions for an agent waiting for input.
*/
async getPendingQuestions(agentId: string): Promise<PendingQuestions | null> {
return this.outputHandler.getPendingQuestions(agentId, this.activeAgents.get(agentId));
}
/**
* Delete an agent and clean up all associated resources.
*/
async delete(agentId: string): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) throw new Error(`Agent '${agentId}' not found`);
log.info({ agentId, name: agent.name }, 'deleting agent');
// 1. Kill process, stop tailer, clear all in-memory state
const active = this.activeAgents.get(agentId);
if (active) {
try { process.kill(active.pid, 'SIGTERM'); } catch { /* already exited */ }
await active.tailer.stop();
}
this.cleanupAgentState(agentId);
this.commitRetryCount.delete(agentId);
// 2. Best-effort cleanup
try { await this.cleanupManager.removeAgentWorktrees(agent.name, agent.initiativeId); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to remove worktrees'); }
try { await this.cleanupManager.removeAgentBranches(agent.name, agent.initiativeId); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to remove branches'); }
try { await this.cleanupManager.removeAgentLogs(agent.name); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to remove logs'); }
// 3b. Delete log chunks from DB
if (this.logChunkRepository) {
try { await this.logChunkRepository.deleteByAgentId(agentId); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to delete log chunks'); }
}
// 4. Delete DB record
await this.repository.delete(agentId);
// 5. Emit deleted event
if (this.eventBus) {
const event: AgentDeletedEvent = {
type: 'agent:deleted',
timestamp: new Date(),
payload: { agentId, name: agent.name },
};
this.eventBus.emit(event);
}
log.info({ agentId, name: agent.name }, 'agent deleted');
}
/**
* Dismiss an agent.
*/
async dismiss(agentId: string): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) throw new Error(`Agent '${agentId}' not found`);
log.info({ agentId, name: agent.name }, 'dismissing agent');
this.cleanupAgentState(agentId);
this.commitRetryCount.delete(agentId);
// Best-effort filesystem cleanup
try { await this.cleanupManager.removeAgentWorktrees(agent.name, agent.initiativeId); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'dismiss: failed to remove worktrees'); }
try { await this.cleanupManager.removeAgentBranches(agent.name, agent.initiativeId); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'dismiss: failed to remove branches'); }
try { await this.cleanupManager.removeAgentLogs(agent.name); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'dismiss: failed to remove logs'); }
await this.repository.update(agentId, {
userDismissedAt: new Date(),
updatedAt: new Date(),
});
log.info({ agentId, name: agent.name }, 'agent dismissed');
}
/**
* Clean up orphaned agent workdirs.
*/
async cleanupOrphanedWorkdirs(): Promise<void> {
return this.cleanupManager.cleanupOrphanedWorkdirs();
}
/**
* Clean up orphaned agent log directories.
*/
async cleanupOrphanedLogs(): Promise<void> {
return this.cleanupManager.cleanupOrphanedLogs();
}
/**
* Reconcile agent state after server restart.
*/
async reconcileAfterRestart(): Promise<void> {
const reconcileLogChunkRepo = this.logChunkRepository;
await this.cleanupManager.reconcileAfterRestart(
this.activeAgents,
(agentId, event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
(agentId, rawOutput, provider) => this.outputHandler.processAgentOutput(agentId, rawOutput, provider, (alias) => this.processManager.getAgentWorkdir(alias)),
(agentId, pid) => {
const { cancel } = this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
);
const active = this.activeAgents.get(agentId);
if (active) active.cancelPoll = cancel;
},
reconcileLogChunkRepo
? (agentId, agentName, content) => {
// Determine session number asynchronously — use fire-and-forget
reconcileLogChunkRepo.getSessionCount(agentId).then(count => {
return reconcileLogChunkRepo.insertChunk({
agentId,
agentName,
sessionNumber: count + 1,
content,
});
}).catch(err => log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to persist log chunk during reconciliation'));
}
: undefined,
);
}
/**
* Handle process crashed event specifically for agents.
* Check if the agent actually completed successfully despite the non-zero exit code.
*/
private async handleProcessCrashed(processId: string, exitCode: number | null, signal: string | null): Promise<void> {
try {
// Check if this is an agent process
const agent = await this.repository.findById(processId);
if (!agent) {
return; // Not our agent
}
// Store exit code and signal for debugging
await this.repository.update(processId, { exitCode });
log.info({
agentId: processId,
name: agent.name,
exitCode,
signal,
outputFilePath: agent.outputFilePath
}, 'agent process crashed, analyzing completion status');
// Check if the agent has output that indicates successful completion
if (agent.outputFilePath) {
const hasCompletion = await this.checkAgentCompletionResult(agent.worktreeId);
if (hasCompletion) {
log.info({
agentId: processId,
name: agent.name,
exitCode,
signal
}, 'agent marked as crashed but completed successfully - completion already handled by polling');
// Note: We don't call handleCompletion() here because the polling handler
// (handleDetachedAgentCompletion) already processes completions. The mutex
// in OutputHandler.handleCompletion() prevents duplicate processing.
log.info({
agentId: processId,
name: agent.name,
exitCode
}, 'completion detection confirmed - deferring to polling handler');
} else {
log.warn({
agentId: processId,
name: agent.name,
exitCode,
signal,
outputFilePath: agent.outputFilePath
}, 'agent crashed and no successful completion detected - marking as truly crashed');
// Only mark as crashed if agent truly crashed (no completion detected)
await this.repository.update(processId, { status: 'crashed' });
}
} else {
log.warn({
agentId: processId,
name: agent.name,
exitCode,
signal
}, 'agent crashed with no output file path - marking as crashed');
await this.repository.update(processId, { status: 'crashed' });
}
} catch (err) {
log.error({
processId,
exitCode,
signal,
err: err instanceof Error ? err.message : String(err)
}, 'failed to check agent completion after crash');
}
}
/**
* Check if agent completed successfully by reading signal.json file.
* Probes the workspace/ subdirectory for standalone agents.
*/
private async checkAgentCompletionResult(worktreeId: string): Promise<boolean> {
try {
// Resolve actual agent workdir — standalone agents have .cw inside workspace/ subdir
let agentWorkdir = this.processManager.getAgentWorkdir(worktreeId);
const workspaceSub = join(agentWorkdir, 'workspace');
if (!existsSync(join(agentWorkdir, '.cw', 'output')) && existsSync(join(workspaceSub, '.cw'))) {
agentWorkdir = workspaceSub;
}
const signalPath = join(agentWorkdir, '.cw/output/signal.json');
if (!existsSync(signalPath)) {
log.debug({ worktreeId, signalPath }, 'no signal.json found - agent not completed');
return false;
}
const signalContent = await readFile(signalPath, 'utf-8');
const signal = JSON.parse(signalContent);
// Agent completed if status is done, questions, or error
const completed = signal.status === 'done' || signal.status === 'questions' || signal.status === 'error';
if (completed) {
log.debug({ worktreeId, signal }, 'agent completion detected via signal.json');
} else {
log.debug({ worktreeId, signal }, 'signal.json found but status indicates incomplete');
}
return completed;
} catch (err) {
log.warn({ worktreeId, err: err instanceof Error ? err.message : String(err) }, 'failed to read or parse signal.json');
return false;
}
}
/**
* Check whether preview instructions should be injected for this initiative.
* Returns true if exactly one project linked and it has .cw-preview.yml.
*/
private async shouldInjectPreviewInstructions(initiativeId: string): Promise<boolean> {
try {
const projects = await this.projectRepository.findProjectsByInitiativeId(initiativeId);
if (projects.length !== 1) return false;
const project = projects[0];
const cloneDir = join(this.workspaceRoot, getProjectCloneDir(project.name, project.id));
return existsSync(join(cloneDir, '.cw-preview.yml'));
} catch {
return false;
}
}
/**
* Convert database agent record to AgentInfo.
*/
private toAgentInfo(agent: {
id: string;
name: string;
taskId: string | null;
initiativeId: string | null;
sessionId: string | null;
worktreeId: string;
status: string;
mode: string;
provider: string;
accountId: string | null;
createdAt: Date;
updatedAt: Date;
userDismissedAt?: Date | null;
exitCode?: number | null;
prompt?: string | null;
}): AgentInfo {
return {
id: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
initiativeId: agent.initiativeId,
sessionId: agent.sessionId,
worktreeId: agent.worktreeId,
status: agent.status as AgentStatus,
mode: agent.mode as AgentMode,
provider: agent.provider,
accountId: agent.accountId,
createdAt: agent.createdAt,
updatedAt: agent.updatedAt,
userDismissedAt: agent.userDismissedAt,
exitCode: agent.exitCode ?? null,
prompt: agent.prompt ?? null,
};
}
}