Replaces the in-memory filter (agentManager.list() + filter) with a direct repository query that LEFT JOINs tasks, phases, and initiatives to return taskName, phaseName, initiativeName, and taskDescription alongside agent fields. - Adds AgentWithContext interface and findWaitingWithContext() to AgentRepository port - Implements findWaitingWithContext() in DrizzleAgentRepository using getTableColumns - Wires agentRepository into TRPCContext, CreateContextOptions, and TrpcAdapterOptions - Adds requireAgentRepository() helper following existing pattern - Updates listWaitingAgents to use repository query instead of agentManager - Adds 5 unit tests for findWaitingWithContext() covering all FK join edge cases - Updates existing AgentRepository mocks to satisfy updated interface Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
306 lines
17 KiB
Markdown
306 lines
17 KiB
Markdown
# Server & API Module
|
|
|
|
`apps/server/server/` — HTTP server, `apps/server/trpc/` — tRPC procedures, `apps/server/coordination/` — merge queue.
|
|
|
|
## HTTP Server
|
|
|
|
**Framework**: Native `node:http` (no Express/Fastify)
|
|
**Default**: `127.0.0.1:3847`
|
|
**PID file**: `~/.cw/server.pid`
|
|
|
|
### Routes
|
|
|
|
| Route | Method | Purpose |
|
|
|-------|--------|---------|
|
|
| `/health` | GET | Health check (`{ status, uptime, processCount }`) |
|
|
| `/status` | GET | Full server status with process list |
|
|
| `/trpc/*` | POST | All tRPC procedure calls |
|
|
|
|
### Lifecycle
|
|
- `CoordinationServer.start()` — checks PID file, creates HTTP server, emits `server:started`
|
|
- `CoordinationServer.stop()` — emits `server:stopped`, closes server, removes PID file
|
|
- `GracefulShutdown` handles SIGTERM/SIGINT/SIGHUP with 10s timeout
|
|
|
|
### tRPC Adapter
|
|
`trpc-adapter.ts` converts `node:http` IncomingMessage/ServerResponse to fetch Request/Response for tRPC. Subscriptions stream via ReadableStream bodies (SSE).
|
|
|
|
## tRPC Context
|
|
|
|
All procedures share a context with optional dependencies:
|
|
|
|
```typescript
|
|
interface TRPCContext {
|
|
eventBus: EventBus // always present
|
|
serverStartedAt: Date | null
|
|
processCount: number
|
|
agentManager?: AgentManager // optional
|
|
taskRepository?: TaskRepository
|
|
// ... all 10 repositories, 3 managers, credentialManager, workspaceRoot
|
|
}
|
|
```
|
|
|
|
Each procedure uses `require*Repository(ctx)` helpers that throw `TRPCError(INTERNAL_SERVER_ERROR)` if a dependency is missing.
|
|
|
|
## Procedure Reference
|
|
|
|
### System
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| health | query | Health check with uptime |
|
|
| status | query | Server status with process list |
|
|
| systemHealthCheck | query | Account, agent, project health |
|
|
|
|
### Agents
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| spawnAgent | mutation | Spawn new agent (taskId, prompt, provider, mode) |
|
|
| stopAgent | mutation | Stop agent by name or ID |
|
|
| deleteAgent | mutation | Delete agent and clean up worktree |
|
|
| dismissAgent | mutation | Dismiss agent (set userDismissedAt) |
|
|
| resumeAgent | mutation | Resume with answers |
|
|
| listAgents | query | All agents |
|
|
| getAgent | query | Single agent by name or ID; also returns `taskName`, `initiativeName`, `exitCode` |
|
|
| getAgentResult | query | Execution result |
|
|
| getAgentQuestions | query | Pending questions |
|
|
| getAgentOutput | query | Timestamped log chunks from DB (`{ content, createdAt }[]`) |
|
|
| getTaskAgent | query | Most recent agent assigned to a task (by taskId) |
|
|
| getAgentInputFiles | query | Files written to agent's `.cw/input/` dir (text only, sorted, 500 KB cap) |
|
|
| getAgentPrompt | query | Assembled prompt — reads from DB (`agents.prompt`) first; falls back to `.cw/agent-logs/<name>/PROMPT.md` for pre-persistence agents (1 MB cap) |
|
|
| getActiveRefineAgent | query | Active refine agent for initiative |
|
|
| getActiveConflictAgent | query | Active conflict resolution agent for initiative (name starts with `conflict-`) |
|
|
| listWaitingAgents | query | Agents waiting for input — returns `AgentWithContext[]` enriched with `taskName`, `phaseName`, `initiativeName`, `taskDescription` via SQL LEFT JOINs |
|
|
| listForRadar | query | Radar page: per-agent metrics (questionsCount, messagesCount, subagentsCount, compactionsCount) with time/status/mode/initiative filters |
|
|
| getCompactionEvents | query | Compaction events for one agent: `{agentId}` → `{timestamp, sessionNumber}[]` (cap 200) |
|
|
| getSubagentSpawns | query | Subagent spawn events for one agent: `{agentId}` → `{timestamp, description, promptPreview, fullPrompt}[]` (cap 200) |
|
|
| getQuestionsAsked | query | AskUserQuestion tool calls for one agent: `{agentId}` → `{timestamp, questions[]}[]` (cap 200) |
|
|
| onAgentOutput | subscription | Live raw JSONL output stream via EventBus |
|
|
|
|
### Tasks
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| listTasks | query | Child tasks of parent |
|
|
| getTask | query | Single task |
|
|
| updateTaskStatus | mutation | Change task status |
|
|
| updateTask | mutation | Update task fields (name, description) |
|
|
| createInitiativeTask | mutation | Create task on initiative |
|
|
| createPhaseTask | mutation | Create task on phase |
|
|
| listInitiativeTasks | query | All tasks for initiative |
|
|
| listPhaseTasks | query | All tasks for phase |
|
|
| deleteTask | mutation | Delete a task by ID |
|
|
| listPhaseTaskDependencies | query | All task dependency edges for tasks in a phase |
|
|
| listInitiativeTaskDependencies | query | All task dependency edges for tasks in an initiative |
|
|
|
|
### Initiatives
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| createInitiative | mutation | Create with optional branch/projectIds/description, auto-creates root page (seeded with description); if description provided, auto-spawns refine agent |
|
|
| listInitiatives | query | Filter by status and/or projectId; returns `activity` (state, activePhase, phase counts) computed from phases |
|
|
| getInitiative | query | With projects array |
|
|
| updateInitiative | mutation | Name, status |
|
|
| deleteInitiative | mutation | Cascade delete initiative and all children |
|
|
| updateInitiativeConfig | mutation | executionMode, branch, qualityReview |
|
|
| getInitiativeReviewDiff | query | Full diff of initiative branch vs project default branch |
|
|
| getInitiativeReviewCommits | query | Commits on initiative branch not on default branch |
|
|
| getInitiativeCommitDiff | query | Single commit diff for initiative review |
|
|
| approveInitiativeReview | mutation | Approve initiative review: `{initiativeId, strategy: 'push_branch' \| 'merge_and_push'}` |
|
|
| requestInitiativeChanges | mutation | Request changes on initiative: `{initiativeId, summary}` → creates review task in Finalization phase, resets initiative to active |
|
|
| checkInitiativeMergeability | query | Dry-run merge check: `{initiativeId}` → `{mergeable, conflictFiles[], targetBranch}` |
|
|
| spawnConflictResolutionAgent | mutation | Spawn agent to resolve merge conflicts: `{initiativeId, provider?}` → auto-dismisses stale conflict agents, creates merge task |
|
|
|
|
### Phases
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| createPhase | mutation | Create in initiative |
|
|
| listPhases | query | By initiative |
|
|
| getPhase | query | Single phase |
|
|
| updatePhase | mutation | Name, content, status |
|
|
| approvePhase | mutation | Validate and approve |
|
|
| deletePhase | mutation | Cascade delete |
|
|
| createPhasesFromPlan | mutation | Bulk create from agent output |
|
|
| createPhaseDependency | mutation | Add dependency edge |
|
|
| removePhaseDependency | mutation | Remove dependency edge |
|
|
| listInitiativePhaseDependencies | query | All dependency edges |
|
|
| getPhaseDependencies | query | What this phase depends on |
|
|
| getPhaseDependents | query | What depends on this phase |
|
|
| getPhaseReviewDiff | query | File-level metadata for pending_review phase: `{phaseName, sourceBranch, targetBranch, files: FileStatEntry[], totalAdditions, totalDeletions}` — no hunk content. Results are cached in-memory by `phaseId:headHash` (TTL: `REVIEW_DIFF_CACHE_TTL_MS`, default 5 min). Cache is invalidated when a task merges into the phase branch. |
|
|
| getFileDiff | query | Per-file unified diff on demand: `{phaseId, filePath, projectId?}` → `{binary: boolean, rawDiff: string}`; `filePath` must be URL-encoded; binary files return `{binary: true, rawDiff: ''}` |
|
|
| getPhaseReviewCommits | query | List commits between initiative and phase branch |
|
|
| getCommitDiff | query | Diff for a single commit (by hash) in a phase |
|
|
| approvePhaseReview | mutation | Approve and merge phase branch |
|
|
| requestPhaseChanges | mutation | Request changes: creates revision task from unresolved threaded comments (with `[comment:ID]` tags and reply threads), resets phase to in_progress |
|
|
| listReviewComments | query | List review comments by phaseId (flat list including replies, frontend groups by parentCommentId) |
|
|
| createReviewComment | mutation | Create inline review comment on diff |
|
|
| resolveReviewComment | mutation | Mark review comment as resolved |
|
|
| unresolveReviewComment | mutation | Mark review comment as unresolved |
|
|
| replyToReviewComment | mutation | Create a threaded reply to an existing review comment (copies parent's phaseId/filePath/lineNumber) |
|
|
|
|
### Phase Dispatch
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| queuePhase | mutation | Queue approved phase |
|
|
| queueAllPhases | mutation | Queue all approved phases for initiative |
|
|
| dispatchNextPhase | mutation | Start next ready phase |
|
|
| getPhaseQueueState | query | Queue state |
|
|
| createChildTasks | mutation | Create tasks from detail parent |
|
|
|
|
### Architect (High-Level Agent Spawning)
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| spawnArchitectDiscuss | mutation | Discussion agent |
|
|
| spawnArchitectPlan | mutation | Plan agent (generates phases). Passes initiative context (phases, execution tasks only, pages) |
|
|
| spawnArchitectRefine | mutation | Refine agent (generates proposals) |
|
|
| spawnArchitectDetail | mutation | Detail agent (generates tasks). Passes initiative context (phases, execution tasks only, pages) |
|
|
|
|
### Dispatch
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| queueTask | mutation | Add task to dispatch queue |
|
|
| dispatchNext | mutation | Dispatch next ready task |
|
|
| getQueueState | query | Queue state |
|
|
| completeTask | mutation | Complete task |
|
|
|
|
### Coordination (Merge Queue)
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| queueMerge | mutation | Queue task for merge |
|
|
| processMerges | mutation | Process merge queue |
|
|
| getMergeQueueStatus | query | Queue state |
|
|
| getNextMergeable | query | Next ready-to-merge task |
|
|
|
|
### Projects
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| registerProject | mutation | Clone git repo, create record. Validates defaultBranch exists in repo |
|
|
| listProjects | query | All projects |
|
|
| getProject | query | Single project |
|
|
| updateProject | mutation | Update project settings (defaultBranch). Validates branch exists in repo |
|
|
| deleteProject | mutation | Delete clone and record |
|
|
| getInitiativeProjects | query | Projects for initiative |
|
|
| updateInitiativeProjects | mutation | Sync junction table |
|
|
| syncProject | mutation | `git fetch` + ff-only merge of defaultBranch, updates `lastFetchedAt` |
|
|
| syncAllProjects | mutation | Sync all registered projects |
|
|
| getProjectSyncStatus | query | Returns `{ ahead, behind, lastFetchedAt }` for a project |
|
|
|
|
### Pages
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| getRootPage | query | Auto-creates if missing |
|
|
| getPage | query | Single page |
|
|
| getPageUpdatedAtMap | query | Bulk updatedAt check |
|
|
| listPages | query | By initiative |
|
|
| listChildPages | query | By parent page |
|
|
| createPage | mutation | Create, emit page:created |
|
|
| updatePage | mutation | Title/content/sortOrder, emit page:updated |
|
|
| deletePage | mutation | Delete, emit page:deleted |
|
|
|
|
### Accounts
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| listAccounts | query | All accounts |
|
|
| addAccount | mutation | Create account |
|
|
| removeAccount | mutation | Delete account |
|
|
| refreshAccounts | mutation | Clear expired exhaustion |
|
|
| updateAccountAuth | mutation | Update credentials |
|
|
| markAccountExhausted | mutation | Set exhaustion timer |
|
|
| listProviderNames | query | Available provider names |
|
|
| addAccountByToken | mutation | Upsert account from OAuth token; returns `{ upserted, account }` |
|
|
|
|
### Proposals
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| listProposals | query | By agent or initiative |
|
|
| acceptProposal | mutation | Apply side effects, auto-dismiss agent |
|
|
| dismissProposal | mutation | Dismiss, auto-dismiss agent |
|
|
| acceptAllProposals | mutation | Bulk accept with error collection |
|
|
| dismissAllProposals | mutation | Bulk dismiss |
|
|
|
|
### Subscriptions (SSE)
|
|
| Procedure | Type | Events |
|
|
|-----------|------|--------|
|
|
| onEvent | subscription | All event types |
|
|
| onAgentUpdate | subscription | agent:* events (7 types, excludes agent:output) |
|
|
| onTaskUpdate | subscription | task:* + phase:* events (8 types) |
|
|
| onPageUpdate | subscription | page:created/updated/deleted |
|
|
| onPreviewUpdate | subscription | preview:building/ready/stopped/failed |
|
|
| onConversationUpdate | subscription | conversation:created/answered |
|
|
|
|
Subscriptions use `eventBusIterable()` — queue-based async generator, max 1000 events, 30s heartbeat. `agent:output` is excluded from all general subscriptions (it's high-frequency streaming data); use the dedicated `onAgentOutput` subscription instead.
|
|
|
|
## Coordination Module
|
|
|
|
`apps/server/coordination/` manages merge queue:
|
|
|
|
- **CoordinationManager** port: `queueMerge`, `getNextMergeable`, `processMerges`, `handleConflict`, `getQueueState`
|
|
- **DefaultCoordinationManager** adapter: in-memory queue, dependency-ordered processing
|
|
- **ConflictResolutionService**: creates resolution tasks for merge conflicts
|
|
- Merge flow: queue → check deps → merge via WorktreeManager → handle conflicts
|
|
- Events: `merge:queued`, `merge:started`, `merge:completed`, `merge:conflicted`
|
|
|
|
## Preview Procedures
|
|
|
|
Docker-based preview deployments. No database table — Docker is the source of truth.
|
|
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| `startPreview` | mutation | Start preview: `{initiativeId, phaseId?, projectId, branch}` → PreviewStatus |
|
|
| `stopPreview` | mutation | Stop preview: `{previewId}` |
|
|
| `listPreviews` | query | List active previews: `{initiativeId?}` → PreviewStatus[] |
|
|
| `getPreviewStatus` | query | Get preview status: `{previewId}` → PreviewStatus |
|
|
|
|
Context dependency: `requirePreviewManager(ctx)` — requires `PreviewManager` from container.
|
|
|
|
## Conversation Procedures
|
|
|
|
Inter-agent communication for parallel agents.
|
|
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| `createConversation` | mutation | Ask a question: `{fromAgentId, toAgentId?, phaseId?, taskId?, question}` → Conversation |
|
|
| `getPendingConversations` | query | Poll for incoming questions: `{agentId}` → Conversation[] |
|
|
| `getConversation` | query | Get conversation by ID: `{id}` → Conversation |
|
|
| `answerConversation` | mutation | Answer a conversation: `{id, answer}` → Conversation |
|
|
| `getByFromAgent` | query | Radar drilldown: all conversations sent by agent: `{agentId}` → `{id, timestamp, toAgentName, toAgentId, question, answer, status, taskId, phaseId}[]` (cap 200) |
|
|
|
|
Target resolution: `toAgentId` → direct; `taskId` → find running agent by task; `phaseId` → find running agent by any task in phase.
|
|
|
|
Context dependency: `requireConversationRepository(ctx)`, `requireAgentManager(ctx)`.
|
|
|
|
## Chat Session Procedures
|
|
|
|
Persistent chat loop for iterative phase/task refinement via agent.
|
|
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| `sendChatMessage` | mutation | Send message: `{targetType, targetId, initiativeId, message, provider?}` → `{sessionId, agentId, action}` |
|
|
| `getChatSession` | query | Get active session with messages: `{targetType, targetId}` → ChatSession \| null |
|
|
| `closeChatSession` | mutation | Close session and dismiss agent: `{sessionId}` → `{success}` |
|
|
|
|
`sendChatMessage` finds or creates an active session, stores the user message, then either resumes the existing agent (if `waiting_for_input`) or spawns a fresh one with full chat history + initiative context. Agent runs in `'chat'` mode and signals `"questions"` after applying changes, staying alive for the next message.
|
|
|
|
Context dependency: `requireChatSessionRepository(ctx)`, `requireAgentManager(ctx)`, `requireInitiativeRepository(ctx)`, `requireTaskRepository(ctx)`.
|
|
|
|
## Headquarters Procedures
|
|
|
|
Composite dashboard query aggregating all action items that require user intervention.
|
|
|
|
| Procedure | Type | Description |
|
|
|-----------|------|-------------|
|
|
| `getHeadquartersDashboard` | query | Returns 6 typed arrays of action items (no input required) |
|
|
|
|
### Return Shape
|
|
|
|
```typescript
|
|
{
|
|
waitingForInput: Array<{ agentId, agentName, initiativeId, initiativeName, questionText, waitingSince }>;
|
|
pendingReviewInitiatives: Array<{ initiativeId, initiativeName, since }>;
|
|
pendingReviewPhases: Array<{ initiativeId, initiativeName, phaseId, phaseName, since }>;
|
|
planningInitiatives: Array<{ initiativeId, initiativeName, pendingPhaseCount, since }>;
|
|
resolvingConflicts: Array<{ initiativeId, initiativeName, agentId, agentName, agentStatus, since }>;
|
|
blockedPhases: Array<{ initiativeId, initiativeName, phaseId, phaseName, lastMessage, since }>;
|
|
}
|
|
```
|
|
|
|
Each array is sorted ascending by timestamp (oldest-first). All timestamps are ISO 8601 strings. `lastMessage` is truncated to 160 chars and is `null` when no messages exist or the message repository is not wired.
|
|
|
|
Context dependency: `requireInitiativeRepository(ctx)`, `requirePhaseRepository(ctx)`, `requireAgentManager(ctx)`. Task/message repos are accessed via optional `ctx` fields for `blockedPhases.lastMessage`.
|