Files
Codewalkers/src/agent/manager.ts
2026-02-07 00:33:12 +01:00

476 lines
17 KiB
TypeScript

/**
* Multi-Provider Agent Manager — Orchestrator
*
* Implementation of AgentManager port supporting multiple CLI providers.
* Delegates to extracted helpers:
* - ProcessManager: subprocess spawn/kill/poll, worktree creation, command building
* - CredentialHandler: account selection, credential write/refresh, exhaustion handling
* - OutputHandler: stream events, signal parsing, file reading, result capture
* - CleanupManager: worktree/branch/log removal, orphan cleanup, reconciliation
*/
import type {
AgentManager,
AgentInfo,
SpawnAgentOptions,
AgentResult,
AgentStatus,
AgentMode,
PendingQuestions,
} from './types.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { AccountRepository } from '../db/repositories/account-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import { generateUniqueAlias } from './alias.js';
import type {
EventBus,
AgentSpawnedEvent,
AgentStoppedEvent,
AgentResumedEvent,
AgentDeletedEvent,
} from '../events/index.js';
import { writeInputFiles } from './file-io.js';
import { getProvider } from './providers/registry.js';
import { createModuleLogger } from '../logger/index.js';
import type { AccountCredentialManager } from './credentials/types.js';
import { ProcessManager } from './process-manager.js';
import { CredentialHandler } from './credential-handler.js';
import { OutputHandler, type ActiveAgent } from './output-handler.js';
import { CleanupManager } from './cleanup-manager.js';
const log = createModuleLogger('agent-manager');
export class MultiProviderAgentManager implements AgentManager {
private activeAgents: Map<string, ActiveAgent> = new Map();
private outputBuffers: Map<string, string[]> = new Map();
private processManager: ProcessManager;
private credentialHandler: CredentialHandler;
private outputHandler: OutputHandler;
private cleanupManager: CleanupManager;
constructor(
private repository: AgentRepository,
private workspaceRoot: string,
private projectRepository: ProjectRepository,
private accountRepository?: AccountRepository,
private eventBus?: EventBus,
private credentialManager?: AccountCredentialManager,
) {
this.processManager = new ProcessManager(workspaceRoot, projectRepository, eventBus);
this.credentialHandler = new CredentialHandler(workspaceRoot, accountRepository, credentialManager);
this.outputHandler = new OutputHandler(repository, eventBus);
this.cleanupManager = new CleanupManager(workspaceRoot, repository, projectRepository, eventBus);
}
/**
* Spawn a new agent to work on a task.
*/
async spawn(options: SpawnAgentOptions): Promise<AgentInfo> {
const { taskId, prompt, cwd, mode = 'execute', provider: providerName = 'claude', initiativeId } = options;
log.info({ taskId, provider: providerName, initiativeId, mode }, 'spawn requested');
const provider = getProvider(providerName);
if (!provider) {
throw new Error(`Unknown provider: '${providerName}'. Available: claude, codex, gemini, cursor, auggie, amp, opencode`);
}
// Generate or validate name
let name: string;
if (options.name) {
name = options.name;
const existing = await this.repository.findByName(name);
if (existing) {
throw new Error(`Agent with name '${name}' already exists`);
}
} else {
name = await generateUniqueAlias(this.repository);
}
const alias = name;
log.debug({ alias }, 'alias generated');
// 1. Account selection
let accountId: string | null = null;
let accountConfigDir: string | null = null;
const accountResult = await this.credentialHandler.selectAccount(providerName);
if (accountResult) {
accountId = accountResult.accountId;
accountConfigDir = accountResult.configDir;
this.credentialHandler.writeCredentialsToDisk(accountResult.account, accountConfigDir);
const { valid, refreshed } = await this.credentialHandler.ensureCredentials(accountConfigDir, accountId);
if (!valid) {
log.warn({ alias, accountId }, 'failed to refresh account credentials, proceeding anyway');
}
if (refreshed) {
await this.credentialHandler.persistRefreshedCredentials(accountId, accountConfigDir);
}
}
if (accountId) {
log.info({ alias, accountId }, 'account selected');
} else {
log.debug('no accounts available, spawning without account');
}
// 2. Create isolated worktrees
let agentCwd: string;
if (initiativeId) {
agentCwd = await this.processManager.createProjectWorktrees(alias, initiativeId);
} else {
agentCwd = await this.processManager.createStandaloneWorktree(alias);
}
log.debug({ alias, agentCwd }, 'worktrees created');
// 2b. Write input files
if (options.inputContext) {
writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext });
log.debug({ alias }, 'input files written');
}
// 3. Create agent record
const agent = await this.repository.create({
name: alias,
taskId: taskId ?? null,
initiativeId: initiativeId ?? null,
sessionId: null,
worktreeId: alias,
status: 'running',
mode,
provider: providerName,
accountId,
});
const agentId = agent.id;
// 4. Build spawn command
const { command, args, env: providerEnv } = this.processManager.buildSpawnCommand(provider, prompt);
log.debug({ command, args: args.join(' '), cwd: cwd ?? agentCwd }, 'spawn command built');
// 5. Set config dir env var if account selected
const processEnv: Record<string, string> = { ...providerEnv };
if (accountConfigDir && provider.configDirEnv) {
processEnv[provider.configDirEnv] = accountConfigDir;
}
// 6. Spawn detached subprocess
const { pid, outputFilePath, tailer } = this.processManager.spawnDetached(
agentId, command, args, cwd ?? agentCwd, processEnv, providerName, prompt,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
);
await this.repository.update(agentId, { pid, outputFilePath });
this.activeAgents.set(agentId, { agentId, pid, tailer, outputFilePath });
log.info({ agentId, alias, pid }, 'detached subprocess started');
// Emit spawned event
if (this.eventBus) {
const event: AgentSpawnedEvent = {
type: 'agent:spawned',
timestamp: new Date(),
payload: { agentId, name: alias, taskId: taskId ?? null, worktreeId: alias, provider: providerName },
};
this.eventBus.emit(event);
}
// Start polling for completion
this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
);
return this.toAgentInfo(agent);
}
/**
* Handle completion of a detached agent.
*/
private async handleDetachedAgentCompletion(agentId: string): Promise<void> {
if (!this.activeAgents.has(agentId)) return;
const active = this.activeAgents.get(agentId);
await this.outputHandler.handleCompletion(
agentId,
active,
(alias) => this.processManager.getAgentWorkdir(alias),
);
this.activeAgents.delete(agentId);
}
/**
* Stop a running agent.
*/
async stop(agentId: string): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) throw new Error(`Agent '${agentId}' not found`);
log.info({ agentId, name: agent.name }, 'stopping agent');
const active = this.activeAgents.get(agentId);
if (active) {
try { process.kill(active.pid, 'SIGTERM'); } catch { /* already exited */ }
await active.tailer.stop();
this.activeAgents.delete(agentId);
}
await this.repository.update(agentId, { status: 'stopped' });
if (this.eventBus) {
const event: AgentStoppedEvent = {
type: 'agent:stopped',
timestamp: new Date(),
payload: { agentId, name: agent.name, taskId: agent.taskId ?? '', reason: 'user_requested' },
};
this.eventBus.emit(event);
}
}
/**
* List all agents with their current status.
*/
async list(): Promise<AgentInfo[]> {
const agents = await this.repository.findAll();
return agents.map((a) => this.toAgentInfo(a));
}
/**
* Get a specific agent by ID.
*/
async get(agentId: string): Promise<AgentInfo | null> {
const agent = await this.repository.findById(agentId);
return agent ? this.toAgentInfo(agent) : null;
}
/**
* Get a specific agent by name.
*/
async getByName(name: string): Promise<AgentInfo | null> {
const agent = await this.repository.findByName(name);
return agent ? this.toAgentInfo(agent) : null;
}
/**
* Resume an agent that's waiting for input.
*/
async resume(agentId: string, answers: Record<string, string>): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) throw new Error(`Agent '${agentId}' not found`);
if (agent.status !== 'waiting_for_input') {
throw new Error(`Agent '${agent.name}' is not waiting for input (status: ${agent.status})`);
}
if (!agent.sessionId) {
throw new Error(`Agent '${agent.name}' has no session to resume`);
}
log.info({ agentId, sessionId: agent.sessionId, provider: agent.provider }, 'resuming agent');
const provider = getProvider(agent.provider);
if (!provider) throw new Error(`Unknown provider: '${agent.provider}'`);
if (provider.resumeStyle === 'none') {
throw new Error(`Provider '${provider.name}' does not support resume`);
}
const agentCwd = this.processManager.getAgentWorkdir(agent.worktreeId);
const prompt = this.outputHandler.formatAnswersAsPrompt(answers);
await this.repository.update(agentId, { status: 'running' });
await this.repository.update(agentId, { pendingQuestions: null });
await this.repository.update(agentId, { result: null });
const { command, args, env: providerEnv } = this.processManager.buildResumeCommand(provider, agent.sessionId, prompt);
log.debug({ command, args: args.join(' ') }, 'resume command built');
// Set config dir if account is assigned
const processEnv: Record<string, string> = { ...providerEnv };
if (agent.accountId && provider.configDirEnv && this.accountRepository) {
const { getAccountConfigDir } = await import('./accounts/paths.js');
const resumeAccountConfigDir = getAccountConfigDir(this.workspaceRoot, agent.accountId);
const resumeAccount = await this.accountRepository.findById(agent.accountId);
if (resumeAccount) {
this.credentialHandler.writeCredentialsToDisk(resumeAccount, resumeAccountConfigDir);
}
processEnv[provider.configDirEnv] = resumeAccountConfigDir;
const { valid, refreshed } = await this.credentialHandler.ensureCredentials(resumeAccountConfigDir, agent.accountId);
if (!valid) {
log.warn({ agentId, accountId: agent.accountId }, 'failed to refresh credentials before resume');
}
if (refreshed) {
await this.credentialHandler.persistRefreshedCredentials(agent.accountId, resumeAccountConfigDir);
}
}
// Stop previous tailer
const prevActive = this.activeAgents.get(agentId);
if (prevActive?.tailer) {
await prevActive.tailer.stop();
}
const { pid, outputFilePath, tailer } = this.processManager.spawnDetached(
agentId, command, args, agentCwd, processEnv, provider.name, prompt,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
);
await this.repository.update(agentId, { pid, outputFilePath });
this.activeAgents.set(agentId, { agentId, pid, tailer, outputFilePath });
log.info({ agentId, pid }, 'resume detached subprocess started');
if (this.eventBus) {
const event: AgentResumedEvent = {
type: 'agent:resumed',
timestamp: new Date(),
payload: { agentId, name: agent.name, taskId: agent.taskId ?? '', sessionId: agent.sessionId },
};
this.eventBus.emit(event);
}
this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
);
}
/**
* Get the result of an agent's work.
*/
async getResult(agentId: string): Promise<AgentResult | null> {
return this.outputHandler.getResult(agentId, this.activeAgents.get(agentId));
}
/**
* Get pending questions for an agent waiting for input.
*/
async getPendingQuestions(agentId: string): Promise<PendingQuestions | null> {
return this.outputHandler.getPendingQuestions(agentId, this.activeAgents.get(agentId));
}
/**
* Get the buffered output for an agent.
*/
getOutputBuffer(agentId: string): string[] {
return this.outputHandler.getOutputBufferCopy(this.outputBuffers, agentId);
}
/**
* Delete an agent and clean up all associated resources.
*/
async delete(agentId: string): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) throw new Error(`Agent '${agentId}' not found`);
log.info({ agentId, name: agent.name }, 'deleting agent');
// 1. Kill process and stop tailer
const active = this.activeAgents.get(agentId);
if (active) {
try { process.kill(active.pid, 'SIGTERM'); } catch { /* already exited */ }
await active.tailer.stop();
this.activeAgents.delete(agentId);
}
// 2. Best-effort cleanup
try { await this.cleanupManager.removeAgentWorktrees(agent.name, agent.initiativeId); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to remove worktrees'); }
try { await this.cleanupManager.removeAgentBranches(agent.name, agent.initiativeId); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to remove branches'); }
try { await this.cleanupManager.removeAgentLogs(agentId); }
catch (err) { log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to remove logs'); }
// 3. Clear output buffer
this.outputHandler.clearOutputBuffer(this.outputBuffers, agentId);
// 4. Delete DB record
await this.repository.delete(agentId);
// 5. Emit deleted event
if (this.eventBus) {
const event: AgentDeletedEvent = {
type: 'agent:deleted',
timestamp: new Date(),
payload: { agentId, name: agent.name },
};
this.eventBus.emit(event);
}
log.info({ agentId, name: agent.name }, 'agent deleted');
}
/**
* Dismiss an agent.
*/
async dismiss(agentId: string): Promise<void> {
const agent = await this.repository.findById(agentId);
if (!agent) throw new Error(`Agent '${agentId}' not found`);
log.info({ agentId, name: agent.name }, 'dismissing agent');
await this.repository.update(agentId, {
userDismissedAt: new Date(),
updatedAt: new Date(),
});
log.info({ agentId, name: agent.name }, 'agent dismissed');
}
/**
* Clean up orphaned agent workdirs.
*/
async cleanupOrphanedWorkdirs(): Promise<void> {
return this.cleanupManager.cleanupOrphanedWorkdirs();
}
/**
* Clean up orphaned agent log directories.
*/
async cleanupOrphanedLogs(): Promise<void> {
return this.cleanupManager.cleanupOrphanedLogs();
}
/**
* Reconcile agent state after server restart.
*/
async reconcileAfterRestart(): Promise<void> {
await this.cleanupManager.reconcileAfterRestart(
this.activeAgents,
(agentId, event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
(agentId, rawOutput, provider) => this.outputHandler.processAgentOutput(agentId, rawOutput, provider, (alias) => this.processManager.getAgentWorkdir(alias)),
(agentId, pid) => this.processManager.pollForCompletion(
agentId, pid,
() => this.handleDetachedAgentCompletion(agentId),
() => this.activeAgents.get(agentId)?.tailer,
),
);
}
/**
* Convert database agent record to AgentInfo.
*/
private toAgentInfo(agent: {
id: string;
name: string;
taskId: string | null;
initiativeId: string | null;
sessionId: string | null;
worktreeId: string;
status: string;
mode: string;
provider: string;
accountId: string | null;
createdAt: Date;
updatedAt: Date;
}): AgentInfo {
return {
id: agent.id,
name: agent.name,
taskId: agent.taskId ?? '',
initiativeId: agent.initiativeId,
sessionId: agent.sessionId,
worktreeId: agent.worktreeId,
status: agent.status as AgentStatus,
mode: agent.mode as AgentMode,
provider: agent.provider,
accountId: agent.accountId,
createdAt: agent.createdAt,
updatedAt: agent.updatedAt,
};
}
}