From f19aac0a76205b511de7e329d7ce5a17d3d26616 Mon Sep 17 00:00:00 2001 From: Lukas May Date: Thu, 5 Mar 2026 20:51:58 +0100 Subject: [PATCH] refactor: Remove dead lastEventId from subscription schemas and document at-most-once delivery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Strip the unused .input(z.object({ lastEventId })) from all 6 subscription procedures — the parameter was never consumed by eventBusIterable. Remove the now-unused zod import. Add at-most-once delivery JSDoc to the EventBus interface to make the real guarantee explicit. Add compliance comment above onConversationUpdate noting what to wire when a conversation view is built. Co-Authored-By: Claude Sonnet 4.6 --- apps/server/events/types.ts | 8 ++++++++ apps/server/trpc/routers/subscription.ts | 11 ++++------- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/apps/server/events/types.ts b/apps/server/events/types.ts index fb6a8a6..2c2009f 100644 --- a/apps/server/events/types.ts +++ b/apps/server/events/types.ts @@ -684,6 +684,14 @@ export type DomainEventType = DomainEventMap['type']; * * All modules communicate through this interface. * Can be swapped for external systems (RabbitMQ, WebSocket forwarding) later. + * + * **Delivery guarantee: at-most-once.** + * + * Events emitted while a client is disconnected are permanently lost. + * Reconnecting clients receive only events emitted after reconnection. + * React Query's `refetchOnWindowFocus` and `refetchOnReconnect` compensate + * for missed mutations since the system uses query invalidation rather + * than incremental state. */ export interface EventBus { /** diff --git a/apps/server/trpc/routers/subscription.ts b/apps/server/trpc/routers/subscription.ts index 949a011..43dfd79 100644 --- a/apps/server/trpc/routers/subscription.ts +++ b/apps/server/trpc/routers/subscription.ts @@ -2,7 +2,6 @@ * Subscription Router — SSE event streams */ -import { z } from 'zod'; import type { ProcedureBuilder } from '../trpc.js'; import { eventBusIterable, @@ -17,42 +16,40 @@ import { export function subscriptionProcedures(publicProcedure: ProcedureBuilder) { return { onEvent: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, ALL_EVENT_TYPES, signal); }), onAgentUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, AGENT_EVENT_TYPES, signal); }), onTaskUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, TASK_EVENT_TYPES, signal); }), onPageUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, PAGE_EVENT_TYPES, signal); }), onPreviewUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, PREVIEW_EVENT_TYPES, signal); }), + // NOTE: No frontend view currently displays inter-agent conversation data. + // When a conversation view is added, add to its useLiveUpdates call: + // { prefix: 'conversation:', invalidate: [''] } + // and add the relevant mutation(s) to INVALIDATION_MAP in apps/web/src/lib/invalidation.ts. onConversationUpdate: publicProcedure - .input(z.object({ lastEventId: z.string().nullish() }).optional()) .subscription(async function* (opts) { const signal = opts.signal ?? new AbortController().signal; yield* eventBusIterable(opts.ctx.eventBus, CONVERSATION_EVENT_TYPES, signal);