Files
Codewalkers/apps/server/trpc/subscriptions.ts
Lukas May ebef093d3f fix: Add missing event routing for initiative status real-time refresh
7 of 12 initiative activity state transitions were broken due to missing
event routing at three layers: SSE event arrays, live-update prefix rules,
and mutation invalidation map.

- Add initiative:changes_requested to ALL_EVENT_TYPES and TASK_EVENT_TYPES
- Add initiative:/agent: prefix rules to initiatives list and detail pages
- Add approveInitiativeReview, requestInitiativeChanges, requestPhaseChanges
  to INVALIDATION_MAP; add listInitiatives to approvePhase
- Extract INITIATIVE_LIST_RULES constant for reuse
2026-03-06 13:25:31 +01:00

247 lines
6.2 KiB
TypeScript

/**
* tRPC Subscription Helpers
*
* Bridges EventBus domain events into tRPC async generator subscriptions.
* Uses a queue-based approach: EventBus handlers push events into a queue,
* the async generator yields from the queue, and waits on a promise when empty.
*/
import { tracked, type TrackedEnvelope } from '@trpc/server';
import type { EventBus, DomainEvent, DomainEventType } from '../events/types.js';
/**
* Shape of events yielded by subscription procedures.
*/
export interface SubscriptionEventData {
id: string;
type: string;
payload: unknown;
timestamp: string;
}
/**
* All domain event types in the system.
*/
export const ALL_EVENT_TYPES: DomainEventType[] = [
'process:spawned',
'process:stopped',
'process:crashed',
'server:started',
'server:stopped',
'log:entry',
'worktree:created',
'worktree:removed',
'worktree:merged',
'worktree:conflict',
'agent:spawned',
'agent:stopped',
'agent:crashed',
'agent:resumed',
'agent:account_switched',
'agent:deleted',
'agent:waiting',
'task:queued',
'task:dispatched',
'task:completed',
'task:blocked',
'phase:queued',
'phase:started',
'phase:completed',
'phase:blocked',
'phase:pending_review',
'phase:merged',
'task:merged',
'merge:queued',
'merge:started',
'merge:completed',
'merge:conflicted',
'page:created',
'page:updated',
'page:deleted',
'changeset:created',
'changeset:reverted',
'preview:building',
'preview:ready',
'preview:stopped',
'preview:failed',
'conversation:created',
'conversation:answered',
'chat:message_created',
'chat:session_closed',
'initiative:pending_review',
'initiative:review_approved',
'initiative:changes_requested',
];
/**
* Agent-specific event types.
*/
export const AGENT_EVENT_TYPES: DomainEventType[] = [
'agent:spawned',
'agent:stopped',
'agent:crashed',
'agent:resumed',
'agent:account_switched',
'agent:deleted',
'agent:waiting',
];
/**
* Task and phase event types.
*/
export const TASK_EVENT_TYPES: DomainEventType[] = [
'task:queued',
'task:dispatched',
'task:completed',
'task:blocked',
'task:merged',
'phase:queued',
'phase:started',
'phase:completed',
'phase:blocked',
'phase:pending_review', // Audited 2026-03-05: phase:pending_review is present — gap 3 verified
'phase:merged',
'initiative:pending_review',
'initiative:review_approved',
'initiative:changes_requested',
];
/**
* Page event types.
*/
export const PAGE_EVENT_TYPES: DomainEventType[] = [
'page:created',
'page:updated',
'page:deleted',
];
/**
* Preview deployment event types.
*/
export const PREVIEW_EVENT_TYPES: DomainEventType[] = [
'preview:building',
'preview:ready',
'preview:stopped',
'preview:failed',
];
/**
* Inter-agent conversation event types.
*/
export const CONVERSATION_EVENT_TYPES: DomainEventType[] = [
'conversation:created',
'conversation:answered',
];
/** Counter for generating unique event IDs */
let eventCounter = 0;
/** Drop oldest events when the queue exceeds this size */
const MAX_QUEUE_SIZE = 1000;
/** Yield a synthetic heartbeat after this many ms of silence */
const HEARTBEAT_INTERVAL_MS = 30_000;
/**
* Creates an async generator that bridges EventBus events into a pull-based stream.
*
* Uses a queue + deferred promise pattern:
* - EventBus handlers push events into a queue array
* - The async generator yields from the queue
* - When the queue is empty, it waits on a promise that resolves when the next event arrives
* - Cleans up handlers on AbortSignal abort
*
* Bounded queue: events beyond MAX_QUEUE_SIZE are dropped (oldest first).
* Heartbeat: a synthetic `__heartbeat__` event is yielded when no real events
* arrive within HEARTBEAT_INTERVAL_MS, allowing clients to detect silent disconnects.
*
* Each yielded event is wrapped with `tracked(id, data)` to enable client-side reconnection.
*
* @param eventBus - The EventBus to subscribe to
* @param eventTypes - Array of event type strings to listen for
* @param signal - AbortSignal to cancel the subscription
*/
export async function* eventBusIterable(
eventBus: EventBus,
eventTypes: DomainEventType[],
signal: AbortSignal,
): AsyncGenerator<TrackedEnvelope<SubscriptionEventData>> {
const queue: DomainEvent[] = [];
let resolve: (() => void) | null = null;
// Handler that pushes events into the queue and resolves the waiter
const handler = (event: DomainEvent) => {
queue.push(event);
// Drop oldest when queue overflows
while (queue.length > MAX_QUEUE_SIZE) {
queue.shift();
}
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
// Subscribe to all requested event types
for (const eventType of eventTypes) {
eventBus.on(eventType as DomainEvent['type'], handler);
}
// Cleanup function
const cleanup = () => {
for (const eventType of eventTypes) {
eventBus.off(eventType as DomainEvent['type'], handler);
}
// Resolve any pending waiter so the generator can exit
if (resolve) {
const r = resolve;
resolve = null;
r();
}
};
// Clean up when the client disconnects
signal.addEventListener('abort', cleanup, { once: true });
try {
while (!signal.aborted) {
// Drain the queue
while (queue.length > 0) {
const event = queue.shift()!;
const id = `${Date.now()}-${eventCounter++}`;
const data: SubscriptionEventData = {
id,
type: event.type,
payload: event.payload,
timestamp: event.timestamp.toISOString(),
};
yield tracked(id, data);
}
// Wait for the next event, abort, or heartbeat timeout
if (!signal.aborted) {
const gotEvent = await Promise.race([
new Promise<true>((r) => {
resolve = () => r(true);
}),
new Promise<false>((r) => setTimeout(() => r(false), HEARTBEAT_INTERVAL_MS)),
]);
if (!gotEvent && !signal.aborted) {
// No real event arrived — yield heartbeat
const id = `${Date.now()}-${eventCounter++}`;
yield tracked(id, {
id,
type: '__heartbeat__',
payload: null,
timestamp: new Date().toISOString(),
});
}
}
}
} finally {
cleanup();
}
}