feat(05-01): add MessageRepository port and adapter

- Create MessageRepository port interface with CRUD and query methods
- Implement DrizzleMessageRepository adapter with nanoid generation
- Add findBySender/findByRecipient for participant-based queries
- Add findPendingForUser for unread user notifications
- Add findRequiringResponse for messages awaiting reply
- Add findReplies for message threading support
- Add 23 tests covering all operations and edge cases
- Update test-helpers with messages table schema
- Re-export from index files following AgentRepository pattern
This commit is contained in:
Lukas May
2026-01-30 20:35:09 +01:00
parent 586f7caa7a
commit 19dc75c3f4
6 changed files with 751 additions and 0 deletions

View File

@@ -10,3 +10,4 @@ export { DrizzlePhaseRepository } from './phase.js';
export { DrizzlePlanRepository } from './plan.js';
export { DrizzleTaskRepository } from './task.js';
export { DrizzleAgentRepository } from './agent.js';
export { DrizzleMessageRepository } from './message.js';

View File

@@ -0,0 +1,464 @@
/**
* DrizzleMessageRepository Tests
*
* Tests for the Message repository adapter.
*/
import { describe, it, expect, beforeEach } from 'vitest';
import { DrizzleMessageRepository } from './message.js';
import { DrizzleAgentRepository } from './agent.js';
import { DrizzleTaskRepository } from './task.js';
import { DrizzlePlanRepository } from './plan.js';
import { DrizzlePhaseRepository } from './phase.js';
import { DrizzleInitiativeRepository } from './initiative.js';
import { createTestDatabase } from './test-helpers.js';
import type { DrizzleDatabase } from '../../index.js';
describe('DrizzleMessageRepository', () => {
let db: DrizzleDatabase;
let messageRepo: DrizzleMessageRepository;
let agentRepo: DrizzleAgentRepository;
let testAgentId: string;
beforeEach(async () => {
db = createTestDatabase();
messageRepo = new DrizzleMessageRepository(db);
agentRepo = new DrizzleAgentRepository(db);
// Create required hierarchy for agent FK
const taskRepo = new DrizzleTaskRepository(db);
const planRepo = new DrizzlePlanRepository(db);
const phaseRepo = new DrizzlePhaseRepository(db);
const initiativeRepo = new DrizzleInitiativeRepository(db);
const initiative = await initiativeRepo.create({
name: 'Test Initiative',
});
const phase = await phaseRepo.create({
initiativeId: initiative.id,
number: 1,
name: 'Test Phase',
});
const plan = await planRepo.create({
phaseId: phase.id,
number: 1,
name: 'Test Plan',
});
const task = await taskRepo.create({
planId: plan.id,
name: 'Test Task',
order: 1,
});
// Create a test agent
const agent = await agentRepo.create({
name: 'test-agent',
worktreeId: 'worktree-123',
taskId: task.id,
});
testAgentId = agent.id;
});
describe('create', () => {
it('should create agent→user message (question)', async () => {
const message = await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
type: 'question',
content: 'Do you want to proceed with deployment?',
requiresResponse: true,
});
expect(message.id).toBeDefined();
expect(message.id.length).toBeGreaterThan(0);
expect(message.senderType).toBe('agent');
expect(message.senderId).toBe(testAgentId);
expect(message.recipientType).toBe('user');
expect(message.recipientId).toBeNull();
expect(message.type).toBe('question');
expect(message.content).toBe('Do you want to proceed with deployment?');
expect(message.requiresResponse).toBe(true);
expect(message.status).toBe('pending');
expect(message.parentMessageId).toBeNull();
expect(message.createdAt).toBeInstanceOf(Date);
expect(message.updatedAt).toBeInstanceOf(Date);
});
it('should create agent→user message (notification, requiresResponse=false)', async () => {
const message = await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
type: 'info',
content: 'Build completed successfully.',
requiresResponse: false,
});
expect(message.type).toBe('info');
expect(message.requiresResponse).toBe(false);
expect(message.status).toBe('pending');
});
it('should create user→agent response', async () => {
// First create the question
const question = await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
type: 'question',
content: 'Which database?',
requiresResponse: true,
});
// Then create user response
const response = await messageRepo.create({
senderType: 'user',
recipientType: 'agent',
recipientId: testAgentId,
type: 'response',
content: 'Use PostgreSQL',
parentMessageId: question.id,
});
expect(response.senderType).toBe('user');
expect(response.senderId).toBeNull();
expect(response.recipientType).toBe('agent');
expect(response.recipientId).toBe(testAgentId);
expect(response.parentMessageId).toBe(question.id);
});
it('should default type to info', async () => {
const message = await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
content: 'Status update',
});
expect(message.type).toBe('info');
});
});
describe('findById', () => {
it('should return null for non-existent message', async () => {
const result = await messageRepo.findById('non-existent-id');
expect(result).toBeNull();
});
it('should find an existing message', async () => {
const created = await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
content: 'Test message',
});
const found = await messageRepo.findById(created.id);
expect(found).not.toBeNull();
expect(found!.id).toBe(created.id);
expect(found!.content).toBe('Test message');
});
});
describe('findBySender', () => {
it('should find messages by agent sender', async () => {
await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
content: 'Message 1',
});
await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
content: 'Message 2',
});
const messages = await messageRepo.findBySender('agent', testAgentId);
expect(messages.length).toBe(2);
});
it('should find messages by user sender', async () => {
await messageRepo.create({
senderType: 'user',
recipientType: 'agent',
recipientId: testAgentId,
content: 'User message 1',
});
await messageRepo.create({
senderType: 'user',
recipientType: 'agent',
recipientId: testAgentId,
content: 'User message 2',
});
const messages = await messageRepo.findBySender('user');
expect(messages.length).toBe(2);
});
it('should return empty array when no messages from sender', async () => {
const messages = await messageRepo.findBySender('agent', 'nonexistent-id');
expect(messages).toEqual([]);
});
});
describe('findByRecipient', () => {
it('should find messages by user recipient', async () => {
await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
content: 'For user 1',
});
await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
content: 'For user 2',
});
const messages = await messageRepo.findByRecipient('user');
expect(messages.length).toBe(2);
});
it('should find messages by agent recipient', async () => {
await messageRepo.create({
senderType: 'user',
recipientType: 'agent',
recipientId: testAgentId,
content: 'For agent',
});
const messages = await messageRepo.findByRecipient('agent', testAgentId);
expect(messages.length).toBe(1);
});
it('should return empty array when no messages for recipient', async () => {
const messages = await messageRepo.findByRecipient('agent', 'nonexistent-id');
expect(messages).toEqual([]);
});
});
describe('findPendingForUser', () => {
it('should return only user-targeted pending messages', async () => {
// Create pending message for user
await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
content: 'Pending for user',
status: 'pending',
});
// Create read message for user (should not be returned)
const readMessage = await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
content: 'Already read',
});
await messageRepo.update(readMessage.id, { status: 'read' });
// Create pending message for agent (should not be returned)
await messageRepo.create({
senderType: 'user',
recipientType: 'agent',
recipientId: testAgentId,
content: 'For agent not user',
});
const pending = await messageRepo.findPendingForUser();
expect(pending.length).toBe(1);
expect(pending[0].content).toBe('Pending for user');
});
it('should return empty array when no pending messages for user', async () => {
const pending = await messageRepo.findPendingForUser();
expect(pending).toEqual([]);
});
});
describe('findRequiringResponse', () => {
it('should return only messages needing response', async () => {
// Create message requiring response
await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
type: 'question',
content: 'Requires answer',
requiresResponse: true,
});
// Create message not requiring response
await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
type: 'info',
content: 'Just info',
requiresResponse: false,
});
// Create message that required response but was responded
const responded = await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
type: 'question',
content: 'Already answered',
requiresResponse: true,
});
await messageRepo.update(responded.id, { status: 'responded' });
const requiring = await messageRepo.findRequiringResponse();
expect(requiring.length).toBe(1);
expect(requiring[0].content).toBe('Requires answer');
});
it('should return empty array when no messages require response', async () => {
const requiring = await messageRepo.findRequiringResponse();
expect(requiring).toEqual([]);
});
});
describe('findReplies (message threading)', () => {
it('should find all replies to a parent message', async () => {
// Create original question
const question = await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
type: 'question',
content: 'Original question',
requiresResponse: true,
});
// Create two replies
await messageRepo.create({
senderType: 'user',
recipientType: 'agent',
recipientId: testAgentId,
type: 'response',
content: 'First reply',
parentMessageId: question.id,
});
await messageRepo.create({
senderType: 'user',
recipientType: 'agent',
recipientId: testAgentId,
type: 'response',
content: 'Second reply',
parentMessageId: question.id,
});
// Create unrelated message (should not be in replies)
await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
content: 'Unrelated message',
});
const replies = await messageRepo.findReplies(question.id);
expect(replies.length).toBe(2);
expect(replies.every((r) => r.parentMessageId === question.id)).toBe(true);
});
it('should return empty array when message has no replies', async () => {
const message = await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
content: 'No replies',
});
const replies = await messageRepo.findReplies(message.id);
expect(replies).toEqual([]);
});
});
describe('update status flow', () => {
it('should update status: pending → read → responded', async () => {
const message = await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
type: 'question',
content: 'Status flow test',
requiresResponse: true,
});
expect(message.status).toBe('pending');
// Wait to ensure different timestamps
await new Promise((resolve) => setTimeout(resolve, 10));
// Update to read
const readMessage = await messageRepo.update(message.id, { status: 'read' });
expect(readMessage.status).toBe('read');
expect(readMessage.updatedAt.getTime()).toBeGreaterThan(message.updatedAt.getTime());
// Wait again
await new Promise((resolve) => setTimeout(resolve, 10));
// Update to responded
const respondedMessage = await messageRepo.update(readMessage.id, {
status: 'responded',
});
expect(respondedMessage.status).toBe('responded');
expect(respondedMessage.updatedAt.getTime()).toBeGreaterThan(
readMessage.updatedAt.getTime()
);
});
});
describe('update', () => {
it('should throw for non-existent message', async () => {
await expect(
messageRepo.update('non-existent-id', { status: 'read' })
).rejects.toThrow('Message not found');
});
it('should update content and updatedAt', async () => {
const created = await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
content: 'Original content',
});
await new Promise((resolve) => setTimeout(resolve, 10));
const updated = await messageRepo.update(created.id, {
content: 'Updated content',
});
expect(updated.content).toBe('Updated content');
expect(updated.updatedAt.getTime()).toBeGreaterThan(created.updatedAt.getTime());
});
});
describe('delete', () => {
it('should delete an existing message', async () => {
const created = await messageRepo.create({
senderType: 'agent',
senderId: testAgentId,
recipientType: 'user',
content: 'To delete',
});
await messageRepo.delete(created.id);
const found = await messageRepo.findById(created.id);
expect(found).toBeNull();
});
it('should throw for non-existent message', async () => {
await expect(messageRepo.delete('non-existent-id')).rejects.toThrow(
'Message not found'
);
});
});
});

View File

@@ -0,0 +1,143 @@
/**
* Drizzle Message Repository Adapter
*
* Implements MessageRepository interface using Drizzle ORM.
*/
import { eq, and, desc } from 'drizzle-orm';
import { nanoid } from 'nanoid';
import type { DrizzleDatabase } from '../../index.js';
import { messages, type Message } from '../../schema.js';
import type {
MessageRepository,
CreateMessageData,
UpdateMessageData,
MessageParticipantType,
} from '../message-repository.js';
/**
* Drizzle adapter for MessageRepository.
*
* Uses dependency injection for database instance,
* enabling isolated test databases.
*/
export class DrizzleMessageRepository implements MessageRepository {
constructor(private db: DrizzleDatabase) {}
async create(data: CreateMessageData): Promise<Message> {
const id = nanoid();
const now = new Date();
await this.db.insert(messages).values({
id,
senderType: data.senderType,
senderId: data.senderId ?? null,
recipientType: data.recipientType,
recipientId: data.recipientId ?? null,
type: data.type ?? 'info',
content: data.content,
requiresResponse: data.requiresResponse ?? false,
status: data.status ?? 'pending',
parentMessageId: data.parentMessageId ?? null,
createdAt: now,
updatedAt: now,
});
// Fetch to get the complete record with all defaults applied
const created = await this.findById(id);
return created!;
}
async findById(id: string): Promise<Message | null> {
const result = await this.db
.select()
.from(messages)
.where(eq(messages.id, id))
.limit(1);
return result[0] ?? null;
}
async findBySender(type: MessageParticipantType, id?: string): Promise<Message[]> {
if (id) {
return this.db
.select()
.from(messages)
.where(and(eq(messages.senderType, type), eq(messages.senderId, id)))
.orderBy(desc(messages.createdAt));
}
// For user sender (no id), find where senderType='user' and senderId is null
return this.db
.select()
.from(messages)
.where(eq(messages.senderType, type))
.orderBy(desc(messages.createdAt));
}
async findByRecipient(type: MessageParticipantType, id?: string): Promise<Message[]> {
if (id) {
return this.db
.select()
.from(messages)
.where(and(eq(messages.recipientType, type), eq(messages.recipientId, id)))
.orderBy(desc(messages.createdAt));
}
// For user recipient (no id), find where recipientType='user' and recipientId is null
return this.db
.select()
.from(messages)
.where(eq(messages.recipientType, type))
.orderBy(desc(messages.createdAt));
}
async findPendingForUser(): Promise<Message[]> {
return this.db
.select()
.from(messages)
.where(and(eq(messages.recipientType, 'user'), eq(messages.status, 'pending')))
.orderBy(desc(messages.createdAt));
}
async findRequiringResponse(): Promise<Message[]> {
return this.db
.select()
.from(messages)
.where(and(eq(messages.requiresResponse, true), eq(messages.status, 'pending')))
.orderBy(desc(messages.createdAt));
}
async findReplies(parentMessageId: string): Promise<Message[]> {
return this.db
.select()
.from(messages)
.where(eq(messages.parentMessageId, parentMessageId))
.orderBy(desc(messages.createdAt));
}
async update(id: string, data: UpdateMessageData): Promise<Message> {
const existing = await this.findById(id);
if (!existing) {
throw new Error(`Message not found: ${id}`);
}
const updated = {
...data,
updatedAt: new Date(),
};
await this.db.update(messages).set(updated).where(eq(messages.id, id));
return { ...existing, ...updated } as Message;
}
async delete(id: string): Promise<void> {
const existing = await this.findById(id);
if (!existing) {
throw new Error(`Message not found: ${id}`);
}
await this.db.delete(messages).where(eq(messages.id, id));
}
}

View File

@@ -82,6 +82,22 @@ CREATE TABLE IF NOT EXISTS agents (
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
-- Messages table
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY NOT NULL,
sender_type TEXT NOT NULL,
sender_id TEXT REFERENCES agents(id) ON DELETE SET NULL,
recipient_type TEXT NOT NULL,
recipient_id TEXT REFERENCES agents(id) ON DELETE SET NULL,
type TEXT NOT NULL DEFAULT 'info',
content TEXT NOT NULL,
requires_response INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'pending',
parent_message_id TEXT REFERENCES messages(id) ON DELETE SET NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
`;
/**

View File

@@ -35,3 +35,12 @@ export type {
AgentStatus,
CreateAgentData,
} from './agent-repository.js';
export type {
MessageRepository,
MessageParticipantType,
MessageType,
MessageStatus,
CreateMessageData,
UpdateMessageData,
} from './message-repository.js';

View File

@@ -0,0 +1,118 @@
/**
* Message Repository Port Interface
*
* Port for Message aggregate operations.
* Implementations (Drizzle, etc.) are adapters.
*
* Messages persist agent questions for users to query and respond later.
* Supports threading via parentMessageId for response linking.
*/
import type { Message } from '../schema.js';
/**
* Message sender/recipient type.
*/
export type MessageParticipantType = 'agent' | 'user';
/**
* Message type.
*/
export type MessageType = 'question' | 'info' | 'error' | 'response';
/**
* Message status.
*/
export type MessageStatus = 'pending' | 'read' | 'responded';
/**
* Data for creating a new message.
* Omits system-managed fields (id, createdAt, updatedAt).
*/
export interface CreateMessageData {
senderType: MessageParticipantType;
senderId?: string | null;
recipientType: MessageParticipantType;
recipientId?: string | null;
type?: MessageType;
content: string;
requiresResponse?: boolean;
status?: MessageStatus;
parentMessageId?: string | null;
}
/**
* Data for updating a message.
* Partial of create data - all fields optional.
*/
export type UpdateMessageData = Partial<CreateMessageData>;
/**
* Message Repository Port
*
* Defines operations for the Message aggregate.
* Enables message persistence for agent-user communication.
*/
export interface MessageRepository {
/**
* Create a new message.
* Generates id and sets timestamps automatically.
*/
create(data: CreateMessageData): Promise<Message>;
/**
* Find a message by its ID.
* Returns null if not found.
*/
findById(id: string): Promise<Message | null>;
/**
* Find messages by sender.
* @param type - 'agent' or 'user'
* @param id - Optional sender ID (agent ID if type='agent', omit for user)
* Returns messages ordered by createdAt DESC.
*/
findBySender(type: MessageParticipantType, id?: string): Promise<Message[]>;
/**
* Find messages by recipient.
* @param type - 'agent' or 'user'
* @param id - Optional recipient ID (agent ID if type='agent', omit for user)
* Returns messages ordered by createdAt DESC.
*/
findByRecipient(type: MessageParticipantType, id?: string): Promise<Message[]>;
/**
* Find all pending messages for user.
* Returns messages where recipientType='user' and status='pending'.
* Ordered by createdAt DESC.
*/
findPendingForUser(): Promise<Message[]>;
/**
* Find all messages requiring a response.
* Returns messages where requiresResponse=true and status='pending'.
* Ordered by createdAt DESC.
*/
findRequiringResponse(): Promise<Message[]>;
/**
* Find all replies to a message.
* @param parentMessageId - The ID of the parent message
* Returns messages ordered by createdAt DESC.
*/
findReplies(parentMessageId: string): Promise<Message[]>;
/**
* Update a message.
* Throws if message not found.
* Updates updatedAt timestamp automatically.
*/
update(id: string, data: UpdateMessageData): Promise<Message>;
/**
* Delete a message.
* Throws if message not found.
*/
delete(id: string): Promise<void>;
}