Files
Codewalkers/src/trpc/subscriptions.ts
Lukas May e5d8dbb583 feat(20): add SSE streaming support and subscription procedures
Fix tRPC HTTP adapter to stream ReadableStream responses instead of
buffering (required for SSE). Create subscriptions module that bridges
EventBus domain events into tRPC async generator subscriptions using a
queue-based pattern. Add three subscription procedures: onEvent (all
events), onAgentUpdate (agent lifecycle), onTaskUpdate (task/phase).
2026-02-04 22:16:14 +01:00

163 lines
4.0 KiB
TypeScript

/**
* tRPC Subscription Helpers
*
* Bridges EventBus domain events into tRPC async generator subscriptions.
* Uses a queue-based approach: EventBus handlers push events into a queue,
* the async generator yields from the queue, and waits on a promise when empty.
*/
import { tracked, type TrackedEnvelope } from '@trpc/server';
import type { EventBus, DomainEvent, DomainEventType } from '../events/types.js';
/**
* Shape of events yielded by subscription procedures.
*/
export interface SubscriptionEventData {
id: string;
type: string;
payload: unknown;
timestamp: string;
}
/**
* All domain event types in the system.
*/
export const ALL_EVENT_TYPES: DomainEventType[] = [
'process:spawned',
'process:stopped',
'process:crashed',
'server:started',
'server:stopped',
'log:entry',
'worktree:created',
'worktree:removed',
'worktree:merged',
'worktree:conflict',
'agent:spawned',
'agent:stopped',
'agent:crashed',
'agent:resumed',
'agent:waiting',
'task:queued',
'task:dispatched',
'task:completed',
'task:blocked',
'phase:queued',
'phase:started',
'phase:completed',
'phase:blocked',
'merge:queued',
'merge:started',
'merge:completed',
'merge:conflicted',
];
/**
* Agent-specific event types.
*/
export const AGENT_EVENT_TYPES: DomainEventType[] = [
'agent:spawned',
'agent:stopped',
'agent:crashed',
'agent:resumed',
'agent:waiting',
];
/**
* Task and phase event types.
*/
export const TASK_EVENT_TYPES: DomainEventType[] = [
'task:queued',
'task:dispatched',
'task:completed',
'task:blocked',
'phase:queued',
'phase:started',
'phase:completed',
'phase:blocked',
];
/** Counter for generating unique event IDs */
let eventCounter = 0;
/**
* Creates an async generator that bridges EventBus events into a pull-based stream.
*
* Uses a queue + deferred promise pattern:
* - EventBus handlers push events into a queue array
* - The async generator yields from the queue
* - When the queue is empty, it waits on a promise that resolves when the next event arrives
* - Cleans up handlers on AbortSignal abort
*
* Each yielded event is wrapped with `tracked(id, data)` to enable client-side reconnection.
*
* @param eventBus - The EventBus to subscribe to
* @param eventTypes - Array of event type strings to listen for
* @param signal - AbortSignal to cancel the subscription
*/
export async function* eventBusIterable(
eventBus: EventBus,
eventTypes: DomainEventType[],
signal: AbortSignal,
): AsyncGenerator<TrackedEnvelope<SubscriptionEventData>> {
const queue: DomainEvent[] = [];
let resolve: (() => void) | null = null;
// Handler that pushes events into the queue and resolves the waiter
const handler = (event: DomainEvent) => {
queue.push(event);
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
// Subscribe to all requested event types
for (const eventType of eventTypes) {
eventBus.on(eventType as DomainEvent['type'], handler);
}
// Cleanup function
const cleanup = () => {
for (const eventType of eventTypes) {
eventBus.off(eventType as DomainEvent['type'], handler);
}
// Resolve any pending waiter so the generator can exit
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
// Clean up when the client disconnects
signal.addEventListener('abort', cleanup, { once: true });
try {
while (!signal.aborted) {
// Drain the queue
while (queue.length > 0) {
const event = queue.shift()!;
const id = `${Date.now()}-${eventCounter++}`;
const data: SubscriptionEventData = {
id,
type: event.type,
payload: event.payload,
timestamp: event.timestamp.toISOString(),
};
yield tracked(id, data);
}
// Wait for the next event or abort
if (!signal.aborted) {
await new Promise<void>((r) => {
resolve = r;
});
}
}
} finally {
cleanup();
}
}