diff --git a/src/server/trpc-adapter.ts b/src/server/trpc-adapter.ts index bbcaa9f..1d1ee6a 100644 --- a/src/server/trpc-adapter.ts +++ b/src/server/trpc-adapter.ts @@ -122,13 +122,28 @@ export function createTrpcHandler(options: TrpcAdapterOptions) { // Send response res.statusCode = fetchResponse.status; - // Set response headers + // Set response headers BEFORE streaming body fetchResponse.headers.forEach((value, key) => { res.setHeader(key, value); }); - // Send body - const responseBody = await fetchResponse.text(); - res.end(responseBody); + // Stream body if it's a ReadableStream (SSE subscriptions), otherwise buffer + if (fetchResponse.body) { + const reader = fetchResponse.body.getReader(); + const pump = async () => { + while (true) { + const { done, value } = await reader.read(); + if (done) { + res.end(); + return; + } + res.write(value); + } + }; + pump().catch(() => res.end()); + } else { + const responseBody = await fetchResponse.text(); + res.end(responseBody); + } }; } diff --git a/src/trpc/router.ts b/src/trpc/router.ts index 699cfd0..b0135da 100644 --- a/src/trpc/router.ts +++ b/src/trpc/router.ts @@ -18,6 +18,12 @@ import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js import type { CoordinationManager } from '../coordination/types.js'; import type { Phase, Task } from '../db/schema.js'; import { buildDiscussPrompt, buildBreakdownPrompt, buildDecomposePrompt } from '../agent/prompts.js'; +import { + eventBusIterable, + ALL_EVENT_TYPES, + AGENT_EVENT_TYPES, + TASK_EVENT_TYPES, +} from './subscriptions.js'; /** * Initialize tRPC with our context type. @@ -1218,6 +1224,44 @@ export const appRouter = router({ mode: 'decompose', }); }), + + // =========================================================================== + // Subscription Procedures (SSE) + // =========================================================================== + + /** + * Subscribe to ALL domain events. + * General-purpose firehose for the frontend. + * Yields tracked events for client-side reconnection support. + */ + onEvent: publicProcedure + .input(z.object({ lastEventId: z.string().nullish() }).optional()) + .subscription(async function* (opts) { + const signal = opts.signal ?? new AbortController().signal; + yield* eventBusIterable(opts.ctx.eventBus, ALL_EVENT_TYPES, signal); + }), + + /** + * Subscribe to agent-specific events only. + * Targeted stream for the inbox page (agent:spawned, agent:stopped, etc.). + */ + onAgentUpdate: publicProcedure + .input(z.object({ lastEventId: z.string().nullish() }).optional()) + .subscription(async function* (opts) { + const signal = opts.signal ?? new AbortController().signal; + yield* eventBusIterable(opts.ctx.eventBus, AGENT_EVENT_TYPES, signal); + }), + + /** + * Subscribe to task and phase events. + * For the initiative detail page (task:queued, phase:started, etc.). + */ + onTaskUpdate: publicProcedure + .input(z.object({ lastEventId: z.string().nullish() }).optional()) + .subscription(async function* (opts) { + const signal = opts.signal ?? new AbortController().signal; + yield* eventBusIterable(opts.ctx.eventBus, TASK_EVENT_TYPES, signal); + }), }); /** diff --git a/src/trpc/subscriptions.ts b/src/trpc/subscriptions.ts new file mode 100644 index 0000000..a7ccec4 --- /dev/null +++ b/src/trpc/subscriptions.ts @@ -0,0 +1,162 @@ +/** + * tRPC Subscription Helpers + * + * Bridges EventBus domain events into tRPC async generator subscriptions. + * Uses a queue-based approach: EventBus handlers push events into a queue, + * the async generator yields from the queue, and waits on a promise when empty. + */ + +import { tracked, type TrackedEnvelope } from '@trpc/server'; +import type { EventBus, DomainEvent, DomainEventType } from '../events/types.js'; + +/** + * Shape of events yielded by subscription procedures. + */ +export interface SubscriptionEventData { + id: string; + type: string; + payload: unknown; + timestamp: string; +} + +/** + * All domain event types in the system. + */ +export const ALL_EVENT_TYPES: DomainEventType[] = [ + 'process:spawned', + 'process:stopped', + 'process:crashed', + 'server:started', + 'server:stopped', + 'log:entry', + 'worktree:created', + 'worktree:removed', + 'worktree:merged', + 'worktree:conflict', + 'agent:spawned', + 'agent:stopped', + 'agent:crashed', + 'agent:resumed', + 'agent:waiting', + 'task:queued', + 'task:dispatched', + 'task:completed', + 'task:blocked', + 'phase:queued', + 'phase:started', + 'phase:completed', + 'phase:blocked', + 'merge:queued', + 'merge:started', + 'merge:completed', + 'merge:conflicted', +]; + +/** + * Agent-specific event types. + */ +export const AGENT_EVENT_TYPES: DomainEventType[] = [ + 'agent:spawned', + 'agent:stopped', + 'agent:crashed', + 'agent:resumed', + 'agent:waiting', +]; + +/** + * Task and phase event types. + */ +export const TASK_EVENT_TYPES: DomainEventType[] = [ + 'task:queued', + 'task:dispatched', + 'task:completed', + 'task:blocked', + 'phase:queued', + 'phase:started', + 'phase:completed', + 'phase:blocked', +]; + +/** Counter for generating unique event IDs */ +let eventCounter = 0; + +/** + * Creates an async generator that bridges EventBus events into a pull-based stream. + * + * Uses a queue + deferred promise pattern: + * - EventBus handlers push events into a queue array + * - The async generator yields from the queue + * - When the queue is empty, it waits on a promise that resolves when the next event arrives + * - Cleans up handlers on AbortSignal abort + * + * Each yielded event is wrapped with `tracked(id, data)` to enable client-side reconnection. + * + * @param eventBus - The EventBus to subscribe to + * @param eventTypes - Array of event type strings to listen for + * @param signal - AbortSignal to cancel the subscription + */ +export async function* eventBusIterable( + eventBus: EventBus, + eventTypes: DomainEventType[], + signal: AbortSignal, +): AsyncGenerator> { + const queue: DomainEvent[] = []; + let resolve: (() => void) | null = null; + + // Handler that pushes events into the queue and resolves the waiter + const handler = (event: DomainEvent) => { + queue.push(event); + if (resolve) { + const r = resolve; + resolve = null; + r(); + } + }; + + // Subscribe to all requested event types + for (const eventType of eventTypes) { + eventBus.on(eventType as DomainEvent['type'], handler); + } + + // Cleanup function + const cleanup = () => { + for (const eventType of eventTypes) { + eventBus.off(eventType as DomainEvent['type'], handler); + } + // Resolve any pending waiter so the generator can exit + if (resolve) { + const r = resolve; + resolve = null; + r(); + } + }; + + // Clean up when the client disconnects + signal.addEventListener('abort', cleanup, { once: true }); + + try { + while (!signal.aborted) { + // Drain the queue + while (queue.length > 0) { + const event = queue.shift()!; + const id = `${Date.now()}-${eventCounter++}`; + const data: SubscriptionEventData = { + id, + type: event.type, + payload: event.payload, + timestamp: event.timestamp.toISOString(), + }; + yield tracked(id, data); + } + + // Wait for the next event or abort + if (!signal.aborted) { + await new Promise((r) => { + resolve = r; + }); + } + } + } finally { + cleanup(); + } +}