docs(20): create phase plan
Phase 20: Real-time Subscriptions - 2 plans in 2 waves - 1 parallel (Wave 1: backend SSE), 1 sequential (Wave 2: frontend hooks) - SSE via tRPC httpSubscriptionLink (no WebSocket dependency) - Ready for execution
This commit is contained in:
210
.planning/phases/20-real-time-subscriptions/20-01-PLAN.md
Normal file
210
.planning/phases/20-real-time-subscriptions/20-01-PLAN.md
Normal file
@@ -0,0 +1,210 @@
|
||||
---
|
||||
phase: 20-real-time-subscriptions
|
||||
plan: 01
|
||||
type: execute
|
||||
wave: 1
|
||||
depends_on: []
|
||||
files_modified:
|
||||
- src/trpc/router.ts
|
||||
- src/trpc/context.ts
|
||||
- src/server/trpc-adapter.ts
|
||||
- src/trpc/subscriptions.ts
|
||||
autonomous: true
|
||||
---
|
||||
|
||||
<objective>
|
||||
Add tRPC subscription procedures and enable SSE streaming on the backend server.
|
||||
|
||||
Purpose: The existing tRPC HTTP adapter buffers entire responses before sending (`fetchResponse.text()` + `res.end()`), which kills streaming. This plan fixes the adapter to support SSE and adds subscription procedures that bridge EventBus domain events into tRPC async generator subscriptions. Uses tRPC v11's native SSE support via `fetchRequestHandler` — no WebSocket dependency needed.
|
||||
|
||||
Output: Backend ready to stream real-time events via SSE to any tRPC client using `httpSubscriptionLink`.
|
||||
</objective>
|
||||
|
||||
<execution_context>
|
||||
@~/.claude/get-shit-done/workflows/execute-plan.md
|
||||
@~/.claude/get-shit-done/templates/summary.md
|
||||
</execution_context>
|
||||
|
||||
<context>
|
||||
@.planning/PROJECT.md
|
||||
@.planning/ROADMAP.md
|
||||
@.planning/STATE.md
|
||||
|
||||
# Key source files:
|
||||
@src/trpc/router.ts
|
||||
@src/trpc/context.ts
|
||||
@src/server/trpc-adapter.ts
|
||||
@src/events/types.ts
|
||||
@src/events/bus.ts
|
||||
</context>
|
||||
|
||||
<tasks>
|
||||
|
||||
<task type="auto">
|
||||
<name>Task 1: Fix HTTP adapter for SSE streaming and add subscription procedures</name>
|
||||
<files>src/server/trpc-adapter.ts, src/trpc/subscriptions.ts, src/trpc/router.ts, src/trpc/context.ts</files>
|
||||
<action>
|
||||
**1. Fix the tRPC HTTP adapter to support streaming responses (`src/server/trpc-adapter.ts`):**
|
||||
|
||||
The current adapter does `const responseBody = await fetchResponse.text(); res.end(responseBody)` which buffers the entire response and breaks SSE. Replace this with stream piping:
|
||||
|
||||
```
|
||||
// Instead of: const responseBody = await fetchResponse.text(); res.end(responseBody);
|
||||
// Do: Check if response body is a ReadableStream and pipe it
|
||||
if (fetchResponse.body) {
|
||||
// For SSE, the response body is a ReadableStream that stays open
|
||||
const reader = fetchResponse.body.getReader();
|
||||
const pump = async () => {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) { res.end(); return; }
|
||||
res.write(value);
|
||||
}
|
||||
};
|
||||
pump().catch(() => res.end());
|
||||
} else {
|
||||
const responseBody = await fetchResponse.text();
|
||||
res.end(responseBody);
|
||||
}
|
||||
```
|
||||
|
||||
Also set response headers before streaming (move the `fetchResponse.headers.forEach` BEFORE the body handling).
|
||||
|
||||
**2. Create subscription procedures module (`src/trpc/subscriptions.ts`):**
|
||||
|
||||
Create a new file with helper functions for bridging EventBus events to tRPC async generators. The pattern:
|
||||
|
||||
```typescript
|
||||
import { tracked } from '@trpc/server';
|
||||
import type { EventBus, DomainEventMap } from '../events/types.js';
|
||||
|
||||
// Helper: Create an async iterable from EventBus events
|
||||
export function eventBusIterable<T extends DomainEventMap>(
|
||||
eventBus: EventBus,
|
||||
eventTypes: T['type'][],
|
||||
signal: AbortSignal,
|
||||
): AsyncGenerator<{ id: string; type: string; payload: unknown; timestamp: Date }> {
|
||||
// Use a queue + promise pattern to bridge push (EventBus) to pull (async generator)
|
||||
}
|
||||
```
|
||||
|
||||
Use a simple queue-based approach: EventBus handlers push events into a queue array, the async generator yields from the queue, and waits on a promise when the queue is empty. Clean up handlers on AbortSignal abort.
|
||||
|
||||
Generate event IDs using a counter (e.g., `${Date.now()}-${counter++}`). Use `tracked(id, data)` for each yielded event to enable client-side reconnection.
|
||||
|
||||
**3. Add subscription procedures to the router (`src/trpc/router.ts`):**
|
||||
|
||||
Add these subscription procedures to `appRouter`:
|
||||
|
||||
- `onEvent` — Subscribe to ALL domain events. Input: optional `lastEventId` for reconnection. Yields `{ id, type, payload, timestamp }` for every event emitted on the EventBus. This is the general-purpose firehose for the frontend.
|
||||
|
||||
- `onAgentUpdate` — Subscribe to agent-specific events only (`agent:spawned`, `agent:stopped`, `agent:crashed`, `agent:resumed`, `agent:waiting`). Input: optional `lastEventId`. More targeted than `onEvent` for the inbox page.
|
||||
|
||||
- `onTaskUpdate` — Subscribe to task and phase events (`task:queued`, `task:dispatched`, `task:completed`, `task:blocked`, `phase:queued`, `phase:started`, `phase:completed`, `phase:blocked`). Input: optional `lastEventId`. For the initiative detail page.
|
||||
|
||||
Each subscription procedure follows this pattern:
|
||||
```typescript
|
||||
onEvent: publicProcedure
|
||||
.input(z.object({ lastEventId: z.string().nullish() }).optional())
|
||||
.subscription(async function* (opts) {
|
||||
const eventBus = opts.ctx.eventBus;
|
||||
// Use eventBusIterable helper
|
||||
yield* eventBusIterable(eventBus, ALL_EVENT_TYPES, opts.signal);
|
||||
}),
|
||||
```
|
||||
|
||||
Define event type arrays as constants:
|
||||
- `AGENT_EVENT_TYPES = ['agent:spawned', 'agent:stopped', 'agent:crashed', 'agent:resumed', 'agent:waiting']`
|
||||
- `TASK_EVENT_TYPES = ['task:queued', 'task:dispatched', 'task:completed', 'task:blocked', 'phase:queued', 'phase:started', 'phase:completed', 'phase:blocked']`
|
||||
- `ALL_EVENT_TYPES` = union of all DomainEventType values
|
||||
|
||||
Do NOT add output validation with `zAsyncIterable` — keep it simple, trust the EventBus types. The subscription output shape is `{ id: string, type: string, payload: unknown, timestamp: string }`.
|
||||
|
||||
**Important:** The `tracked` import is from `@trpc/server`. No new npm packages needed — tRPC v11.9.0 already includes async generator subscription support and SSE handling in `fetchRequestHandler`.
|
||||
</action>
|
||||
<verify>
|
||||
1. `npm run build` succeeds (TypeScript compiles)
|
||||
2. `npm run test` passes (existing tests not broken)
|
||||
3. Start server with `npx tsx src/bin/cw.ts --server`, then `curl -N http://127.0.0.1:3847/trpc/onEvent.subscribe` returns SSE-formatted response with `content-type: text/event-stream` header (connection stays open)
|
||||
</verify>
|
||||
<done>
|
||||
- tRPC HTTP adapter streams responses instead of buffering
|
||||
- Three subscription procedures (onEvent, onAgentUpdate, onTaskUpdate) available on the router
|
||||
- EventBus events bridge into SSE streams via async generators
|
||||
- Existing HTTP queries/mutations still work (no regression)
|
||||
</done>
|
||||
</task>
|
||||
|
||||
<task type="auto">
|
||||
<name>Task 2: Update shared types and verify SSE integration</name>
|
||||
<files>packages/shared/src/types.ts, packages/shared/src/index.ts</files>
|
||||
<action>
|
||||
**1. Export subscription event type from shared package:**
|
||||
|
||||
Add a `SubscriptionEvent` type to `packages/shared/src/types.ts`:
|
||||
|
||||
```typescript
|
||||
export interface SubscriptionEvent {
|
||||
id: string;
|
||||
type: string;
|
||||
payload: unknown;
|
||||
timestamp: string;
|
||||
}
|
||||
```
|
||||
|
||||
Export it from `packages/shared/src/index.ts`.
|
||||
|
||||
This type is what the frontend will use when consuming subscription data in `onData` callbacks.
|
||||
|
||||
**2. Verify the full SSE flow works end-to-end:**
|
||||
|
||||
Write a quick integration check (not a persistent test file — just verify manually or via curl):
|
||||
- Start the server
|
||||
- Open an SSE connection to `onEvent.subscribe`
|
||||
- In another terminal, trigger a health check or any mutation
|
||||
- Verify the SSE stream receives the event
|
||||
|
||||
If the `fetchRequestHandler` does not natively produce SSE for subscription procedures over HTTP, check if the `@trpc/server` package needs the response to include specific headers. The tRPC fetch handler should set `Content-Type: text/event-stream` automatically for subscription responses.
|
||||
|
||||
**3. Verify Vite dev proxy passes SSE correctly:**
|
||||
|
||||
The Vite proxy at `/trpc` must not buffer SSE responses. Check that the existing Vite proxy config in `packages/web/vite.config.ts` works for SSE. If it buffers, add `configure: (proxy) => { proxy.on('proxyRes', (proxyRes) => { proxyRes.headers['cache-control'] = 'no-cache'; }); }` or set `ws: false` explicitly (SSE is NOT WebSocket — it's plain HTTP with `Transfer-Encoding: chunked`).
|
||||
|
||||
Verify by running `npm run dev` in `packages/web/` and curling `http://localhost:5173/trpc/onEvent.subscribe` — should stream SSE events through the Vite proxy.
|
||||
</action>
|
||||
<verify>
|
||||
1. `npm run build` succeeds across all packages (root + web + shared)
|
||||
2. `SubscriptionEvent` type is importable from `@codewalk-district/shared`
|
||||
3. SSE stream works through both direct backend (port 3847) and Vite proxy (port 5173)
|
||||
</verify>
|
||||
<done>
|
||||
- SubscriptionEvent type exported from shared package
|
||||
- SSE streaming verified end-to-end through both direct and proxied connections
|
||||
- Vite dev proxy confirmed to pass SSE without buffering
|
||||
</done>
|
||||
</task>
|
||||
|
||||
</tasks>
|
||||
|
||||
<verification>
|
||||
Before declaring plan complete:
|
||||
- [ ] `npm run build` succeeds without errors
|
||||
- [ ] `npm run test` passes all existing tests
|
||||
- [ ] SSE subscription endpoint responds with `text/event-stream` content type
|
||||
- [ ] EventBus events appear in SSE stream when triggered
|
||||
- [ ] Existing tRPC queries/mutations still work (no regression from adapter changes)
|
||||
</verification>
|
||||
|
||||
<success_criteria>
|
||||
|
||||
- All tasks completed
|
||||
- All verification checks pass
|
||||
- No errors or warnings introduced
|
||||
- tRPC router exposes 3 subscription procedures (onEvent, onAgentUpdate, onTaskUpdate)
|
||||
- HTTP adapter supports streaming responses for SSE
|
||||
- Shared SubscriptionEvent type available for frontend
|
||||
</success_criteria>
|
||||
|
||||
<output>
|
||||
After completion, create `.planning/phases/20-real-time-subscriptions/20-01-SUMMARY.md`
|
||||
</output>
|
||||
188
.planning/phases/20-real-time-subscriptions/20-02-PLAN.md
Normal file
188
.planning/phases/20-real-time-subscriptions/20-02-PLAN.md
Normal file
@@ -0,0 +1,188 @@
|
||||
---
|
||||
phase: 20-real-time-subscriptions
|
||||
plan: 02
|
||||
type: execute
|
||||
wave: 2
|
||||
depends_on: ["20-01"]
|
||||
files_modified:
|
||||
- packages/web/src/lib/trpc.ts
|
||||
- packages/web/src/routes/initiatives.index.tsx
|
||||
- packages/web/src/routes/initiatives.$id.tsx
|
||||
- packages/web/src/routes/inbox.tsx
|
||||
- packages/web/package.json
|
||||
autonomous: true
|
||||
---
|
||||
|
||||
<objective>
|
||||
Wire frontend tRPC client with SSE subscriptions and add live update hooks to all three UI pages.
|
||||
|
||||
Purpose: Currently, the dashboard, initiative detail, and inbox pages fetch data once and only refresh on manual user action. This plan adds `httpSubscriptionLink` via `splitLink` so subscription procedures route to SSE, then wires `useSubscription` hooks into each page to auto-refresh data when domain events arrive.
|
||||
|
||||
Output: All three UI pages update in real-time when agents spawn/stop, tasks complete, or questions arrive.
|
||||
</objective>
|
||||
|
||||
<execution_context>
|
||||
@~/.claude/get-shit-done/workflows/execute-plan.md
|
||||
@~/.claude/get-shit-done/templates/summary.md
|
||||
</execution_context>
|
||||
|
||||
<context>
|
||||
@.planning/PROJECT.md
|
||||
@.planning/ROADMAP.md
|
||||
@.planning/STATE.md
|
||||
|
||||
# Prior plan output (genuinely needed — subscription procedures defined there):
|
||||
@.planning/phases/20-real-time-subscriptions/20-01-SUMMARY.md
|
||||
|
||||
# Key source files:
|
||||
@packages/web/src/lib/trpc.ts
|
||||
@packages/web/src/main.tsx
|
||||
@packages/web/src/routes/initiatives.index.tsx
|
||||
@packages/web/src/routes/initiatives.$id.tsx
|
||||
@packages/web/src/routes/inbox.tsx
|
||||
@packages/web/vite.config.ts
|
||||
</context>
|
||||
|
||||
<tasks>
|
||||
|
||||
<task type="auto">
|
||||
<name>Task 1: Update tRPC client with splitLink for SSE subscriptions</name>
|
||||
<files>packages/web/src/lib/trpc.ts, packages/web/package.json</files>
|
||||
<action>
|
||||
**1. Update the tRPC client configuration (`packages/web/src/lib/trpc.ts`):**
|
||||
|
||||
Replace the single `httpBatchLink` with `splitLink` that routes subscriptions to `httpSubscriptionLink` and everything else to `httpBatchLink`:
|
||||
|
||||
```typescript
|
||||
import { createTRPCReact } from '@trpc/react-query';
|
||||
import { httpBatchLink, splitLink } from '@trpc/client';
|
||||
import { httpSubscriptionLink } from '@trpc/client';
|
||||
import type { AppRouter } from '@codewalk-district/shared';
|
||||
|
||||
export const trpc = createTRPCReact<AppRouter>();
|
||||
|
||||
export function createTRPCClient() {
|
||||
return trpc.createClient({
|
||||
links: [
|
||||
splitLink({
|
||||
condition: (op) => op.type === 'subscription',
|
||||
true: httpSubscriptionLink({ url: '/trpc' }),
|
||||
false: httpBatchLink({ url: '/trpc' }),
|
||||
}),
|
||||
],
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
Key points:
|
||||
- `httpSubscriptionLink` uses SSE (Server-Sent Events) over HTTP — NOT WebSocket. It reuses the same `/trpc` URL.
|
||||
- The Vite dev proxy at `/trpc` already forwards to the backend. SSE works over standard HTTP, so no WebSocket proxy config needed.
|
||||
- No `createWSClient` or `wsLink` needed — SSE is simpler and sufficient for this use case (server-to-client push only).
|
||||
- `httpSubscriptionLink` is already included in `@trpc/client@^11.9.0` — no new packages needed.
|
||||
|
||||
**2. Verify no new packages needed:**
|
||||
|
||||
Check that `@trpc/client@^11.9.0` already exports `httpSubscriptionLink` and `splitLink`. These are part of the core `@trpc/client` package in v11. Do NOT install any additional packages unless the import fails.
|
||||
</action>
|
||||
<verify>
|
||||
1. `npm run build` in `packages/web/` succeeds
|
||||
2. TypeScript can resolve `httpSubscriptionLink` and `splitLink` from `@trpc/client`
|
||||
3. No type errors
|
||||
</verify>
|
||||
<done>
|
||||
- tRPC client routes subscriptions to SSE via httpSubscriptionLink
|
||||
- Queries and mutations still use httpBatchLink (no regression)
|
||||
- splitLink correctly discriminates by operation type
|
||||
</done>
|
||||
</task>
|
||||
|
||||
<task type="auto">
|
||||
<name>Task 2: Add live update hooks to all three UI pages</name>
|
||||
<files>packages/web/src/routes/initiatives.index.tsx, packages/web/src/routes/initiatives.$id.tsx, packages/web/src/routes/inbox.tsx</files>
|
||||
<action>
|
||||
**Pattern for all pages:** Use `trpc.{subscription}.useSubscription()` with an `onData` callback that invalidates the relevant React Query cache, causing the existing `useQuery` hooks to refetch. This is the simplest approach — no local state management for events, just invalidate-on-event.
|
||||
|
||||
```typescript
|
||||
const utils = trpc.useUtils();
|
||||
|
||||
trpc.onTaskUpdate.useSubscription(undefined, {
|
||||
onData: () => {
|
||||
// Invalidate relevant queries to trigger refetch
|
||||
utils.listPhases.invalidate();
|
||||
utils.listTasks.invalidate();
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
**1. Initiative Dashboard (`packages/web/src/routes/initiatives.index.tsx`):**
|
||||
|
||||
Add `onTaskUpdate` subscription. On any task/phase event:
|
||||
- Invalidate `listInitiatives` (status may have changed)
|
||||
- Invalidate `listPhases` (progress bars use phase counts)
|
||||
|
||||
This makes the dashboard phase progress bars and status badges update live.
|
||||
|
||||
**2. Initiative Detail (`packages/web/src/routes/initiatives.$id.tsx`):**
|
||||
|
||||
Add `onTaskUpdate` subscription. On any task/phase event:
|
||||
- Invalidate `listPhases` for the current initiative
|
||||
- Invalidate `listTasks` for visible plans
|
||||
- Invalidate `listPlans` for visible phases
|
||||
|
||||
This makes the phase accordion and task rows update live.
|
||||
|
||||
Also add `onAgentUpdate` subscription to update agent assignment badges:
|
||||
- Invalidate `listAgents` on agent events
|
||||
|
||||
**3. Agent Inbox (`packages/web/src/routes/inbox.tsx`):**
|
||||
|
||||
Add `onAgentUpdate` subscription. On any agent event:
|
||||
- Invalidate `listWaitingAgents` (new question arrived or agent resumed)
|
||||
- Invalidate `listMessages` (message status may have changed)
|
||||
|
||||
This is the most impactful — when an agent asks a question, the inbox lights up without manual refresh.
|
||||
|
||||
**Important implementation notes:**
|
||||
- Use `undefined` as subscription input (no `lastEventId` filtering needed for v1 — just invalidate on any event)
|
||||
- The `useSubscription` hook auto-reconnects on connection loss — tRPC v11's `httpSubscriptionLink` handles this
|
||||
- Don't accumulate events in state — just invalidate queries. Keep it simple.
|
||||
- If the subscription connection fails (server not running), it should fail silently — the pages still work via polling/manual refresh. Do NOT show error UI for subscription failures.
|
||||
- Add `onError: () => {}` to suppress subscription error logs in development (server may not be running during frontend-only development)
|
||||
</action>
|
||||
<verify>
|
||||
1. `npm run build` in `packages/web/` succeeds
|
||||
2. No TypeScript errors on subscription hook usage
|
||||
3. Start backend (`npx tsx src/bin/cw.ts --server`), start frontend (`npm run dev` in packages/web/), open dashboard — SSE connection established (check Network tab in browser DevTools for `onTaskUpdate.subscribe` request with `text/event-stream` response)
|
||||
</verify>
|
||||
<done>
|
||||
- Dashboard auto-refreshes initiative status and phase progress on events
|
||||
- Initiative detail auto-refreshes task/phase status on events
|
||||
- Inbox auto-refreshes agent questions on agent events
|
||||
- Subscription failures are silent — pages degrade to manual refresh
|
||||
- No new state management — subscriptions just invalidate React Query caches
|
||||
</done>
|
||||
</task>
|
||||
|
||||
</tasks>
|
||||
|
||||
<verification>
|
||||
Before declaring plan complete:
|
||||
- [ ] `npm run build` succeeds for all packages
|
||||
- [ ] Frontend connects SSE subscription on page load (visible in DevTools Network tab)
|
||||
- [ ] Triggering a backend event (e.g., spawning agent via CLI) causes frontend data to refresh without manual action
|
||||
- [ ] Closing/reopening the SSE connection auto-reconnects
|
||||
- [ ] Pages still work when backend is not running (graceful degradation)
|
||||
</verification>
|
||||
|
||||
<success_criteria>
|
||||
|
||||
- All tasks completed
|
||||
- All verification checks pass
|
||||
- No errors or warnings introduced
|
||||
- All three UI pages (dashboard, detail, inbox) receive live updates via SSE
|
||||
- Subscription failures degrade gracefully (no error UI, pages still work)
|
||||
</success_criteria>
|
||||
|
||||
<output>
|
||||
After completion, create `.planning/phases/20-real-time-subscriptions/20-02-SUMMARY.md`
|
||||
</output>
|
||||
Reference in New Issue
Block a user