From 06f443ebc837f1dda95aed2cc2945c8117fd5f7d Mon Sep 17 00:00:00 2001 From: Lukas May Date: Tue, 10 Feb 2026 11:47:36 +0100 Subject: [PATCH] refactor: DB-driven agent output events with single emission point MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DB log chunk insertion is now the sole trigger for agent:output events. Eliminates triple emission (FileTailer, handleStreamEvent, output buffer) in favor of: FileTailer.onRawContent → DB insert → EventBus emit. - createLogChunkCallback emits agent:output after successful DB insert - spawnInternal now wires onRawContent callback (fixes session 1 gap) - Remove eventBus from FileTailer (no longer touches EventBus) - Remove eventBus from ProcessManager constructor (dead parameter) - Remove agent:output emission from handleStreamEvent text_delta - Remove outputBuffers map and all buffer helpers from manager/handler - Remove getOutputBuffer from AgentManager interface and implementations - getAgentOutput tRPC: DB-only, no file fallback - onAgentOutput subscription: no initial buffer yield, events only - AgentOutputViewer: accumulates raw JSONL chunks, parses uniformly --- docs/agent.md | 32 ++++++++------- docs/server-api.md | 4 +- .../web/src/components/AgentOutputViewer.tsx | 9 ++++- src/agent/cleanup-manager.ts | 1 - src/agent/file-tailer.ts | 24 +----------- src/agent/manager.ts | 31 ++++++++------- src/agent/mock-manager.ts | 8 ---- src/agent/output-handler.ts | 39 +------------------ src/agent/process-manager.test.ts | 12 +----- src/agent/process-manager.ts | 3 -- src/agent/types.ts | 11 ------ src/dispatch/manager.test.ts | 1 - .../real-providers/claude-manager.test.ts | 8 +--- .../real-providers/codex-manager.test.ts | 4 -- src/trpc/routers/agent.ts | 28 +------------ 15 files changed, 50 insertions(+), 165 deletions(-) diff --git a/docs/agent.md b/docs/agent.md index 17e8cba..b11544f 100644 --- a/docs/agent.md +++ b/docs/agent.md @@ -10,7 +10,7 @@ | `manager.ts` | `MultiProviderAgentManager` — main orchestrator class | | `process-manager.ts` | `AgentProcessManager` — worktree creation, command building, detached spawn | | `output-handler.ts` | `OutputHandler` — JSONL stream parsing, completion detection, proposal creation, task dedup | -| `file-tailer.ts` | `FileTailer` — watches output files, emits line events | +| `file-tailer.ts` | `FileTailer` — watches output files, fires parser + raw content callbacks | | `file-io.ts` | Input/output file I/O: frontmatter writing, signal.json reading, tiptap conversion | | `markdown-to-tiptap.ts` | Markdown to Tiptap JSON conversion using MarkdownManager | | `index.ts` | Public exports, `ClaudeAgentManager` deprecated alias | @@ -36,14 +36,15 @@ 4. `file-io.writeInputFiles()` — writes `.cw/input/` with assignment files (initiative, pages, phase, task) and read-only context dirs (`context/phases/`, `context/tasks/`) 5. Provider config builds spawn command via `buildSpawnCommand()` 6. `spawnDetached()` — launches detached child process with file output redirection -7. `FileTailer` watches output file, fires `onEvent` and `onRawContent` callbacks -8. `OutputHandler.handleStreamEvent()` processes each JSONL line -9. DB record updated with PID, output file path, session ID -10. `agent:spawned` event emitted +7. `FileTailer` watches output file, fires `onEvent` (parsed stream events) and `onRawContent` (raw JSONL chunks) callbacks +8. `onRawContent` → DB insert via `createLogChunkCallback()` → `agent:output` event emitted (single emission point) +9. `OutputHandler.handleStreamEvent()` processes parsed events (session tracking, result capture — no event emission) +10. DB record updated with PID, output file path, session ID +11. `agent:spawned` event emitted ### Completion Detection -1. `FileTailer` detects process exit +1. Polling detects process exit, `FileTailer.stop()` flushes remaining output 2. `OutputHandler.handleCompletion()` triggered 3. **Primary path**: Reads `.cw/output/signal.json` from agent worktree 4. Signal contains `{ status: "done"|"questions"|"error", result?, questions?, error? }` @@ -87,11 +88,13 @@ Each provider config specifies: `command`, `args`, `resumeStyle`, `promptMode`, The `OutputHandler` processes JSONL streams from Claude CLI: -- `text_delta` events → accumulated as text output, emitted via `agent:output` -- `init` event → session ID extracted -- `result` event → final result with structured data +- `init` event → session ID extracted and persisted +- `text_delta` events → no-op in handler (output streaming handled by DB log chunks) +- `result` event → final result with structured data captured on `ActiveAgent` - Signal file (`signal.json`) → authoritative completion status +**Output event flow**: `FileTailer.onRawContent()` → DB `insertChunk()` → `EventBus.emit('agent:output')`. This is the single emission point — no events from `handleStreamEvent()` or `processLine()`. + For providers without structured output, the generic line parser accumulates raw text. ## Credential Management @@ -113,8 +116,11 @@ Stored as `credentials: {"claudeAiOauth":{"accessToken":""}}` and `config ## Log Chunks -Agent output is persisted to `agent_log_chunks` table: -- `onRawContent` callback fires for every output chunk -- Fire-and-forget DB insert (no FK to agents — survives deletion) +Agent output is persisted to `agent_log_chunks` table and drives all live streaming: +- `onRawContent` callback fires for every raw JSONL chunk from `FileTailer` +- DB insert → `agent:output` event emission (single source of truth for UI) +- No FK to agents — survives agent deletion - Session tracking: spawn=1, resume=previousMax+1 -- Read path concatenates all sessions for full output history +- Read path (`getAgentOutput` tRPC): concatenates all DB chunks (no file fallback) +- Live path (`onAgentOutput` subscription): listens for `agent:output` events +- Frontend: initial query loads from DB, subscription accumulates raw JSONL, both parsed via `parseAgentOutput()` diff --git a/docs/server-api.md b/docs/server-api.md index d2c7e14..a27daa3 100644 --- a/docs/server-api.md +++ b/docs/server-api.md @@ -62,10 +62,10 @@ Each procedure uses `require*Repository(ctx)` helpers that throw `TRPCError(INTE | getAgent | query | Single agent by name or ID | | getAgentResult | query | Execution result | | getAgentQuestions | query | Pending questions | -| getAgentOutput | query | Full output (DB chunks or file fallback) | +| getAgentOutput | query | Full output from DB log chunks | | getActiveRefineAgent | query | Active refine agent for initiative | | listWaitingAgents | query | Agents waiting for input | -| onAgentOutput | subscription | Live output stream + buffered history | +| onAgentOutput | subscription | Live raw JSONL output stream via EventBus | ### Tasks | Procedure | Type | Description | diff --git a/packages/web/src/components/AgentOutputViewer.tsx b/packages/web/src/components/AgentOutputViewer.tsx index e9e9532..82c47a2 100644 --- a/packages/web/src/components/AgentOutputViewer.tsx +++ b/packages/web/src/components/AgentOutputViewer.tsx @@ -21,6 +21,8 @@ export function AgentOutputViewer({ agentId, agentName, status, onStop }: AgentO const [messages, setMessages] = useState([]); const [follow, setFollow] = useState(true); const containerRef = useRef(null); + // Accumulate raw JSONL: initial query data + live subscription chunks + const rawBufferRef = useRef(''); // Load initial/historical output const outputQuery = trpc.getAgentOutput.useQuery( @@ -37,8 +39,9 @@ export function AgentOutputViewer({ agentId, agentName, status, onStop }: AgentO onData: (event: any) => { // TrackedEnvelope shape: { id, data: { agentId, data: string } } const raw = event?.data?.data ?? event?.data; - const data = typeof raw === 'string' ? raw : JSON.stringify(raw); - setMessages((prev) => [...prev, { type: 'text', content: data }]); + const chunk = typeof raw === 'string' ? raw : JSON.stringify(raw); + rawBufferRef.current += chunk; + setMessages(parseAgentOutput(rawBufferRef.current)); }, onError: (error) => { console.error('Agent output subscription error:', error); @@ -51,12 +54,14 @@ export function AgentOutputViewer({ agentId, agentName, status, onStop }: AgentO // Set initial output when query loads useEffect(() => { if (outputQuery.data) { + rawBufferRef.current = outputQuery.data; setMessages(parseAgentOutput(outputQuery.data)); } }, [outputQuery.data]); // Reset output when agent changes useEffect(() => { + rawBufferRef.current = ''; setMessages([]); setFollow(true); }, [agentId]); diff --git a/src/agent/cleanup-manager.ts b/src/agent/cleanup-manager.ts index ea07469..eb0dea9 100644 --- a/src/agent/cleanup-manager.ts +++ b/src/agent/cleanup-manager.ts @@ -355,7 +355,6 @@ export class CleanupManager { filePath: agent.outputFilePath, agentId: agent.id, parser, - eventBus: this.eventBus, onEvent: (event) => onStreamEvent(agent.id, event), startFromBeginning: false, onRawContent: onRawContent diff --git a/src/agent/file-tailer.ts b/src/agent/file-tailer.ts index ca0e34f..4e882a5 100644 --- a/src/agent/file-tailer.ts +++ b/src/agent/file-tailer.ts @@ -13,7 +13,6 @@ import { watch, type FSWatcher } from 'node:fs'; import { open, stat } from 'node:fs/promises'; import type { FileHandle } from 'node:fs/promises'; import type { StreamParser, StreamEvent } from './providers/stream-types.js'; -import type { EventBus, AgentOutputEvent } from '../events/index.js'; import { createModuleLogger } from '../logger/index.js'; const log = createModuleLogger('file-tailer'); @@ -27,17 +26,15 @@ const READ_BUFFER_SIZE = 64 * 1024; export interface FileTailerOptions { /** Path to the output file to watch */ filePath: string; - /** Agent ID for event emission */ + /** Agent ID for logging */ agentId: string; /** Parser to convert lines to stream events */ parser: StreamParser; - /** Optional event bus for emitting agent:output events */ - eventBus?: EventBus; /** Optional callback for each stream event */ onEvent?: (event: StreamEvent) => void; /** If true, read from beginning of file; otherwise tail only new content (default: false) */ startFromBeginning?: boolean; - /** Callback for raw file content chunks (for DB persistence) */ + /** Callback for raw file content chunks (for DB persistence + event emission) */ onRawContent?: (content: string) => void; } @@ -63,7 +60,6 @@ export class FileTailer { private readonly filePath: string; private readonly agentId: string; private readonly parser: StreamParser; - private readonly eventBus?: EventBus; private readonly onEvent?: (event: StreamEvent) => void; private readonly startFromBeginning: boolean; private readonly onRawContent?: (content: string) => void; @@ -72,7 +68,6 @@ export class FileTailer { this.filePath = options.filePath; this.agentId = options.agentId; this.parser = options.parser; - this.eventBus = options.eventBus; this.onEvent = options.onEvent; this.startFromBeginning = options.startFromBeginning ?? false; this.onRawContent = options.onRawContent; @@ -193,24 +188,9 @@ export class FileTailer { const events = this.parser.parseLine(line); for (const event of events) { - // Call user callback if provided if (this.onEvent) { this.onEvent(event); } - - // Emit agent:output for text_delta events - if (event.type === 'text_delta' && this.eventBus) { - const outputEvent: AgentOutputEvent = { - type: 'agent:output', - timestamp: new Date(), - payload: { - agentId: this.agentId, - stream: 'stdout', - data: event.text, - }, - }; - this.eventBus.emit(outputEvent); - } } } diff --git a/src/agent/manager.ts b/src/agent/manager.ts index ad28e64..cb2fad7 100644 --- a/src/agent/manager.ts +++ b/src/agent/manager.ts @@ -59,7 +59,6 @@ export class MultiProviderAgentManager implements AgentManager { private static readonly MAX_COMMIT_RETRIES = 1; private activeAgents: Map = new Map(); - private outputBuffers: Map = new Map(); private commitRetryCount: Map = new Map(); private processManager: ProcessManager; private credentialHandler: CredentialHandler; @@ -83,7 +82,7 @@ export class MultiProviderAgentManager implements AgentManager { private debug: boolean = false, ) { this.signalManager = new FileSystemSignalManager(); - this.processManager = new ProcessManager(workspaceRoot, projectRepository, eventBus); + this.processManager = new ProcessManager(workspaceRoot, projectRepository); this.credentialHandler = new CredentialHandler(workspaceRoot, accountRepository, credentialManager); this.outputHandler = new OutputHandler(repository, eventBus, changeSetRepository, phaseRepository, taskRepository, pageRepository, this.signalManager); this.cleanupManager = new CleanupManager(workspaceRoot, repository, projectRepository, eventBus, debug, this.signalManager); @@ -105,13 +104,12 @@ export class MultiProviderAgentManager implements AgentManager { /** * Centralized cleanup of all in-memory state for an agent. - * Cancels polling timer, removes from activeAgents, outputBuffers, and commitRetryCount. + * Cancels polling timer, removes from activeAgents and commitRetryCount. */ private cleanupAgentState(agentId: string): void { const active = this.activeAgents.get(agentId); if (active?.cancelPoll) active.cancelPoll(); this.activeAgents.delete(agentId); - this.outputBuffers.delete(agentId); this.commitRetryCount.delete(agentId); } @@ -129,6 +127,15 @@ export class MultiProviderAgentManager implements AgentManager { return (content) => { repo.insertChunk({ agentId, agentName, sessionNumber, content }) + .then(() => { + if (this.eventBus) { + this.eventBus.emit({ + type: 'agent:output' as const, + timestamp: new Date(), + payload: { agentId, stream: 'stdout', data: content }, + }); + } + }) .catch(err => log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to persist log chunk')); }; } @@ -301,7 +308,8 @@ export class MultiProviderAgentManager implements AgentManager { // 6. Spawn detached subprocess const { pid, outputFilePath, tailer } = this.processManager.spawnDetached( agentId, alias, command, args, cwd ?? agentCwd, processEnv, providerName, prompt, - (event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers), + (event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)), + this.createLogChunkCallback(agentId, alias, 1), ); await this.repository.update(agentId, { pid, outputFilePath }); @@ -452,7 +460,7 @@ export class MultiProviderAgentManager implements AgentManager { const { pid, outputFilePath, tailer } = this.processManager.spawnDetached( agentId, agent.name, command, args, agentCwd, processEnv, provider.name, commitPrompt, - (event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers), + (event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)), this.createLogChunkCallback(agentId, agent.name, commitSessionNumber), ); @@ -625,7 +633,7 @@ export class MultiProviderAgentManager implements AgentManager { const { pid, outputFilePath, tailer } = this.processManager.spawnDetached( agentId, agent.name, command, args, agentCwd, processEnv, provider.name, prompt, - (event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers), + (event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)), this.createLogChunkCallback(agentId, agent.name, resumeSessionNumber), ); @@ -666,13 +674,6 @@ export class MultiProviderAgentManager implements AgentManager { return this.outputHandler.getPendingQuestions(agentId, this.activeAgents.get(agentId)); } - /** - * Get the buffered output for an agent. - */ - getOutputBuffer(agentId: string): string[] { - return this.outputHandler.getOutputBufferCopy(this.outputBuffers, agentId); - } - /** * Delete an agent and clean up all associated resources. */ @@ -759,7 +760,7 @@ export class MultiProviderAgentManager implements AgentManager { const reconcileLogChunkRepo = this.logChunkRepository; await this.cleanupManager.reconcileAfterRestart( this.activeAgents, - (agentId, event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers), + (agentId, event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)), (agentId, rawOutput, provider) => this.outputHandler.processAgentOutput(agentId, rawOutput, provider, (alias) => this.processManager.getAgentWorkdir(alias)), (agentId, pid) => { const { cancel } = this.processManager.pollForCompletion( diff --git a/src/agent/mock-manager.ts b/src/agent/mock-manager.ts index f213ce1..0c59498 100644 --- a/src/agent/mock-manager.ts +++ b/src/agent/mock-manager.ts @@ -457,14 +457,6 @@ export class MockAgentManager implements AgentManager { return record?.pendingQuestions ?? null; } - /** - * Get the buffered output for an agent. - * Mock implementation returns empty array. - */ - getOutputBuffer(_agentId: string): string[] { - return []; - } - /** * Dismiss an agent. * Mock implementation just marks the agent as dismissed. diff --git a/src/agent/output-handler.ts b/src/agent/output-handler.ts index d6444d9..9f49aa7 100644 --- a/src/agent/output-handler.ts +++ b/src/agent/output-handler.ts @@ -19,7 +19,6 @@ import type { AgentStoppedEvent, AgentCrashedEvent, AgentWaitingEvent, - AgentOutputEvent, } from '../events/index.js'; import type { AgentResult, @@ -45,9 +44,6 @@ import { createModuleLogger } from '../logger/index.js'; const log = createModuleLogger('output-handler'); -/** Max number of output chunks to buffer per agent */ -const MAX_OUTPUT_BUFFER_SIZE = 1000; - /** * Tracks an active agent with its PID and file tailer. */ @@ -159,7 +155,6 @@ export class OutputHandler { agentId: string, event: StreamEvent, active: ActiveAgent | undefined, - outputBuffers: Map, ): void { switch (event.type) { case 'init': @@ -172,15 +167,7 @@ export class OutputHandler { break; case 'text_delta': - this.pushToOutputBuffer(outputBuffers, agentId, event.text); - if (this.eventBus) { - const outputEvent: AgentOutputEvent = { - type: 'agent:output', - timestamp: new Date(), - payload: { agentId, stream: 'stdout', data: event.text }, - }; - this.eventBus.emit(outputEvent); - } + // Text deltas are now streamed via DB log chunks + EventBus in manager.createLogChunkCallback break; case 'tool_use_start': @@ -887,30 +874,6 @@ export class OutputHandler { return agent?.pendingQuestions ? JSON.parse(agent.pendingQuestions) : null; } - // ========================================================================= - // Output Buffer Management - // ========================================================================= - - pushToOutputBuffer(buffers: Map, agentId: string, chunk: string): void { - let buffer = buffers.get(agentId); - if (!buffer) { - buffer = []; - buffers.set(agentId, buffer); - } - buffer.push(chunk); - while (buffer.length > MAX_OUTPUT_BUFFER_SIZE) { - buffer.shift(); - } - } - - clearOutputBuffer(buffers: Map, agentId: string): void { - buffers.delete(agentId); - } - - getOutputBufferCopy(buffers: Map, agentId: string): string[] { - return [...(buffers.get(agentId) ?? [])]; - } - // ========================================================================= // Private Helpers // ========================================================================= diff --git a/src/agent/process-manager.test.ts b/src/agent/process-manager.test.ts index ba8e826..5a3c681 100644 --- a/src/agent/process-manager.test.ts +++ b/src/agent/process-manager.test.ts @@ -8,7 +8,6 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import { ProcessManager } from './process-manager.js'; import type { ProjectRepository } from '../db/repositories/project-repository.js'; -import type { EventBus } from '../events/index.js'; // Mock child_process.spawn vi.mock('node:child_process', () => ({ @@ -70,7 +69,6 @@ const mockCloseSync = vi.mocked(closeSync); describe('ProcessManager', () => { let processManager: ProcessManager; let mockProjectRepository: ProjectRepository; - let mockEventBus: EventBus; const workspaceRoot = '/test/workspace'; @@ -100,15 +98,7 @@ describe('ProcessManager', () => { removeProjectFromInitiative: vi.fn(), }; - // Mock event bus - mockEventBus = { - emit: vi.fn(), - on: vi.fn(), - off: vi.fn(), - once: vi.fn(), - }; - - processManager = new ProcessManager(workspaceRoot, mockProjectRepository, mockEventBus); + processManager = new ProcessManager(workspaceRoot, mockProjectRepository); }); afterEach(() => { diff --git a/src/agent/process-manager.ts b/src/agent/process-manager.ts index e0b0707..5117a02 100644 --- a/src/agent/process-manager.ts +++ b/src/agent/process-manager.ts @@ -10,7 +10,6 @@ import { spawn } from 'node:child_process'; import { writeFileSync, mkdirSync, openSync, closeSync, existsSync } from 'node:fs'; import { join } from 'node:path'; import type { ProjectRepository } from '../db/repositories/project-repository.js'; -import type { EventBus } from '../events/index.js'; import type { AgentProviderConfig } from './providers/types.js'; import type { StreamEvent } from './providers/parsers/index.js'; import { getStreamParser } from './providers/parsers/index.js'; @@ -37,7 +36,6 @@ export class ProcessManager { constructor( private workspaceRoot: string, private projectRepository: ProjectRepository, - private eventBus?: EventBus, ) {} /** @@ -312,7 +310,6 @@ export class ProcessManager { filePath: outputFilePath, agentId, parser, - eventBus: this.eventBus, onEvent: onEvent ?? (() => {}), startFromBeginning: true, onRawContent, diff --git a/src/agent/types.ts b/src/agent/types.ts index 70aeb07..eff15cc 100644 --- a/src/agent/types.ts +++ b/src/agent/types.ts @@ -214,17 +214,6 @@ export interface AgentManager { */ getPendingQuestions(agentId: string): Promise; - /** - * Get the buffered output for an agent. - * - * Returns recent output chunks from the agent's stdout stream. - * Buffer is limited to MAX_OUTPUT_BUFFER_SIZE chunks (ring buffer). - * - * @param agentId - Agent ID - * @returns Array of output chunks (newest last) - */ - getOutputBuffer(agentId: string): string[]; - /** * Delete an agent and clean up all associated resources. * diff --git a/src/dispatch/manager.test.ts b/src/dispatch/manager.test.ts index ffb6b38..2823d99 100644 --- a/src/dispatch/manager.test.ts +++ b/src/dispatch/manager.test.ts @@ -80,7 +80,6 @@ function createMockAgentManager( resume: vi.fn().mockResolvedValue(undefined), getResult: vi.fn().mockResolvedValue(null), getPendingQuestions: vi.fn().mockResolvedValue(null), - getOutputBuffer: vi.fn().mockReturnValue([]), }; } diff --git a/src/test/integration/real-providers/claude-manager.test.ts b/src/test/integration/real-providers/claude-manager.test.ts index 9ae1bb4..aac987c 100644 --- a/src/test/integration/real-providers/claude-manager.test.ts +++ b/src/test/integration/real-providers/claude-manager.test.ts @@ -64,18 +64,12 @@ describeRealClaude('Real Claude Manager Integration', () => { // Wait for completion const result = await harness.waitForAgentCompletion(agent.id, REAL_TEST_TIMEOUT); - // Verify we got output (either via events or buffer) + // Verify we got output events const outputEvents = harness.getEventsByType('agent:output'); - const outputBuffer = harness.agentManager.getOutputBuffer(agent.id); - - // At least one should have content (output may be emitted as event or buffered) - const hasOutput = outputEvents.length > 0 || outputBuffer.length > 0; console.log(' Output events:', outputEvents.length); - console.log(' Output buffer:', outputBuffer.length); // Verify completion expect(result).toBeTruthy(); - console.log(' Output chunks:', outputBuffer.length); console.log(' Result:', result?.message); }, REAL_TEST_TIMEOUT diff --git a/src/test/integration/real-providers/codex-manager.test.ts b/src/test/integration/real-providers/codex-manager.test.ts index 622b84a..59030de 100644 --- a/src/test/integration/real-providers/codex-manager.test.ts +++ b/src/test/integration/real-providers/codex-manager.test.ts @@ -105,10 +105,6 @@ describeRealCodex('Real Codex Manager Integration', () => { const outputEvents = harness.getEventsByType('agent:output'); console.log(' Output events:', outputEvents.length); - // Verify output buffer - const outputBuffer = harness.agentManager.getOutputBuffer(agent.id); - console.log(' Output buffer chunks:', outputBuffer.length); - // For generic provider, result should be captured const dbAgent = await harness.agentRepository.findById(agent.id); console.log(' Status:', dbAgent?.status); diff --git a/src/trpc/routers/agent.ts b/src/trpc/routers/agent.ts index 6de93b1..5be795d 100644 --- a/src/trpc/routers/agent.ts +++ b/src/trpc/routers/agent.ts @@ -4,8 +4,6 @@ import { TRPCError } from '@trpc/server'; import { z } from 'zod'; -import { join } from 'node:path'; -import { readFile } from 'node:fs/promises'; import { tracked, type TrackedEnvelope } from '@trpc/server'; import type { ProcedureBuilder } from '../trpc.js'; import type { TRPCContext } from '../context.js'; @@ -191,24 +189,7 @@ export function agentProcedures(publicProcedure: ProcedureBuilder) { const logChunkRepo = requireLogChunkRepository(ctx); const chunks = await logChunkRepo.findByAgentId(agent.id); - if (chunks.length > 0) { - return chunks.map(c => c.content).join(''); - } - - // Fallback to file for agents that predate DB persistence - const workspaceRoot = ctx.workspaceRoot; - if (!workspaceRoot) { - return ''; - } - - const outputFilePath = join(workspaceRoot, '.cw', 'agent-logs', agent.name, 'output.jsonl'); - - try { - const content = await readFile(outputFilePath, 'utf-8'); - return content; - } catch { - return ''; - } + return chunks.map(c => c.content).join(''); }), onAgentOutput: publicProcedure @@ -216,16 +197,9 @@ export function agentProcedures(publicProcedure: ProcedureBuilder) { .subscription(async function* (opts): AsyncGenerator> { const { agentId } = opts.input; const signal = opts.signal ?? new AbortController().signal; - const agentManager = requireAgentManager(opts.ctx); const eventBus = opts.ctx.eventBus; - const buffer = agentManager.getOutputBuffer(agentId); let eventCounter = 0; - for (const chunk of buffer) { - const id = `${agentId}-buf-${eventCounter++}`; - yield tracked(id, { agentId, data: chunk }); - } - const queue: string[] = []; let resolve: (() => void) | null = null;