Merge branch 'main' into cw/unified-event-flow-conflict-1772795597661

# Conflicts:
#	apps/web/src/components/review/ReviewTab.tsx
#	apps/web/src/routes/initiatives/$id.tsx
This commit is contained in:
Lukas May
2026-03-06 12:16:07 +01:00
80 changed files with 4771 additions and 519 deletions

View File

@@ -14,6 +14,7 @@ import {
} from './index.js';
import type { TRPCContext } from './context.js';
import type { EventBus } from '../events/types.js';
import type { AccountRepository } from '../db/repositories/account-repository.js';
// Create caller factory for the app router
const createCaller = createCallerFactory(appRouter);
@@ -161,6 +162,79 @@ describe('tRPC Router', () => {
});
});
describe('addAccountByToken procedure', () => {
let mockRepo: AccountRepository;
beforeEach(() => {
mockRepo = {
findByEmail: vi.fn(),
updateAccountAuth: vi.fn(),
create: vi.fn(),
} as unknown as AccountRepository;
vi.clearAllMocks();
});
it('creates a new account and returns { upserted: false, account }', async () => {
const stubAccount = { id: 'new-id', email: 'user@example.com', provider: 'claude' };
(mockRepo.findByEmail as ReturnType<typeof vi.fn>).mockResolvedValue(null);
(mockRepo.create as ReturnType<typeof vi.fn>).mockResolvedValue(stubAccount);
const testCtx = createTestContext({ accountRepository: mockRepo });
const testCaller = createCaller(testCtx);
const result = await testCaller.addAccountByToken({ email: 'user@example.com', token: 'my-token' });
expect(result).toEqual({ upserted: false, account: stubAccount });
expect(mockRepo.create).toHaveBeenCalledWith({
email: 'user@example.com',
provider: 'claude',
configJson: '{"hasCompletedOnboarding":true}',
credentials: '{"claudeAiOauth":{"accessToken":"my-token"}}',
});
expect(mockRepo.updateAccountAuth).not.toHaveBeenCalled();
});
it('updates existing account and returns { upserted: true, account }', async () => {
const existingAccount = { id: 'existing-id', email: 'user@example.com', provider: 'claude' };
const updatedAccount = { id: 'existing-id', email: 'user@example.com', provider: 'claude', updated: true };
(mockRepo.findByEmail as ReturnType<typeof vi.fn>).mockResolvedValue(existingAccount);
(mockRepo.updateAccountAuth as ReturnType<typeof vi.fn>).mockResolvedValue(updatedAccount);
const testCtx = createTestContext({ accountRepository: mockRepo });
const testCaller = createCaller(testCtx);
const result = await testCaller.addAccountByToken({ email: 'user@example.com', token: 'my-token' });
expect(result).toEqual({ upserted: true, account: updatedAccount });
expect(mockRepo.updateAccountAuth).toHaveBeenCalledWith(
'existing-id',
'{"hasCompletedOnboarding":true}',
'{"claudeAiOauth":{"accessToken":"my-token"}}',
);
expect(mockRepo.create).not.toHaveBeenCalled();
});
it('throws BAD_REQUEST when email is empty', async () => {
const testCtx = createTestContext({ accountRepository: mockRepo });
const testCaller = createCaller(testCtx);
await expect(testCaller.addAccountByToken({ email: '', provider: 'claude', token: 'tok' }))
.rejects.toMatchObject({ code: 'BAD_REQUEST' });
expect(mockRepo.findByEmail).not.toHaveBeenCalled();
expect(mockRepo.create).not.toHaveBeenCalled();
expect(mockRepo.updateAccountAuth).not.toHaveBeenCalled();
});
it('throws BAD_REQUEST when token is empty', async () => {
const testCtx = createTestContext({ accountRepository: mockRepo });
const testCaller = createCaller(testCtx);
await expect(testCaller.addAccountByToken({ email: 'user@example.com', provider: 'claude', token: '' }))
.rejects.toMatchObject({ code: 'BAD_REQUEST' });
expect(mockRepo.findByEmail).not.toHaveBeenCalled();
expect(mockRepo.create).not.toHaveBeenCalled();
expect(mockRepo.updateAccountAuth).not.toHaveBeenCalled();
});
});
describe('Zod schema validation', () => {
it('healthResponseSchema should reject invalid status', () => {
const invalid = {

View File

@@ -72,5 +72,29 @@ export function accountProcedures(publicProcedure: ProcedureBuilder) {
.query(() => {
return listProviderNames();
}),
addAccountByToken: publicProcedure
.input(z.object({
email: z.string().min(1),
provider: z.string().default('claude'),
token: z.string().min(1),
}))
.mutation(async ({ ctx, input }) => {
const repo = requireAccountRepository(ctx);
const credentials = JSON.stringify({ claudeAiOauth: { accessToken: input.token } });
const configJson = JSON.stringify({ hasCompletedOnboarding: true });
const existing = await repo.findByEmail(input.email);
if (existing) {
const account = await repo.updateAccountAuth(existing.id, configJson, credentials);
return { upserted: true, account };
}
const account = await repo.create({
email: input.email,
provider: input.provider,
configJson,
credentials,
});
return { upserted: false, account };
}),
};
}

View File

@@ -184,6 +184,27 @@ export function agentProcedures(publicProcedure: ProcedureBuilder) {
return candidates[0] ?? null;
}),
getActiveConflictAgent: publicProcedure
.input(z.object({ initiativeId: z.string().min(1) }))
.query(async ({ ctx, input }): Promise<AgentInfo | null> => {
const agentManager = requireAgentManager(ctx);
const allAgents = await agentManager.list();
const candidates = allAgents
.filter(
(a) =>
a.mode === 'execute' &&
a.initiativeId === input.initiativeId &&
a.name?.startsWith('conflict-') &&
['running', 'waiting_for_input', 'idle', 'crashed'].includes(a.status) &&
!a.userDismissedAt,
)
.sort(
(a, b) =>
new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(),
);
return candidates[0] ?? null;
}),
getAgentOutput: publicProcedure
.input(agentIdentifierSchema)
.query(async ({ ctx, input }): Promise<string> => {

View File

@@ -91,6 +91,9 @@ export function changeSetProcedures(publicProcedure: ProcedureBuilder) {
}
}
// Mark reverted FIRST to avoid ghost state if entity deletion fails partway
await repo.markReverted(input.id);
// Apply reverts in reverse entry order
const reversedEntries = [...cs.entries].reverse();
for (const entry of reversedEntries) {
@@ -159,8 +162,6 @@ export function changeSetProcedures(publicProcedure: ProcedureBuilder) {
}
}
await repo.markReverted(input.id);
ctx.eventBus.emit({
type: 'changeset:reverted' as const,
timestamp: new Date(),

View File

@@ -35,5 +35,15 @@ export function dispatchProcedures(publicProcedure: ProcedureBuilder) {
await dispatchManager.completeTask(input.taskId);
return { success: true };
}),
retryBlockedTask: publicProcedure
.input(z.object({ taskId: z.string().min(1) }))
.mutation(async ({ ctx, input }) => {
const dispatchManager = requireDispatchManager(ctx);
await dispatchManager.retryBlockedTask(input.taskId);
// Kick dispatch loop to pick up the re-queued task
await dispatchManager.dispatchNext();
return { success: true };
}),
};
}

View File

@@ -7,7 +7,7 @@ import { z } from 'zod';
import type { ProcedureBuilder } from '../trpc.js';
import { requireAgentManager, requireInitiativeRepository, requireProjectRepository, requireTaskRepository, requireBranchManager, requireExecutionOrchestrator } from './_helpers.js';
import { deriveInitiativeActivity } from './initiative-activity.js';
import { buildRefinePrompt } from '../../agent/prompts/index.js';
import { buildRefinePrompt, buildConflictResolutionPrompt, buildConflictResolutionDescription } from '../../agent/prompts/index.js';
import type { PageForSerialization } from '../../agent/content-serializer.js';
import { ensureProjectClone } from '../../git/project-clones.js';
@@ -335,5 +335,145 @@ export function initiativeProcedures(publicProcedure: ProcedureBuilder) {
await orchestrator.approveInitiative(input.initiativeId, input.strategy);
return { success: true };
}),
requestInitiativeChanges: publicProcedure
.input(z.object({
initiativeId: z.string().min(1),
summary: z.string().trim().min(1),
}))
.mutation(async ({ ctx, input }) => {
const orchestrator = requireExecutionOrchestrator(ctx);
const result = await orchestrator.requestChangesOnInitiative(
input.initiativeId,
input.summary,
);
return { success: true, taskId: result.taskId };
}),
checkInitiativeMergeability: publicProcedure
.input(z.object({ initiativeId: z.string().min(1) }))
.query(async ({ ctx, input }) => {
const initiativeRepo = requireInitiativeRepository(ctx);
const projectRepo = requireProjectRepository(ctx);
const branchManager = requireBranchManager(ctx);
const initiative = await initiativeRepo.findById(input.initiativeId);
if (!initiative) {
throw new TRPCError({ code: 'NOT_FOUND', message: `Initiative '${input.initiativeId}' not found` });
}
if (!initiative.branch) {
throw new TRPCError({ code: 'BAD_REQUEST', message: 'Initiative has no branch configured' });
}
const projects = await projectRepo.findProjectsByInitiativeId(input.initiativeId);
const allConflicts: string[] = [];
let mergeable = true;
for (const project of projects) {
const clonePath = await ensureProjectClone(project, ctx.workspaceRoot!);
const result = await branchManager.checkMergeability(clonePath, initiative.branch, project.defaultBranch);
if (!result.mergeable) {
mergeable = false;
if (result.conflicts) allConflicts.push(...result.conflicts);
}
}
return {
mergeable,
conflictFiles: allConflicts,
targetBranch: projects[0]?.defaultBranch ?? 'main',
};
}),
spawnConflictResolutionAgent: publicProcedure
.input(z.object({
initiativeId: z.string().min(1),
provider: z.string().optional(),
}))
.mutation(async ({ ctx, input }) => {
const agentManager = requireAgentManager(ctx);
const initiativeRepo = requireInitiativeRepository(ctx);
const projectRepo = requireProjectRepository(ctx);
const taskRepo = requireTaskRepository(ctx);
const branchManager = requireBranchManager(ctx);
const initiative = await initiativeRepo.findById(input.initiativeId);
if (!initiative) {
throw new TRPCError({ code: 'NOT_FOUND', message: `Initiative '${input.initiativeId}' not found` });
}
if (!initiative.branch) {
throw new TRPCError({ code: 'BAD_REQUEST', message: 'Initiative has no branch configured' });
}
const projects = await projectRepo.findProjectsByInitiativeId(input.initiativeId);
if (projects.length === 0) {
throw new TRPCError({ code: 'BAD_REQUEST', message: 'Initiative has no linked projects' });
}
// Auto-dismiss stale conflict agents
const allAgents = await agentManager.list();
const staleAgents = allAgents.filter(
(a) =>
a.mode === 'execute' &&
a.initiativeId === input.initiativeId &&
a.name?.startsWith('conflict-') &&
['crashed', 'idle'].includes(a.status) &&
!a.userDismissedAt,
);
for (const stale of staleAgents) {
await agentManager.dismiss(stale.id);
}
// Reject if active conflict agent already running
const activeConflictAgents = allAgents.filter(
(a) =>
a.mode === 'execute' &&
a.initiativeId === input.initiativeId &&
a.name?.startsWith('conflict-') &&
['running', 'waiting_for_input'].includes(a.status),
);
if (activeConflictAgents.length > 0) {
throw new TRPCError({
code: 'CONFLICT',
message: 'A conflict resolution agent is already running for this initiative',
});
}
// Re-check mergeability to get current conflict list
const project = projects[0];
const clonePath = await ensureProjectClone(project, ctx.workspaceRoot!);
const mergeCheck = await branchManager.checkMergeability(clonePath, initiative.branch, project.defaultBranch);
if (mergeCheck.mergeable) {
throw new TRPCError({ code: 'BAD_REQUEST', message: 'No merge conflicts detected — merge is clean' });
}
const conflicts = mergeCheck.conflicts ?? [];
const targetBranch = project.defaultBranch;
// Create task
const task = await taskRepo.create({
initiativeId: input.initiativeId,
name: `Resolve conflicts: ${initiative.name}`,
description: buildConflictResolutionDescription(initiative.branch, targetBranch, conflicts),
category: 'merge',
status: 'in_progress',
});
// Spawn agent on a unique temp branch based off the initiative branch.
// Using initiative.branch directly as branchName would cause SimpleGitWorktreeManager.create()
// to run `git branch -f <branch> <base>`, force-resetting the initiative branch.
const tempBranch = `${initiative.branch}-conflict-${Date.now()}`;
const prompt = buildConflictResolutionPrompt(initiative.branch, targetBranch, conflicts);
return agentManager.spawn({
name: `conflict-${Date.now()}`,
taskId: task.id,
prompt,
mode: 'execute',
provider: input.provider,
initiativeId: input.initiativeId,
baseBranch: initiative.branch,
branchName: tempBranch,
});
}),
};
}

View File

@@ -53,7 +53,7 @@ export function phaseDispatchProcedures(publicProcedure: ProcedureBuilder) {
number: z.number().int().positive(),
name: z.string().min(1),
description: z.string(),
type: z.enum(['auto', 'checkpoint:human-verify', 'checkpoint:decision', 'checkpoint:human-action']).default('auto'),
type: z.enum(['auto']).default('auto'),
dependencies: z.array(z.number().int().positive()).optional(),
})),
}))

View File

@@ -6,7 +6,7 @@ import { TRPCError } from '@trpc/server';
import { z } from 'zod';
import type { Phase } from '../../db/schema.js';
import type { ProcedureBuilder } from '../trpc.js';
import { requirePhaseRepository, requireTaskRepository, requireBranchManager, requireInitiativeRepository, requireProjectRepository, requireExecutionOrchestrator, requireReviewCommentRepository } from './_helpers.js';
import { requirePhaseRepository, requireTaskRepository, requireBranchManager, requireInitiativeRepository, requireProjectRepository, requireExecutionOrchestrator, requireReviewCommentRepository, requireChangeSetRepository } from './_helpers.js';
import { phaseBranchName } from '../../git/branch-naming.js';
import { ensureProjectClone } from '../../git/project-clones.js';
@@ -98,6 +98,29 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
.mutation(async ({ ctx, input }) => {
const repo = requirePhaseRepository(ctx);
await repo.delete(input.id);
// Reconcile any applied changesets that created this phase.
// If all created phases in a changeset are now deleted, mark it reverted.
if (ctx.changeSetRepository) {
try {
const csRepo = requireChangeSetRepository(ctx);
const affectedChangeSets = await csRepo.findAppliedByCreatedEntity('phase', input.id);
for (const cs of affectedChangeSets) {
const createdPhaseIds = cs.entries
.filter(e => e.entityType === 'phase' && e.action === 'create')
.map(e => e.entityId);
const survivingPhases = await Promise.all(
createdPhaseIds.map(id => repo.findById(id)),
);
if (survivingPhases.every(p => p === null)) {
await csRepo.markReverted(cs.id);
}
}
} catch {
// Best-effort reconciliation — don't fail the delete
}
}
return { success: true };
}),
@@ -196,8 +219,8 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
if (!phase) {
throw new TRPCError({ code: 'NOT_FOUND', message: `Phase '${input.phaseId}' not found` });
}
if (phase.status !== 'pending_review') {
throw new TRPCError({ code: 'BAD_REQUEST', message: `Phase is not pending review (status: ${phase.status})` });
if (phase.status !== 'pending_review' && phase.status !== 'completed') {
throw new TRPCError({ code: 'BAD_REQUEST', message: `Phase is not reviewable (status: ${phase.status})` });
}
const initiative = await initiativeRepo.findById(phase.initiativeId);
@@ -207,13 +230,15 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
const initBranch = initiative.branch;
const phBranch = phaseBranchName(initBranch, phase.name);
// For completed phases, use stored merge base; for pending_review, use initiative branch
const diffBase = (phase.status === 'completed' && phase.mergeBase) ? phase.mergeBase : initBranch;
const projects = await projectRepo.findProjectsByInitiativeId(phase.initiativeId);
let rawDiff = '';
for (const project of projects) {
const clonePath = await ensureProjectClone(project, ctx.workspaceRoot!);
const diff = await branchManager.diffBranches(clonePath, initBranch, phBranch);
const diff = await branchManager.diffBranches(clonePath, diffBase, phBranch);
if (diff) {
rawDiff += diff + '\n';
}
@@ -247,8 +272,8 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
if (!phase) {
throw new TRPCError({ code: 'NOT_FOUND', message: `Phase '${input.phaseId}' not found` });
}
if (phase.status !== 'pending_review') {
throw new TRPCError({ code: 'BAD_REQUEST', message: `Phase is not pending review (status: ${phase.status})` });
if (phase.status !== 'pending_review' && phase.status !== 'completed') {
throw new TRPCError({ code: 'BAD_REQUEST', message: `Phase is not reviewable (status: ${phase.status})` });
}
const initiative = await initiativeRepo.findById(phase.initiativeId);
@@ -258,13 +283,14 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
const initBranch = initiative.branch;
const phBranch = phaseBranchName(initBranch, phase.name);
const diffBase = (phase.status === 'completed' && phase.mergeBase) ? phase.mergeBase : initBranch;
const projects = await projectRepo.findProjectsByInitiativeId(phase.initiativeId);
const allCommits: Array<{ hash: string; shortHash: string; message: string; author: string; date: string; filesChanged: number; insertions: number; deletions: number }> = [];
for (const project of projects) {
const clonePath = await ensureProjectClone(project, ctx.workspaceRoot!);
const commits = await branchManager.listCommits(clonePath, initBranch, phBranch);
const commits = await branchManager.listCommits(clonePath, diffBase, phBranch);
allCommits.push(...commits);
}
@@ -320,6 +346,20 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
return repo.create(input);
}),
updateReviewComment: publicProcedure
.input(z.object({
id: z.string().min(1),
body: z.string().trim().min(1),
}))
.mutation(async ({ ctx, input }) => {
const repo = requireReviewCommentRepository(ctx);
const comment = await repo.update(input.id, input.body);
if (!comment) {
throw new TRPCError({ code: 'NOT_FOUND', message: `Review comment '${input.id}' not found` });
}
return comment;
}),
resolveReviewComment: publicProcedure
.input(z.object({ id: z.string().min(1) }))
.mutation(async ({ ctx, input }) => {
@@ -342,25 +382,54 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
return comment;
}),
replyToReviewComment: publicProcedure
.input(z.object({
parentCommentId: z.string().min(1),
body: z.string().trim().min(1),
author: z.string().optional(),
}))
.mutation(async ({ ctx, input }) => {
const repo = requireReviewCommentRepository(ctx);
return repo.createReply(input.parentCommentId, input.body, input.author);
}),
requestPhaseChanges: publicProcedure
.input(z.object({
phaseId: z.string().min(1),
summary: z.string().optional(),
summary: z.string().trim().min(1).optional(),
}))
.mutation(async ({ ctx, input }) => {
const orchestrator = requireExecutionOrchestrator(ctx);
const reviewCommentRepo = requireReviewCommentRepository(ctx);
const allComments = await reviewCommentRepo.findByPhaseId(input.phaseId);
const unresolved = allComments
.filter((c: { resolved: boolean }) => !c.resolved)
.map((c: { filePath: string; lineNumber: number; body: string }) => ({
// Build threaded structure: unresolved root comments with their replies
const rootComments = allComments.filter((c) => !c.parentCommentId);
const repliesByParent = new Map<string, typeof allComments>();
for (const c of allComments) {
if (c.parentCommentId) {
const arr = repliesByParent.get(c.parentCommentId) ?? [];
arr.push(c);
repliesByParent.set(c.parentCommentId, arr);
}
}
const unresolvedThreads = rootComments
.filter((c) => !c.resolved)
.map((c) => ({
id: c.id,
filePath: c.filePath,
lineNumber: c.lineNumber,
body: c.body,
author: c.author,
replies: (repliesByParent.get(c.id) ?? []).map((r) => ({
id: r.id,
body: r.body,
author: r.author,
})),
}));
if (unresolved.length === 0 && !input.summary) {
if (unresolvedThreads.length === 0 && !input.summary) {
throw new TRPCError({
code: 'BAD_REQUEST',
message: 'Add comments or a summary before requesting changes',
@@ -369,7 +438,7 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
const result = await orchestrator.requestChangesOnPhase(
input.phaseId,
unresolved,
unresolvedThreads,
input.summary,
);
return { success: true, taskId: result.taskId };

View File

@@ -10,6 +10,7 @@ import {
requireInitiativeRepository,
requirePhaseRepository,
requireDispatchManager,
requireChangeSetRepository,
} from './_helpers.js';
export function taskProcedures(publicProcedure: ProcedureBuilder) {
@@ -49,6 +50,14 @@ export function taskProcedures(publicProcedure: ProcedureBuilder) {
message: `Task '${input.id}' not found`,
});
}
// Route through dispatchManager when completing — emits task:completed
// event so the orchestrator can check phase completion and merge branches
if (input.status === 'completed' && ctx.dispatchManager) {
await ctx.dispatchManager.completeTask(input.id);
return (await taskRepository.findById(input.id))!;
}
return taskRepository.update(input.id, { status: input.status });
}),
@@ -58,7 +67,7 @@ export function taskProcedures(publicProcedure: ProcedureBuilder) {
name: z.string().min(1),
description: z.string().optional(),
category: z.enum(['execute', 'research', 'discuss', 'plan', 'detail', 'refine', 'verify', 'merge', 'review']).optional(),
type: z.enum(['auto', 'checkpoint:human-verify', 'checkpoint:decision', 'checkpoint:human-action']).optional(),
type: z.enum(['auto']).optional(),
}))
.mutation(async ({ ctx, input }) => {
const taskRepository = requireTaskRepository(ctx);
@@ -88,7 +97,7 @@ export function taskProcedures(publicProcedure: ProcedureBuilder) {
name: z.string().min(1),
description: z.string().optional(),
category: z.enum(['execute', 'research', 'discuss', 'plan', 'detail', 'refine', 'verify', 'merge', 'review']).optional(),
type: z.enum(['auto', 'checkpoint:human-verify', 'checkpoint:decision', 'checkpoint:human-action']).optional(),
type: z.enum(['auto']).optional(),
}))
.mutation(async ({ ctx, input }) => {
const taskRepository = requireTaskRepository(ctx);
@@ -152,6 +161,29 @@ export function taskProcedures(publicProcedure: ProcedureBuilder) {
.mutation(async ({ ctx, input }) => {
const taskRepository = requireTaskRepository(ctx);
await taskRepository.delete(input.id);
// Reconcile any applied changesets that created this task.
// If all created tasks in a changeset are now deleted, mark it reverted.
if (ctx.changeSetRepository) {
try {
const csRepo = requireChangeSetRepository(ctx);
const affectedChangeSets = await csRepo.findAppliedByCreatedEntity('task', input.id);
for (const cs of affectedChangeSets) {
const createdTaskIds = cs.entries
.filter(e => e.entityType === 'task' && e.action === 'create')
.map(e => e.entityId);
const survivingTasks = await Promise.all(
createdTaskIds.map(id => taskRepository.findById(id)),
);
if (survivingTasks.every(t => t === null)) {
await csRepo.markReverted(cs.id);
}
}
} catch {
// Best-effort reconciliation — don't fail the delete
}
}
return { success: true };
}),

View File

@@ -40,7 +40,6 @@ export const ALL_EVENT_TYPES: DomainEventType[] = [
'agent:account_switched',
'agent:deleted',
'agent:waiting',
'agent:output',
'task:queued',
'task:dispatched',
'task:completed',
@@ -84,7 +83,6 @@ export const AGENT_EVENT_TYPES: DomainEventType[] = [
'agent:account_switched',
'agent:deleted',
'agent:waiting',
'agent:output',
];
/**