/** * tRPC Subscription Helpers * * Bridges EventBus domain events into tRPC async generator subscriptions. * Uses a queue-based approach: EventBus handlers push events into a queue, * the async generator yields from the queue, and waits on a promise when empty. */ import { tracked, type TrackedEnvelope } from '@trpc/server'; import type { EventBus, DomainEvent, DomainEventType } from '../events/types.js'; /** * Shape of events yielded by subscription procedures. */ export interface SubscriptionEventData { id: string; type: string; payload: unknown; timestamp: string; } /** * All domain event types in the system. */ export const ALL_EVENT_TYPES: DomainEventType[] = [ 'process:spawned', 'process:stopped', 'process:crashed', 'server:started', 'server:stopped', 'log:entry', 'worktree:created', 'worktree:removed', 'worktree:merged', 'worktree:conflict', 'agent:spawned', 'agent:stopped', 'agent:crashed', 'agent:resumed', 'agent:account_switched', 'agent:deleted', 'agent:waiting', 'agent:output', 'task:queued', 'task:dispatched', 'task:completed', 'task:blocked', 'phase:queued', 'phase:started', 'phase:completed', 'phase:blocked', 'phase:pending_review', 'phase:merged', 'task:merged', 'merge:queued', 'merge:started', 'merge:completed', 'merge:conflicted', 'page:created', 'page:updated', 'page:deleted', 'changeset:created', 'changeset:reverted', 'preview:building', 'preview:ready', 'preview:stopped', 'preview:failed', 'conversation:created', 'conversation:answered', 'chat:message_created', 'chat:session_closed', 'initiative:pending_review', 'initiative:review_approved', ]; /** * Agent-specific event types. */ export const AGENT_EVENT_TYPES: DomainEventType[] = [ 'agent:spawned', 'agent:stopped', 'agent:crashed', 'agent:resumed', 'agent:account_switched', 'agent:deleted', 'agent:waiting', 'agent:output', ]; /** * Task and phase event types. */ export const TASK_EVENT_TYPES: DomainEventType[] = [ 'task:queued', 'task:dispatched', 'task:completed', 'task:blocked', 'task:merged', 'phase:queued', 'phase:started', 'phase:completed', 'phase:blocked', 'phase:pending_review', // Audited 2026-03-05: phase:pending_review is present — gap 3 verified 'phase:merged', 'initiative:pending_review', 'initiative:review_approved', ]; /** * Page event types. */ export const PAGE_EVENT_TYPES: DomainEventType[] = [ 'page:created', 'page:updated', 'page:deleted', ]; /** * Preview deployment event types. */ export const PREVIEW_EVENT_TYPES: DomainEventType[] = [ 'preview:building', 'preview:ready', 'preview:stopped', 'preview:failed', ]; /** * Inter-agent conversation event types. */ export const CONVERSATION_EVENT_TYPES: DomainEventType[] = [ 'conversation:created', 'conversation:answered', ]; /** Counter for generating unique event IDs */ let eventCounter = 0; /** Drop oldest events when the queue exceeds this size */ const MAX_QUEUE_SIZE = 1000; /** Yield a synthetic heartbeat after this many ms of silence */ const HEARTBEAT_INTERVAL_MS = 30_000; /** * Creates an async generator that bridges EventBus events into a pull-based stream. * * Uses a queue + deferred promise pattern: * - EventBus handlers push events into a queue array * - The async generator yields from the queue * - When the queue is empty, it waits on a promise that resolves when the next event arrives * - Cleans up handlers on AbortSignal abort * * Bounded queue: events beyond MAX_QUEUE_SIZE are dropped (oldest first). * Heartbeat: a synthetic `__heartbeat__` event is yielded when no real events * arrive within HEARTBEAT_INTERVAL_MS, allowing clients to detect silent disconnects. * * Each yielded event is wrapped with `tracked(id, data)` to enable client-side reconnection. * * @param eventBus - The EventBus to subscribe to * @param eventTypes - Array of event type strings to listen for * @param signal - AbortSignal to cancel the subscription */ export async function* eventBusIterable( eventBus: EventBus, eventTypes: DomainEventType[], signal: AbortSignal, ): AsyncGenerator> { const queue: DomainEvent[] = []; let resolve: (() => void) | null = null; // Handler that pushes events into the queue and resolves the waiter const handler = (event: DomainEvent) => { queue.push(event); // Drop oldest when queue overflows while (queue.length > MAX_QUEUE_SIZE) { queue.shift(); } if (resolve) { const r = resolve; resolve = null; r(); } }; // Subscribe to all requested event types for (const eventType of eventTypes) { eventBus.on(eventType as DomainEvent['type'], handler); } // Cleanup function const cleanup = () => { for (const eventType of eventTypes) { eventBus.off(eventType as DomainEvent['type'], handler); } // Resolve any pending waiter so the generator can exit if (resolve) { const r = resolve; resolve = null; r(); } }; // Clean up when the client disconnects signal.addEventListener('abort', cleanup, { once: true }); try { while (!signal.aborted) { // Drain the queue while (queue.length > 0) { const event = queue.shift()!; const id = `${Date.now()}-${eventCounter++}`; const data: SubscriptionEventData = { id, type: event.type, payload: event.payload, timestamp: event.timestamp.toISOString(), }; yield tracked(id, data); } // Wait for the next event, abort, or heartbeat timeout if (!signal.aborted) { const gotEvent = await Promise.race([ new Promise((r) => { resolve = () => r(true); }), new Promise((r) => setTimeout(() => r(false), HEARTBEAT_INTERVAL_MS)), ]); if (!gotEvent && !signal.aborted) { // No real event arrived — yield heartbeat const id = `${Date.now()}-${eventCounter++}`; yield tracked(id, { id, type: '__heartbeat__', payload: null, timestamp: new Date().toISOString(), }); } } } } finally { cleanup(); } }