diff --git a/src/trpc/context.ts b/src/trpc/context.ts index ccde186..8a42c24 100644 --- a/src/trpc/context.ts +++ b/src/trpc/context.ts @@ -8,6 +8,8 @@ import type { EventBus, DomainEvent } from '../events/types.js'; import type { AgentManager } from '../agent/types.js'; import type { TaskRepository } from '../db/repositories/task-repository.js'; +import type { MessageRepository } from '../db/repositories/message-repository.js'; +import type { DispatchManager } from '../dispatch/types.js'; // Re-export for convenience export type { EventBus, DomainEvent }; @@ -26,6 +28,10 @@ export interface TRPCContext { agentManager?: AgentManager; /** Task repository for task CRUD operations (optional until server wiring complete) */ taskRepository?: TaskRepository; + /** Message repository for agent-user communication (optional until server wiring complete) */ + messageRepository?: MessageRepository; + /** Dispatch manager for task queue operations (optional until server wiring complete) */ + dispatchManager?: DispatchManager; } /** @@ -37,6 +43,8 @@ export interface CreateContextOptions { processCount: number; agentManager?: AgentManager; taskRepository?: TaskRepository; + messageRepository?: MessageRepository; + dispatchManager?: DispatchManager; } /** @@ -52,5 +60,7 @@ export function createContext(options: CreateContextOptions): TRPCContext { processCount: options.processCount, agentManager: options.agentManager, taskRepository: options.taskRepository, + messageRepository: options.messageRepository, + dispatchManager: options.dispatchManager, }; } diff --git a/src/trpc/router.ts b/src/trpc/router.ts index b1dd2ee..69ae66b 100644 --- a/src/trpc/router.ts +++ b/src/trpc/router.ts @@ -10,6 +10,8 @@ import { z } from 'zod'; import type { TRPCContext } from './context.js'; import type { AgentInfo, AgentResult } from '../agent/types.js'; import type { TaskRepository } from '../db/repositories/task-repository.js'; +import type { MessageRepository } from '../db/repositories/message-repository.js'; +import type { DispatchManager } from '../dispatch/types.js'; /** * Initialize tRPC with our context type. @@ -190,6 +192,32 @@ function requireTaskRepository(ctx: TRPCContext): TaskRepository { return ctx.taskRepository; } +/** + * Helper to ensure messageRepository is available in context. + */ +function requireMessageRepository(ctx: TRPCContext): MessageRepository { + if (!ctx.messageRepository) { + throw new TRPCError({ + code: 'INTERNAL_SERVER_ERROR', + message: 'Message repository not available', + }); + } + return ctx.messageRepository; +} + +/** + * Helper to ensure dispatchManager is available in context. + */ +function requireDispatchManager(ctx: TRPCContext): DispatchManager { + if (!ctx.dispatchManager) { + throw new TRPCError({ + code: 'INTERNAL_SERVER_ERROR', + message: 'Dispatch manager not available', + }); + } + return ctx.dispatchManager; +} + // ============================================================================= // Application Router with Procedures // ============================================================================= @@ -210,6 +238,13 @@ function requireTaskRepository(ctx: TRPCContext): TaskRepository { * - listTasks: List tasks for a plan * - getTask: Get task by ID * - updateTaskStatus: Update task status + * - listMessages: List messages with optional filtering + * - getMessage: Get message by ID + * - respondToMessage: Respond to a message + * - queueTask: Queue a task for dispatch + * - dispatchNext: Dispatch next available task + * - getQueueState: Get dispatch queue state + * - completeTask: Mark a task as complete */ export const appRouter = router({ /** @@ -395,6 +430,141 @@ export const appRouter = router({ } return taskRepository.update(input.id, { status: input.status }); }), + + // =========================================================================== + // Message Procedures + // =========================================================================== + + /** + * List messages with optional filtering. + * Filter by agent ID or status. + */ + listMessages: publicProcedure + .input(z.object({ + agentId: z.string().optional(), + status: z.enum(['pending', 'read', 'responded']).optional(), + })) + .query(async ({ ctx, input }) => { + const messageRepository = requireMessageRepository(ctx); + + // Get all messages for user (recipientType='user') + let messages = await messageRepository.findByRecipient('user'); + + // Filter by agentId if provided (sender is the agent) + if (input.agentId) { + messages = messages.filter(m => m.senderId === input.agentId); + } + + // Filter by status if provided + if (input.status) { + messages = messages.filter(m => m.status === input.status); + } + + return messages; + }), + + /** + * Get a single message by ID. + * Throws NOT_FOUND if message doesn't exist. + */ + getMessage: publicProcedure + .input(z.object({ id: z.string().min(1) })) + .query(async ({ ctx, input }) => { + const messageRepository = requireMessageRepository(ctx); + const message = await messageRepository.findById(input.id); + if (!message) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: `Message '${input.id}' not found`, + }); + } + return message; + }), + + /** + * Respond to a message. + * Updates message with response and sets status to 'responded'. + */ + respondToMessage: publicProcedure + .input(z.object({ + id: z.string().min(1), + response: z.string().min(1), + })) + .mutation(async ({ ctx, input }) => { + const messageRepository = requireMessageRepository(ctx); + + // Check message exists + const existing = await messageRepository.findById(input.id); + if (!existing) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: `Message '${input.id}' not found`, + }); + } + + // Create a response message linked to the original + const responseMessage = await messageRepository.create({ + senderType: 'user', + recipientType: 'agent', + recipientId: existing.senderId, + type: 'response', + content: input.response, + parentMessageId: input.id, + }); + + // Update original message status to 'responded' + await messageRepository.update(input.id, { status: 'responded' }); + + return responseMessage; + }), + + // =========================================================================== + // Dispatch Procedures + // =========================================================================== + + /** + * Queue a task for dispatch. + * Task will be dispatched when all dependencies complete. + */ + queueTask: publicProcedure + .input(z.object({ taskId: z.string().min(1) })) + .mutation(async ({ ctx, input }) => { + const dispatchManager = requireDispatchManager(ctx); + await dispatchManager.queue(input.taskId); + return { success: true }; + }), + + /** + * Dispatch next available task to an agent. + * Returns dispatch result with task and agent info. + */ + dispatchNext: publicProcedure + .mutation(async ({ ctx }) => { + const dispatchManager = requireDispatchManager(ctx); + return dispatchManager.dispatchNext(); + }), + + /** + * Get current queue state. + * Returns queued, ready, and blocked task counts. + */ + getQueueState: publicProcedure + .query(async ({ ctx }) => { + const dispatchManager = requireDispatchManager(ctx); + return dispatchManager.getQueueState(); + }), + + /** + * Mark a task as complete. + * Removes from queue and triggers dependent task re-evaluation. + */ + completeTask: publicProcedure + .input(z.object({ taskId: z.string().min(1) })) + .mutation(async ({ ctx, input }) => { + const dispatchManager = requireDispatchManager(ctx); + await dispatchManager.completeTask(input.taskId); + return { success: true }; + }), }); /**