From fff4ce2bb725d57a346b2635f6a8e2ff4460db22 Mon Sep 17 00:00:00 2001 From: Lukas May Date: Thu, 5 Mar 2026 20:51:16 +0100 Subject: [PATCH 1/3] fix: close five frontend invalidation gaps for changeset, preview, and dependency events - Add changeset: SSE rule (Gap 1-A) to cover cross-agent changeset events - Add getChangeSet to revertChangeSet INVALIDATION_MAP (Gap 1-B) for same-client path - Add preview: SSE rule (Gap 2) covering listPreviews and getPreviewStatus - Extend task:/phase: SSE rules with getPhaseDependencies and listPhaseTaskDependencies (Gap 4-A) - Add listPhaseTaskDependencies to createPhaseDependency/removePhaseDependency (Gap 4-B) Co-Authored-By: Claude Sonnet 4.6 --- apps/web/src/lib/invalidation.ts | 6 +++--- apps/web/src/routes/initiatives/$id.tsx | 10 ++++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/apps/web/src/lib/invalidation.ts b/apps/web/src/lib/invalidation.ts index f08105d..1ee739d 100644 --- a/apps/web/src/lib/invalidation.ts +++ b/apps/web/src/lib/invalidation.ts @@ -56,8 +56,8 @@ const INVALIDATION_MAP: Partial> = { updatePhase: ["listPhases", "getPhase"], approvePhase: ["listPhases", "listInitiativeTasks"], queuePhase: ["listPhases"], - createPhaseDependency: ["getPhaseDependencies", "listInitiativePhaseDependencies"], - removePhaseDependency: ["getPhaseDependencies", "listInitiativePhaseDependencies"], + createPhaseDependency: ["getPhaseDependencies", "listInitiativePhaseDependencies", "listPhaseTaskDependencies"], + removePhaseDependency: ["getPhaseDependencies", "listInitiativePhaseDependencies", "listPhaseTaskDependencies"], // --- Tasks --- createPhaseTask: ["listPhaseTasks", "listInitiativeTasks", "listTasks"], @@ -66,7 +66,7 @@ const INVALIDATION_MAP: Partial> = { queueTask: ["listTasks", "listInitiativeTasks", "listPhaseTasks"], // --- Change Sets --- - revertChangeSet: ["listPhases", "listPhaseTasks", "listInitiativeTasks", "listPages", "getPage", "listChangeSets", "getRootPage"], + revertChangeSet: ["listPhases", "listPhaseTasks", "listInitiativeTasks", "listPages", "getPage", "listChangeSets", "getRootPage", "getChangeSet"], // --- Pages --- updatePage: ["listPages", "getPage", "getRootPage"], diff --git a/apps/web/src/routes/initiatives/$id.tsx b/apps/web/src/routes/initiatives/$id.tsx index 7ff848e..678100c 100644 --- a/apps/web/src/routes/initiatives/$id.tsx +++ b/apps/web/src/routes/initiatives/$id.tsx @@ -29,10 +29,12 @@ function InitiativeDetailPage() { // Single SSE stream for all live updates useLiveUpdates([ - { prefix: 'task:', invalidate: ['listPhases', 'listTasks', 'listInitiativeTasks'] }, - { prefix: 'phase:', invalidate: ['listPhases', 'listTasks', 'listInitiativePhaseDependencies'] }, - { prefix: 'agent:', invalidate: ['listAgents', 'getActiveRefineAgent'] }, - { prefix: 'page:', invalidate: ['listPages', 'getPage', 'getRootPage'] }, + { prefix: 'task:', invalidate: ['listPhases', 'listTasks', 'listInitiativeTasks', 'getPhaseDependencies', 'listPhaseTaskDependencies'] }, + { prefix: 'phase:', invalidate: ['listPhases', 'listTasks', 'listInitiativePhaseDependencies', 'getPhaseDependencies'] }, + { prefix: 'agent:', invalidate: ['listAgents', 'getActiveRefineAgent'] }, + { prefix: 'page:', invalidate: ['listPages', 'getPage', 'getRootPage'] }, + { prefix: 'changeset:', invalidate: ['getChangeSet', 'listChangeSets'] }, + { prefix: 'preview:', invalidate: ['listPreviews', 'getPreviewStatus'] }, ]); // tRPC queries From f19aac0a76205b511de7e329d7ce5a17d3d26616 Mon Sep 17 00:00:00 2001 From: Lukas May Date: Thu, 5 Mar 2026 20:51:58 +0100 Subject: [PATCH 2/3] refactor: Remove dead lastEventId from subscription schemas and document at-most-once delivery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Strip the unused .input(z.object({ lastEventId })) from all 6 subscription procedures — the parameter was never consumed by eventBusIterable. Remove the now-unused zod import. Add at-most-once delivery JSDoc to the EventBus interface to make the real guarantee explicit. Add compliance comment above onConversationUpdate noting what to wire when a conversation view is built. Co-Authored-By: Claude Sonnet 4.6 --- apps/server/events/types.ts | 8 ++++++++ apps/server/trpc/routers/subscription.ts | 11 ++++------- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/apps/server/events/types.ts b/apps/server/events/types.ts index fb6a8a6..2c2009f 100644 --- a/apps/server/events/types.ts +++ b/apps/server/events/types.ts @@ -684,6 +684,14 @@ export type DomainEventType = DomainEventMap['type']; * * All modules communicate through this interface. * Can be swapped for external systems (RabbitMQ, WebSocket forwarding) later. + * + * **Delivery guarantee: at-most-once.** + * + * Events emitted while a client is disconnected are permanently lost. + * Reconnecting clients receive only events emitted after reconnection. + * React Query's `refetchOnWindowFocus` and `refetchOnReconnect` compensate + * for missed mutations since the system uses query invalidation rather + * than incremental state. */ export interface EventBus { /** diff --git a/apps/server/trpc/routers/subscription.ts b/apps/server/trpc/routers/subscription.ts index 949a011..43dfd79 100644 --- a/apps/server/trpc/routers/subscription.ts +++ b/apps/server/trpc/routers/subscription.ts @@ -2,7 +2,6 @@ * Subscription Router — SSE event streams */ -import { z } from 'zod'; import type { ProcedureBuilder } from '../trpc.js'; import { eventBusIterable, @@ -17,42 +16,40 @@ import { export function subscriptionProcedures(publicProcedure: ProcedureBuilder) { return { onEvent: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, ALL_EVENT_TYPES, signal); }), onAgentUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, AGENT_EVENT_TYPES, signal); }), onTaskUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, TASK_EVENT_TYPES, signal); }), onPageUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, PAGE_EVENT_TYPES, signal); }), onPreviewUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, PREVIEW_EVENT_TYPES, signal); }), + // NOTE: No frontend view currently displays inter-agent conversation data. + // When a conversation view is added, add to its useLiveUpdates call: + // { prefix: 'conversation:', invalidate: [''] } + // and add the relevant mutation(s) to INVALIDATION_MAP in apps/web/src/lib/invalidation.ts. onConversationUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, CONVERSATION_EVENT_TYPES, signal); From 7bc1e7f25b43d431dd9e0ee33ae662be4f6f96e9 Mon Sep 17 00:00:00 2001 From: Lukas May Date: Thu, 5 Mar 2026 21:17:00 +0100 Subject: [PATCH 3/3] perf: Remove preview polling in favour of SSE-driven invalidation refetchInterval: 3000 on listPreviews and getPreviewStatus is now redundant. Phase oMHtTekCDgdnBG0kkk25A wired useLiveUpdates in $id.tsx to invalidate both queries on every preview:* SSE event, so queries refetch exactly once per state change instead of on a fixed 3-second timer. Co-Authored-By: Claude Sonnet 4.6 --- apps/web/src/components/review/ReviewTab.tsx | 2 -- 1 file changed, 2 deletions(-) diff --git a/apps/web/src/components/review/ReviewTab.tsx b/apps/web/src/components/review/ReviewTab.tsx index 9945224..6b6d85c 100644 --- a/apps/web/src/components/review/ReviewTab.tsx +++ b/apps/web/src/components/review/ReviewTab.tsx @@ -80,7 +80,6 @@ export function ReviewTab({ initiativeId }: ReviewTabProps) { // Preview state const previewsQuery = trpc.listPreviews.useQuery( { initiativeId }, - { refetchInterval: 3000 }, ); const existingPreview = previewsQuery.data?.find( (p) => p.phaseId === activePhaseId || p.initiativeId === initiativeId, @@ -90,7 +89,6 @@ export function ReviewTab({ initiativeId }: ReviewTabProps) { { previewId: activePreviewId ?? existingPreview?.id ?? "" }, { enabled: !!(activePreviewId ?? existingPreview?.id), - refetchInterval: 3000, }, ); const preview = previewStatusQuery.data ?? existingPreview;