feat: Add PREVIEW_EVENT_TYPES, CONVERSATION_EVENT_TYPES, and subscription procedures
- Add preview:building/ready/stopped/failed to ALL_EVENT_TYPES - Export PREVIEW_EVENT_TYPES and CONVERSATION_EVENT_TYPES constants - Add onPreviewUpdate and onConversationUpdate subscription procedures - Add audit comment confirming phase:pending_review presence (gap 3 verified) - Add unit tests covering constants, filtering behavior, and procedure shape - Update server-api.md to document new subscription procedures Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
148
apps/server/trpc/subscriptions.test.ts
Normal file
148
apps/server/trpc/subscriptions.test.ts
Normal file
@@ -0,0 +1,148 @@
|
|||||||
|
/**
|
||||||
|
* Subscription Helpers Tests
|
||||||
|
*
|
||||||
|
* Tests for subscription constants and eventBusIterable filtering behavior.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { describe, it, expect, vi } from 'vitest';
|
||||||
|
import {
|
||||||
|
ALL_EVENT_TYPES,
|
||||||
|
TASK_EVENT_TYPES,
|
||||||
|
PREVIEW_EVENT_TYPES,
|
||||||
|
CONVERSATION_EVENT_TYPES,
|
||||||
|
eventBusIterable,
|
||||||
|
} from './subscriptions.js';
|
||||||
|
import { EventEmitterBus } from '../events/bus.js';
|
||||||
|
import { subscriptionProcedures } from './routers/subscription.js';
|
||||||
|
|
||||||
|
describe('subscription constants', () => {
|
||||||
|
it('PREVIEW_EVENT_TYPES contains exactly the four preview event types', () => {
|
||||||
|
expect(PREVIEW_EVENT_TYPES).toEqual([
|
||||||
|
'preview:building',
|
||||||
|
'preview:ready',
|
||||||
|
'preview:stopped',
|
||||||
|
'preview:failed',
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('CONVERSATION_EVENT_TYPES contains exactly conversation:created and conversation:answered', () => {
|
||||||
|
expect(CONVERSATION_EVENT_TYPES).toEqual([
|
||||||
|
'conversation:created',
|
||||||
|
'conversation:answered',
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('ALL_EVENT_TYPES includes all four preview event types', () => {
|
||||||
|
expect(ALL_EVENT_TYPES).toContain('preview:building');
|
||||||
|
expect(ALL_EVENT_TYPES).toContain('preview:ready');
|
||||||
|
expect(ALL_EVENT_TYPES).toContain('preview:stopped');
|
||||||
|
expect(ALL_EVENT_TYPES).toContain('preview:failed');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('TASK_EVENT_TYPES includes phase:pending_review', () => {
|
||||||
|
expect(TASK_EVENT_TYPES).toContain('phase:pending_review');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('eventBusIterable filtering', () => {
|
||||||
|
it('yields preview events and ignores non-preview events', async () => {
|
||||||
|
const eventBus = new EventEmitterBus();
|
||||||
|
const ac = new AbortController();
|
||||||
|
const gen = eventBusIterable(eventBus, PREVIEW_EVENT_TYPES, ac.signal);
|
||||||
|
|
||||||
|
const results: string[] = [];
|
||||||
|
|
||||||
|
// Start consuming in background so the generator registers its listeners
|
||||||
|
const consumePromise = (async () => {
|
||||||
|
for await (const envelope of gen) {
|
||||||
|
results.push((envelope as any)[1]?.type as string);
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
// Yield control so the generator can register its EventBus listeners
|
||||||
|
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||||
|
|
||||||
|
// Emit a non-preview event — should not appear
|
||||||
|
eventBus.emit({
|
||||||
|
type: 'agent:spawned',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: { agentId: 'a', name: 'n', taskId: null, worktreeId: 'w', provider: 'p' },
|
||||||
|
});
|
||||||
|
|
||||||
|
// Emit a preview event
|
||||||
|
eventBus.emit({
|
||||||
|
type: 'preview:ready',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: {
|
||||||
|
previewId: 'p1',
|
||||||
|
initiativeId: 'i1',
|
||||||
|
branch: 'main',
|
||||||
|
gatewayPort: 3000,
|
||||||
|
url: 'http://localhost:3000',
|
||||||
|
mode: 'preview',
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Yield control so the generator can drain its queue
|
||||||
|
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||||
|
|
||||||
|
ac.abort();
|
||||||
|
await consumePromise;
|
||||||
|
|
||||||
|
expect(results).toHaveLength(1);
|
||||||
|
expect(results[0]).toBe('preview:ready');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('yields conversation events and ignores non-conversation events', async () => {
|
||||||
|
const eventBus = new EventEmitterBus();
|
||||||
|
const ac = new AbortController();
|
||||||
|
const gen = eventBusIterable(eventBus, CONVERSATION_EVENT_TYPES, ac.signal);
|
||||||
|
|
||||||
|
const results: string[] = [];
|
||||||
|
|
||||||
|
// Start consuming in background so the generator registers its listeners
|
||||||
|
const consumePromise = (async () => {
|
||||||
|
for await (const envelope of gen) {
|
||||||
|
results.push((envelope as any)[1]?.type as string);
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
// Yield control so the generator can register its EventBus listeners
|
||||||
|
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||||
|
|
||||||
|
// Emit a non-conversation event — should not appear
|
||||||
|
eventBus.emit({
|
||||||
|
type: 'agent:spawned',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: { agentId: 'a', name: 'n', taskId: null, worktreeId: 'w', provider: 'p' },
|
||||||
|
});
|
||||||
|
|
||||||
|
// Emit a conversation event
|
||||||
|
eventBus.emit({
|
||||||
|
type: 'conversation:created',
|
||||||
|
timestamp: new Date(),
|
||||||
|
payload: { conversationId: 'c1', fromAgentId: 'a1', toAgentId: 'a2' },
|
||||||
|
});
|
||||||
|
|
||||||
|
// Yield control so the generator can drain its queue
|
||||||
|
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||||
|
|
||||||
|
ac.abort();
|
||||||
|
await consumePromise;
|
||||||
|
|
||||||
|
expect(results).toHaveLength(1);
|
||||||
|
expect(results[0]).toBe('conversation:created');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('subscriptionProcedures shape', () => {
|
||||||
|
it('subscriptionProcedures includes onPreviewUpdate and onConversationUpdate', () => {
|
||||||
|
const mockProcedure = {
|
||||||
|
input: vi.fn().mockReturnThis(),
|
||||||
|
subscription: vi.fn().mockReturnThis(),
|
||||||
|
};
|
||||||
|
const procs = subscriptionProcedures(mockProcedure as any);
|
||||||
|
expect(procs).toHaveProperty('onPreviewUpdate');
|
||||||
|
expect(procs).toHaveProperty('onConversationUpdate');
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -100,7 +100,7 @@ export const TASK_EVENT_TYPES: DomainEventType[] = [
|
|||||||
'phase:started',
|
'phase:started',
|
||||||
'phase:completed',
|
'phase:completed',
|
||||||
'phase:blocked',
|
'phase:blocked',
|
||||||
'phase:pending_review',
|
'phase:pending_review', // Audited 2026-03-05: phase:pending_review is present — gap 3 verified
|
||||||
'phase:merged',
|
'phase:merged',
|
||||||
'initiative:pending_review',
|
'initiative:pending_review',
|
||||||
'initiative:review_approved',
|
'initiative:review_approved',
|
||||||
|
|||||||
@@ -205,10 +205,12 @@ Each procedure uses `require*Repository(ctx)` helpers that throw `TRPCError(INTE
|
|||||||
### Subscriptions (SSE)
|
### Subscriptions (SSE)
|
||||||
| Procedure | Type | Events |
|
| Procedure | Type | Events |
|
||||||
|-----------|------|--------|
|
|-----------|------|--------|
|
||||||
| onEvent | subscription | All 30+ event types |
|
| onEvent | subscription | All event types |
|
||||||
| onAgentUpdate | subscription | agent:* events (8 types) |
|
| onAgentUpdate | subscription | agent:* events (8 types) |
|
||||||
| onTaskUpdate | subscription | task:* + phase:* events (8 types) |
|
| onTaskUpdate | subscription | task:* + phase:* events (8 types) |
|
||||||
| onPageUpdate | subscription | page:created/updated/deleted |
|
| onPageUpdate | subscription | page:created/updated/deleted |
|
||||||
|
| onPreviewUpdate | subscription | preview:building/ready/stopped/failed |
|
||||||
|
| onConversationUpdate | subscription | conversation:created/answered |
|
||||||
|
|
||||||
Subscriptions use `eventBusIterable()` — queue-based async generator, max 1000 events, 30s heartbeat.
|
Subscriptions use `eventBusIterable()` — queue-based async generator, max 1000 events, 30s heartbeat.
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user