feat: Add PREVIEW_EVENT_TYPES, CONVERSATION_EVENT_TYPES, and subscription procedures
- Add preview:building/ready/stopped/failed to ALL_EVENT_TYPES - Export PREVIEW_EVENT_TYPES and CONVERSATION_EVENT_TYPES constants - Add onPreviewUpdate and onConversationUpdate subscription procedures - Add audit comment confirming phase:pending_review presence (gap 3 verified) - Add unit tests covering constants, filtering behavior, and procedure shape - Update server-api.md to document new subscription procedures Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
148
apps/server/trpc/subscriptions.test.ts
Normal file
148
apps/server/trpc/subscriptions.test.ts
Normal file
@@ -0,0 +1,148 @@
|
||||
/**
|
||||
* Subscription Helpers Tests
|
||||
*
|
||||
* Tests for subscription constants and eventBusIterable filtering behavior.
|
||||
*/
|
||||
|
||||
import { describe, it, expect, vi } from 'vitest';
|
||||
import {
|
||||
ALL_EVENT_TYPES,
|
||||
TASK_EVENT_TYPES,
|
||||
PREVIEW_EVENT_TYPES,
|
||||
CONVERSATION_EVENT_TYPES,
|
||||
eventBusIterable,
|
||||
} from './subscriptions.js';
|
||||
import { EventEmitterBus } from '../events/bus.js';
|
||||
import { subscriptionProcedures } from './routers/subscription.js';
|
||||
|
||||
describe('subscription constants', () => {
|
||||
it('PREVIEW_EVENT_TYPES contains exactly the four preview event types', () => {
|
||||
expect(PREVIEW_EVENT_TYPES).toEqual([
|
||||
'preview:building',
|
||||
'preview:ready',
|
||||
'preview:stopped',
|
||||
'preview:failed',
|
||||
]);
|
||||
});
|
||||
|
||||
it('CONVERSATION_EVENT_TYPES contains exactly conversation:created and conversation:answered', () => {
|
||||
expect(CONVERSATION_EVENT_TYPES).toEqual([
|
||||
'conversation:created',
|
||||
'conversation:answered',
|
||||
]);
|
||||
});
|
||||
|
||||
it('ALL_EVENT_TYPES includes all four preview event types', () => {
|
||||
expect(ALL_EVENT_TYPES).toContain('preview:building');
|
||||
expect(ALL_EVENT_TYPES).toContain('preview:ready');
|
||||
expect(ALL_EVENT_TYPES).toContain('preview:stopped');
|
||||
expect(ALL_EVENT_TYPES).toContain('preview:failed');
|
||||
});
|
||||
|
||||
it('TASK_EVENT_TYPES includes phase:pending_review', () => {
|
||||
expect(TASK_EVENT_TYPES).toContain('phase:pending_review');
|
||||
});
|
||||
});
|
||||
|
||||
describe('eventBusIterable filtering', () => {
|
||||
it('yields preview events and ignores non-preview events', async () => {
|
||||
const eventBus = new EventEmitterBus();
|
||||
const ac = new AbortController();
|
||||
const gen = eventBusIterable(eventBus, PREVIEW_EVENT_TYPES, ac.signal);
|
||||
|
||||
const results: string[] = [];
|
||||
|
||||
// Start consuming in background so the generator registers its listeners
|
||||
const consumePromise = (async () => {
|
||||
for await (const envelope of gen) {
|
||||
results.push((envelope as any)[1]?.type as string);
|
||||
}
|
||||
})();
|
||||
|
||||
// Yield control so the generator can register its EventBus listeners
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
// Emit a non-preview event — should not appear
|
||||
eventBus.emit({
|
||||
type: 'agent:spawned',
|
||||
timestamp: new Date(),
|
||||
payload: { agentId: 'a', name: 'n', taskId: null, worktreeId: 'w', provider: 'p' },
|
||||
});
|
||||
|
||||
// Emit a preview event
|
||||
eventBus.emit({
|
||||
type: 'preview:ready',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
previewId: 'p1',
|
||||
initiativeId: 'i1',
|
||||
branch: 'main',
|
||||
gatewayPort: 3000,
|
||||
url: 'http://localhost:3000',
|
||||
mode: 'preview',
|
||||
},
|
||||
});
|
||||
|
||||
// Yield control so the generator can drain its queue
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
ac.abort();
|
||||
await consumePromise;
|
||||
|
||||
expect(results).toHaveLength(1);
|
||||
expect(results[0]).toBe('preview:ready');
|
||||
});
|
||||
|
||||
it('yields conversation events and ignores non-conversation events', async () => {
|
||||
const eventBus = new EventEmitterBus();
|
||||
const ac = new AbortController();
|
||||
const gen = eventBusIterable(eventBus, CONVERSATION_EVENT_TYPES, ac.signal);
|
||||
|
||||
const results: string[] = [];
|
||||
|
||||
// Start consuming in background so the generator registers its listeners
|
||||
const consumePromise = (async () => {
|
||||
for await (const envelope of gen) {
|
||||
results.push((envelope as any)[1]?.type as string);
|
||||
}
|
||||
})();
|
||||
|
||||
// Yield control so the generator can register its EventBus listeners
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
// Emit a non-conversation event — should not appear
|
||||
eventBus.emit({
|
||||
type: 'agent:spawned',
|
||||
timestamp: new Date(),
|
||||
payload: { agentId: 'a', name: 'n', taskId: null, worktreeId: 'w', provider: 'p' },
|
||||
});
|
||||
|
||||
// Emit a conversation event
|
||||
eventBus.emit({
|
||||
type: 'conversation:created',
|
||||
timestamp: new Date(),
|
||||
payload: { conversationId: 'c1', fromAgentId: 'a1', toAgentId: 'a2' },
|
||||
});
|
||||
|
||||
// Yield control so the generator can drain its queue
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
ac.abort();
|
||||
await consumePromise;
|
||||
|
||||
expect(results).toHaveLength(1);
|
||||
expect(results[0]).toBe('conversation:created');
|
||||
});
|
||||
});
|
||||
|
||||
describe('subscriptionProcedures shape', () => {
|
||||
it('subscriptionProcedures includes onPreviewUpdate and onConversationUpdate', () => {
|
||||
const mockProcedure = {
|
||||
input: vi.fn().mockReturnThis(),
|
||||
subscription: vi.fn().mockReturnThis(),
|
||||
};
|
||||
const procs = subscriptionProcedures(mockProcedure as any);
|
||||
expect(procs).toHaveProperty('onPreviewUpdate');
|
||||
expect(procs).toHaveProperty('onConversationUpdate');
|
||||
});
|
||||
});
|
||||
@@ -100,7 +100,7 @@ export const TASK_EVENT_TYPES: DomainEventType[] = [
|
||||
'phase:started',
|
||||
'phase:completed',
|
||||
'phase:blocked',
|
||||
'phase:pending_review',
|
||||
'phase:pending_review', // Audited 2026-03-05: phase:pending_review is present — gap 3 verified
|
||||
'phase:merged',
|
||||
'initiative:pending_review',
|
||||
'initiative:review_approved',
|
||||
|
||||
@@ -205,10 +205,12 @@ Each procedure uses `require*Repository(ctx)` helpers that throw `TRPCError(INTE
|
||||
### Subscriptions (SSE)
|
||||
| Procedure | Type | Events |
|
||||
|-----------|------|--------|
|
||||
| onEvent | subscription | All 30+ event types |
|
||||
| onEvent | subscription | All event types |
|
||||
| onAgentUpdate | subscription | agent:* events (8 types) |
|
||||
| onTaskUpdate | subscription | task:* + phase:* events (8 types) |
|
||||
| onPageUpdate | subscription | page:created/updated/deleted |
|
||||
| onPreviewUpdate | subscription | preview:building/ready/stopped/failed |
|
||||
| onConversationUpdate | subscription | conversation:created/answered |
|
||||
|
||||
Subscriptions use `eventBusIterable()` — queue-based async generator, max 1000 events, 30s heartbeat.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user