Move src/ → apps/server/ and packages/web/ → apps/web/ to adopt standard monorepo conventions (apps/ for runnable apps, packages/ for reusable libraries). Update all config files, shared package imports, test fixtures, and documentation to reflect new paths. Key fixes: - Update workspace config to ["apps/*", "packages/*"] - Update tsconfig.json rootDir/include for apps/server/ - Add apps/web/** to vitest exclude list - Update drizzle.config.ts schema path - Fix ensure-schema.ts migration path detection (3 levels up in dev, 2 levels up in dist) - Fix tests/integration/cli-server.test.ts import paths - Update packages/shared imports to apps/server/ paths - Update all docs/ files with new paths
219 lines
5.5 KiB
TypeScript
219 lines
5.5 KiB
TypeScript
/**
|
|
* tRPC Subscription Helpers
|
|
*
|
|
* Bridges EventBus domain events into tRPC async generator subscriptions.
|
|
* Uses a queue-based approach: EventBus handlers push events into a queue,
|
|
* the async generator yields from the queue, and waits on a promise when empty.
|
|
*/
|
|
|
|
import { tracked, type TrackedEnvelope } from '@trpc/server';
|
|
import type { EventBus, DomainEvent, DomainEventType } from '../events/types.js';
|
|
|
|
/**
|
|
* Shape of events yielded by subscription procedures.
|
|
*/
|
|
export interface SubscriptionEventData {
|
|
id: string;
|
|
type: string;
|
|
payload: unknown;
|
|
timestamp: string;
|
|
}
|
|
|
|
/**
|
|
* All domain event types in the system.
|
|
*/
|
|
export const ALL_EVENT_TYPES: DomainEventType[] = [
|
|
'process:spawned',
|
|
'process:stopped',
|
|
'process:crashed',
|
|
'server:started',
|
|
'server:stopped',
|
|
'log:entry',
|
|
'worktree:created',
|
|
'worktree:removed',
|
|
'worktree:merged',
|
|
'worktree:conflict',
|
|
'agent:spawned',
|
|
'agent:stopped',
|
|
'agent:crashed',
|
|
'agent:resumed',
|
|
'agent:account_switched',
|
|
'agent:deleted',
|
|
'agent:waiting',
|
|
'agent:output',
|
|
'task:queued',
|
|
'task:dispatched',
|
|
'task:completed',
|
|
'task:blocked',
|
|
'phase:queued',
|
|
'phase:started',
|
|
'phase:completed',
|
|
'phase:blocked',
|
|
'phase:pending_review',
|
|
'phase:merged',
|
|
'task:merged',
|
|
'merge:queued',
|
|
'merge:started',
|
|
'merge:completed',
|
|
'merge:conflicted',
|
|
'page:created',
|
|
'page:updated',
|
|
'page:deleted',
|
|
'changeset:created',
|
|
'changeset:reverted',
|
|
'conversation:created',
|
|
'conversation:answered',
|
|
];
|
|
|
|
/**
|
|
* Agent-specific event types.
|
|
*/
|
|
export const AGENT_EVENT_TYPES: DomainEventType[] = [
|
|
'agent:spawned',
|
|
'agent:stopped',
|
|
'agent:crashed',
|
|
'agent:resumed',
|
|
'agent:account_switched',
|
|
'agent:deleted',
|
|
'agent:waiting',
|
|
'agent:output',
|
|
];
|
|
|
|
/**
|
|
* Task and phase event types.
|
|
*/
|
|
export const TASK_EVENT_TYPES: DomainEventType[] = [
|
|
'task:queued',
|
|
'task:dispatched',
|
|
'task:completed',
|
|
'task:blocked',
|
|
'task:merged',
|
|
'phase:queued',
|
|
'phase:started',
|
|
'phase:completed',
|
|
'phase:blocked',
|
|
'phase:pending_review',
|
|
'phase:merged',
|
|
];
|
|
|
|
/**
|
|
* Page event types.
|
|
*/
|
|
export const PAGE_EVENT_TYPES: DomainEventType[] = [
|
|
'page:created',
|
|
'page:updated',
|
|
'page:deleted',
|
|
];
|
|
|
|
/** Counter for generating unique event IDs */
|
|
let eventCounter = 0;
|
|
|
|
/** Drop oldest events when the queue exceeds this size */
|
|
const MAX_QUEUE_SIZE = 1000;
|
|
|
|
/** Yield a synthetic heartbeat after this many ms of silence */
|
|
const HEARTBEAT_INTERVAL_MS = 30_000;
|
|
|
|
/**
|
|
* Creates an async generator that bridges EventBus events into a pull-based stream.
|
|
*
|
|
* Uses a queue + deferred promise pattern:
|
|
* - EventBus handlers push events into a queue array
|
|
* - The async generator yields from the queue
|
|
* - When the queue is empty, it waits on a promise that resolves when the next event arrives
|
|
* - Cleans up handlers on AbortSignal abort
|
|
*
|
|
* Bounded queue: events beyond MAX_QUEUE_SIZE are dropped (oldest first).
|
|
* Heartbeat: a synthetic `__heartbeat__` event is yielded when no real events
|
|
* arrive within HEARTBEAT_INTERVAL_MS, allowing clients to detect silent disconnects.
|
|
*
|
|
* Each yielded event is wrapped with `tracked(id, data)` to enable client-side reconnection.
|
|
*
|
|
* @param eventBus - The EventBus to subscribe to
|
|
* @param eventTypes - Array of event type strings to listen for
|
|
* @param signal - AbortSignal to cancel the subscription
|
|
*/
|
|
export async function* eventBusIterable(
|
|
eventBus: EventBus,
|
|
eventTypes: DomainEventType[],
|
|
signal: AbortSignal,
|
|
): AsyncGenerator<TrackedEnvelope<SubscriptionEventData>> {
|
|
const queue: DomainEvent[] = [];
|
|
let resolve: (() => void) | null = null;
|
|
|
|
// Handler that pushes events into the queue and resolves the waiter
|
|
const handler = (event: DomainEvent) => {
|
|
queue.push(event);
|
|
// Drop oldest when queue overflows
|
|
while (queue.length > MAX_QUEUE_SIZE) {
|
|
queue.shift();
|
|
}
|
|
if (resolve) {
|
|
const r = resolve;
|
|
resolve = null;
|
|
r();
|
|
}
|
|
};
|
|
|
|
// Subscribe to all requested event types
|
|
for (const eventType of eventTypes) {
|
|
eventBus.on(eventType as DomainEvent['type'], handler);
|
|
}
|
|
|
|
// Cleanup function
|
|
const cleanup = () => {
|
|
for (const eventType of eventTypes) {
|
|
eventBus.off(eventType as DomainEvent['type'], handler);
|
|
}
|
|
// Resolve any pending waiter so the generator can exit
|
|
if (resolve) {
|
|
const r = resolve;
|
|
resolve = null;
|
|
r();
|
|
}
|
|
};
|
|
|
|
// Clean up when the client disconnects
|
|
signal.addEventListener('abort', cleanup, { once: true });
|
|
|
|
try {
|
|
while (!signal.aborted) {
|
|
// Drain the queue
|
|
while (queue.length > 0) {
|
|
const event = queue.shift()!;
|
|
const id = `${Date.now()}-${eventCounter++}`;
|
|
const data: SubscriptionEventData = {
|
|
id,
|
|
type: event.type,
|
|
payload: event.payload,
|
|
timestamp: event.timestamp.toISOString(),
|
|
};
|
|
yield tracked(id, data);
|
|
}
|
|
|
|
// Wait for the next event, abort, or heartbeat timeout
|
|
if (!signal.aborted) {
|
|
const gotEvent = await Promise.race([
|
|
new Promise<true>((r) => {
|
|
resolve = () => r(true);
|
|
}),
|
|
new Promise<false>((r) => setTimeout(() => r(false), HEARTBEAT_INTERVAL_MS)),
|
|
]);
|
|
|
|
if (!gotEvent && !signal.aborted) {
|
|
// No real event arrived — yield heartbeat
|
|
const id = `${Date.now()}-${eventCounter++}`;
|
|
yield tracked(id, {
|
|
id,
|
|
type: '__heartbeat__',
|
|
payload: null,
|
|
timestamp: new Date().toISOString(),
|
|
});
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
cleanup();
|
|
}
|
|
}
|