Merge branch 'cw/unified-event-flow' into cw-merge-1772797070237
This commit is contained in:
@@ -694,6 +694,14 @@ export type DomainEventType = DomainEventMap['type'];
|
||||
*
|
||||
* All modules communicate through this interface.
|
||||
* Can be swapped for external systems (RabbitMQ, WebSocket forwarding) later.
|
||||
*
|
||||
* **Delivery guarantee: at-most-once.**
|
||||
*
|
||||
* Events emitted while a client is disconnected are permanently lost.
|
||||
* Reconnecting clients receive only events emitted after reconnection.
|
||||
* React Query's `refetchOnWindowFocus` and `refetchOnReconnect` compensate
|
||||
* for missed mutations since the system uses query invalidation rather
|
||||
* than incremental state.
|
||||
*/
|
||||
export interface EventBus {
|
||||
/**
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
* Subscription Router — SSE event streams
|
||||
*/
|
||||
|
||||
import { z } from 'zod';
|
||||
import type { ProcedureBuilder } from '../trpc.js';
|
||||
import {
|
||||
eventBusIterable,
|
||||
@@ -17,42 +16,40 @@ import {
|
||||
export function subscriptionProcedures(publicProcedure: ProcedureBuilder) {
|
||||
return {
|
||||
onEvent: publicProcedure
|
||||
.input(z.object({ lastEventId: z.string().nullish() }).optional())
|
||||
.subscription(async function* (opts) {
|
||||
const signal = opts.signal ?? new AbortController().signal;
|
||||
yield* eventBusIterable(opts.ctx.eventBus, ALL_EVENT_TYPES, signal);
|
||||
}),
|
||||
|
||||
onAgentUpdate: publicProcedure
|
||||
.input(z.object({ lastEventId: z.string().nullish() }).optional())
|
||||
.subscription(async function* (opts) {
|
||||
const signal = opts.signal ?? new AbortController().signal;
|
||||
yield* eventBusIterable(opts.ctx.eventBus, AGENT_EVENT_TYPES, signal);
|
||||
}),
|
||||
|
||||
onTaskUpdate: publicProcedure
|
||||
.input(z.object({ lastEventId: z.string().nullish() }).optional())
|
||||
.subscription(async function* (opts) {
|
||||
const signal = opts.signal ?? new AbortController().signal;
|
||||
yield* eventBusIterable(opts.ctx.eventBus, TASK_EVENT_TYPES, signal);
|
||||
}),
|
||||
|
||||
onPageUpdate: publicProcedure
|
||||
.input(z.object({ lastEventId: z.string().nullish() }).optional())
|
||||
.subscription(async function* (opts) {
|
||||
const signal = opts.signal ?? new AbortController().signal;
|
||||
yield* eventBusIterable(opts.ctx.eventBus, PAGE_EVENT_TYPES, signal);
|
||||
}),
|
||||
|
||||
onPreviewUpdate: publicProcedure
|
||||
.input(z.object({ lastEventId: z.string().nullish() }).optional())
|
||||
.subscription(async function* (opts) {
|
||||
const signal = opts.signal ?? new AbortController().signal;
|
||||
yield* eventBusIterable(opts.ctx.eventBus, PREVIEW_EVENT_TYPES, signal);
|
||||
}),
|
||||
|
||||
// NOTE: No frontend view currently displays inter-agent conversation data.
|
||||
// When a conversation view is added, add to its useLiveUpdates call:
|
||||
// { prefix: 'conversation:', invalidate: ['<query-key>'] }
|
||||
// and add the relevant mutation(s) to INVALIDATION_MAP in apps/web/src/lib/invalidation.ts.
|
||||
onConversationUpdate: publicProcedure
|
||||
.input(z.object({ lastEventId: z.string().nullish() }).optional())
|
||||
.subscription(async function* (opts) {
|
||||
const signal = opts.signal ?? new AbortController().signal;
|
||||
yield* eventBusIterable(opts.ctx.eventBus, CONVERSATION_EVENT_TYPES, signal);
|
||||
|
||||
@@ -56,8 +56,8 @@ const INVALIDATION_MAP: Partial<Record<MutationName, QueryName[]>> = {
|
||||
updatePhase: ["listPhases", "getPhase"],
|
||||
approvePhase: ["listPhases", "listInitiativeTasks"],
|
||||
queuePhase: ["listPhases"],
|
||||
createPhaseDependency: ["getPhaseDependencies", "listInitiativePhaseDependencies"],
|
||||
removePhaseDependency: ["getPhaseDependencies", "listInitiativePhaseDependencies"],
|
||||
createPhaseDependency: ["getPhaseDependencies", "listInitiativePhaseDependencies", "listPhaseTaskDependencies"],
|
||||
removePhaseDependency: ["getPhaseDependencies", "listInitiativePhaseDependencies", "listPhaseTaskDependencies"],
|
||||
|
||||
// --- Tasks ---
|
||||
createPhaseTask: ["listPhaseTasks", "listInitiativeTasks", "listTasks"],
|
||||
@@ -68,7 +68,7 @@ const INVALIDATION_MAP: Partial<Record<MutationName, QueryName[]>> = {
|
||||
deleteTask: ["listTasks", "listInitiativeTasks", "listPhaseTasks", "listChangeSets"],
|
||||
|
||||
// --- Change Sets ---
|
||||
revertChangeSet: ["listPhases", "listPhaseTasks", "listInitiativeTasks", "listPages", "getPage", "listChangeSets", "getRootPage"],
|
||||
revertChangeSet: ["listPhases", "listPhaseTasks", "listInitiativeTasks", "listPages", "getPage", "listChangeSets", "getRootPage", "getChangeSet"],
|
||||
|
||||
// --- Pages ---
|
||||
updatePage: ["listPages", "getPage", "getRootPage"],
|
||||
|
||||
@@ -29,11 +29,12 @@ function InitiativeDetailPage() {
|
||||
|
||||
// Single SSE stream for all live updates
|
||||
useLiveUpdates([
|
||||
{ prefix: 'task:', invalidate: ['listPhases', 'listTasks', 'listInitiativeTasks'] },
|
||||
{ prefix: 'phase:', invalidate: ['listPhases', 'listTasks', 'listInitiativePhaseDependencies'] },
|
||||
{ prefix: 'agent:', invalidate: ['listAgents', 'getActiveRefineAgent'] },
|
||||
{ prefix: 'page:', invalidate: ['listPages', 'getPage', 'getRootPage'] },
|
||||
{ prefix: 'preview:', invalidate: ['listPreviews', 'getPreviewStatus'] },
|
||||
{ prefix: 'task:', invalidate: ['listPhases', 'listTasks', 'listInitiativeTasks', 'getPhaseDependencies', 'listPhaseTaskDependencies'] },
|
||||
{ prefix: 'phase:', invalidate: ['listPhases', 'listTasks', 'listInitiativePhaseDependencies', 'getPhaseDependencies'] },
|
||||
{ prefix: 'agent:', invalidate: ['listAgents', 'getActiveRefineAgent'] },
|
||||
{ prefix: 'page:', invalidate: ['listPages', 'getPage', 'getRootPage'] },
|
||||
{ prefix: 'changeset:', invalidate: ['getChangeSet', 'listChangeSets'] },
|
||||
{ prefix: 'preview:', invalidate: ['listPreviews', 'getPreviewStatus'] },
|
||||
]);
|
||||
|
||||
// tRPC queries
|
||||
|
||||
Reference in New Issue
Block a user