feat(20): add SSE streaming support and subscription procedures
Fix tRPC HTTP adapter to stream ReadableStream responses instead of buffering (required for SSE). Create subscriptions module that bridges EventBus domain events into tRPC async generator subscriptions using a queue-based pattern. Add three subscription procedures: onEvent (all events), onAgentUpdate (agent lifecycle), onTaskUpdate (task/phase).
This commit is contained in:
@@ -122,13 +122,28 @@ export function createTrpcHandler(options: TrpcAdapterOptions) {
|
||||
// Send response
|
||||
res.statusCode = fetchResponse.status;
|
||||
|
||||
// Set response headers
|
||||
// Set response headers BEFORE streaming body
|
||||
fetchResponse.headers.forEach((value, key) => {
|
||||
res.setHeader(key, value);
|
||||
});
|
||||
|
||||
// Send body
|
||||
const responseBody = await fetchResponse.text();
|
||||
res.end(responseBody);
|
||||
// Stream body if it's a ReadableStream (SSE subscriptions), otherwise buffer
|
||||
if (fetchResponse.body) {
|
||||
const reader = fetchResponse.body.getReader();
|
||||
const pump = async () => {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) {
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
res.write(value);
|
||||
}
|
||||
};
|
||||
pump().catch(() => res.end());
|
||||
} else {
|
||||
const responseBody = await fetchResponse.text();
|
||||
res.end(responseBody);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -18,6 +18,12 @@ import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js
|
||||
import type { CoordinationManager } from '../coordination/types.js';
|
||||
import type { Phase, Task } from '../db/schema.js';
|
||||
import { buildDiscussPrompt, buildBreakdownPrompt, buildDecomposePrompt } from '../agent/prompts.js';
|
||||
import {
|
||||
eventBusIterable,
|
||||
ALL_EVENT_TYPES,
|
||||
AGENT_EVENT_TYPES,
|
||||
TASK_EVENT_TYPES,
|
||||
} from './subscriptions.js';
|
||||
|
||||
/**
|
||||
* Initialize tRPC with our context type.
|
||||
@@ -1218,6 +1224,44 @@ export const appRouter = router({
|
||||
mode: 'decompose',
|
||||
});
|
||||
}),
|
||||
|
||||
// ===========================================================================
|
||||
// Subscription Procedures (SSE)
|
||||
// ===========================================================================
|
||||
|
||||
/**
|
||||
* Subscribe to ALL domain events.
|
||||
* General-purpose firehose for the frontend.
|
||||
* Yields tracked events for client-side reconnection support.
|
||||
*/
|
||||
onEvent: publicProcedure
|
||||
.input(z.object({ lastEventId: z.string().nullish() }).optional())
|
||||
.subscription(async function* (opts) {
|
||||
const signal = opts.signal ?? new AbortController().signal;
|
||||
yield* eventBusIterable(opts.ctx.eventBus, ALL_EVENT_TYPES, signal);
|
||||
}),
|
||||
|
||||
/**
|
||||
* Subscribe to agent-specific events only.
|
||||
* Targeted stream for the inbox page (agent:spawned, agent:stopped, etc.).
|
||||
*/
|
||||
onAgentUpdate: publicProcedure
|
||||
.input(z.object({ lastEventId: z.string().nullish() }).optional())
|
||||
.subscription(async function* (opts) {
|
||||
const signal = opts.signal ?? new AbortController().signal;
|
||||
yield* eventBusIterable(opts.ctx.eventBus, AGENT_EVENT_TYPES, signal);
|
||||
}),
|
||||
|
||||
/**
|
||||
* Subscribe to task and phase events.
|
||||
* For the initiative detail page (task:queued, phase:started, etc.).
|
||||
*/
|
||||
onTaskUpdate: publicProcedure
|
||||
.input(z.object({ lastEventId: z.string().nullish() }).optional())
|
||||
.subscription(async function* (opts) {
|
||||
const signal = opts.signal ?? new AbortController().signal;
|
||||
yield* eventBusIterable(opts.ctx.eventBus, TASK_EVENT_TYPES, signal);
|
||||
}),
|
||||
});
|
||||
|
||||
/**
|
||||
|
||||
162
src/trpc/subscriptions.ts
Normal file
162
src/trpc/subscriptions.ts
Normal file
@@ -0,0 +1,162 @@
|
||||
/**
|
||||
* tRPC Subscription Helpers
|
||||
*
|
||||
* Bridges EventBus domain events into tRPC async generator subscriptions.
|
||||
* Uses a queue-based approach: EventBus handlers push events into a queue,
|
||||
* the async generator yields from the queue, and waits on a promise when empty.
|
||||
*/
|
||||
|
||||
import { tracked, type TrackedEnvelope } from '@trpc/server';
|
||||
import type { EventBus, DomainEvent, DomainEventType } from '../events/types.js';
|
||||
|
||||
/**
|
||||
* Shape of events yielded by subscription procedures.
|
||||
*/
|
||||
export interface SubscriptionEventData {
|
||||
id: string;
|
||||
type: string;
|
||||
payload: unknown;
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* All domain event types in the system.
|
||||
*/
|
||||
export const ALL_EVENT_TYPES: DomainEventType[] = [
|
||||
'process:spawned',
|
||||
'process:stopped',
|
||||
'process:crashed',
|
||||
'server:started',
|
||||
'server:stopped',
|
||||
'log:entry',
|
||||
'worktree:created',
|
||||
'worktree:removed',
|
||||
'worktree:merged',
|
||||
'worktree:conflict',
|
||||
'agent:spawned',
|
||||
'agent:stopped',
|
||||
'agent:crashed',
|
||||
'agent:resumed',
|
||||
'agent:waiting',
|
||||
'task:queued',
|
||||
'task:dispatched',
|
||||
'task:completed',
|
||||
'task:blocked',
|
||||
'phase:queued',
|
||||
'phase:started',
|
||||
'phase:completed',
|
||||
'phase:blocked',
|
||||
'merge:queued',
|
||||
'merge:started',
|
||||
'merge:completed',
|
||||
'merge:conflicted',
|
||||
];
|
||||
|
||||
/**
|
||||
* Agent-specific event types.
|
||||
*/
|
||||
export const AGENT_EVENT_TYPES: DomainEventType[] = [
|
||||
'agent:spawned',
|
||||
'agent:stopped',
|
||||
'agent:crashed',
|
||||
'agent:resumed',
|
||||
'agent:waiting',
|
||||
];
|
||||
|
||||
/**
|
||||
* Task and phase event types.
|
||||
*/
|
||||
export const TASK_EVENT_TYPES: DomainEventType[] = [
|
||||
'task:queued',
|
||||
'task:dispatched',
|
||||
'task:completed',
|
||||
'task:blocked',
|
||||
'phase:queued',
|
||||
'phase:started',
|
||||
'phase:completed',
|
||||
'phase:blocked',
|
||||
];
|
||||
|
||||
/** Counter for generating unique event IDs */
|
||||
let eventCounter = 0;
|
||||
|
||||
/**
|
||||
* Creates an async generator that bridges EventBus events into a pull-based stream.
|
||||
*
|
||||
* Uses a queue + deferred promise pattern:
|
||||
* - EventBus handlers push events into a queue array
|
||||
* - The async generator yields from the queue
|
||||
* - When the queue is empty, it waits on a promise that resolves when the next event arrives
|
||||
* - Cleans up handlers on AbortSignal abort
|
||||
*
|
||||
* Each yielded event is wrapped with `tracked(id, data)` to enable client-side reconnection.
|
||||
*
|
||||
* @param eventBus - The EventBus to subscribe to
|
||||
* @param eventTypes - Array of event type strings to listen for
|
||||
* @param signal - AbortSignal to cancel the subscription
|
||||
*/
|
||||
export async function* eventBusIterable(
|
||||
eventBus: EventBus,
|
||||
eventTypes: DomainEventType[],
|
||||
signal: AbortSignal,
|
||||
): AsyncGenerator<TrackedEnvelope<SubscriptionEventData>> {
|
||||
const queue: DomainEvent[] = [];
|
||||
let resolve: (() => void) | null = null;
|
||||
|
||||
// Handler that pushes events into the queue and resolves the waiter
|
||||
const handler = (event: DomainEvent) => {
|
||||
queue.push(event);
|
||||
if (resolve) {
|
||||
const r = resolve;
|
||||
resolve = null;
|
||||
r();
|
||||
}
|
||||
};
|
||||
|
||||
// Subscribe to all requested event types
|
||||
for (const eventType of eventTypes) {
|
||||
eventBus.on(eventType as DomainEvent['type'], handler);
|
||||
}
|
||||
|
||||
// Cleanup function
|
||||
const cleanup = () => {
|
||||
for (const eventType of eventTypes) {
|
||||
eventBus.off(eventType as DomainEvent['type'], handler);
|
||||
}
|
||||
// Resolve any pending waiter so the generator can exit
|
||||
if (resolve) {
|
||||
const r = resolve;
|
||||
resolve = null;
|
||||
r();
|
||||
}
|
||||
};
|
||||
|
||||
// Clean up when the client disconnects
|
||||
signal.addEventListener('abort', cleanup, { once: true });
|
||||
|
||||
try {
|
||||
while (!signal.aborted) {
|
||||
// Drain the queue
|
||||
while (queue.length > 0) {
|
||||
const event = queue.shift()!;
|
||||
const id = `${Date.now()}-${eventCounter++}`;
|
||||
const data: SubscriptionEventData = {
|
||||
id,
|
||||
type: event.type,
|
||||
payload: event.payload,
|
||||
timestamp: event.timestamp.toISOString(),
|
||||
};
|
||||
yield tracked(id, data);
|
||||
}
|
||||
|
||||
// Wait for the next event or abort
|
||||
if (!signal.aborted) {
|
||||
await new Promise<void>((r) => {
|
||||
resolve = r;
|
||||
});
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
cleanup();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user