From e5d8dbb58386b9fb13a7e2c52d7b03eca4044b71 Mon Sep 17 00:00:00 2001 From: Lukas May Date: Wed, 4 Feb 2026 22:16:14 +0100 Subject: [PATCH] feat(20): add SSE streaming support and subscription procedures Fix tRPC HTTP adapter to stream ReadableStream responses instead of buffering (required for SSE). Create subscriptions module that bridges EventBus domain events into tRPC async generator subscriptions using a queue-based pattern. Add three subscription procedures: onEvent (all events), onAgentUpdate (agent lifecycle), onTaskUpdate (task/phase). --- src/server/trpc-adapter.ts | 23 +++++- src/trpc/router.ts | 44 ++++++++++ src/trpc/subscriptions.ts | 162 +++++++++++++++++++++++++++++++++++++ 3 files changed, 225 insertions(+), 4 deletions(-) create mode 100644 src/trpc/subscriptions.ts diff --git a/src/server/trpc-adapter.ts b/src/server/trpc-adapter.ts index bbcaa9f..1d1ee6a 100644 --- a/src/server/trpc-adapter.ts +++ b/src/server/trpc-adapter.ts @@ -122,13 +122,28 @@ export function createTrpcHandler(options: TrpcAdapterOptions) { // Send response res.statusCode = fetchResponse.status; - // Set response headers + // Set response headers BEFORE streaming body fetchResponse.headers.forEach((value, key) => { res.setHeader(key, value); }); - // Send body - const responseBody = await fetchResponse.text(); - res.end(responseBody); + // Stream body if it's a ReadableStream (SSE subscriptions), otherwise buffer + if (fetchResponse.body) { + const reader = fetchResponse.body.getReader(); + const pump = async () => { + while (true) { + const { done, value } = await reader.read(); + if (done) { + res.end(); + return; + } + res.write(value); + } + }; + pump().catch(() => res.end()); + } else { + const responseBody = await fetchResponse.text(); + res.end(responseBody); + } }; } diff --git a/src/trpc/router.ts b/src/trpc/router.ts index 699cfd0..b0135da 100644 --- a/src/trpc/router.ts +++ b/src/trpc/router.ts @@ -18,6 +18,12 @@ import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js import type { CoordinationManager } from '../coordination/types.js'; import type { Phase, Task } from '../db/schema.js'; import { buildDiscussPrompt, buildBreakdownPrompt, buildDecomposePrompt } from '../agent/prompts.js'; +import { + eventBusIterable, + ALL_EVENT_TYPES, + AGENT_EVENT_TYPES, + TASK_EVENT_TYPES, +} from './subscriptions.js'; /** * Initialize tRPC with our context type. @@ -1218,6 +1224,44 @@ export const appRouter = router({ mode: 'decompose', }); }), + + // =========================================================================== + // Subscription Procedures (SSE) + // =========================================================================== + + /** + * Subscribe to ALL domain events. + * General-purpose firehose for the frontend. + * Yields tracked events for client-side reconnection support. + */ + onEvent: publicProcedure + .input(z.object({ lastEventId: z.string().nullish() }).optional()) + .subscription(async function* (opts) { + const signal = opts.signal ?? new AbortController().signal; + yield* eventBusIterable(opts.ctx.eventBus, ALL_EVENT_TYPES, signal); + }), + + /** + * Subscribe to agent-specific events only. + * Targeted stream for the inbox page (agent:spawned, agent:stopped, etc.). + */ + onAgentUpdate: publicProcedure + .input(z.object({ lastEventId: z.string().nullish() }).optional()) + .subscription(async function* (opts) { + const signal = opts.signal ?? new AbortController().signal; + yield* eventBusIterable(opts.ctx.eventBus, AGENT_EVENT_TYPES, signal); + }), + + /** + * Subscribe to task and phase events. + * For the initiative detail page (task:queued, phase:started, etc.). + */ + onTaskUpdate: publicProcedure + .input(z.object({ lastEventId: z.string().nullish() }).optional()) + .subscription(async function* (opts) { + const signal = opts.signal ?? new AbortController().signal; + yield* eventBusIterable(opts.ctx.eventBus, TASK_EVENT_TYPES, signal); + }), }); /** diff --git a/src/trpc/subscriptions.ts b/src/trpc/subscriptions.ts new file mode 100644 index 0000000..a7ccec4 --- /dev/null +++ b/src/trpc/subscriptions.ts @@ -0,0 +1,162 @@ +/** + * tRPC Subscription Helpers + * + * Bridges EventBus domain events into tRPC async generator subscriptions. + * Uses a queue-based approach: EventBus handlers push events into a queue, + * the async generator yields from the queue, and waits on a promise when empty. + */ + +import { tracked, type TrackedEnvelope } from '@trpc/server'; +import type { EventBus, DomainEvent, DomainEventType } from '../events/types.js'; + +/** + * Shape of events yielded by subscription procedures. + */ +export interface SubscriptionEventData { + id: string; + type: string; + payload: unknown; + timestamp: string; +} + +/** + * All domain event types in the system. + */ +export const ALL_EVENT_TYPES: DomainEventType[] = [ + 'process:spawned', + 'process:stopped', + 'process:crashed', + 'server:started', + 'server:stopped', + 'log:entry', + 'worktree:created', + 'worktree:removed', + 'worktree:merged', + 'worktree:conflict', + 'agent:spawned', + 'agent:stopped', + 'agent:crashed', + 'agent:resumed', + 'agent:waiting', + 'task:queued', + 'task:dispatched', + 'task:completed', + 'task:blocked', + 'phase:queued', + 'phase:started', + 'phase:completed', + 'phase:blocked', + 'merge:queued', + 'merge:started', + 'merge:completed', + 'merge:conflicted', +]; + +/** + * Agent-specific event types. + */ +export const AGENT_EVENT_TYPES: DomainEventType[] = [ + 'agent:spawned', + 'agent:stopped', + 'agent:crashed', + 'agent:resumed', + 'agent:waiting', +]; + +/** + * Task and phase event types. + */ +export const TASK_EVENT_TYPES: DomainEventType[] = [ + 'task:queued', + 'task:dispatched', + 'task:completed', + 'task:blocked', + 'phase:queued', + 'phase:started', + 'phase:completed', + 'phase:blocked', +]; + +/** Counter for generating unique event IDs */ +let eventCounter = 0; + +/** + * Creates an async generator that bridges EventBus events into a pull-based stream. + * + * Uses a queue + deferred promise pattern: + * - EventBus handlers push events into a queue array + * - The async generator yields from the queue + * - When the queue is empty, it waits on a promise that resolves when the next event arrives + * - Cleans up handlers on AbortSignal abort + * + * Each yielded event is wrapped with `tracked(id, data)` to enable client-side reconnection. + * + * @param eventBus - The EventBus to subscribe to + * @param eventTypes - Array of event type strings to listen for + * @param signal - AbortSignal to cancel the subscription + */ +export async function* eventBusIterable( + eventBus: EventBus, + eventTypes: DomainEventType[], + signal: AbortSignal, +): AsyncGenerator> { + const queue: DomainEvent[] = []; + let resolve: (() => void) | null = null; + + // Handler that pushes events into the queue and resolves the waiter + const handler = (event: DomainEvent) => { + queue.push(event); + if (resolve) { + const r = resolve; + resolve = null; + r(); + } + }; + + // Subscribe to all requested event types + for (const eventType of eventTypes) { + eventBus.on(eventType as DomainEvent['type'], handler); + } + + // Cleanup function + const cleanup = () => { + for (const eventType of eventTypes) { + eventBus.off(eventType as DomainEvent['type'], handler); + } + // Resolve any pending waiter so the generator can exit + if (resolve) { + const r = resolve; + resolve = null; + r(); + } + }; + + // Clean up when the client disconnects + signal.addEventListener('abort', cleanup, { once: true }); + + try { + while (!signal.aborted) { + // Drain the queue + while (queue.length > 0) { + const event = queue.shift()!; + const id = `${Date.now()}-${eventCounter++}`; + const data: SubscriptionEventData = { + id, + type: event.type, + payload: event.payload, + timestamp: event.timestamp.toISOString(), + }; + yield tracked(id, data); + } + + // Wait for the next event or abort + if (!signal.aborted) { + await new Promise((r) => { + resolve = r; + }); + } + } + } finally { + cleanup(); + } +}