diff --git a/apps/server/trpc/subscriptions.test.ts b/apps/server/trpc/subscriptions.test.ts new file mode 100644 index 0000000..69ca751 --- /dev/null +++ b/apps/server/trpc/subscriptions.test.ts @@ -0,0 +1,148 @@ +/** + * Subscription Helpers Tests + * + * Tests for subscription constants and eventBusIterable filtering behavior. + */ + +import { describe, it, expect, vi } from 'vitest'; +import { + ALL_EVENT_TYPES, + TASK_EVENT_TYPES, + PREVIEW_EVENT_TYPES, + CONVERSATION_EVENT_TYPES, + eventBusIterable, +} from './subscriptions.js'; +import { EventEmitterBus } from '../events/bus.js'; +import { subscriptionProcedures } from './routers/subscription.js'; + +describe('subscription constants', () => { + it('PREVIEW_EVENT_TYPES contains exactly the four preview event types', () => { + expect(PREVIEW_EVENT_TYPES).toEqual([ + 'preview:building', + 'preview:ready', + 'preview:stopped', + 'preview:failed', + ]); + }); + + it('CONVERSATION_EVENT_TYPES contains exactly conversation:created and conversation:answered', () => { + expect(CONVERSATION_EVENT_TYPES).toEqual([ + 'conversation:created', + 'conversation:answered', + ]); + }); + + it('ALL_EVENT_TYPES includes all four preview event types', () => { + expect(ALL_EVENT_TYPES).toContain('preview:building'); + expect(ALL_EVENT_TYPES).toContain('preview:ready'); + expect(ALL_EVENT_TYPES).toContain('preview:stopped'); + expect(ALL_EVENT_TYPES).toContain('preview:failed'); + }); + + it('TASK_EVENT_TYPES includes phase:pending_review', () => { + expect(TASK_EVENT_TYPES).toContain('phase:pending_review'); + }); +}); + +describe('eventBusIterable filtering', () => { + it('yields preview events and ignores non-preview events', async () => { + const eventBus = new EventEmitterBus(); + const ac = new AbortController(); + const gen = eventBusIterable(eventBus, PREVIEW_EVENT_TYPES, ac.signal); + + const results: string[] = []; + + // Start consuming in background so the generator registers its listeners + const consumePromise = (async () => { + for await (const envelope of gen) { + results.push((envelope as any)[1]?.type as string); + } + })(); + + // Yield control so the generator can register its EventBus listeners + await new Promise((resolve) => setImmediate(resolve)); + + // Emit a non-preview event — should not appear + eventBus.emit({ + type: 'agent:spawned', + timestamp: new Date(), + payload: { agentId: 'a', name: 'n', taskId: null, worktreeId: 'w', provider: 'p' }, + }); + + // Emit a preview event + eventBus.emit({ + type: 'preview:ready', + timestamp: new Date(), + payload: { + previewId: 'p1', + initiativeId: 'i1', + branch: 'main', + gatewayPort: 3000, + url: 'http://localhost:3000', + mode: 'preview', + }, + }); + + // Yield control so the generator can drain its queue + await new Promise((resolve) => setImmediate(resolve)); + + ac.abort(); + await consumePromise; + + expect(results).toHaveLength(1); + expect(results[0]).toBe('preview:ready'); + }); + + it('yields conversation events and ignores non-conversation events', async () => { + const eventBus = new EventEmitterBus(); + const ac = new AbortController(); + const gen = eventBusIterable(eventBus, CONVERSATION_EVENT_TYPES, ac.signal); + + const results: string[] = []; + + // Start consuming in background so the generator registers its listeners + const consumePromise = (async () => { + for await (const envelope of gen) { + results.push((envelope as any)[1]?.type as string); + } + })(); + + // Yield control so the generator can register its EventBus listeners + await new Promise((resolve) => setImmediate(resolve)); + + // Emit a non-conversation event — should not appear + eventBus.emit({ + type: 'agent:spawned', + timestamp: new Date(), + payload: { agentId: 'a', name: 'n', taskId: null, worktreeId: 'w', provider: 'p' }, + }); + + // Emit a conversation event + eventBus.emit({ + type: 'conversation:created', + timestamp: new Date(), + payload: { conversationId: 'c1', fromAgentId: 'a1', toAgentId: 'a2' }, + }); + + // Yield control so the generator can drain its queue + await new Promise((resolve) => setImmediate(resolve)); + + ac.abort(); + await consumePromise; + + expect(results).toHaveLength(1); + expect(results[0]).toBe('conversation:created'); + }); +}); + +describe('subscriptionProcedures shape', () => { + it('subscriptionProcedures includes onPreviewUpdate and onConversationUpdate', () => { + const mockProcedure = { + input: vi.fn().mockReturnThis(), + subscription: vi.fn().mockReturnThis(), + }; + const procs = subscriptionProcedures(mockProcedure as any); + expect(procs).toHaveProperty('onPreviewUpdate'); + expect(procs).toHaveProperty('onConversationUpdate'); + }); +}); diff --git a/apps/server/trpc/subscriptions.ts b/apps/server/trpc/subscriptions.ts index a5f9795..97cf38b 100644 --- a/apps/server/trpc/subscriptions.ts +++ b/apps/server/trpc/subscriptions.ts @@ -100,7 +100,7 @@ export const TASK_EVENT_TYPES: DomainEventType[] = [ 'phase:started', 'phase:completed', 'phase:blocked', - 'phase:pending_review', + 'phase:pending_review', // Audited 2026-03-05: phase:pending_review is present — gap 3 verified 'phase:merged', 'initiative:pending_review', 'initiative:review_approved', diff --git a/docs/server-api.md b/docs/server-api.md index fcfe641..64eca42 100644 --- a/docs/server-api.md +++ b/docs/server-api.md @@ -205,10 +205,12 @@ Each procedure uses `require*Repository(ctx)` helpers that throw `TRPCError(INTE ### Subscriptions (SSE) | Procedure | Type | Events | |-----------|------|--------| -| onEvent | subscription | All 30+ event types | +| onEvent | subscription | All event types | | onAgentUpdate | subscription | agent:* events (8 types) | | onTaskUpdate | subscription | task:* + phase:* events (8 types) | | onPageUpdate | subscription | page:created/updated/deleted | +| onPreviewUpdate | subscription | preview:building/ready/stopped/failed | +| onConversationUpdate | subscription | conversation:created/answered | Subscriptions use `eventBusIterable()` — queue-based async generator, max 1000 events, 30s heartbeat.