Files
Codewalkers/apps/server/trpc/subscriptions.ts
Lukas May 34578d39c6 refactor: Restructure monorepo to apps/server/ and apps/web/ layout
Move src/ → apps/server/ and packages/web/ → apps/web/ to adopt
standard monorepo conventions (apps/ for runnable apps, packages/
for reusable libraries). Update all config files, shared package
imports, test fixtures, and documentation to reflect new paths.

Key fixes:
- Update workspace config to ["apps/*", "packages/*"]
- Update tsconfig.json rootDir/include for apps/server/
- Add apps/web/** to vitest exclude list
- Update drizzle.config.ts schema path
- Fix ensure-schema.ts migration path detection (3 levels up in dev,
  2 levels up in dist)
- Fix tests/integration/cli-server.test.ts import paths
- Update packages/shared imports to apps/server/ paths
- Update all docs/ files with new paths
2026-03-03 11:22:53 +01:00

219 lines
5.5 KiB
TypeScript

/**
* tRPC Subscription Helpers
*
* Bridges EventBus domain events into tRPC async generator subscriptions.
* Uses a queue-based approach: EventBus handlers push events into a queue,
* the async generator yields from the queue, and waits on a promise when empty.
*/
import { tracked, type TrackedEnvelope } from '@trpc/server';
import type { EventBus, DomainEvent, DomainEventType } from '../events/types.js';
/**
* Shape of events yielded by subscription procedures.
*/
export interface SubscriptionEventData {
id: string;
type: string;
payload: unknown;
timestamp: string;
}
/**
* All domain event types in the system.
*/
export const ALL_EVENT_TYPES: DomainEventType[] = [
'process:spawned',
'process:stopped',
'process:crashed',
'server:started',
'server:stopped',
'log:entry',
'worktree:created',
'worktree:removed',
'worktree:merged',
'worktree:conflict',
'agent:spawned',
'agent:stopped',
'agent:crashed',
'agent:resumed',
'agent:account_switched',
'agent:deleted',
'agent:waiting',
'agent:output',
'task:queued',
'task:dispatched',
'task:completed',
'task:blocked',
'phase:queued',
'phase:started',
'phase:completed',
'phase:blocked',
'phase:pending_review',
'phase:merged',
'task:merged',
'merge:queued',
'merge:started',
'merge:completed',
'merge:conflicted',
'page:created',
'page:updated',
'page:deleted',
'changeset:created',
'changeset:reverted',
'conversation:created',
'conversation:answered',
];
/**
* Agent-specific event types.
*/
export const AGENT_EVENT_TYPES: DomainEventType[] = [
'agent:spawned',
'agent:stopped',
'agent:crashed',
'agent:resumed',
'agent:account_switched',
'agent:deleted',
'agent:waiting',
'agent:output',
];
/**
* Task and phase event types.
*/
export const TASK_EVENT_TYPES: DomainEventType[] = [
'task:queued',
'task:dispatched',
'task:completed',
'task:blocked',
'task:merged',
'phase:queued',
'phase:started',
'phase:completed',
'phase:blocked',
'phase:pending_review',
'phase:merged',
];
/**
* Page event types.
*/
export const PAGE_EVENT_TYPES: DomainEventType[] = [
'page:created',
'page:updated',
'page:deleted',
];
/** Counter for generating unique event IDs */
let eventCounter = 0;
/** Drop oldest events when the queue exceeds this size */
const MAX_QUEUE_SIZE = 1000;
/** Yield a synthetic heartbeat after this many ms of silence */
const HEARTBEAT_INTERVAL_MS = 30_000;
/**
* Creates an async generator that bridges EventBus events into a pull-based stream.
*
* Uses a queue + deferred promise pattern:
* - EventBus handlers push events into a queue array
* - The async generator yields from the queue
* - When the queue is empty, it waits on a promise that resolves when the next event arrives
* - Cleans up handlers on AbortSignal abort
*
* Bounded queue: events beyond MAX_QUEUE_SIZE are dropped (oldest first).
* Heartbeat: a synthetic `__heartbeat__` event is yielded when no real events
* arrive within HEARTBEAT_INTERVAL_MS, allowing clients to detect silent disconnects.
*
* Each yielded event is wrapped with `tracked(id, data)` to enable client-side reconnection.
*
* @param eventBus - The EventBus to subscribe to
* @param eventTypes - Array of event type strings to listen for
* @param signal - AbortSignal to cancel the subscription
*/
export async function* eventBusIterable(
eventBus: EventBus,
eventTypes: DomainEventType[],
signal: AbortSignal,
): AsyncGenerator<TrackedEnvelope<SubscriptionEventData>> {
const queue: DomainEvent[] = [];
let resolve: (() => void) | null = null;
// Handler that pushes events into the queue and resolves the waiter
const handler = (event: DomainEvent) => {
queue.push(event);
// Drop oldest when queue overflows
while (queue.length > MAX_QUEUE_SIZE) {
queue.shift();
}
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
// Subscribe to all requested event types
for (const eventType of eventTypes) {
eventBus.on(eventType as DomainEvent['type'], handler);
}
// Cleanup function
const cleanup = () => {
for (const eventType of eventTypes) {
eventBus.off(eventType as DomainEvent['type'], handler);
}
// Resolve any pending waiter so the generator can exit
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
// Clean up when the client disconnects
signal.addEventListener('abort', cleanup, { once: true });
try {
while (!signal.aborted) {
// Drain the queue
while (queue.length > 0) {
const event = queue.shift()!;
const id = `${Date.now()}-${eventCounter++}`;
const data: SubscriptionEventData = {
id,
type: event.type,
payload: event.payload,
timestamp: event.timestamp.toISOString(),
};
yield tracked(id, data);
}
// Wait for the next event, abort, or heartbeat timeout
if (!signal.aborted) {
const gotEvent = await Promise.race([
new Promise<true>((r) => {
resolve = () => r(true);
}),
new Promise<false>((r) => setTimeout(() => r(false), HEARTBEAT_INTERVAL_MS)),
]);
if (!gotEvent && !signal.aborted) {
// No real event arrived — yield heartbeat
const id = `${Date.now()}-${eventCounter++}`;
yield tracked(id, {
id,
type: '__heartbeat__',
payload: null,
timestamp: new Date().toISOString(),
});
}
}
}
} finally {
cleanup();
}
}