refactor: DB-driven agent output events with single emission point

DB log chunk insertion is now the sole trigger for agent:output events.
Eliminates triple emission (FileTailer, handleStreamEvent, output buffer)
in favor of: FileTailer.onRawContent → DB insert → EventBus emit.

- createLogChunkCallback emits agent:output after successful DB insert
- spawnInternal now wires onRawContent callback (fixes session 1 gap)
- Remove eventBus from FileTailer (no longer touches EventBus)
- Remove eventBus from ProcessManager constructor (dead parameter)
- Remove agent:output emission from handleStreamEvent text_delta
- Remove outputBuffers map and all buffer helpers from manager/handler
- Remove getOutputBuffer from AgentManager interface and implementations
- getAgentOutput tRPC: DB-only, no file fallback
- onAgentOutput subscription: no initial buffer yield, events only
- AgentOutputViewer: accumulates raw JSONL chunks, parses uniformly
This commit is contained in:
Lukas May
2026-02-10 11:47:36 +01:00
parent 771cd71c1e
commit 06f443ebc8
15 changed files with 50 additions and 165 deletions

View File

@@ -10,7 +10,7 @@
| `manager.ts` | `MultiProviderAgentManager` — main orchestrator class |
| `process-manager.ts` | `AgentProcessManager` — worktree creation, command building, detached spawn |
| `output-handler.ts` | `OutputHandler` — JSONL stream parsing, completion detection, proposal creation, task dedup |
| `file-tailer.ts` | `FileTailer` — watches output files, emits line events |
| `file-tailer.ts` | `FileTailer` — watches output files, fires parser + raw content callbacks |
| `file-io.ts` | Input/output file I/O: frontmatter writing, signal.json reading, tiptap conversion |
| `markdown-to-tiptap.ts` | Markdown to Tiptap JSON conversion using MarkdownManager |
| `index.ts` | Public exports, `ClaudeAgentManager` deprecated alias |
@@ -36,14 +36,15 @@
4. `file-io.writeInputFiles()` — writes `.cw/input/` with assignment files (initiative, pages, phase, task) and read-only context dirs (`context/phases/`, `context/tasks/`)
5. Provider config builds spawn command via `buildSpawnCommand()`
6. `spawnDetached()` — launches detached child process with file output redirection
7. `FileTailer` watches output file, fires `onEvent` and `onRawContent` callbacks
8. `OutputHandler.handleStreamEvent()` processes each JSONL line
9. DB record updated with PID, output file path, session ID
10. `agent:spawned` event emitted
7. `FileTailer` watches output file, fires `onEvent` (parsed stream events) and `onRawContent` (raw JSONL chunks) callbacks
8. `onRawContent` → DB insert via `createLogChunkCallback()``agent:output` event emitted (single emission point)
9. `OutputHandler.handleStreamEvent()` processes parsed events (session tracking, result capture — no event emission)
10. DB record updated with PID, output file path, session ID
11. `agent:spawned` event emitted
### Completion Detection
1. `FileTailer` detects process exit
1. Polling detects process exit, `FileTailer.stop()` flushes remaining output
2. `OutputHandler.handleCompletion()` triggered
3. **Primary path**: Reads `.cw/output/signal.json` from agent worktree
4. Signal contains `{ status: "done"|"questions"|"error", result?, questions?, error? }`
@@ -87,11 +88,13 @@ Each provider config specifies: `command`, `args`, `resumeStyle`, `promptMode`,
The `OutputHandler` processes JSONL streams from Claude CLI:
- `text_delta` eventsaccumulated as text output, emitted via `agent:output`
- `init` event → session ID extracted
- `result` event → final result with structured data
- `init` event → session ID extracted and persisted
- `text_delta` eventsno-op in handler (output streaming handled by DB log chunks)
- `result` event → final result with structured data captured on `ActiveAgent`
- Signal file (`signal.json`) → authoritative completion status
**Output event flow**: `FileTailer.onRawContent()` → DB `insertChunk()``EventBus.emit('agent:output')`. This is the single emission point — no events from `handleStreamEvent()` or `processLine()`.
For providers without structured output, the generic line parser accumulates raw text.
## Credential Management
@@ -113,8 +116,11 @@ Stored as `credentials: {"claudeAiOauth":{"accessToken":"<token>"}}` and `config
## Log Chunks
Agent output is persisted to `agent_log_chunks` table:
- `onRawContent` callback fires for every output chunk
- Fire-and-forget DB insert (no FK to agents — survives deletion)
Agent output is persisted to `agent_log_chunks` table and drives all live streaming:
- `onRawContent` callback fires for every raw JSONL chunk from `FileTailer`
- DB insert → `agent:output` event emission (single source of truth for UI)
- No FK to agents — survives agent deletion
- Session tracking: spawn=1, resume=previousMax+1
- Read path concatenates all sessions for full output history
- Read path (`getAgentOutput` tRPC): concatenates all DB chunks (no file fallback)
- Live path (`onAgentOutput` subscription): listens for `agent:output` events
- Frontend: initial query loads from DB, subscription accumulates raw JSONL, both parsed via `parseAgentOutput()`

View File

@@ -62,10 +62,10 @@ Each procedure uses `require*Repository(ctx)` helpers that throw `TRPCError(INTE
| getAgent | query | Single agent by name or ID |
| getAgentResult | query | Execution result |
| getAgentQuestions | query | Pending questions |
| getAgentOutput | query | Full output (DB chunks or file fallback) |
| getAgentOutput | query | Full output from DB log chunks |
| getActiveRefineAgent | query | Active refine agent for initiative |
| listWaitingAgents | query | Agents waiting for input |
| onAgentOutput | subscription | Live output stream + buffered history |
| onAgentOutput | subscription | Live raw JSONL output stream via EventBus |
### Tasks
| Procedure | Type | Description |

View File

@@ -21,6 +21,8 @@ export function AgentOutputViewer({ agentId, agentName, status, onStop }: AgentO
const [messages, setMessages] = useState<ParsedMessage[]>([]);
const [follow, setFollow] = useState(true);
const containerRef = useRef<HTMLDivElement>(null);
// Accumulate raw JSONL: initial query data + live subscription chunks
const rawBufferRef = useRef<string>('');
// Load initial/historical output
const outputQuery = trpc.getAgentOutput.useQuery(
@@ -37,8 +39,9 @@ export function AgentOutputViewer({ agentId, agentName, status, onStop }: AgentO
onData: (event: any) => {
// TrackedEnvelope shape: { id, data: { agentId, data: string } }
const raw = event?.data?.data ?? event?.data;
const data = typeof raw === 'string' ? raw : JSON.stringify(raw);
setMessages((prev) => [...prev, { type: 'text', content: data }]);
const chunk = typeof raw === 'string' ? raw : JSON.stringify(raw);
rawBufferRef.current += chunk;
setMessages(parseAgentOutput(rawBufferRef.current));
},
onError: (error) => {
console.error('Agent output subscription error:', error);
@@ -51,12 +54,14 @@ export function AgentOutputViewer({ agentId, agentName, status, onStop }: AgentO
// Set initial output when query loads
useEffect(() => {
if (outputQuery.data) {
rawBufferRef.current = outputQuery.data;
setMessages(parseAgentOutput(outputQuery.data));
}
}, [outputQuery.data]);
// Reset output when agent changes
useEffect(() => {
rawBufferRef.current = '';
setMessages([]);
setFollow(true);
}, [agentId]);

View File

@@ -355,7 +355,6 @@ export class CleanupManager {
filePath: agent.outputFilePath,
agentId: agent.id,
parser,
eventBus: this.eventBus,
onEvent: (event) => onStreamEvent(agent.id, event),
startFromBeginning: false,
onRawContent: onRawContent

View File

@@ -13,7 +13,6 @@ import { watch, type FSWatcher } from 'node:fs';
import { open, stat } from 'node:fs/promises';
import type { FileHandle } from 'node:fs/promises';
import type { StreamParser, StreamEvent } from './providers/stream-types.js';
import type { EventBus, AgentOutputEvent } from '../events/index.js';
import { createModuleLogger } from '../logger/index.js';
const log = createModuleLogger('file-tailer');
@@ -27,17 +26,15 @@ const READ_BUFFER_SIZE = 64 * 1024;
export interface FileTailerOptions {
/** Path to the output file to watch */
filePath: string;
/** Agent ID for event emission */
/** Agent ID for logging */
agentId: string;
/** Parser to convert lines to stream events */
parser: StreamParser;
/** Optional event bus for emitting agent:output events */
eventBus?: EventBus;
/** Optional callback for each stream event */
onEvent?: (event: StreamEvent) => void;
/** If true, read from beginning of file; otherwise tail only new content (default: false) */
startFromBeginning?: boolean;
/** Callback for raw file content chunks (for DB persistence) */
/** Callback for raw file content chunks (for DB persistence + event emission) */
onRawContent?: (content: string) => void;
}
@@ -63,7 +60,6 @@ export class FileTailer {
private readonly filePath: string;
private readonly agentId: string;
private readonly parser: StreamParser;
private readonly eventBus?: EventBus;
private readonly onEvent?: (event: StreamEvent) => void;
private readonly startFromBeginning: boolean;
private readonly onRawContent?: (content: string) => void;
@@ -72,7 +68,6 @@ export class FileTailer {
this.filePath = options.filePath;
this.agentId = options.agentId;
this.parser = options.parser;
this.eventBus = options.eventBus;
this.onEvent = options.onEvent;
this.startFromBeginning = options.startFromBeginning ?? false;
this.onRawContent = options.onRawContent;
@@ -193,24 +188,9 @@ export class FileTailer {
const events = this.parser.parseLine(line);
for (const event of events) {
// Call user callback if provided
if (this.onEvent) {
this.onEvent(event);
}
// Emit agent:output for text_delta events
if (event.type === 'text_delta' && this.eventBus) {
const outputEvent: AgentOutputEvent = {
type: 'agent:output',
timestamp: new Date(),
payload: {
agentId: this.agentId,
stream: 'stdout',
data: event.text,
},
};
this.eventBus.emit(outputEvent);
}
}
}

View File

@@ -59,7 +59,6 @@ export class MultiProviderAgentManager implements AgentManager {
private static readonly MAX_COMMIT_RETRIES = 1;
private activeAgents: Map<string, ActiveAgent> = new Map();
private outputBuffers: Map<string, string[]> = new Map();
private commitRetryCount: Map<string, number> = new Map();
private processManager: ProcessManager;
private credentialHandler: CredentialHandler;
@@ -83,7 +82,7 @@ export class MultiProviderAgentManager implements AgentManager {
private debug: boolean = false,
) {
this.signalManager = new FileSystemSignalManager();
this.processManager = new ProcessManager(workspaceRoot, projectRepository, eventBus);
this.processManager = new ProcessManager(workspaceRoot, projectRepository);
this.credentialHandler = new CredentialHandler(workspaceRoot, accountRepository, credentialManager);
this.outputHandler = new OutputHandler(repository, eventBus, changeSetRepository, phaseRepository, taskRepository, pageRepository, this.signalManager);
this.cleanupManager = new CleanupManager(workspaceRoot, repository, projectRepository, eventBus, debug, this.signalManager);
@@ -105,13 +104,12 @@ export class MultiProviderAgentManager implements AgentManager {
/**
* Centralized cleanup of all in-memory state for an agent.
* Cancels polling timer, removes from activeAgents, outputBuffers, and commitRetryCount.
* Cancels polling timer, removes from activeAgents and commitRetryCount.
*/
private cleanupAgentState(agentId: string): void {
const active = this.activeAgents.get(agentId);
if (active?.cancelPoll) active.cancelPoll();
this.activeAgents.delete(agentId);
this.outputBuffers.delete(agentId);
this.commitRetryCount.delete(agentId);
}
@@ -129,6 +127,15 @@ export class MultiProviderAgentManager implements AgentManager {
return (content) => {
repo.insertChunk({ agentId, agentName, sessionNumber, content })
.then(() => {
if (this.eventBus) {
this.eventBus.emit({
type: 'agent:output' as const,
timestamp: new Date(),
payload: { agentId, stream: 'stdout', data: content },
});
}
})
.catch(err => log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to persist log chunk'));
};
}
@@ -301,7 +308,8 @@ export class MultiProviderAgentManager implements AgentManager {
// 6. Spawn detached subprocess
const { pid, outputFilePath, tailer } = this.processManager.spawnDetached(
agentId, alias, command, args, cwd ?? agentCwd, processEnv, providerName, prompt,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
this.createLogChunkCallback(agentId, alias, 1),
);
await this.repository.update(agentId, { pid, outputFilePath });
@@ -452,7 +460,7 @@ export class MultiProviderAgentManager implements AgentManager {
const { pid, outputFilePath, tailer } = this.processManager.spawnDetached(
agentId, agent.name, command, args, agentCwd, processEnv, provider.name, commitPrompt,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
this.createLogChunkCallback(agentId, agent.name, commitSessionNumber),
);
@@ -625,7 +633,7 @@ export class MultiProviderAgentManager implements AgentManager {
const { pid, outputFilePath, tailer } = this.processManager.spawnDetached(
agentId, agent.name, command, args, agentCwd, processEnv, provider.name, prompt,
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
(event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
this.createLogChunkCallback(agentId, agent.name, resumeSessionNumber),
);
@@ -666,13 +674,6 @@ export class MultiProviderAgentManager implements AgentManager {
return this.outputHandler.getPendingQuestions(agentId, this.activeAgents.get(agentId));
}
/**
* Get the buffered output for an agent.
*/
getOutputBuffer(agentId: string): string[] {
return this.outputHandler.getOutputBufferCopy(this.outputBuffers, agentId);
}
/**
* Delete an agent and clean up all associated resources.
*/
@@ -759,7 +760,7 @@ export class MultiProviderAgentManager implements AgentManager {
const reconcileLogChunkRepo = this.logChunkRepository;
await this.cleanupManager.reconcileAfterRestart(
this.activeAgents,
(agentId, event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId), this.outputBuffers),
(agentId, event) => this.outputHandler.handleStreamEvent(agentId, event, this.activeAgents.get(agentId)),
(agentId, rawOutput, provider) => this.outputHandler.processAgentOutput(agentId, rawOutput, provider, (alias) => this.processManager.getAgentWorkdir(alias)),
(agentId, pid) => {
const { cancel } = this.processManager.pollForCompletion(

View File

@@ -457,14 +457,6 @@ export class MockAgentManager implements AgentManager {
return record?.pendingQuestions ?? null;
}
/**
* Get the buffered output for an agent.
* Mock implementation returns empty array.
*/
getOutputBuffer(_agentId: string): string[] {
return [];
}
/**
* Dismiss an agent.
* Mock implementation just marks the agent as dismissed.

View File

@@ -19,7 +19,6 @@ import type {
AgentStoppedEvent,
AgentCrashedEvent,
AgentWaitingEvent,
AgentOutputEvent,
} from '../events/index.js';
import type {
AgentResult,
@@ -45,9 +44,6 @@ import { createModuleLogger } from '../logger/index.js';
const log = createModuleLogger('output-handler');
/** Max number of output chunks to buffer per agent */
const MAX_OUTPUT_BUFFER_SIZE = 1000;
/**
* Tracks an active agent with its PID and file tailer.
*/
@@ -159,7 +155,6 @@ export class OutputHandler {
agentId: string,
event: StreamEvent,
active: ActiveAgent | undefined,
outputBuffers: Map<string, string[]>,
): void {
switch (event.type) {
case 'init':
@@ -172,15 +167,7 @@ export class OutputHandler {
break;
case 'text_delta':
this.pushToOutputBuffer(outputBuffers, agentId, event.text);
if (this.eventBus) {
const outputEvent: AgentOutputEvent = {
type: 'agent:output',
timestamp: new Date(),
payload: { agentId, stream: 'stdout', data: event.text },
};
this.eventBus.emit(outputEvent);
}
// Text deltas are now streamed via DB log chunks + EventBus in manager.createLogChunkCallback
break;
case 'tool_use_start':
@@ -887,30 +874,6 @@ export class OutputHandler {
return agent?.pendingQuestions ? JSON.parse(agent.pendingQuestions) : null;
}
// =========================================================================
// Output Buffer Management
// =========================================================================
pushToOutputBuffer(buffers: Map<string, string[]>, agentId: string, chunk: string): void {
let buffer = buffers.get(agentId);
if (!buffer) {
buffer = [];
buffers.set(agentId, buffer);
}
buffer.push(chunk);
while (buffer.length > MAX_OUTPUT_BUFFER_SIZE) {
buffer.shift();
}
}
clearOutputBuffer(buffers: Map<string, string[]>, agentId: string): void {
buffers.delete(agentId);
}
getOutputBufferCopy(buffers: Map<string, string[]>, agentId: string): string[] {
return [...(buffers.get(agentId) ?? [])];
}
// =========================================================================
// Private Helpers
// =========================================================================

View File

@@ -8,7 +8,6 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { ProcessManager } from './process-manager.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { EventBus } from '../events/index.js';
// Mock child_process.spawn
vi.mock('node:child_process', () => ({
@@ -70,7 +69,6 @@ const mockCloseSync = vi.mocked(closeSync);
describe('ProcessManager', () => {
let processManager: ProcessManager;
let mockProjectRepository: ProjectRepository;
let mockEventBus: EventBus;
const workspaceRoot = '/test/workspace';
@@ -100,15 +98,7 @@ describe('ProcessManager', () => {
removeProjectFromInitiative: vi.fn(),
};
// Mock event bus
mockEventBus = {
emit: vi.fn(),
on: vi.fn(),
off: vi.fn(),
once: vi.fn(),
};
processManager = new ProcessManager(workspaceRoot, mockProjectRepository, mockEventBus);
processManager = new ProcessManager(workspaceRoot, mockProjectRepository);
});
afterEach(() => {

View File

@@ -10,7 +10,6 @@ import { spawn } from 'node:child_process';
import { writeFileSync, mkdirSync, openSync, closeSync, existsSync } from 'node:fs';
import { join } from 'node:path';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { EventBus } from '../events/index.js';
import type { AgentProviderConfig } from './providers/types.js';
import type { StreamEvent } from './providers/parsers/index.js';
import { getStreamParser } from './providers/parsers/index.js';
@@ -37,7 +36,6 @@ export class ProcessManager {
constructor(
private workspaceRoot: string,
private projectRepository: ProjectRepository,
private eventBus?: EventBus,
) {}
/**
@@ -312,7 +310,6 @@ export class ProcessManager {
filePath: outputFilePath,
agentId,
parser,
eventBus: this.eventBus,
onEvent: onEvent ?? (() => {}),
startFromBeginning: true,
onRawContent,

View File

@@ -214,17 +214,6 @@ export interface AgentManager {
*/
getPendingQuestions(agentId: string): Promise<PendingQuestions | null>;
/**
* Get the buffered output for an agent.
*
* Returns recent output chunks from the agent's stdout stream.
* Buffer is limited to MAX_OUTPUT_BUFFER_SIZE chunks (ring buffer).
*
* @param agentId - Agent ID
* @returns Array of output chunks (newest last)
*/
getOutputBuffer(agentId: string): string[];
/**
* Delete an agent and clean up all associated resources.
*

View File

@@ -80,7 +80,6 @@ function createMockAgentManager(
resume: vi.fn().mockResolvedValue(undefined),
getResult: vi.fn().mockResolvedValue(null),
getPendingQuestions: vi.fn().mockResolvedValue(null),
getOutputBuffer: vi.fn().mockReturnValue([]),
};
}

View File

@@ -64,18 +64,12 @@ describeRealClaude('Real Claude Manager Integration', () => {
// Wait for completion
const result = await harness.waitForAgentCompletion(agent.id, REAL_TEST_TIMEOUT);
// Verify we got output (either via events or buffer)
// Verify we got output events
const outputEvents = harness.getEventsByType<AgentOutputEvent>('agent:output');
const outputBuffer = harness.agentManager.getOutputBuffer(agent.id);
// At least one should have content (output may be emitted as event or buffered)
const hasOutput = outputEvents.length > 0 || outputBuffer.length > 0;
console.log(' Output events:', outputEvents.length);
console.log(' Output buffer:', outputBuffer.length);
// Verify completion
expect(result).toBeTruthy();
console.log(' Output chunks:', outputBuffer.length);
console.log(' Result:', result?.message);
},
REAL_TEST_TIMEOUT

View File

@@ -105,10 +105,6 @@ describeRealCodex('Real Codex Manager Integration', () => {
const outputEvents = harness.getEventsByType<AgentOutputEvent>('agent:output');
console.log(' Output events:', outputEvents.length);
// Verify output buffer
const outputBuffer = harness.agentManager.getOutputBuffer(agent.id);
console.log(' Output buffer chunks:', outputBuffer.length);
// For generic provider, result should be captured
const dbAgent = await harness.agentRepository.findById(agent.id);
console.log(' Status:', dbAgent?.status);

View File

@@ -4,8 +4,6 @@
import { TRPCError } from '@trpc/server';
import { z } from 'zod';
import { join } from 'node:path';
import { readFile } from 'node:fs/promises';
import { tracked, type TrackedEnvelope } from '@trpc/server';
import type { ProcedureBuilder } from '../trpc.js';
import type { TRPCContext } from '../context.js';
@@ -191,24 +189,7 @@ export function agentProcedures(publicProcedure: ProcedureBuilder) {
const logChunkRepo = requireLogChunkRepository(ctx);
const chunks = await logChunkRepo.findByAgentId(agent.id);
if (chunks.length > 0) {
return chunks.map(c => c.content).join('');
}
// Fallback to file for agents that predate DB persistence
const workspaceRoot = ctx.workspaceRoot;
if (!workspaceRoot) {
return '';
}
const outputFilePath = join(workspaceRoot, '.cw', 'agent-logs', agent.name, 'output.jsonl');
try {
const content = await readFile(outputFilePath, 'utf-8');
return content;
} catch {
return '';
}
return chunks.map(c => c.content).join('');
}),
onAgentOutput: publicProcedure
@@ -216,16 +197,9 @@ export function agentProcedures(publicProcedure: ProcedureBuilder) {
.subscription(async function* (opts): AsyncGenerator<TrackedEnvelope<{ agentId: string; data: string }>> {
const { agentId } = opts.input;
const signal = opts.signal ?? new AbortController().signal;
const agentManager = requireAgentManager(opts.ctx);
const eventBus = opts.ctx.eventBus;
const buffer = agentManager.getOutputBuffer(agentId);
let eventCounter = 0;
for (const chunk of buffer) {
const id = `${agentId}-buf-${eventCounter++}`;
yield tracked(id, { agentId, data: chunk });
}
const queue: string[] = [];
let resolve: (() => void) | null = null;