feat(20): add SSE streaming support and subscription procedures

Fix tRPC HTTP adapter to stream ReadableStream responses instead of
buffering (required for SSE). Create subscriptions module that bridges
EventBus domain events into tRPC async generator subscriptions using a
queue-based pattern. Add three subscription procedures: onEvent (all
events), onAgentUpdate (agent lifecycle), onTaskUpdate (task/phase).
This commit is contained in:
Lukas May
2026-02-04 22:16:14 +01:00
parent 4b25ba0ab2
commit e5d8dbb583
3 changed files with 225 additions and 4 deletions

View File

@@ -122,13 +122,28 @@ export function createTrpcHandler(options: TrpcAdapterOptions) {
// Send response
res.statusCode = fetchResponse.status;
// Set response headers
// Set response headers BEFORE streaming body
fetchResponse.headers.forEach((value, key) => {
res.setHeader(key, value);
});
// Send body
const responseBody = await fetchResponse.text();
res.end(responseBody);
// Stream body if it's a ReadableStream (SSE subscriptions), otherwise buffer
if (fetchResponse.body) {
const reader = fetchResponse.body.getReader();
const pump = async () => {
while (true) {
const { done, value } = await reader.read();
if (done) {
res.end();
return;
}
res.write(value);
}
};
pump().catch(() => res.end());
} else {
const responseBody = await fetchResponse.text();
res.end(responseBody);
}
};
}

View File

@@ -18,6 +18,12 @@ import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js
import type { CoordinationManager } from '../coordination/types.js';
import type { Phase, Task } from '../db/schema.js';
import { buildDiscussPrompt, buildBreakdownPrompt, buildDecomposePrompt } from '../agent/prompts.js';
import {
eventBusIterable,
ALL_EVENT_TYPES,
AGENT_EVENT_TYPES,
TASK_EVENT_TYPES,
} from './subscriptions.js';
/**
* Initialize tRPC with our context type.
@@ -1218,6 +1224,44 @@ export const appRouter = router({
mode: 'decompose',
});
}),
// ===========================================================================
// Subscription Procedures (SSE)
// ===========================================================================
/**
* Subscribe to ALL domain events.
* General-purpose firehose for the frontend.
* Yields tracked events for client-side reconnection support.
*/
onEvent: publicProcedure
.input(z.object({ lastEventId: z.string().nullish() }).optional())
.subscription(async function* (opts) {
const signal = opts.signal ?? new AbortController().signal;
yield* eventBusIterable(opts.ctx.eventBus, ALL_EVENT_TYPES, signal);
}),
/**
* Subscribe to agent-specific events only.
* Targeted stream for the inbox page (agent:spawned, agent:stopped, etc.).
*/
onAgentUpdate: publicProcedure
.input(z.object({ lastEventId: z.string().nullish() }).optional())
.subscription(async function* (opts) {
const signal = opts.signal ?? new AbortController().signal;
yield* eventBusIterable(opts.ctx.eventBus, AGENT_EVENT_TYPES, signal);
}),
/**
* Subscribe to task and phase events.
* For the initiative detail page (task:queued, phase:started, etc.).
*/
onTaskUpdate: publicProcedure
.input(z.object({ lastEventId: z.string().nullish() }).optional())
.subscription(async function* (opts) {
const signal = opts.signal ?? new AbortController().signal;
yield* eventBusIterable(opts.ctx.eventBus, TASK_EVENT_TYPES, signal);
}),
});
/**

162
src/trpc/subscriptions.ts Normal file
View File

@@ -0,0 +1,162 @@
/**
* tRPC Subscription Helpers
*
* Bridges EventBus domain events into tRPC async generator subscriptions.
* Uses a queue-based approach: EventBus handlers push events into a queue,
* the async generator yields from the queue, and waits on a promise when empty.
*/
import { tracked, type TrackedEnvelope } from '@trpc/server';
import type { EventBus, DomainEvent, DomainEventType } from '../events/types.js';
/**
* Shape of events yielded by subscription procedures.
*/
export interface SubscriptionEventData {
id: string;
type: string;
payload: unknown;
timestamp: string;
}
/**
* All domain event types in the system.
*/
export const ALL_EVENT_TYPES: DomainEventType[] = [
'process:spawned',
'process:stopped',
'process:crashed',
'server:started',
'server:stopped',
'log:entry',
'worktree:created',
'worktree:removed',
'worktree:merged',
'worktree:conflict',
'agent:spawned',
'agent:stopped',
'agent:crashed',
'agent:resumed',
'agent:waiting',
'task:queued',
'task:dispatched',
'task:completed',
'task:blocked',
'phase:queued',
'phase:started',
'phase:completed',
'phase:blocked',
'merge:queued',
'merge:started',
'merge:completed',
'merge:conflicted',
];
/**
* Agent-specific event types.
*/
export const AGENT_EVENT_TYPES: DomainEventType[] = [
'agent:spawned',
'agent:stopped',
'agent:crashed',
'agent:resumed',
'agent:waiting',
];
/**
* Task and phase event types.
*/
export const TASK_EVENT_TYPES: DomainEventType[] = [
'task:queued',
'task:dispatched',
'task:completed',
'task:blocked',
'phase:queued',
'phase:started',
'phase:completed',
'phase:blocked',
];
/** Counter for generating unique event IDs */
let eventCounter = 0;
/**
* Creates an async generator that bridges EventBus events into a pull-based stream.
*
* Uses a queue + deferred promise pattern:
* - EventBus handlers push events into a queue array
* - The async generator yields from the queue
* - When the queue is empty, it waits on a promise that resolves when the next event arrives
* - Cleans up handlers on AbortSignal abort
*
* Each yielded event is wrapped with `tracked(id, data)` to enable client-side reconnection.
*
* @param eventBus - The EventBus to subscribe to
* @param eventTypes - Array of event type strings to listen for
* @param signal - AbortSignal to cancel the subscription
*/
export async function* eventBusIterable(
eventBus: EventBus,
eventTypes: DomainEventType[],
signal: AbortSignal,
): AsyncGenerator<TrackedEnvelope<SubscriptionEventData>> {
const queue: DomainEvent[] = [];
let resolve: (() => void) | null = null;
// Handler that pushes events into the queue and resolves the waiter
const handler = (event: DomainEvent) => {
queue.push(event);
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
// Subscribe to all requested event types
for (const eventType of eventTypes) {
eventBus.on(eventType as DomainEvent['type'], handler);
}
// Cleanup function
const cleanup = () => {
for (const eventType of eventTypes) {
eventBus.off(eventType as DomainEvent['type'], handler);
}
// Resolve any pending waiter so the generator can exit
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
// Clean up when the client disconnects
signal.addEventListener('abort', cleanup, { once: true });
try {
while (!signal.aborted) {
// Drain the queue
while (queue.length > 0) {
const event = queue.shift()!;
const id = `${Date.now()}-${eventCounter++}`;
const data: SubscriptionEventData = {
id,
type: event.type,
payload: event.payload,
timestamp: event.timestamp.toISOString(),
};
yield tracked(id, data);
}
// Wait for the next event or abort
if (!signal.aborted) {
await new Promise<void>((r) => {
resolve = r;
});
}
}
} finally {
cleanup();
}
}