Files
Codewalkers/apps/server/trpc/routers/agent.ts
Lukas May b2f4004191 feat: Persist agent prompt in DB so getAgentPrompt survives log cleanup
The `getAgentPrompt` tRPC procedure previously read exclusively from
`.cw/agent-logs/<name>/PROMPT.md`. Once the cleanup-manager removes
that directory, the prompt is gone forever.

Adds a `prompt` text column to the `agents` table and writes the fully
assembled prompt (including workspace layout, inter-agent comms, and
preview sections) to the DB in the same `repository.update()` call
that saves pid/outputFilePath after spawn.

`getAgentPrompt` now reads from DB first (`agent.prompt`) and falls
back to the filesystem only for agents spawned before this change.

Addresses review comment [MMcmVlEK16bBfkJuXvG6h].

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-06 13:13:01 +01:00

380 lines
13 KiB
TypeScript

/**
* Agent Router — spawn, stop, delete, list, get, resume, result, questions, output
*/
import { TRPCError } from '@trpc/server';
import { z } from 'zod';
import { tracked, type TrackedEnvelope } from '@trpc/server';
import path from 'path';
import fs from 'fs/promises';
import type { ProcedureBuilder } from '../trpc.js';
import type { TRPCContext } from '../context.js';
import type { AgentInfo, AgentResult, PendingQuestions } from '../../agent/types.js';
import type { AgentOutputEvent } from '../../events/types.js';
import { requireAgentManager, requireLogChunkRepository, requireTaskRepository, requireInitiativeRepository } from './_helpers.js';
export const spawnAgentInputSchema = z.object({
name: z.string().min(1).optional(),
taskId: z.string().min(1),
prompt: z.string().min(1),
cwd: z.string().optional(),
mode: z.enum(['execute', 'discuss', 'plan', 'detail', 'refine']).optional(),
provider: z.string().optional(),
initiativeId: z.string().min(1).optional(),
});
export type SpawnAgentInput = z.infer<typeof spawnAgentInputSchema>;
export const agentIdentifierSchema = z.object({
name: z.string().optional(),
id: z.string().optional(),
}).refine(data => data.name || data.id, {
message: 'Either name or id must be provided',
});
export type AgentIdentifier = z.infer<typeof agentIdentifierSchema>;
export const resumeAgentInputSchema = z.object({
name: z.string().optional(),
id: z.string().optional(),
answers: z.record(z.string(), z.string()),
}).refine(data => data.name || data.id, {
message: 'Either name or id must be provided',
});
export type ResumeAgentInput = z.infer<typeof resumeAgentInputSchema>;
async function resolveAgent(
ctx: TRPCContext,
input: { name?: string; id?: string }
): Promise<AgentInfo> {
if (!ctx.agentManager) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Agent manager not available',
});
}
const agent = input.name
? await ctx.agentManager.getByName(input.name)
: await ctx.agentManager.get(input.id!);
if (!agent) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Agent '${input.name ?? input.id}' not found`,
});
}
return agent;
}
export function agentProcedures(publicProcedure: ProcedureBuilder) {
return {
spawnAgent: publicProcedure
.input(spawnAgentInputSchema)
.mutation(async ({ ctx, input }) => {
const agentManager = requireAgentManager(ctx);
return agentManager.spawn({
name: input.name,
taskId: input.taskId,
prompt: input.prompt,
cwd: input.cwd,
mode: input.mode,
provider: input.provider,
initiativeId: input.initiativeId,
});
}),
stopAgent: publicProcedure
.input(agentIdentifierSchema)
.mutation(async ({ ctx, input }) => {
const agentManager = requireAgentManager(ctx);
const agent = await resolveAgent(ctx, input);
await agentManager.stop(agent.id);
return { success: true, name: agent.name };
}),
deleteAgent: publicProcedure
.input(agentIdentifierSchema)
.mutation(async ({ ctx, input }) => {
const agentManager = requireAgentManager(ctx);
const agent = await resolveAgent(ctx, input);
await agentManager.delete(agent.id);
return { success: true, name: agent.name };
}),
dismissAgent: publicProcedure
.input(agentIdentifierSchema)
.mutation(async ({ ctx, input }) => {
const agentManager = requireAgentManager(ctx);
const agent = await resolveAgent(ctx, input);
await agentManager.dismiss(agent.id);
return { success: true, name: agent.name };
}),
listAgents: publicProcedure
.query(async ({ ctx }) => {
const agentManager = requireAgentManager(ctx);
return agentManager.list();
}),
getAgent: publicProcedure
.input(agentIdentifierSchema)
.query(async ({ ctx, input }) => {
const agent = await resolveAgent(ctx, input);
let taskName: string | null = null;
let initiativeName: string | null = null;
if (agent.taskId) {
const taskRepo = requireTaskRepository(ctx);
const task = await taskRepo.findById(agent.taskId);
taskName = task?.name ?? null;
}
if (agent.initiativeId) {
const initiativeRepo = requireInitiativeRepository(ctx);
const initiative = await initiativeRepo.findById(agent.initiativeId);
initiativeName = initiative?.name ?? null;
}
return { ...agent, taskName, initiativeName };
}),
getAgentByName: publicProcedure
.input(z.object({ name: z.string().min(1) }))
.query(async ({ ctx, input }) => {
const agentManager = requireAgentManager(ctx);
return agentManager.getByName(input.name);
}),
resumeAgent: publicProcedure
.input(resumeAgentInputSchema)
.mutation(async ({ ctx, input }) => {
const agentManager = requireAgentManager(ctx);
const agent = await resolveAgent(ctx, input);
await agentManager.resume(agent.id, input.answers);
return { success: true, name: agent.name };
}),
getAgentResult: publicProcedure
.input(agentIdentifierSchema)
.query(async ({ ctx, input }): Promise<AgentResult | null> => {
const agentManager = requireAgentManager(ctx);
const agent = await resolveAgent(ctx, input);
return agentManager.getResult(agent.id);
}),
getAgentQuestions: publicProcedure
.input(agentIdentifierSchema)
.query(async ({ ctx, input }): Promise<PendingQuestions | null> => {
const agentManager = requireAgentManager(ctx);
const agent = await resolveAgent(ctx, input);
return agentManager.getPendingQuestions(agent.id);
}),
listWaitingAgents: publicProcedure
.query(async ({ ctx }) => {
const agentManager = requireAgentManager(ctx);
const allAgents = await agentManager.list();
return allAgents.filter(agent => agent.status === 'waiting_for_input');
}),
getActiveRefineAgent: publicProcedure
.input(z.object({ initiativeId: z.string().min(1) }))
.query(async ({ ctx, input }): Promise<AgentInfo | null> => {
const agentManager = requireAgentManager(ctx);
const allAgents = await agentManager.list();
// Surface discuss and refine agents — both work on initiative content
const CONTENT_MODES = ['discuss', 'refine'];
const candidates = allAgents
.filter(
(a) =>
CONTENT_MODES.includes(a.mode) &&
a.initiativeId === input.initiativeId &&
['running', 'waiting_for_input', 'idle', 'crashed'].includes(a.status) &&
!a.userDismissedAt,
)
.sort(
(a, b) =>
new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(),
);
return candidates[0] ?? null;
}),
getAgentOutput: publicProcedure
.input(agentIdentifierSchema)
.query(async ({ ctx, input }): Promise<string> => {
const agent = await resolveAgent(ctx, input);
const logChunkRepo = requireLogChunkRepository(ctx);
const chunks = await logChunkRepo.findByAgentId(agent.id);
return chunks.map(c => c.content).join('');
}),
onAgentOutput: publicProcedure
.input(z.object({ agentId: z.string().min(1) }))
.subscription(async function* (opts): AsyncGenerator<TrackedEnvelope<{ agentId: string; data: string }>> {
const { agentId } = opts.input;
const signal = opts.signal ?? new AbortController().signal;
const eventBus = opts.ctx.eventBus;
let eventCounter = 0;
const queue: string[] = [];
let resolve: (() => void) | null = null;
const handler = (event: AgentOutputEvent) => {
if (event.payload.agentId !== agentId) return;
queue.push(event.payload.data);
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
eventBus.on('agent:output', handler);
const cleanup = () => {
eventBus.off('agent:output', handler);
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
signal.addEventListener('abort', cleanup, { once: true });
try {
while (!signal.aborted) {
while (queue.length > 0) {
const data = queue.shift()!;
const id = `${agentId}-live-${eventCounter++}`;
yield tracked(id, { agentId, data });
}
if (!signal.aborted) {
await new Promise<void>((r) => {
resolve = r;
});
}
}
} finally {
cleanup();
}
}),
getAgentInputFiles: publicProcedure
.input(z.object({ id: z.string().min(1) }))
.output(z.object({
files: z.array(z.object({
name: z.string(),
content: z.string(),
sizeBytes: z.number(),
})),
reason: z.enum(['worktree_missing', 'input_dir_missing']).optional(),
}))
.query(async ({ ctx, input }) => {
const agent = await resolveAgent(ctx, { id: input.id });
const worktreeRoot = path.join(ctx.workspaceRoot!, 'agent-workdirs', agent.worktreeId);
const inputDir = path.join(worktreeRoot, '.cw', 'input');
// Check worktree root exists
try {
await fs.stat(worktreeRoot);
} catch {
return { files: [], reason: 'worktree_missing' as const };
}
// Check input dir exists
try {
await fs.stat(inputDir);
} catch {
return { files: [], reason: 'input_dir_missing' as const };
}
// Walk inputDir recursively
const entries = await fs.readdir(inputDir, { recursive: true, withFileTypes: true });
const MAX_SIZE = 500 * 1024;
const results: Array<{ name: string; content: string; sizeBytes: number }> = [];
for (const entry of entries) {
if (!entry.isFile()) continue;
// entry.parentPath is available in Node 20+
const dir = (entry as any).parentPath ?? (entry as any).path;
const fullPath = path.join(dir, entry.name);
const relativeName = path.relative(inputDir, fullPath);
try {
// Binary detection: read first 512 bytes
const fd = await fs.open(fullPath, 'r');
const headerBuf = Buffer.alloc(512);
const { bytesRead } = await fd.read(headerBuf, 0, 512, 0);
await fd.close();
if (headerBuf.slice(0, bytesRead).includes(0)) continue; // skip binary
const raw = await fs.readFile(fullPath);
const sizeBytes = raw.length;
let content: string;
if (sizeBytes > MAX_SIZE) {
content = raw.slice(0, MAX_SIZE).toString('utf-8') + '\n\n[truncated — file exceeds 500 KB]';
} else {
content = raw.toString('utf-8');
}
results.push({ name: relativeName, content, sizeBytes });
} catch {
continue; // skip unreadable files
}
}
results.sort((a, b) => a.name.localeCompare(b.name));
return { files: results };
}),
getAgentPrompt: publicProcedure
.input(z.object({ id: z.string().min(1) }))
.output(z.object({
content: z.string().nullable(),
reason: z.enum(['prompt_not_written']).optional(),
}))
.query(async ({ ctx, input }) => {
const agent = await resolveAgent(ctx, { id: input.id });
const MAX_BYTES = 1024 * 1024; // 1 MB
function truncateIfNeeded(text: string): string {
if (Buffer.byteLength(text, 'utf-8') > MAX_BYTES) {
const buf = Buffer.from(text, 'utf-8');
return buf.slice(0, MAX_BYTES).toString('utf-8') + '\n\n[truncated — prompt exceeds 1 MB]';
}
return text;
}
// Prefer DB-persisted prompt (durable even after log file cleanup)
if (agent.prompt !== null) {
return { content: truncateIfNeeded(agent.prompt) };
}
// Fall back to filesystem for agents spawned before DB persistence was added
const promptPath = path.join(ctx.workspaceRoot!, '.cw', 'agent-logs', agent.name, 'PROMPT.md');
let raw: string;
try {
raw = await fs.readFile(promptPath, 'utf-8');
} catch (err: any) {
if (err?.code === 'ENOENT') {
return { content: null, reason: 'prompt_not_written' as const };
}
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: `Failed to read prompt file: ${String(err)}`,
});
}
return { content: truncateIfNeeded(raw) };
}),
};
}