feat(05-05): add message and dispatch tRPC procedures

- Add messageRepository and dispatchManager to TRPCContext
- Add requireMessageRepository and requireDispatchManager helpers
- Add message procedures: listMessages, getMessage, respondToMessage
- Add dispatch procedures: queueTask, dispatchNext, getQueueState, completeTask
This commit is contained in:
Lukas May
2026-01-30 20:47:07 +01:00
parent 099f9b6353
commit e0e03eef86
2 changed files with 180 additions and 0 deletions

View File

@@ -8,6 +8,8 @@
import type { EventBus, DomainEvent } from '../events/types.js'; import type { EventBus, DomainEvent } from '../events/types.js';
import type { AgentManager } from '../agent/types.js'; import type { AgentManager } from '../agent/types.js';
import type { TaskRepository } from '../db/repositories/task-repository.js'; import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { MessageRepository } from '../db/repositories/message-repository.js';
import type { DispatchManager } from '../dispatch/types.js';
// Re-export for convenience // Re-export for convenience
export type { EventBus, DomainEvent }; export type { EventBus, DomainEvent };
@@ -26,6 +28,10 @@ export interface TRPCContext {
agentManager?: AgentManager; agentManager?: AgentManager;
/** Task repository for task CRUD operations (optional until server wiring complete) */ /** Task repository for task CRUD operations (optional until server wiring complete) */
taskRepository?: TaskRepository; taskRepository?: TaskRepository;
/** Message repository for agent-user communication (optional until server wiring complete) */
messageRepository?: MessageRepository;
/** Dispatch manager for task queue operations (optional until server wiring complete) */
dispatchManager?: DispatchManager;
} }
/** /**
@@ -37,6 +43,8 @@ export interface CreateContextOptions {
processCount: number; processCount: number;
agentManager?: AgentManager; agentManager?: AgentManager;
taskRepository?: TaskRepository; taskRepository?: TaskRepository;
messageRepository?: MessageRepository;
dispatchManager?: DispatchManager;
} }
/** /**
@@ -52,5 +60,7 @@ export function createContext(options: CreateContextOptions): TRPCContext {
processCount: options.processCount, processCount: options.processCount,
agentManager: options.agentManager, agentManager: options.agentManager,
taskRepository: options.taskRepository, taskRepository: options.taskRepository,
messageRepository: options.messageRepository,
dispatchManager: options.dispatchManager,
}; };
} }

View File

@@ -10,6 +10,8 @@ import { z } from 'zod';
import type { TRPCContext } from './context.js'; import type { TRPCContext } from './context.js';
import type { AgentInfo, AgentResult } from '../agent/types.js'; import type { AgentInfo, AgentResult } from '../agent/types.js';
import type { TaskRepository } from '../db/repositories/task-repository.js'; import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { MessageRepository } from '../db/repositories/message-repository.js';
import type { DispatchManager } from '../dispatch/types.js';
/** /**
* Initialize tRPC with our context type. * Initialize tRPC with our context type.
@@ -190,6 +192,32 @@ function requireTaskRepository(ctx: TRPCContext): TaskRepository {
return ctx.taskRepository; return ctx.taskRepository;
} }
/**
* Helper to ensure messageRepository is available in context.
*/
function requireMessageRepository(ctx: TRPCContext): MessageRepository {
if (!ctx.messageRepository) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Message repository not available',
});
}
return ctx.messageRepository;
}
/**
* Helper to ensure dispatchManager is available in context.
*/
function requireDispatchManager(ctx: TRPCContext): DispatchManager {
if (!ctx.dispatchManager) {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Dispatch manager not available',
});
}
return ctx.dispatchManager;
}
// ============================================================================= // =============================================================================
// Application Router with Procedures // Application Router with Procedures
// ============================================================================= // =============================================================================
@@ -210,6 +238,13 @@ function requireTaskRepository(ctx: TRPCContext): TaskRepository {
* - listTasks: List tasks for a plan * - listTasks: List tasks for a plan
* - getTask: Get task by ID * - getTask: Get task by ID
* - updateTaskStatus: Update task status * - updateTaskStatus: Update task status
* - listMessages: List messages with optional filtering
* - getMessage: Get message by ID
* - respondToMessage: Respond to a message
* - queueTask: Queue a task for dispatch
* - dispatchNext: Dispatch next available task
* - getQueueState: Get dispatch queue state
* - completeTask: Mark a task as complete
*/ */
export const appRouter = router({ export const appRouter = router({
/** /**
@@ -395,6 +430,141 @@ export const appRouter = router({
} }
return taskRepository.update(input.id, { status: input.status }); return taskRepository.update(input.id, { status: input.status });
}), }),
// ===========================================================================
// Message Procedures
// ===========================================================================
/**
* List messages with optional filtering.
* Filter by agent ID or status.
*/
listMessages: publicProcedure
.input(z.object({
agentId: z.string().optional(),
status: z.enum(['pending', 'read', 'responded']).optional(),
}))
.query(async ({ ctx, input }) => {
const messageRepository = requireMessageRepository(ctx);
// Get all messages for user (recipientType='user')
let messages = await messageRepository.findByRecipient('user');
// Filter by agentId if provided (sender is the agent)
if (input.agentId) {
messages = messages.filter(m => m.senderId === input.agentId);
}
// Filter by status if provided
if (input.status) {
messages = messages.filter(m => m.status === input.status);
}
return messages;
}),
/**
* Get a single message by ID.
* Throws NOT_FOUND if message doesn't exist.
*/
getMessage: publicProcedure
.input(z.object({ id: z.string().min(1) }))
.query(async ({ ctx, input }) => {
const messageRepository = requireMessageRepository(ctx);
const message = await messageRepository.findById(input.id);
if (!message) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Message '${input.id}' not found`,
});
}
return message;
}),
/**
* Respond to a message.
* Updates message with response and sets status to 'responded'.
*/
respondToMessage: publicProcedure
.input(z.object({
id: z.string().min(1),
response: z.string().min(1),
}))
.mutation(async ({ ctx, input }) => {
const messageRepository = requireMessageRepository(ctx);
// Check message exists
const existing = await messageRepository.findById(input.id);
if (!existing) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Message '${input.id}' not found`,
});
}
// Create a response message linked to the original
const responseMessage = await messageRepository.create({
senderType: 'user',
recipientType: 'agent',
recipientId: existing.senderId,
type: 'response',
content: input.response,
parentMessageId: input.id,
});
// Update original message status to 'responded'
await messageRepository.update(input.id, { status: 'responded' });
return responseMessage;
}),
// ===========================================================================
// Dispatch Procedures
// ===========================================================================
/**
* Queue a task for dispatch.
* Task will be dispatched when all dependencies complete.
*/
queueTask: publicProcedure
.input(z.object({ taskId: z.string().min(1) }))
.mutation(async ({ ctx, input }) => {
const dispatchManager = requireDispatchManager(ctx);
await dispatchManager.queue(input.taskId);
return { success: true };
}),
/**
* Dispatch next available task to an agent.
* Returns dispatch result with task and agent info.
*/
dispatchNext: publicProcedure
.mutation(async ({ ctx }) => {
const dispatchManager = requireDispatchManager(ctx);
return dispatchManager.dispatchNext();
}),
/**
* Get current queue state.
* Returns queued, ready, and blocked task counts.
*/
getQueueState: publicProcedure
.query(async ({ ctx }) => {
const dispatchManager = requireDispatchManager(ctx);
return dispatchManager.getQueueState();
}),
/**
* Mark a task as complete.
* Removes from queue and triggers dependent task re-evaluation.
*/
completeTask: publicProcedure
.input(z.object({ taskId: z.string().min(1) }))
.mutation(async ({ ctx, input }) => {
const dispatchManager = requireDispatchManager(ctx);
await dispatchManager.completeTask(input.taskId);
return { success: true };
}),
}); });
/** /**