diff --git a/apps/server/events/types.ts b/apps/server/events/types.ts index 2f4b84c..04162cf 100644 --- a/apps/server/events/types.ts +++ b/apps/server/events/types.ts @@ -694,6 +694,14 @@ export type DomainEventType = DomainEventMap['type']; * * All modules communicate through this interface. * Can be swapped for external systems (RabbitMQ, WebSocket forwarding) later. + * + * **Delivery guarantee: at-most-once.** + * + * Events emitted while a client is disconnected are permanently lost. + * Reconnecting clients receive only events emitted after reconnection. + * React Query's `refetchOnWindowFocus` and `refetchOnReconnect` compensate + * for missed mutations since the system uses query invalidation rather + * than incremental state. */ export interface EventBus { /** diff --git a/apps/server/trpc/routers/subscription.ts b/apps/server/trpc/routers/subscription.ts index 949a011..43dfd79 100644 --- a/apps/server/trpc/routers/subscription.ts +++ b/apps/server/trpc/routers/subscription.ts @@ -2,7 +2,6 @@ * Subscription Router — SSE event streams */ -import { z } from 'zod'; import type { ProcedureBuilder } from '../trpc.js'; import { eventBusIterable, @@ -17,42 +16,40 @@ import { export function subscriptionProcedures(publicProcedure: ProcedureBuilder) { return { onEvent: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, ALL_EVENT_TYPES, signal); }), onAgentUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, AGENT_EVENT_TYPES, signal); }), onTaskUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, TASK_EVENT_TYPES, signal); }), onPageUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, PAGE_EVENT_TYPES, signal); }), onPreviewUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, PREVIEW_EVENT_TYPES, signal); }), + // NOTE: No frontend view currently displays inter-agent conversation data. + // When a conversation view is added, add to its useLiveUpdates call: + // { prefix: 'conversation:', invalidate: [''] } + // and add the relevant mutation(s) to INVALIDATION_MAP in apps/web/src/lib/invalidation.ts. onConversationUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, CONVERSATION_EVENT_TYPES, signal); diff --git a/apps/web/src/lib/invalidation.ts b/apps/web/src/lib/invalidation.ts index d0c767f..3cd6e1f 100644 --- a/apps/web/src/lib/invalidation.ts +++ b/apps/web/src/lib/invalidation.ts @@ -56,8 +56,8 @@ const INVALIDATION_MAP: Partial> = { updatePhase: ["listPhases", "getPhase"], approvePhase: ["listPhases", "listInitiativeTasks"], queuePhase: ["listPhases"], - createPhaseDependency: ["getPhaseDependencies", "listInitiativePhaseDependencies"], - removePhaseDependency: ["getPhaseDependencies", "listInitiativePhaseDependencies"], + createPhaseDependency: ["getPhaseDependencies", "listInitiativePhaseDependencies", "listPhaseTaskDependencies"], + removePhaseDependency: ["getPhaseDependencies", "listInitiativePhaseDependencies", "listPhaseTaskDependencies"], // --- Tasks --- createPhaseTask: ["listPhaseTasks", "listInitiativeTasks", "listTasks"], @@ -68,7 +68,7 @@ const INVALIDATION_MAP: Partial> = { deleteTask: ["listTasks", "listInitiativeTasks", "listPhaseTasks", "listChangeSets"], // --- Change Sets --- - revertChangeSet: ["listPhases", "listPhaseTasks", "listInitiativeTasks", "listPages", "getPage", "listChangeSets", "getRootPage"], + revertChangeSet: ["listPhases", "listPhaseTasks", "listInitiativeTasks", "listPages", "getPage", "listChangeSets", "getRootPage", "getChangeSet"], // --- Pages --- updatePage: ["listPages", "getPage", "getRootPage"], diff --git a/apps/web/src/routes/initiatives/$id.tsx b/apps/web/src/routes/initiatives/$id.tsx index 6662f18..678100c 100644 --- a/apps/web/src/routes/initiatives/$id.tsx +++ b/apps/web/src/routes/initiatives/$id.tsx @@ -29,11 +29,12 @@ function InitiativeDetailPage() { // Single SSE stream for all live updates useLiveUpdates([ - { prefix: 'task:', invalidate: ['listPhases', 'listTasks', 'listInitiativeTasks'] }, - { prefix: 'phase:', invalidate: ['listPhases', 'listTasks', 'listInitiativePhaseDependencies'] }, - { prefix: 'agent:', invalidate: ['listAgents', 'getActiveRefineAgent'] }, - { prefix: 'page:', invalidate: ['listPages', 'getPage', 'getRootPage'] }, - { prefix: 'preview:', invalidate: ['listPreviews', 'getPreviewStatus'] }, + { prefix: 'task:', invalidate: ['listPhases', 'listTasks', 'listInitiativeTasks', 'getPhaseDependencies', 'listPhaseTaskDependencies'] }, + { prefix: 'phase:', invalidate: ['listPhases', 'listTasks', 'listInitiativePhaseDependencies', 'getPhaseDependencies'] }, + { prefix: 'agent:', invalidate: ['listAgents', 'getActiveRefineAgent'] }, + { prefix: 'page:', invalidate: ['listPages', 'getPage', 'getRootPage'] }, + { prefix: 'changeset:', invalidate: ['getChangeSet', 'listChangeSets'] }, + { prefix: 'preview:', invalidate: ['listPreviews', 'getPreviewStatus'] }, ]); // tRPC queries