Merge branch 'refs/heads/main' into cw/agent-details-conflict-1772799979862

# Conflicts:
#	apps/server/drizzle/meta/_journal.json
This commit is contained in:
Lukas May
2026-03-06 13:34:28 +01:00
82 changed files with 7164 additions and 537 deletions

View File

@@ -8,7 +8,7 @@
import { promisify } from 'node:util';
import { execFile } from 'node:child_process';
import { readFile, readdir, rm, cp, mkdir } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import { existsSync, readdirSync } from 'node:fs';
import { join } from 'node:path';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
@@ -49,10 +49,35 @@ export class CleanupManager {
*/
private resolveAgentCwd(worktreeId: string): string {
const base = this.getAgentWorkdir(worktreeId);
// Fast path: .cw/output exists at the base level
if (existsSync(join(base, '.cw', 'output'))) {
return base;
}
// Standalone agents use a workspace/ subdirectory
const workspaceSub = join(base, 'workspace');
if (!existsSync(join(base, '.cw', 'output')) && existsSync(join(workspaceSub, '.cw'))) {
if (existsSync(join(workspaceSub, '.cw'))) {
return workspaceSub;
}
// Initiative-based agents may have written .cw/ inside a project
// subdirectory (e.g. agent-workdirs/<name>/codewalk-district/.cw/).
// Probe immediate children for a .cw/output directory.
try {
const entries = readdirSync(base, { withFileTypes: true });
for (const entry of entries) {
if (entry.isDirectory() && entry.name !== '.cw') {
const projectSub = join(base, entry.name);
if (existsSync(join(projectSub, '.cw', 'output'))) {
return projectSub;
}
}
}
} catch {
// base dir may not exist
}
return base;
}

View File

@@ -68,6 +68,7 @@ describe('writeInputFiles', () => {
name: 'Phase One',
content: 'First phase',
status: 'pending',
mergeBase: null,
createdAt: new Date(),
updatedAt: new Date(),
} as Phase;

View File

@@ -397,6 +397,34 @@ export async function readDecisionFiles(agentWorkdir: string): Promise<ParsedDec
});
}
export interface ParsedCommentResponse {
commentId: string;
body: string;
resolved?: boolean;
}
export async function readCommentResponses(agentWorkdir: string): Promise<ParsedCommentResponse[]> {
const filePath = join(agentWorkdir, '.cw', 'output', 'comment-responses.json');
try {
const raw = await readFile(filePath, 'utf-8');
const parsed = JSON.parse(raw);
if (!Array.isArray(parsed)) return [];
return parsed
.filter((entry: unknown) => {
if (typeof entry !== 'object' || entry === null) return false;
const e = entry as Record<string, unknown>;
return typeof e.commentId === 'string' && typeof e.body === 'string';
})
.map((entry: Record<string, unknown>) => ({
commentId: String(entry.commentId),
body: String(entry.body),
resolved: typeof entry.resolved === 'boolean' ? entry.resolved : undefined,
}));
} catch {
return [];
}
}
export async function readPageFiles(agentWorkdir: string): Promise<ParsedPageFile[]> {
const dirPath = join(agentWorkdir, '.cw', 'output', 'pages');
return readFrontmatterDir(dirPath, (data, body, filename) => {

View File

@@ -27,6 +27,7 @@ import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { PageRepository } from '../db/repositories/page-repository.js';
import type { LogChunkRepository } from '../db/repositories/log-chunk-repository.js';
import type { ChatSessionRepository } from '../db/repositories/chat-session-repository.js';
import type { ReviewCommentRepository } from '../db/repositories/review-comment-repository.js';
import { generateUniqueAlias } from './alias.js';
import type {
EventBus,
@@ -42,7 +43,7 @@ import { getProvider } from './providers/registry.js';
import { createModuleLogger } from '../logger/index.js';
import { getProjectCloneDir } from '../git/project-clones.js';
import { join } from 'node:path';
import { unlink, readFile, writeFile as writeFileAsync } from 'node:fs/promises';
import { unlink, readFile, writeFile as writeFileAsync, mkdir } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import type { AccountCredentialManager } from './credentials/types.js';
import { ProcessManager } from './process-manager.js';
@@ -84,11 +85,12 @@ export class MultiProviderAgentManager implements AgentManager {
private debug: boolean = false,
processManagerOverride?: ProcessManager,
private chatSessionRepository?: ChatSessionRepository,
private reviewCommentRepository?: ReviewCommentRepository,
) {
this.signalManager = new FileSystemSignalManager();
this.processManager = processManagerOverride ?? new ProcessManager(workspaceRoot, projectRepository);
this.credentialHandler = new CredentialHandler(workspaceRoot, accountRepository, credentialManager);
this.outputHandler = new OutputHandler(repository, eventBus, changeSetRepository, phaseRepository, taskRepository, pageRepository, this.signalManager, chatSessionRepository);
this.outputHandler = new OutputHandler(repository, eventBus, changeSetRepository, phaseRepository, taskRepository, pageRepository, this.signalManager, chatSessionRepository, reviewCommentRepository);
this.cleanupManager = new CleanupManager(workspaceRoot, repository, projectRepository, eventBus, debug, this.signalManager);
this.lifecycleController = createLifecycleController({
repository,
@@ -295,6 +297,10 @@ export class MultiProviderAgentManager implements AgentManager {
if (options.inputContext) {
await writeInputFiles({ agentWorkdir: agentCwd, ...options.inputContext, agentId, agentName: alias });
log.debug({ alias }, 'input files written');
} else {
// Always create .cw/output/ at the agent workdir root so the agent
// writes signal.json here rather than in a project subdirectory.
await mkdir(join(agentCwd, '.cw', 'output'), { recursive: true });
}
// 4. Build spawn command
@@ -330,32 +336,10 @@ export class MultiProviderAgentManager implements AgentManager {
await this.repository.update(agentId, { pid, outputFilePath, prompt });
// Write spawn diagnostic file for post-execution verification
const diagnostic = {
timestamp: new Date().toISOString(),
agentId,
alias,
intendedCwd: finalCwd,
worktreeId: agent.worktreeId,
provider: providerName,
command,
args,
env: processEnv,
cwdExistsAtSpawn: existsSync(finalCwd),
initiativeId: initiativeId || null,
customCwdProvided: !!cwd,
accountId: accountId || null,
};
await writeFileAsync(
join(finalCwd, '.cw', 'spawn-diagnostic.json'),
JSON.stringify(diagnostic, null, 2),
'utf-8'
);
// Register agent and start polling BEFORE non-critical I/O so that a
// diagnostic-write failure can never orphan a running process.
const activeEntry: ActiveAgent = { agentId, pid, tailer, outputFilePath, agentCwd: finalCwd };
this.activeAgents.set(agentId, activeEntry);
log.info({ agentId, alias, pid, diagnosticWritten: true }, 'detached subprocess started with diagnostic');
// Emit spawned event
if (this.eventBus) {
@@ -375,6 +359,37 @@ export class MultiProviderAgentManager implements AgentManager {
);
activeEntry.cancelPoll = cancel;
// Write spawn diagnostic file (non-fatal — .cw/ may not exist yet for
// agents spawned without inputContext, e.g. conflict-resolution agents)
try {
const diagnosticDir = join(finalCwd, '.cw');
await mkdir(diagnosticDir, { recursive: true });
const diagnostic = {
timestamp: new Date().toISOString(),
agentId,
alias,
intendedCwd: finalCwd,
worktreeId: agent.worktreeId,
provider: providerName,
command,
args,
env: processEnv,
cwdExistsAtSpawn: existsSync(finalCwd),
initiativeId: initiativeId || null,
customCwdProvided: !!cwd,
accountId: accountId || null,
};
await writeFileAsync(
join(diagnosticDir, 'spawn-diagnostic.json'),
JSON.stringify(diagnostic, null, 2),
'utf-8'
);
} catch (err) {
log.warn({ agentId, alias, err: err instanceof Error ? err.message : String(err) }, 'failed to write spawn diagnostic');
}
log.info({ agentId, alias, pid }, 'detached subprocess started');
return this.toAgentInfo(agent);
}

View File

@@ -7,7 +7,7 @@
*/
import { readFile } from 'node:fs/promises';
import { existsSync } from 'node:fs';
import { existsSync, readdirSync } from 'node:fs';
import { join } from 'node:path';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { ChangeSetRepository, CreateChangeSetEntryData } from '../db/repositories/change-set-repository.js';
@@ -15,6 +15,7 @@ import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { PageRepository } from '../db/repositories/page-repository.js';
import type { ChatSessionRepository } from '../db/repositories/chat-session-repository.js';
import type { ReviewCommentRepository } from '../db/repositories/review-comment-repository.js';
import type {
EventBus,
AgentStoppedEvent,
@@ -37,6 +38,7 @@ import {
readDecisionFiles,
readPageFiles,
readFrontmatterFile,
readCommentResponses,
} from './file-io.js';
import { getProvider } from './providers/registry.js';
import { markdownToTiptapJson } from './markdown-to-tiptap.js';
@@ -92,6 +94,7 @@ export class OutputHandler {
private pageRepository?: PageRepository,
private signalManager?: SignalManager,
private chatSessionRepository?: ChatSessionRepository,
private reviewCommentRepository?: ReviewCommentRepository,
) {}
/**
@@ -230,10 +233,10 @@ export class OutputHandler {
log.debug({ agentId }, 'detached agent completed');
// Resolve actual agent working directory — standalone agents run in a
// "workspace/" subdirectory inside getAgentWorkdir, so prefer agentCwd
// recorded at spawn time when available.
const agentWorkdir = active?.agentCwd ?? getAgentWorkdir(agent.worktreeId);
// Resolve actual agent working directory.
// The recorded agentCwd may be the parent dir (agent-workdirs/<name>/) while
// the agent actually writes .cw/output/ inside a project subdirectory.
const agentWorkdir = this.resolveAgentWorkdir(active?.agentCwd ?? getAgentWorkdir(agent.worktreeId));
const outputDir = join(agentWorkdir, '.cw', 'output');
const expectedPwdFile = join(agentWorkdir, '.cw', 'expected-pwd.txt');
const diagnosticFile = join(agentWorkdir, '.cw', 'spawn-diagnostic.json');
@@ -851,6 +854,28 @@ export class OutputHandler {
}
}
// Process comment responses from agent (for review/execute tasks)
if (this.reviewCommentRepository) {
try {
const commentResponses = await readCommentResponses(agentWorkdir);
for (const resp of commentResponses) {
try {
await this.reviewCommentRepository.createReply(resp.commentId, resp.body, 'agent');
if (resp.resolved) {
await this.reviewCommentRepository.resolve(resp.commentId);
}
} catch (err) {
log.warn({ agentId, commentId: resp.commentId, err: err instanceof Error ? err.message : String(err) }, 'failed to process comment response');
}
}
if (commentResponses.length > 0) {
log.info({ agentId, count: commentResponses.length }, 'processed agent comment responses');
}
} catch (err) {
log.warn({ agentId, err: err instanceof Error ? err.message : String(err) }, 'failed to read comment responses');
}
}
const resultPayload: AgentResult = {
success: true,
message: resultMessage,
@@ -1133,6 +1158,31 @@ export class OutputHandler {
}
}
/**
* Resolve the actual agent working directory. The recorded agentCwd may be
* the parent (agent-workdirs/<name>/) but .cw/output/ could be inside a
* project subdirectory (e.g. codewalk-district/.cw/output/).
*/
private resolveAgentWorkdir(base: string): string {
if (existsSync(join(base, '.cw', 'output'))) return base;
// Standalone agents: workspace/ subdirectory
const workspaceSub = join(base, 'workspace');
if (existsSync(join(workspaceSub, '.cw'))) return workspaceSub;
// Initiative-based agents: probe project subdirectories
try {
for (const entry of readdirSync(base, { withFileTypes: true })) {
if (entry.isDirectory() && entry.name !== '.cw') {
const sub = join(base, entry.name);
if (existsSync(join(sub, '.cw', 'output'))) return sub;
}
}
} catch { /* base may not exist */ }
return base;
}
private emitCrashed(agent: { id: string; name: string; taskId: string | null }, error: string): void {
if (this.eventBus) {
const event: AgentCrashedEvent = {

View File

@@ -0,0 +1,77 @@
/**
* Conflict resolution prompt — spawned when initiative branch has merge conflicts
* with the target branch.
*/
import {
SIGNAL_FORMAT,
SESSION_STARTUP,
GIT_WORKFLOW,
CONTEXT_MANAGEMENT,
} from './shared.js';
export function buildConflictResolutionPrompt(
sourceBranch: string,
targetBranch: string,
conflicts: string[],
): string {
const conflictList = conflicts.map(f => `- \`${f}\``).join('\n');
return `<role>
You are a Conflict Resolution agent. Your job is to merge \`${targetBranch}\` into the initiative branch \`${sourceBranch}\` and resolve all merge conflicts. You are working on a temporary branch created from \`${sourceBranch}\`. After resolving conflicts and committing, you must advance the initiative branch pointer using \`git update-ref\`.
</role>
<conflict_details>
**Source branch (initiative):** \`${sourceBranch}\`
**Target branch (default):** \`${targetBranch}\`
**Conflicting files:**
${conflictList}
</conflict_details>
${SIGNAL_FORMAT}
${SESSION_STARTUP}
<resolution_protocol>
Follow these steps in order:
1. **Inspect divergence**: Run \`git log --oneline ${targetBranch}..${sourceBranch}\` and \`git log --oneline ${sourceBranch}..${targetBranch}\` to understand what each side changed.
2. **Review conflicting files**: For each conflicting file, read both versions:
- \`git show ${sourceBranch}:<file>\`
- \`git show ${targetBranch}:<file>\`
3. **Merge**: Run \`git merge ${targetBranch} --no-edit\`. This will produce conflict markers.
4. **Resolve each file**: For each conflicting file:
- Read the file to see conflict markers (\`<<<<<<<\`, \`=======\`, \`>>>>>>>\`)
- Understand both sides' intent from step 1-2
- Choose the correct resolution — keep both changes when they don't overlap, prefer the more complete version when they do
- If you genuinely cannot determine the correct resolution, signal "questions" explaining the ambiguity
5. **Verify**: Run \`git diff --check\` to confirm no conflict markers remain. Run the test suite to confirm nothing is broken.
6. **Commit**: Stage resolved files with \`git add <file>\` (never \`git add .\`), then \`git commit --no-edit\` to complete the merge commit.
7. **Update initiative branch**: Run \`git update-ref refs/heads/${sourceBranch} HEAD\` to advance the initiative branch to include the merge result. This is necessary because you are working on a temporary branch — this command propagates the merge commit to the actual initiative branch.
8. **Signal done**: Write signal.json with status "done".
</resolution_protocol>
${GIT_WORKFLOW}
${CONTEXT_MANAGEMENT}
<important>
- You are on a temporary branch created from ${sourceBranch}. You are merging ${targetBranch} INTO this branch — bringing it up to date, NOT the other way around.
- After committing the merge, you MUST run \`git update-ref refs/heads/${sourceBranch} HEAD\` to advance the initiative branch pointer. Without this step, the initiative branch will not reflect the merge.
- Do NOT force-push or rebase. A merge commit is the correct approach.
- If tests fail after resolution, fix the code — don't skip tests.
- If a conflict is genuinely ambiguous (e.g., both sides rewrote the same function differently), signal "questions" with the specific ambiguity and your proposed resolution.
</important>`;
}
export function buildConflictResolutionDescription(
sourceBranch: string,
targetBranch: string,
conflicts: string[],
): string {
return `Resolve ${conflicts.length} merge conflict(s) between ${sourceBranch} and ${targetBranch}: ${conflicts.join(', ')}`;
}

View File

@@ -13,7 +13,7 @@ ${CODEBASE_EXPLORATION}
<output_format>
Write one file per task to \`.cw/output/tasks/{id}.md\`:
- Frontmatter: \`title\`, \`category\` (execute|research|discuss|plan|detail|refine|verify|merge|review), \`type\` (auto|checkpoint:human-verify|checkpoint:decision|checkpoint:human-action), \`dependencies\` (list of task IDs that must complete before this task can start)
- Frontmatter: \`title\`, \`category\` (execute|research|discuss|plan|detail|refine|verify|merge|review), \`dependencies\` (list of task IDs that must complete before this task can start)
- Body: Detailed task description
</output_format>
@@ -92,14 +92,6 @@ Each task is handled by a separate agent that must load the full codebase contex
Bundle related changes into one task. "Add user validation" + "Add user API route" + "Add user route tests" is ONE task ("Add user creation endpoint with validation and tests"), not three.
</task_sizing>
<checkpoint_tasks>
- \`checkpoint:human-verify\`: Visual changes, migrations, API contracts
- \`checkpoint:decision\`: Architecture choices affecting multiple phases
- \`checkpoint:human-action\`: External setup (DNS, credentials, third-party config)
~90% of tasks should be \`auto\`.
</checkpoint_tasks>
<existing_context>
- Read ALL \`context/tasks/\` files before generating output
- Only create tasks for THIS phase (\`phase.md\`)

View File

@@ -14,13 +14,26 @@ import {
} from './shared.js';
export function buildExecutePrompt(taskDescription?: string): string {
const hasReviewComments = taskDescription?.includes('[comment:');
const reviewCommentsSection = hasReviewComments
? `
<review_comments>
You are addressing review feedback. Each comment is tagged with [comment:ID].
For EACH comment you address:
1. Fix the issue in code, OR explain why no change is needed.
2. Write \`.cw/output/comment-responses.json\`:
[{"commentId": "abc123", "body": "Fixed: added try-catch around token validation", "resolved": true}]
Set resolved:true when you fixed it, false when you're explaining why you didn't.
</review_comments>`
: '';
const taskSection = taskDescription
? `
<task>
${taskDescription}
Read \`.cw/input/task.md\` for the full structured task with metadata, priority, and dependencies.
</task>`
</task>${reviewCommentsSection}`
: '';
return `<role>

View File

@@ -15,3 +15,4 @@ export { buildChatPrompt } from './chat.js';
export type { ChatHistoryEntry } from './chat.js';
export { buildWorkspaceLayout } from './workspace.js';
export { buildPreviewInstructions } from './preview.js';
export { buildConflictResolutionPrompt, buildConflictResolutionDescription } from './conflict-resolution.js';

View File

@@ -36,5 +36,7 @@ This is an isolated git worktree. Other agents may be working in parallel on sep
The following project directories contain the source code (git worktrees):
${lines.join('\n')}
**IMPORTANT**: All \`.cw/output/\` paths (signal.json, progress.md, etc.) are relative to this working directory (\`${agentCwd}\`), NOT to any project subdirectory. Always write to \`${join(agentCwd, '.cw/output/')}\` regardless of your current \`cd\` location.
</workspace>`;
}

View File

@@ -183,13 +183,10 @@ export async function createContainer(options?: ContainerOptions): Promise<Conta
options?.debug ?? false,
undefined, // processManagerOverride
repos.chatSessionRepository,
repos.reviewCommentRepository,
);
log.info('agent manager created');
// Reconcile agent state from any previous server session
await agentManager.reconcileAfterRestart();
log.info('agent reconciliation complete');
// Branch manager
const branchManager = new SimpleGitBranchManager();
log.info('branch manager created');
@@ -249,10 +246,17 @@ export async function createContainer(options?: ContainerOptions): Promise<Conta
conflictResolutionService,
eventBus,
workspaceRoot,
repos.agentRepository,
);
executionOrchestrator.start();
log.info('execution orchestrator started');
// Reconcile agent state from any previous server session.
// Must run AFTER orchestrator.start() so event listeners are registered
// and agent:stopped / agent:crashed events are not lost.
await agentManager.reconcileAfterRestart();
log.info('agent reconciliation complete');
// Preview manager
const previewManager = new PreviewManager(
repos.projectRepository,

View File

@@ -33,4 +33,10 @@ export interface ChangeSetRepository {
findByInitiativeId(initiativeId: string): Promise<ChangeSet[]>;
findByAgentId(agentId: string): Promise<ChangeSet[]>;
markReverted(id: string): Promise<ChangeSet>;
/**
* Find applied changesets that have a 'create' entry for the given entity.
* Used to reconcile changeset status when entities are manually deleted.
*/
findAppliedByCreatedEntity(entityType: string, entityId: string): Promise<ChangeSetWithEntries[]>;
}

View File

@@ -4,7 +4,7 @@
* Implements ChangeSetRepository interface using Drizzle ORM.
*/
import { eq, desc, asc } from 'drizzle-orm';
import { eq, desc, asc, and } from 'drizzle-orm';
import { nanoid } from 'nanoid';
import type { DrizzleDatabase } from '../../index.js';
import { changeSets, changeSetEntries, type ChangeSet } from '../../schema.js';
@@ -94,6 +94,32 @@ export class DrizzleChangeSetRepository implements ChangeSetRepository {
.orderBy(desc(changeSets.createdAt));
}
async findAppliedByCreatedEntity(entityType: string, entityId: string): Promise<ChangeSetWithEntries[]> {
// Find changeset entries matching the entity
const matchingEntries = await this.db
.select({ changeSetId: changeSetEntries.changeSetId })
.from(changeSetEntries)
.where(
and(
eq(changeSetEntries.entityType, entityType as any),
eq(changeSetEntries.entityId, entityId),
eq(changeSetEntries.action, 'create'),
),
);
const results: ChangeSetWithEntries[] = [];
const seen = new Set<string>();
for (const { changeSetId } of matchingEntries) {
if (seen.has(changeSetId)) continue;
seen.add(changeSetId);
const cs = await this.findByIdWithEntries(changeSetId);
if (cs && cs.status === 'applied') {
results.push(cs);
}
}
return results;
}
async markReverted(id: string): Promise<ChangeSet> {
const [updated] = await this.db
.update(changeSets)

View File

@@ -0,0 +1,336 @@
/**
* DrizzleErrandRepository Tests
*/
import { describe, it, expect, beforeEach } from 'vitest';
import { DrizzleErrandRepository } from './errand.js';
import { createTestDatabase } from './test-helpers.js';
import type { DrizzleDatabase } from '../../index.js';
import { projects, agents, errands } from '../../schema.js';
import { nanoid } from 'nanoid';
import { eq } from 'drizzle-orm';
describe('DrizzleErrandRepository', () => {
let db: DrizzleDatabase;
let repo: DrizzleErrandRepository;
beforeEach(() => {
db = createTestDatabase();
repo = new DrizzleErrandRepository(db);
});
// Helper: create a project record
async function createProject(name = 'Test Project', suffix = '') {
const id = nanoid();
const now = new Date();
const [project] = await db.insert(projects).values({
id,
name: name + suffix + id,
url: `https://github.com/test/${id}`,
defaultBranch: 'main',
createdAt: now,
updatedAt: now,
}).returning();
return project;
}
// Helper: create an agent record
async function createAgent(name?: string) {
const id = nanoid();
const now = new Date();
const agentName = name ?? `agent-${id}`;
const [agent] = await db.insert(agents).values({
id,
name: agentName,
worktreeId: `agent-workdirs/${agentName}`,
provider: 'claude',
status: 'idle',
mode: 'execute',
createdAt: now,
updatedAt: now,
}).returning();
return agent;
}
// Helper: create an errand
async function createErrand(overrides: Partial<{
id: string;
description: string;
branch: string;
baseBranch: string;
agentId: string | null;
projectId: string | null;
status: 'active' | 'pending_review' | 'conflict' | 'merged' | 'abandoned';
createdAt: Date;
}> = {}) {
const project = await createProject();
const id = overrides.id ?? nanoid();
return repo.create({
id,
description: overrides.description ?? 'Test errand',
branch: overrides.branch ?? 'feature/test',
baseBranch: overrides.baseBranch ?? 'main',
agentId: overrides.agentId !== undefined ? overrides.agentId : null,
projectId: overrides.projectId !== undefined ? overrides.projectId : project.id,
status: overrides.status ?? 'active',
});
}
describe('create + findById', () => {
it('should create errand and find by id with all fields', async () => {
const project = await createProject();
const id = nanoid();
await repo.create({
id,
description: 'Fix the bug',
branch: 'fix/bug-123',
baseBranch: 'main',
agentId: null,
projectId: project.id,
status: 'active',
});
const found = await repo.findById(id);
expect(found).toBeDefined();
expect(found!.id).toBe(id);
expect(found!.description).toBe('Fix the bug');
expect(found!.branch).toBe('fix/bug-123');
expect(found!.baseBranch).toBe('main');
expect(found!.status).toBe('active');
expect(found!.projectId).toBe(project.id);
expect(found!.agentId).toBeNull();
expect(found!.agentAlias).toBeNull();
});
});
describe('findAll', () => {
it('should return all errands ordered by createdAt desc', async () => {
const project = await createProject();
const t1 = new Date('2024-01-01T00:00:00Z');
const t2 = new Date('2024-01-02T00:00:00Z');
const t3 = new Date('2024-01-03T00:00:00Z');
const id1 = nanoid();
const id2 = nanoid();
const id3 = nanoid();
await db.insert(errands).values([
{ id: id1, description: 'Errand 1', branch: 'b1', baseBranch: 'main', agentId: null, projectId: project.id, status: 'active', createdAt: t1, updatedAt: t1 },
{ id: id2, description: 'Errand 2', branch: 'b2', baseBranch: 'main', agentId: null, projectId: project.id, status: 'active', createdAt: t2, updatedAt: t2 },
{ id: id3, description: 'Errand 3', branch: 'b3', baseBranch: 'main', agentId: null, projectId: project.id, status: 'active', createdAt: t3, updatedAt: t3 },
]);
const result = await repo.findAll();
expect(result.length).toBeGreaterThanOrEqual(3);
// Find our three in the results
const ids = result.map((e) => e.id);
expect(ids.indexOf(id3)).toBeLessThan(ids.indexOf(id2));
expect(ids.indexOf(id2)).toBeLessThan(ids.indexOf(id1));
});
it('should filter by projectId', async () => {
const projectA = await createProject('A');
const projectB = await createProject('B');
const now = new Date();
const idA1 = nanoid();
const idA2 = nanoid();
const idB1 = nanoid();
await db.insert(errands).values([
{ id: idA1, description: 'A1', branch: 'b-a1', baseBranch: 'main', agentId: null, projectId: projectA.id, status: 'active', createdAt: now, updatedAt: now },
{ id: idA2, description: 'A2', branch: 'b-a2', baseBranch: 'main', agentId: null, projectId: projectA.id, status: 'active', createdAt: now, updatedAt: now },
{ id: idB1, description: 'B1', branch: 'b-b1', baseBranch: 'main', agentId: null, projectId: projectB.id, status: 'active', createdAt: now, updatedAt: now },
]);
const result = await repo.findAll({ projectId: projectA.id });
expect(result).toHaveLength(2);
expect(result.map((e) => e.id).sort()).toEqual([idA1, idA2].sort());
});
it('should filter by status', async () => {
const project = await createProject();
const now = new Date();
const id1 = nanoid();
const id2 = nanoid();
const id3 = nanoid();
await db.insert(errands).values([
{ id: id1, description: 'E1', branch: 'b1', baseBranch: 'main', agentId: null, projectId: project.id, status: 'active', createdAt: now, updatedAt: now },
{ id: id2, description: 'E2', branch: 'b2', baseBranch: 'main', agentId: null, projectId: project.id, status: 'pending_review', createdAt: now, updatedAt: now },
{ id: id3, description: 'E3', branch: 'b3', baseBranch: 'main', agentId: null, projectId: project.id, status: 'merged', createdAt: now, updatedAt: now },
]);
const result = await repo.findAll({ status: 'pending_review' });
expect(result).toHaveLength(1);
expect(result[0].id).toBe(id2);
});
it('should filter by both projectId and status', async () => {
const projectA = await createProject('PA');
const projectB = await createProject('PB');
const now = new Date();
const idMatch = nanoid();
const idOtherStatus = nanoid();
const idOtherProject = nanoid();
const idNeither = nanoid();
await db.insert(errands).values([
{ id: idMatch, description: 'Match', branch: 'b1', baseBranch: 'main', agentId: null, projectId: projectA.id, status: 'pending_review', createdAt: now, updatedAt: now },
{ id: idOtherStatus, description: 'Wrong status', branch: 'b2', baseBranch: 'main', agentId: null, projectId: projectA.id, status: 'active', createdAt: now, updatedAt: now },
{ id: idOtherProject, description: 'Wrong project', branch: 'b3', baseBranch: 'main', agentId: null, projectId: projectB.id, status: 'pending_review', createdAt: now, updatedAt: now },
{ id: idNeither, description: 'Neither', branch: 'b4', baseBranch: 'main', agentId: null, projectId: projectB.id, status: 'active', createdAt: now, updatedAt: now },
]);
const result = await repo.findAll({ projectId: projectA.id, status: 'pending_review' });
expect(result).toHaveLength(1);
expect(result[0].id).toBe(idMatch);
});
});
describe('findById', () => {
it('should return agentAlias when agentId is set', async () => {
const agent = await createAgent('known-agent');
const project = await createProject();
const id = nanoid();
const now = new Date();
await db.insert(errands).values({
id,
description: 'With agent',
branch: 'feature/x',
baseBranch: 'main',
agentId: agent.id,
projectId: project.id,
status: 'active',
createdAt: now,
updatedAt: now,
});
const found = await repo.findById(id);
expect(found).toBeDefined();
expect(found!.agentAlias).toBe(agent.name);
});
it('should return agentAlias as null when agentId is null', async () => {
const project = await createProject();
const id = nanoid();
const now = new Date();
await db.insert(errands).values({
id,
description: 'No agent',
branch: 'feature/y',
baseBranch: 'main',
agentId: null,
projectId: project.id,
status: 'active',
createdAt: now,
updatedAt: now,
});
const found = await repo.findById(id);
expect(found).toBeDefined();
expect(found!.agentAlias).toBeNull();
});
it('should return undefined for unknown id', async () => {
const found = await repo.findById('nonexistent');
expect(found).toBeUndefined();
});
});
describe('update', () => {
it('should update status and advance updatedAt', async () => {
const project = await createProject();
const id = nanoid();
const past = new Date('2024-01-01T00:00:00Z');
await db.insert(errands).values({
id,
description: 'Errand',
branch: 'feature/update',
baseBranch: 'main',
agentId: null,
projectId: project.id,
status: 'active',
createdAt: past,
updatedAt: past,
});
const updated = await repo.update(id, { status: 'pending_review' });
expect(updated.status).toBe('pending_review');
expect(updated.updatedAt.getTime()).toBeGreaterThan(past.getTime());
});
it('should throw on unknown id', async () => {
await expect(
repo.update('nonexistent', { status: 'merged' })
).rejects.toThrow('Errand not found');
});
});
describe('delete', () => {
it('should delete errand and findById returns undefined', async () => {
const errand = await createErrand();
await repo.delete(errand.id);
const found = await repo.findById(errand.id);
expect(found).toBeUndefined();
});
});
describe('cascade and set null', () => {
it('should cascade delete errands when project is deleted', async () => {
const project = await createProject();
const id = nanoid();
const now = new Date();
await db.insert(errands).values({
id,
description: 'Cascade test',
branch: 'feature/cascade',
baseBranch: 'main',
agentId: null,
projectId: project.id,
status: 'active',
createdAt: now,
updatedAt: now,
});
// Delete project — should cascade delete errands
await db.delete(projects).where(eq(projects.id, project.id));
const found = await repo.findById(id);
expect(found).toBeUndefined();
});
it('should set agentId to null when agent is deleted', async () => {
const agent = await createAgent();
const project = await createProject();
const id = nanoid();
const now = new Date();
await db.insert(errands).values({
id,
description: 'Agent null test',
branch: 'feature/agent-null',
baseBranch: 'main',
agentId: agent.id,
projectId: project.id,
status: 'active',
createdAt: now,
updatedAt: now,
});
// Delete agent — should set null
await db.delete(agents).where(eq(agents.id, agent.id));
const [errand] = await db.select().from(errands).where(eq(errands.id, id));
expect(errand).toBeDefined();
expect(errand.agentId).toBeNull();
});
});
});

View File

@@ -0,0 +1,89 @@
/**
* Drizzle Errand Repository Adapter
*
* Implements ErrandRepository interface using Drizzle ORM.
*/
import { eq, desc, and } from 'drizzle-orm';
import type { DrizzleDatabase } from '../../index.js';
import { errands, agents } from '../../schema.js';
import type {
ErrandRepository,
ErrandWithAlias,
ErrandStatus,
CreateErrandData,
UpdateErrandData,
} from '../errand-repository.js';
import type { Errand } from '../../schema.js';
export class DrizzleErrandRepository implements ErrandRepository {
constructor(private db: DrizzleDatabase) {}
async create(data: CreateErrandData): Promise<Errand> {
const now = new Date();
const [created] = await this.db
.insert(errands)
.values({ ...data, createdAt: now, updatedAt: now })
.returning();
return created;
}
async findById(id: string): Promise<ErrandWithAlias | undefined> {
const result = await this.db
.select({
id: errands.id,
description: errands.description,
branch: errands.branch,
baseBranch: errands.baseBranch,
agentId: errands.agentId,
projectId: errands.projectId,
status: errands.status,
createdAt: errands.createdAt,
updatedAt: errands.updatedAt,
agentAlias: agents.name,
})
.from(errands)
.leftJoin(agents, eq(errands.agentId, agents.id))
.where(eq(errands.id, id))
.limit(1);
return result[0] ?? undefined;
}
async findAll(opts?: { projectId?: string; status?: ErrandStatus }): Promise<ErrandWithAlias[]> {
const conditions = [];
if (opts?.projectId) conditions.push(eq(errands.projectId, opts.projectId));
if (opts?.status) conditions.push(eq(errands.status, opts.status));
return this.db
.select({
id: errands.id,
description: errands.description,
branch: errands.branch,
baseBranch: errands.baseBranch,
agentId: errands.agentId,
projectId: errands.projectId,
status: errands.status,
createdAt: errands.createdAt,
updatedAt: errands.updatedAt,
agentAlias: agents.name,
})
.from(errands)
.leftJoin(agents, eq(errands.agentId, agents.id))
.where(conditions.length > 0 ? and(...conditions) : undefined)
.orderBy(desc(errands.createdAt));
}
async update(id: string, data: UpdateErrandData): Promise<Errand> {
const [updated] = await this.db
.update(errands)
.set({ ...data, updatedAt: new Date() })
.where(eq(errands.id, id))
.returning();
if (!updated) throw new Error(`Errand not found: ${id}`);
return updated;
}
async delete(id: string): Promise<void> {
await this.db.delete(errands).where(eq(errands.id, id));
}
}

View File

@@ -18,3 +18,4 @@ export { DrizzleLogChunkRepository } from './log-chunk.js';
export { DrizzleConversationRepository } from './conversation.js';
export { DrizzleChatSessionRepository } from './chat-session.js';
export { DrizzleReviewCommentRepository } from './review-comment.js';
export { DrizzleErrandRepository } from './errand.js';

View File

@@ -23,7 +23,43 @@ export class DrizzleReviewCommentRepository implements ReviewCommentRepository {
lineNumber: data.lineNumber,
lineType: data.lineType,
body: data.body,
author: data.author ?? 'you',
author: data.author ?? 'user',
parentCommentId: data.parentCommentId ?? null,
resolved: false,
createdAt: now,
updatedAt: now,
});
const rows = await this.db
.select()
.from(reviewComments)
.where(eq(reviewComments.id, id))
.limit(1);
return rows[0]!;
}
async createReply(parentCommentId: string, body: string, author?: string): Promise<ReviewComment> {
// Fetch parent comment to copy context fields
const parentRows = await this.db
.select()
.from(reviewComments)
.where(eq(reviewComments.id, parentCommentId))
.limit(1);
const parent = parentRows[0];
if (!parent) {
throw new Error(`Parent comment not found: ${parentCommentId}`);
}
const now = new Date();
const id = nanoid();
await this.db.insert(reviewComments).values({
id,
phaseId: parent.phaseId,
filePath: parent.filePath,
lineNumber: parent.lineNumber,
lineType: parent.lineType,
body,
author: author ?? 'user',
parentCommentId,
resolved: false,
createdAt: now,
updatedAt: now,
@@ -44,6 +80,19 @@ export class DrizzleReviewCommentRepository implements ReviewCommentRepository {
.orderBy(asc(reviewComments.createdAt));
}
async update(id: string, body: string): Promise<ReviewComment | null> {
await this.db
.update(reviewComments)
.set({ body, updatedAt: new Date() })
.where(eq(reviewComments.id, id));
const rows = await this.db
.select()
.from(reviewComments)
.where(eq(reviewComments.id, id))
.limit(1);
return rows[0] ?? null;
}
async resolve(id: string): Promise<ReviewComment | null> {
await this.db
.update(reviewComments)

View File

@@ -71,13 +71,13 @@ describe('DrizzleTaskRepository', () => {
it('should accept custom type and priority', async () => {
const task = await taskRepo.create({
phaseId: testPhaseId,
name: 'Checkpoint Task',
type: 'checkpoint:human-verify',
name: 'High Priority Task',
type: 'auto',
priority: 'high',
order: 1,
});
expect(task.type).toBe('checkpoint:human-verify');
expect(task.type).toBe('auto');
expect(task.priority).toBe('high');
});
});

View File

@@ -0,0 +1,15 @@
import type { Errand, NewErrand } from '../schema.js';
export type ErrandStatus = 'active' | 'pending_review' | 'conflict' | 'merged' | 'abandoned';
export type ErrandWithAlias = Errand & { agentAlias: string | null };
export type CreateErrandData = Omit<NewErrand, 'createdAt' | 'updatedAt'>;
export type UpdateErrandData = Partial<Omit<NewErrand, 'id' | 'createdAt'>>;
export interface ErrandRepository {
create(data: CreateErrandData): Promise<Errand>;
findById(id: string): Promise<ErrandWithAlias | undefined>;
findAll(opts?: { projectId?: string; status?: ErrandStatus }): Promise<ErrandWithAlias[]>;
update(id: string, data: UpdateErrandData): Promise<Errand>;
delete(id: string): Promise<void>;
}

View File

@@ -82,3 +82,11 @@ export type {
ReviewCommentRepository,
CreateReviewCommentData,
} from './review-comment-repository.js';
export type {
ErrandRepository,
ErrandWithAlias,
ErrandStatus,
CreateErrandData,
UpdateErrandData,
} from './errand-repository.js';

View File

@@ -13,11 +13,14 @@ export interface CreateReviewCommentData {
lineType: 'added' | 'removed' | 'context';
body: string;
author?: string;
parentCommentId?: string; // for replies
}
export interface ReviewCommentRepository {
create(data: CreateReviewCommentData): Promise<ReviewComment>;
createReply(parentCommentId: string, body: string, author?: string): Promise<ReviewComment>;
findByPhaseId(phaseId: string): Promise<ReviewComment[]>;
update(id: string, body: string): Promise<ReviewComment | null>;
resolve(id: string): Promise<ReviewComment | null>;
unresolve(id: string): Promise<ReviewComment | null>;
delete(id: string): Promise<void>;

View File

@@ -55,6 +55,7 @@ export const phases = sqliteTable('phases', {
status: text('status', { enum: ['pending', 'approved', 'in_progress', 'completed', 'blocked', 'pending_review'] })
.notNull()
.default('pending'),
mergeBase: text('merge_base'),
createdAt: integer('created_at', { mode: 'timestamp' }).notNull(),
updatedAt: integer('updated_at', { mode: 'timestamp' }).notNull(),
});
@@ -137,7 +138,7 @@ export const tasks = sqliteTable('tasks', {
name: text('name').notNull(),
description: text('description'),
type: text('type', {
enum: ['auto', 'checkpoint:human-verify', 'checkpoint:decision', 'checkpoint:human-action'],
enum: ['auto'],
})
.notNull()
.default('auto'),
@@ -156,6 +157,7 @@ export const tasks = sqliteTable('tasks', {
.default('pending'),
order: integer('order').notNull().default(0),
summary: text('summary'), // Agent result summary — propagated to dependent tasks as context
retryCount: integer('retry_count').notNull().default(0),
createdAt: integer('created_at', { mode: 'timestamp' }).notNull(),
updatedAt: integer('updated_at', { mode: 'timestamp' }).notNull(),
});
@@ -260,7 +262,7 @@ export const agents = sqliteTable('agents', {
})
.notNull()
.default('idle'),
mode: text('mode', { enum: ['execute', 'discuss', 'plan', 'detail', 'refine', 'chat'] })
mode: text('mode', { enum: ['execute', 'discuss', 'plan', 'detail', 'refine', 'chat', 'errand'] })
.notNull()
.default('execute'),
pid: integer('pid'),
@@ -617,12 +619,46 @@ export const reviewComments = sqliteTable('review_comments', {
lineType: text('line_type', { enum: ['added', 'removed', 'context'] }).notNull(),
body: text('body').notNull(),
author: text('author').notNull().default('you'),
parentCommentId: text('parent_comment_id').references((): ReturnType<typeof text> => reviewComments.id, { onDelete: 'cascade' }),
resolved: integer('resolved', { mode: 'boolean' }).notNull().default(false),
createdAt: integer('created_at', { mode: 'timestamp' }).notNull(),
updatedAt: integer('updated_at', { mode: 'timestamp' }).notNull(),
}, (table) => [
index('review_comments_phase_id_idx').on(table.phaseId),
index('review_comments_parent_id_idx').on(table.parentCommentId),
]);
export type ReviewComment = InferSelectModel<typeof reviewComments>;
export type NewReviewComment = InferInsertModel<typeof reviewComments>;
// ============================================================================
// ERRANDS
// ============================================================================
export const errands = sqliteTable('errands', {
id: text('id').primaryKey(),
description: text('description').notNull(),
branch: text('branch').notNull(),
baseBranch: text('base_branch').notNull().default('main'),
agentId: text('agent_id').references(() => agents.id, { onDelete: 'set null' }),
projectId: text('project_id').references(() => projects.id, { onDelete: 'cascade' }),
status: text('status', {
enum: ['active', 'pending_review', 'conflict', 'merged', 'abandoned'],
}).notNull().default('active'),
createdAt: integer('created_at', { mode: 'timestamp' }).notNull(),
updatedAt: integer('updated_at', { mode: 'timestamp' }).notNull(),
});
export const errandsRelations = relations(errands, ({ one }) => ({
agent: one(agents, {
fields: [errands.agentId],
references: [agents.id],
}),
project: one(projects, {
fields: [errands.projectId],
references: [projects.id],
}),
}));
export type Errand = InferSelectModel<typeof errands>;
export type NewErrand = InferInsertModel<typeof errands>;

View File

@@ -79,7 +79,6 @@ export class DefaultDispatchManager implements DispatchManager {
/**
* Queue a task for dispatch.
* Fetches task dependencies and adds to internal queue.
* Checkpoint tasks are queued but won't auto-dispatch.
*/
async queue(taskId: string): Promise<void> {
// Fetch task to verify it exists and get priority
@@ -100,7 +99,7 @@ export class DefaultDispatchManager implements DispatchManager {
this.taskQueue.set(taskId, queuedTask);
log.info({ taskId, priority: task.priority, isCheckpoint: this.isCheckpointTask(task) }, 'task queued');
log.info({ taskId, priority: task.priority }, 'task queued');
// Emit TaskQueuedEvent
const event: TaskQueuedEvent = {
@@ -118,7 +117,6 @@ export class DefaultDispatchManager implements DispatchManager {
/**
* Get next dispatchable task.
* Returns task with all dependencies complete, highest priority first.
* Checkpoint tasks are excluded (require human action).
*/
async getNextDispatchable(): Promise<QueuedTask | null> {
const queuedTasks = Array.from(this.taskQueue.values());
@@ -127,7 +125,7 @@ export class DefaultDispatchManager implements DispatchManager {
return null;
}
// Filter to only tasks with all dependencies complete and not checkpoint tasks
// Filter to only tasks with all dependencies complete
const readyTasks: QueuedTask[] = [];
log.debug({ queueSize: queuedTasks.length }, 'evaluating dispatchable tasks');
@@ -139,14 +137,8 @@ export class DefaultDispatchManager implements DispatchManager {
continue;
}
// Check if this is a checkpoint task (requires human action)
const task = await this.taskRepository.findById(qt.taskId);
if (task && this.isCheckpointTask(task)) {
log.debug({ taskId: qt.taskId, type: task.type }, 'skipping checkpoint task');
continue;
}
// Skip planning-category tasks (handled by architect flow)
const task = await this.taskRepository.findById(qt.taskId);
if (task && isPlanningCategory(task.category)) {
log.debug({ taskId: qt.taskId, category: task.category }, 'skipping planning-category task');
continue;
@@ -255,8 +247,8 @@ export class DefaultDispatchManager implements DispatchManager {
// Clear blocked state
this.blockedTasks.delete(taskId);
// Reset DB status to pending
await this.taskRepository.update(taskId, { status: 'pending' });
// Reset DB status to pending and clear retry count (manual retry = fresh start)
await this.taskRepository.update(taskId, { status: 'pending', retryCount: 0 });
log.info({ taskId }, 'retrying blocked task');
@@ -478,14 +470,6 @@ export class DefaultDispatchManager implements DispatchManager {
return true;
}
/**
* Check if a task is a checkpoint task.
* Checkpoint tasks require human action and don't auto-dispatch.
*/
private isCheckpointTask(task: Task): boolean {
return task.type.startsWith('checkpoint:');
}
/**
* Store the completing agent's result summary on the task record.
*/

View File

@@ -0,0 +1 @@
ALTER TABLE phases ADD COLUMN merge_base TEXT;

View File

@@ -0,0 +1,2 @@
ALTER TABLE review_comments ADD COLUMN parent_comment_id TEXT REFERENCES review_comments(id) ON DELETE CASCADE;--> statement-breakpoint
CREATE INDEX review_comments_parent_id_idx ON review_comments(parent_comment_id);

View File

@@ -0,0 +1,5 @@
-- Drop orphaned approval columns left behind by 0030_remove_task_approval.
-- These columns were removed from schema.ts but left in the DB because
-- 0030 assumed SQLite couldn't DROP COLUMN. SQLite 3.35+ supports it.
ALTER TABLE initiatives DROP COLUMN merge_requires_approval;--> statement-breakpoint
ALTER TABLE tasks DROP COLUMN requires_approval;

View File

@@ -0,0 +1 @@
ALTER TABLE tasks ADD COLUMN retry_count integer NOT NULL DEFAULT 0;

View File

@@ -0,0 +1,13 @@
CREATE TABLE `errands` (
`id` text PRIMARY KEY NOT NULL,
`description` text NOT NULL,
`branch` text NOT NULL,
`base_branch` text DEFAULT 'main' NOT NULL,
`agent_id` text,
`project_id` text,
`status` text DEFAULT 'active' NOT NULL,
`created_at` integer NOT NULL,
`updated_at` integer NOT NULL,
FOREIGN KEY (`agent_id`) REFERENCES `agents`(`id`) ON UPDATE no action ON DELETE set null,
FOREIGN KEY (`project_id`) REFERENCES `projects`(`id`) ON UPDATE no action ON DELETE cascade
);

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -222,9 +222,44 @@
{
"idx": 31,
"version": "6",
"when": 1772236800000,
"tag": "0031_add_phase_merge_base",
"breakpoints": true
},
{
"idx": 32,
"version": "6",
"when": 1772323200000,
"tag": "0032_add_comment_threading",
"breakpoints": true
},
{
"idx": 33,
"version": "6",
"when": 1772409600000,
"tag": "0033_drop_approval_columns",
"breakpoints": true
},
{
"idx": 34,
"version": "6",
"when": 1772496000000,
"tag": "0034_add_task_retry_count",
"breakpoints": true
},
{
"idx": 35,
"version": "6",
"when": 1772796561474,
"tag": "0035_faulty_human_fly",
"breakpoints": true
},
{
"idx": 36,
"version": "6",
"when": 1772798869413,
"tag": "0031_icy_silvermane",
"tag": "0036_icy_silvermane",
"breakpoints": true
}
]
}
}

View File

@@ -52,6 +52,7 @@ export type {
AccountCredentialsValidatedEvent,
InitiativePendingReviewEvent,
InitiativeReviewApprovedEvent,
InitiativeChangesRequestedEvent,
DomainEventMap,
DomainEventType,
} from './types.js';

View File

@@ -591,6 +591,15 @@ export interface InitiativeReviewApprovedEvent extends DomainEvent {
};
}
export interface InitiativeChangesRequestedEvent extends DomainEvent {
type: 'initiative:changes_requested';
payload: {
initiativeId: string;
phaseId: string;
taskId: string;
};
}
/**
* Chat Session Events
*/
@@ -668,7 +677,8 @@ export type DomainEventMap =
| ChatMessageCreatedEvent
| ChatSessionClosedEvent
| InitiativePendingReviewEvent
| InitiativeReviewApprovedEvent;
| InitiativeReviewApprovedEvent
| InitiativeChangesRequestedEvent;
/**
* Event type literal union for type checking
@@ -684,6 +694,14 @@ export type DomainEventType = DomainEventMap['type'];
*
* All modules communicate through this interface.
* Can be swapped for external systems (RabbitMQ, WebSocket forwarding) later.
*
* **Delivery guarantee: at-most-once.**
*
* Events emitted while a client is disconnected are permanently lost.
* Reconnecting clients receive only events emitted after reconnection.
* React Query's `refetchOnWindowFocus` and `refetchOnReconnect` compensate
* for missed mutations since the system uses query invalidation rather
* than incremental state.
*/
export interface EventBus {
/**

View File

@@ -0,0 +1,369 @@
/**
* ExecutionOrchestrator Tests
*
* Tests phase completion transitions, especially when initiative has no branch.
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { ExecutionOrchestrator } from './orchestrator.js';
import { ensureProjectClone } from '../git/project-clones.js';
import type { BranchManager } from '../git/branch-manager.js';
vi.mock('../git/project-clones.js', () => ({
ensureProjectClone: vi.fn().mockResolvedValue('/tmp/test-workspace/clones/test'),
}));
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js';
import type { EventBus, TaskCompletedEvent, DomainEvent } from '../events/types.js';
function createMockEventBus(): EventBus & { handlers: Map<string, Function[]>; emitted: DomainEvent[] } {
const handlers = new Map<string, Function[]>();
const emitted: DomainEvent[] = [];
return {
handlers,
emitted,
emit: vi.fn((event: DomainEvent) => {
emitted.push(event);
const fns = handlers.get(event.type) ?? [];
for (const fn of fns) fn(event);
}),
on: vi.fn((type: string, handler: Function) => {
const fns = handlers.get(type) ?? [];
fns.push(handler);
handlers.set(type, fns);
}),
off: vi.fn(),
once: vi.fn(),
};
}
function createMocks() {
const branchManager: BranchManager = {
ensureBranch: vi.fn(),
mergeBranch: vi.fn().mockResolvedValue({ success: true, message: 'merged', previousRef: 'abc000' }),
diffBranches: vi.fn().mockResolvedValue(''),
deleteBranch: vi.fn(),
branchExists: vi.fn().mockResolvedValue(true),
remoteBranchExists: vi.fn().mockResolvedValue(false),
listCommits: vi.fn().mockResolvedValue([]),
diffCommit: vi.fn().mockResolvedValue(''),
getMergeBase: vi.fn().mockResolvedValue('abc123'),
pushBranch: vi.fn(),
checkMergeability: vi.fn().mockResolvedValue({ mergeable: true }),
fetchRemote: vi.fn(),
fastForwardBranch: vi.fn(),
updateRef: vi.fn(),
};
const phaseRepository = {
findById: vi.fn(),
findByInitiativeId: vi.fn().mockResolvedValue([]),
update: vi.fn().mockImplementation(async (id: string, data: any) => ({ id, ...data })),
create: vi.fn(),
} as unknown as PhaseRepository;
const taskRepository = {
findById: vi.fn(),
findByPhaseId: vi.fn().mockResolvedValue([]),
findByInitiativeId: vi.fn().mockResolvedValue([]),
} as unknown as TaskRepository;
const initiativeRepository = {
findById: vi.fn(),
findByStatus: vi.fn().mockResolvedValue([]),
update: vi.fn(),
} as unknown as InitiativeRepository;
const projectRepository = {
findProjectsByInitiativeId: vi.fn().mockResolvedValue([]),
} as unknown as ProjectRepository;
const phaseDispatchManager: PhaseDispatchManager = {
queuePhase: vi.fn(),
getNextDispatchablePhase: vi.fn().mockResolvedValue(null),
dispatchNextPhase: vi.fn().mockResolvedValue({ success: false, phaseId: '', reason: 'none' }),
completePhase: vi.fn(),
blockPhase: vi.fn(),
getPhaseQueueState: vi.fn().mockResolvedValue({ queued: [], ready: [], blocked: [] }),
};
const dispatchManager = {
queue: vi.fn(),
getNextDispatchable: vi.fn().mockResolvedValue(null),
dispatchNext: vi.fn().mockResolvedValue({ success: false, taskId: '' }),
completeTask: vi.fn(),
blockTask: vi.fn(),
retryBlockedTask: vi.fn(),
getQueueState: vi.fn().mockResolvedValue({ queued: [], ready: [], blocked: [] }),
} as unknown as DispatchManager;
const conflictResolutionService: ConflictResolutionService = {
handleConflict: vi.fn(),
};
const eventBus = createMockEventBus();
return {
branchManager,
phaseRepository,
taskRepository,
initiativeRepository,
projectRepository,
phaseDispatchManager,
dispatchManager,
conflictResolutionService,
eventBus,
};
}
function createOrchestrator(mocks: ReturnType<typeof createMocks>) {
const orchestrator = new ExecutionOrchestrator(
mocks.branchManager,
mocks.phaseRepository,
mocks.taskRepository,
mocks.initiativeRepository,
mocks.projectRepository,
mocks.phaseDispatchManager,
mocks.dispatchManager,
mocks.conflictResolutionService,
mocks.eventBus,
'/tmp/test-workspace',
);
orchestrator.start();
return orchestrator;
}
function emitTaskCompleted(eventBus: ReturnType<typeof createMockEventBus>, taskId: string) {
const event: TaskCompletedEvent = {
type: 'task:completed',
timestamp: new Date(),
payload: { taskId, agentId: 'agent-1', success: true, message: 'done' },
};
eventBus.emit(event);
}
describe('ExecutionOrchestrator', () => {
let mocks: ReturnType<typeof createMocks>;
beforeEach(() => {
mocks = createMocks();
});
describe('phase completion when initiative has no branch', () => {
it('should transition phase to pending_review in review mode even without a branch', async () => {
const task = {
id: 'task-1',
phaseId: 'phase-1',
initiativeId: 'init-1',
category: 'execute',
status: 'completed',
};
const phase = { id: 'phase-1', initiativeId: 'init-1', name: 'Phase 1', status: 'in_progress' };
const initiative = { id: 'init-1', branch: null, executionMode: 'review_per_phase' };
vi.mocked(mocks.taskRepository.findById).mockResolvedValue(task as any);
vi.mocked(mocks.phaseRepository.findById).mockResolvedValue(phase as any);
vi.mocked(mocks.initiativeRepository.findById).mockResolvedValue(initiative as any);
vi.mocked(mocks.taskRepository.findByPhaseId).mockResolvedValue([task] as any);
createOrchestrator(mocks);
emitTaskCompleted(mocks.eventBus, 'task-1');
// Allow async handler to complete
await vi.waitFor(() => {
expect(mocks.phaseRepository.update).toHaveBeenCalledWith('phase-1', { status: 'pending_review' });
});
});
it('should complete phase in yolo mode even without a branch', async () => {
const task = {
id: 'task-1',
phaseId: 'phase-1',
initiativeId: 'init-1',
category: 'execute',
status: 'completed',
};
const phase = { id: 'phase-1', initiativeId: 'init-1', name: 'Phase 1', status: 'in_progress' };
const initiative = { id: 'init-1', branch: null, executionMode: 'yolo' };
vi.mocked(mocks.taskRepository.findById).mockResolvedValue(task as any);
vi.mocked(mocks.phaseRepository.findById).mockResolvedValue(phase as any);
vi.mocked(mocks.initiativeRepository.findById).mockResolvedValue(initiative as any);
vi.mocked(mocks.initiativeRepository.findByStatus).mockResolvedValue([]);
vi.mocked(mocks.taskRepository.findByPhaseId).mockResolvedValue([task] as any);
vi.mocked(mocks.phaseRepository.findByInitiativeId).mockResolvedValue([phase] as any);
createOrchestrator(mocks);
emitTaskCompleted(mocks.eventBus, 'task-1');
await vi.waitFor(() => {
expect(mocks.phaseDispatchManager.completePhase).toHaveBeenCalledWith('phase-1');
});
// Should NOT have attempted any branch merges
expect(mocks.branchManager.mergeBranch).not.toHaveBeenCalled();
});
});
describe('phase completion when merge fails', () => {
it('should still check phase completion even if task merge throws', async () => {
const task = {
id: 'task-1',
phaseId: 'phase-1',
initiativeId: 'init-1',
category: 'execute',
status: 'completed',
};
const phase = { id: 'phase-1', initiativeId: 'init-1', name: 'Phase 1', status: 'in_progress' };
const initiative = { id: 'init-1', branch: 'cw/test', executionMode: 'review_per_phase' };
const project = { id: 'proj-1', name: 'test', url: 'https://example.com', defaultBranch: 'main' };
vi.mocked(mocks.taskRepository.findById).mockResolvedValue(task as any);
vi.mocked(mocks.phaseRepository.findById).mockResolvedValue(phase as any);
vi.mocked(mocks.initiativeRepository.findById).mockResolvedValue(initiative as any);
vi.mocked(mocks.taskRepository.findByPhaseId).mockResolvedValue([task] as any);
vi.mocked(mocks.projectRepository.findProjectsByInitiativeId).mockResolvedValue([project] as any);
// Merge fails
vi.mocked(mocks.branchManager.mergeBranch).mockResolvedValue({
success: false,
message: 'conflict',
conflicts: ['file.ts'],
});
createOrchestrator(mocks);
emitTaskCompleted(mocks.eventBus, 'task-1');
// Phase should still transition despite merge failure
await vi.waitFor(() => {
expect(mocks.phaseRepository.update).toHaveBeenCalledWith('phase-1', { status: 'pending_review' });
});
});
});
describe('phase completion with branch (normal flow)', () => {
it('should merge task branch and transition phase when all tasks done', async () => {
const task = {
id: 'task-1',
phaseId: 'phase-1',
initiativeId: 'init-1',
category: 'execute',
status: 'completed',
};
const phase = { id: 'phase-1', initiativeId: 'init-1', name: 'Phase 1', status: 'in_progress' };
const initiative = { id: 'init-1', branch: 'cw/test', executionMode: 'review_per_phase' };
const project = { id: 'proj-1', name: 'test', url: 'https://example.com', defaultBranch: 'main' };
vi.mocked(mocks.taskRepository.findById).mockResolvedValue(task as any);
vi.mocked(mocks.phaseRepository.findById).mockResolvedValue(phase as any);
vi.mocked(mocks.initiativeRepository.findById).mockResolvedValue(initiative as any);
vi.mocked(mocks.taskRepository.findByPhaseId).mockResolvedValue([task] as any);
vi.mocked(mocks.projectRepository.findProjectsByInitiativeId).mockResolvedValue([project] as any);
vi.mocked(mocks.branchManager.branchExists).mockResolvedValue(true);
vi.mocked(mocks.branchManager.mergeBranch).mockResolvedValue({ success: true, message: 'ok' });
createOrchestrator(mocks);
emitTaskCompleted(mocks.eventBus, 'task-1');
await vi.waitFor(() => {
expect(mocks.phaseRepository.update).toHaveBeenCalledWith('phase-1', { status: 'pending_review' });
});
});
it('should not transition phase when some tasks are still pending', async () => {
const task1 = {
id: 'task-1',
phaseId: 'phase-1',
initiativeId: 'init-1',
category: 'execute',
status: 'completed',
};
const task2 = {
id: 'task-2',
phaseId: 'phase-1',
initiativeId: 'init-1',
category: 'execute',
status: 'pending',
};
const phase = { id: 'phase-1', initiativeId: 'init-1', name: 'Phase 1', status: 'in_progress' };
const initiative = { id: 'init-1', branch: 'cw/test', executionMode: 'review_per_phase' };
vi.mocked(mocks.taskRepository.findById).mockResolvedValue(task1 as any);
vi.mocked(mocks.phaseRepository.findById).mockResolvedValue(phase as any);
vi.mocked(mocks.initiativeRepository.findById).mockResolvedValue(initiative as any);
vi.mocked(mocks.taskRepository.findByPhaseId).mockResolvedValue([task1, task2] as any);
vi.mocked(mocks.projectRepository.findProjectsByInitiativeId).mockResolvedValue([]);
createOrchestrator(mocks);
emitTaskCompleted(mocks.eventBus, 'task-1');
// Give the async handler time to run
await new Promise((r) => setTimeout(r, 50));
expect(mocks.phaseRepository.update).not.toHaveBeenCalled();
expect(mocks.phaseDispatchManager.completePhase).not.toHaveBeenCalled();
});
});
describe('approveInitiative', () => {
function setupApproveTest(mocks: ReturnType<typeof createMocks>) {
const initiative = { id: 'init-1', branch: 'cw/test', status: 'pending_review' };
const project = { id: 'proj-1', name: 'test', url: 'https://example.com', defaultBranch: 'main' };
vi.mocked(mocks.initiativeRepository.findById).mockResolvedValue(initiative as any);
vi.mocked(mocks.projectRepository.findProjectsByInitiativeId).mockResolvedValue([project] as any);
vi.mocked(mocks.branchManager.branchExists).mockResolvedValue(true);
vi.mocked(mocks.branchManager.mergeBranch).mockResolvedValue({ success: true, message: 'ok', previousRef: 'abc000' });
return { initiative, project };
}
it('should roll back merge when push fails', async () => {
setupApproveTest(mocks);
vi.mocked(mocks.branchManager.pushBranch).mockRejectedValue(new Error('non-fast-forward'));
const orchestrator = createOrchestrator(mocks);
await expect(orchestrator.approveInitiative('init-1', 'merge_and_push')).rejects.toThrow('non-fast-forward');
// Should have rolled back the merge by restoring the previous ref
expect(mocks.branchManager.updateRef).toHaveBeenCalledWith(
expect.any(String),
'main',
'abc000',
);
// Should NOT have marked initiative as completed
expect(mocks.initiativeRepository.update).not.toHaveBeenCalled();
});
it('should complete initiative when push succeeds', async () => {
setupApproveTest(mocks);
const orchestrator = createOrchestrator(mocks);
await orchestrator.approveInitiative('init-1', 'merge_and_push');
expect(mocks.branchManager.updateRef).not.toHaveBeenCalled();
expect(mocks.initiativeRepository.update).toHaveBeenCalledWith('init-1', { status: 'completed' });
});
it('should not attempt rollback for push_branch strategy', async () => {
setupApproveTest(mocks);
vi.mocked(mocks.branchManager.pushBranch).mockRejectedValue(new Error('auth failed'));
const orchestrator = createOrchestrator(mocks);
await expect(orchestrator.approveInitiative('init-1', 'push_branch')).rejects.toThrow('auth failed');
// No merge happened, so no rollback needed
expect(mocks.branchManager.updateRef).not.toHaveBeenCalled();
});
});
});

View File

@@ -11,12 +11,13 @@
* - Review per-phase: pause after each phase for diff review
*/
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseChangesRequestedEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent, InitiativePendingReviewEvent, InitiativeReviewApprovedEvent } from '../events/index.js';
import type { EventBus, TaskCompletedEvent, PhasePendingReviewEvent, PhaseChangesRequestedEvent, PhaseMergedEvent, TaskMergedEvent, PhaseQueuedEvent, AgentStoppedEvent, AgentCrashedEvent, InitiativePendingReviewEvent, InitiativeReviewApprovedEvent, InitiativeChangesRequestedEvent } from '../events/index.js';
import type { BranchManager } from '../git/branch-manager.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { TaskRepository } from '../db/repositories/task-repository.js';
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { AgentRepository } from '../db/repositories/agent-repository.js';
import type { DispatchManager, PhaseDispatchManager } from '../dispatch/types.js';
import type { ConflictResolutionService } from '../coordination/conflict-resolution-service.js';
import { phaseBranchName, taskBranchName } from '../git/branch-naming.js';
@@ -25,6 +26,9 @@ import { createModuleLogger } from '../logger/index.js';
const log = createModuleLogger('execution-orchestrator');
/** Maximum number of automatic retries for crashed tasks before blocking */
const MAX_TASK_RETRIES = 3;
export class ExecutionOrchestrator {
/** Serialize merges per phase to avoid concurrent merge conflicts */
private phaseMergeLocks: Map<string, Promise<void>> = new Map();
@@ -44,6 +48,7 @@ export class ExecutionOrchestrator {
private conflictResolutionService: ConflictResolutionService,
private eventBus: EventBus,
private workspaceRoot: string,
private agentRepository?: AgentRepository,
) {}
/**
@@ -66,6 +71,18 @@ export class ExecutionOrchestrator {
});
});
// Auto-retry crashed agent tasks (up to MAX_TASK_RETRIES)
this.eventBus.on<AgentCrashedEvent>('agent:crashed', (event) => {
this.handleAgentCrashed(event).catch((err) => {
log.error({ err: err instanceof Error ? err.message : String(err) }, 'error handling agent:crashed');
});
});
// Recover in-memory dispatch queues from DB state (survives server restarts)
this.recoverDispatchQueues().catch((err) => {
log.error({ err: err instanceof Error ? err.message : String(err) }, 'dispatch queue recovery failed');
});
log.info('execution orchestrator started');
}
@@ -106,6 +123,27 @@ export class ExecutionOrchestrator {
this.scheduleDispatch();
}
private async handleAgentCrashed(event: AgentCrashedEvent): Promise<void> {
const { taskId, agentId, error } = event.payload;
if (!taskId) return;
const task = await this.taskRepository.findById(taskId);
if (!task || task.status !== 'in_progress') return;
const retryCount = (task.retryCount ?? 0) + 1;
if (retryCount > MAX_TASK_RETRIES) {
log.warn({ taskId, agentId, retryCount, error }, 'task exceeded max retries, leaving in_progress');
return;
}
// Reset task for re-dispatch with incremented retry count
await this.taskRepository.update(taskId, { status: 'pending', retryCount });
await this.dispatchManager.queue(taskId);
log.info({ taskId, agentId, retryCount, error }, 'crashed task re-queued for retry');
this.scheduleDispatch();
}
private async runDispatchCycle(): Promise<void> {
this.dispatchRunning = true;
try {
@@ -140,27 +178,29 @@ export class ExecutionOrchestrator {
if (!task?.phaseId || !task.initiativeId) return;
const initiative = await this.initiativeRepository.findById(task.initiativeId);
if (!initiative?.branch) return;
const phase = await this.phaseRepository.findById(task.phaseId);
if (!phase) return;
// Skip merge/review tasks — they already work on the phase branch directly
if (task.category === 'merge' || task.category === 'review') return;
// Merge task branch into phase branch (only when branches exist)
if (initiative?.branch && task.category !== 'merge' && task.category !== 'review') {
try {
const initBranch = initiative.branch;
const phBranch = phaseBranchName(initBranch, phase.name);
const tBranch = taskBranchName(initBranch, task.id);
const initBranch = initiative.branch;
const phBranch = phaseBranchName(initBranch, phase.name);
const tBranch = taskBranchName(initBranch, task.id);
// Serialize merges per phase
const lock = this.phaseMergeLocks.get(task.phaseId) ?? Promise.resolve();
const mergeOp = lock.then(async () => {
await this.mergeTaskIntoPhase(taskId, task.phaseId!, tBranch, phBranch);
});
this.phaseMergeLocks.set(task.phaseId, mergeOp.catch(() => {}));
await mergeOp;
} catch (err) {
log.error({ taskId, err: err instanceof Error ? err.message : String(err) }, 'task merge failed, still checking phase completion');
}
}
// Serialize merges per phase
const lock = this.phaseMergeLocks.get(task.phaseId) ?? Promise.resolve();
const mergeOp = lock.then(async () => {
await this.mergeTaskIntoPhase(taskId, task.phaseId!, tBranch, phBranch);
});
this.phaseMergeLocks.set(task.phaseId, mergeOp.catch(() => {}));
await mergeOp;
// Check if all phase tasks are done
// Check if all phase tasks are done — always, regardless of branch/merge status
const phaseTasks = await this.taskRepository.findByPhaseId(task.phaseId);
const allDone = phaseTasks.every((t) => t.status === 'completed');
if (allDone) {
@@ -228,10 +268,13 @@ export class ExecutionOrchestrator {
if (!phase) return;
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
if (!initiative?.branch) return;
if (!initiative) return;
if (initiative.executionMode === 'yolo') {
await this.mergePhaseIntoInitiative(phaseId);
// Merge phase branch into initiative branch (only when branches exist)
if (initiative.branch) {
await this.mergePhaseIntoInitiative(phaseId);
}
await this.phaseDispatchManager.completePhase(phaseId);
// Re-queue approved phases (self-healing: survives server restarts that wipe in-memory queue)
@@ -273,6 +316,18 @@ export class ExecutionOrchestrator {
const projects = await this.projectRepository.findProjectsByInitiativeId(phase.initiativeId);
// Store merge base before merging so we can reconstruct diffs for completed phases
for (const project of projects) {
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
try {
const mergeBase = await this.branchManager.getMergeBase(clonePath, initBranch, phBranch);
await this.phaseRepository.update(phaseId, { mergeBase });
break; // Only need one merge base (first project)
} catch {
// Phase branch may not exist in this project clone
}
}
for (const project of projects) {
const clonePath = await ensureProjectClone(project, this.workspaceRoot);
const result = await this.branchManager.mergeBranch(clonePath, phBranch, initBranch);
@@ -327,7 +382,14 @@ export class ExecutionOrchestrator {
*/
async requestChangesOnPhase(
phaseId: string,
unresolvedComments: Array<{ filePath: string; lineNumber: number; body: string }>,
unresolvedThreads: Array<{
id: string;
filePath: string;
lineNumber: number;
body: string;
author: string;
replies: Array<{ id: string; body: string; author: string }>;
}>,
summary?: string,
): Promise<{ taskId: string }> {
const phase = await this.phaseRepository.findById(phaseId);
@@ -339,16 +401,25 @@ export class ExecutionOrchestrator {
const initiative = await this.initiativeRepository.findById(phase.initiativeId);
if (!initiative) throw new Error(`Initiative not found: ${phase.initiativeId}`);
// Build revision task description from comments + summary
// Guard: don't create duplicate review tasks
const existingTasks = await this.taskRepository.findByPhaseId(phaseId);
const activeReview = existingTasks.find(
(t) => t.category === 'review' && (t.status === 'pending' || t.status === 'in_progress'),
);
if (activeReview) {
return { taskId: activeReview.id };
}
// Build revision task description from threaded comments + summary
const lines: string[] = [];
if (summary) {
lines.push(`## Summary\n\n${summary}\n`);
}
if (unresolvedComments.length > 0) {
if (unresolvedThreads.length > 0) {
lines.push('## Review Comments\n');
// Group comments by file
const byFile = new Map<string, typeof unresolvedComments>();
for (const c of unresolvedComments) {
const byFile = new Map<string, typeof unresolvedThreads>();
for (const c of unresolvedThreads) {
const arr = byFile.get(c.filePath) ?? [];
arr.push(c);
byFile.set(c.filePath, arr);
@@ -356,9 +427,13 @@ export class ExecutionOrchestrator {
for (const [filePath, fileComments] of byFile) {
lines.push(`### ${filePath}\n`);
for (const c of fileComments) {
lines.push(`- **Line ${c.lineNumber}**: ${c.body}`);
lines.push(`#### Line ${c.lineNumber} [comment:${c.id}]`);
lines.push(`**${c.author}**: ${c.body}`);
for (const r of c.replies) {
lines.push(`> **${r.author}**: ${r.body}`);
}
lines.push('');
}
lines.push('');
}
}
@@ -388,12 +463,12 @@ export class ExecutionOrchestrator {
phaseId,
initiativeId: phase.initiativeId,
taskId: task.id,
commentCount: unresolvedComments.length,
commentCount: unresolvedThreads.length,
},
};
this.eventBus.emit(event);
log.info({ phaseId, taskId: task.id, commentCount: unresolvedComments.length }, 'changes requested on phase');
log.info({ phaseId, taskId: task.id, commentCount: unresolvedThreads.length }, 'changes requested on phase');
// Kick off dispatch
this.scheduleDispatch();
@@ -401,6 +476,81 @@ export class ExecutionOrchestrator {
return { taskId: task.id };
}
/**
* Request changes on an initiative that's pending review.
* Creates/reuses a "Finalization" phase and adds a review task to it.
*/
async requestChangesOnInitiative(
initiativeId: string,
summary: string,
): Promise<{ taskId: string }> {
const initiative = await this.initiativeRepository.findById(initiativeId);
if (!initiative) throw new Error(`Initiative not found: ${initiativeId}`);
if (initiative.status !== 'pending_review') {
throw new Error(`Initiative ${initiativeId} is not pending review (status: ${initiative.status})`);
}
// Find or create a "Finalization" phase
const phases = await this.phaseRepository.findByInitiativeId(initiativeId);
let finalizationPhase = phases.find((p) => p.name === 'Finalization');
if (!finalizationPhase) {
finalizationPhase = await this.phaseRepository.create({
initiativeId,
name: 'Finalization',
status: 'in_progress',
});
} else if (finalizationPhase.status === 'completed' || finalizationPhase.status === 'pending_review') {
await this.phaseRepository.update(finalizationPhase.id, { status: 'in_progress' as any });
}
// Guard: don't create duplicate review tasks
const existingTasks = await this.taskRepository.findByPhaseId(finalizationPhase.id);
const activeReview = existingTasks.find(
(t) => t.category === 'review' && (t.status === 'pending' || t.status === 'in_progress'),
);
if (activeReview) {
// Still reset initiative to active
await this.initiativeRepository.update(initiativeId, { status: 'active' as any });
this.scheduleDispatch();
return { taskId: activeReview.id };
}
// Create review task
const task = await this.taskRepository.create({
phaseId: finalizationPhase.id,
initiativeId,
name: `Address initiative review feedback`,
description: `## Summary\n\n${summary}`,
category: 'review',
priority: 'high',
});
// Reset initiative status to active
await this.initiativeRepository.update(initiativeId, { status: 'active' as any });
// Queue task for dispatch
await this.dispatchManager.queue(task.id);
// Emit event
const event: InitiativeChangesRequestedEvent = {
type: 'initiative:changes_requested',
timestamp: new Date(),
payload: {
initiativeId,
phaseId: finalizationPhase.id,
taskId: task.id,
},
};
this.eventBus.emit(event);
log.info({ initiativeId, phaseId: finalizationPhase.id, taskId: task.id }, 'changes requested on initiative');
this.scheduleDispatch();
return { taskId: task.id };
}
/**
* Re-queue approved phases for an initiative into the in-memory dispatch queue.
* Self-healing: ensures phases aren't lost if the server restarted since the
@@ -420,6 +570,63 @@ export class ExecutionOrchestrator {
}
}
/**
* Recover in-memory dispatch queues from DB state on server startup.
* Re-queues approved phases and pending tasks for in_progress phases.
*/
private async recoverDispatchQueues(): Promise<void> {
const initiatives = await this.initiativeRepository.findByStatus('active');
let phasesRecovered = 0;
let tasksRecovered = 0;
for (const initiative of initiatives) {
const phases = await this.phaseRepository.findByInitiativeId(initiative.id);
for (const phase of phases) {
// Re-queue approved phases into the phase dispatch queue
if (phase.status === 'approved') {
try {
await this.phaseDispatchManager.queuePhase(phase.id);
phasesRecovered++;
} catch {
// Already queued or status changed
}
}
// Re-queue pending tasks and recover stuck in_progress tasks for in_progress phases
if (phase.status === 'in_progress') {
const tasks = await this.taskRepository.findByPhaseId(phase.id);
for (const task of tasks) {
if (task.status === 'pending') {
try {
await this.dispatchManager.queue(task.id);
tasksRecovered++;
} catch {
// Already queued or task issue
}
} else if (task.status === 'in_progress' && this.agentRepository) {
// Check if the assigned agent is still alive
const agent = await this.agentRepository.findByTaskId(task.id);
const isAlive = agent && (agent.status === 'running' || agent.status === 'waiting_for_input');
if (!isAlive) {
// Agent is dead — reset task for re-dispatch
await this.taskRepository.update(task.id, { status: 'pending' });
await this.dispatchManager.queue(task.id);
tasksRecovered++;
log.info({ taskId: task.id, agentId: agent?.id }, 'recovered stuck in_progress task (dead agent)');
}
}
}
}
}
}
if (phasesRecovered > 0 || tasksRecovered > 0) {
log.info({ phasesRecovered, tasksRecovered }, 'recovered dispatch queues from DB state');
this.scheduleDispatch();
}
}
/**
* Check if all phases for an initiative are completed.
* If so, set initiative to pending_review and emit event.
@@ -474,12 +681,32 @@ export class ExecutionOrchestrator {
continue;
}
// Fetch remote so local branches are up-to-date before merge/push
await this.branchManager.fetchRemote(clonePath);
if (strategy === 'merge_and_push') {
// Fast-forward local defaultBranch to match origin before merging
try {
await this.branchManager.fastForwardBranch(clonePath, project.defaultBranch);
} catch (ffErr) {
log.warn({ project: project.name, err: (ffErr as Error).message }, 'fast-forward of default branch failed — attempting merge anyway');
}
const result = await this.branchManager.mergeBranch(clonePath, initiative.branch, project.defaultBranch);
if (!result.success) {
throw new Error(`Failed to merge ${initiative.branch} into ${project.defaultBranch} for project ${project.name}: ${result.message}`);
}
await this.branchManager.pushBranch(clonePath, project.defaultBranch);
try {
await this.branchManager.pushBranch(clonePath, project.defaultBranch);
} catch (pushErr) {
// Roll back the merge so the diff doesn't disappear from the review tab.
// Without rollback, defaultBranch includes the initiative changes and the
// three-dot diff (defaultBranch...initiativeBranch) becomes empty.
if (result.previousRef) {
log.warn({ project: project.name, previousRef: result.previousRef }, 'push failed — rolling back merge');
await this.branchManager.updateRef(clonePath, project.defaultBranch, result.previousRef);
}
throw pushErr;
}
log.info({ initiativeId, project: project.name }, 'initiative branch merged into default and pushed');
} else {
await this.branchManager.pushBranch(clonePath, initiative.branch);

View File

@@ -6,7 +6,7 @@
* a worktree to be checked out.
*/
import type { MergeResult, BranchCommit } from './types.js';
import type { MergeResult, MergeabilityResult, BranchCommit } from './types.js';
export interface BranchManager {
/**
@@ -57,9 +57,41 @@ export interface BranchManager {
*/
diffCommit(repoPath: string, commitHash: string): Promise<string>;
/**
* Get the merge base (common ancestor) of two branches.
* Returns the commit hash of the merge base.
*/
getMergeBase(repoPath: string, branch1: string, branch2: string): Promise<string>;
/**
* Push a branch to a remote.
* Defaults to 'origin' if no remote specified.
*/
pushBranch(repoPath: string, branch: string, remote?: string): Promise<void>;
/**
* Dry-run merge check — determines if sourceBranch can be cleanly merged
* into targetBranch without actually performing the merge.
* Uses `git merge-tree --write-tree` (git 2.38+).
*/
checkMergeability(repoPath: string, sourceBranch: string, targetBranch: string): Promise<MergeabilityResult>;
/**
* Fetch all refs from a remote.
* Defaults to 'origin' if no remote specified.
*/
fetchRemote(repoPath: string, remote?: string): Promise<void>;
/**
* Fast-forward a local branch to match its remote-tracking counterpart.
* No-op if already up to date. Throws if fast-forward is not possible
* (i.e. the branches have diverged).
*/
fastForwardBranch(repoPath: string, branch: string, remote?: string): Promise<void>;
/**
* Force-update a branch ref to point at a specific commit.
* Used to roll back a merge when a subsequent push fails.
*/
updateRef(repoPath: string, branch: string, commitHash: string): Promise<void>;
}

View File

@@ -13,7 +13,7 @@
export type { WorktreeManager } from './types.js';
// Domain types
export type { Worktree, WorktreeDiff, MergeResult } from './types.js';
export type { Worktree, WorktreeDiff, MergeResult, MergeabilityResult } from './types.js';
// Adapters
export { SimpleGitWorktreeManager } from './manager.js';

View File

@@ -61,16 +61,35 @@ export class SimpleGitWorktreeManager implements WorktreeManager {
const worktreePath = path.join(this.worktreesDir, id);
log.info({ id, branch, baseBranch }, 'creating worktree');
// Create worktree with new branch
// git worktree add -b <branch> <path> <base-branch>
await this.git.raw([
'worktree',
'add',
'-b',
branch,
worktreePath,
baseBranch,
]);
// Safety: never force-reset a branch to its own base — this would nuke
// shared branches like the initiative branch if passed as both branch and baseBranch.
if (branch === baseBranch) {
throw new Error(`Worktree branch and baseBranch are the same (${branch}). Use a unique branch name.`);
}
// Create worktree — reuse existing branch or create new one
const branchExists = await this.branchExists(branch);
if (branchExists) {
// Branch exists from a previous run. Check if it has commits beyond baseBranch
// before resetting — a previous agent may have done real work on this branch.
try {
const aheadCount = await this.git.raw(['rev-list', '--count', `${baseBranch}..${branch}`]);
if (parseInt(aheadCount.trim(), 10) > 0) {
log.warn({ branch, baseBranch, aheadBy: aheadCount.trim() }, 'branch has commits beyond base, preserving');
} else {
await this.git.raw(['branch', '-f', branch, baseBranch]);
}
} catch {
// If rev-list fails (e.g. baseBranch doesn't exist yet), fall back to reset
await this.git.raw(['branch', '-f', branch, baseBranch]);
}
// Prune stale worktree references before adding new one
await this.git.raw(['worktree', 'prune']);
await this.git.raw(['worktree', 'add', worktreePath, branch]);
} else {
// git worktree add -b <branch> <path> <base-branch>
await this.git.raw(['worktree', 'add', '-b', branch, worktreePath, baseBranch]);
}
const worktree: Worktree = {
id,
@@ -327,6 +346,18 @@ export class SimpleGitWorktreeManager implements WorktreeManager {
return worktrees;
}
/**
* Check if a local branch exists in the repository.
*/
private async branchExists(branch: string): Promise<boolean> {
try {
await this.git.raw(['rev-parse', '--verify', `refs/heads/${branch}`]);
return true;
} catch {
return false;
}
}
/**
* Parse the output of git diff --name-status.
*/

View File

@@ -6,12 +6,12 @@
* on project clones without requiring a worktree.
*/
import { join } from 'node:path';
import { join, resolve } from 'node:path';
import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { simpleGit } from 'simple-git';
import type { BranchManager } from './branch-manager.js';
import type { MergeResult, BranchCommit } from './types.js';
import type { MergeResult, MergeabilityResult, BranchCommit } from './types.js';
import { createModuleLogger } from '../logger/index.js';
const log = createModuleLogger('branch-manager');
@@ -31,21 +31,32 @@ export class SimpleGitBranchManager implements BranchManager {
}
async mergeBranch(repoPath: string, sourceBranch: string, targetBranch: string): Promise<MergeResult> {
// Use an ephemeral worktree for merge safety
// Use an ephemeral worktree with a temp branch for merge safety.
// We can't check out targetBranch directly — it may already be checked out
// in the clone's main working tree or an agent worktree.
const tmpPath = mkdtempSync(join(tmpdir(), 'cw-merge-'));
const repoGit = simpleGit(repoPath);
const tempBranch = `cw-merge-${Date.now()}`;
try {
// Create ephemeral worktree on target branch
await repoGit.raw(['worktree', 'add', tmpPath, targetBranch]);
// Capture the target branch ref before merge so callers can roll back on push failure
const previousRef = (await repoGit.raw(['rev-parse', targetBranch])).trim();
// Create worktree with a temp branch starting at targetBranch's commit
await repoGit.raw(['worktree', 'add', '-b', tempBranch, tmpPath, targetBranch]);
const wtGit = simpleGit(tmpPath);
try {
await wtGit.merge([sourceBranch, '--no-edit']);
// Update the real target branch ref to the merge result.
// update-ref bypasses the "branch is checked out" guard.
const mergeCommit = (await wtGit.revparse(['HEAD'])).trim();
await repoGit.raw(['update-ref', `refs/heads/${targetBranch}`, mergeCommit]);
log.info({ repoPath, sourceBranch, targetBranch }, 'merge completed cleanly');
return { success: true, message: `Merged ${sourceBranch} into ${targetBranch}` };
return { success: true, message: `Merged ${sourceBranch} into ${targetBranch}`, previousRef };
} catch (mergeErr) {
// Check for merge conflicts
const status = await wtGit.status();
@@ -73,6 +84,10 @@ export class SimpleGitBranchManager implements BranchManager {
try { rmSync(tmpPath, { recursive: true, force: true }); } catch { /* ignore */ }
try { await repoGit.raw(['worktree', 'prune']); } catch { /* ignore */ }
}
// Delete the temp branch
try {
await repoGit.raw(['branch', '-D', tempBranch]);
} catch { /* ignore — may already be cleaned up */ }
}
}
@@ -141,9 +156,95 @@ export class SimpleGitBranchManager implements BranchManager {
return git.diff([`${commitHash}~1`, commitHash]);
}
async getMergeBase(repoPath: string, branch1: string, branch2: string): Promise<string> {
const git = simpleGit(repoPath);
const result = await git.raw(['merge-base', branch1, branch2]);
return result.trim();
}
async pushBranch(repoPath: string, branch: string, remote = 'origin'): Promise<void> {
const git = simpleGit(repoPath);
await git.push(remote, branch);
try {
await git.push(remote, branch);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
if (!msg.includes('branch is currently checked out')) throw err;
// Local non-bare repo with the branch checked out — temporarily allow it.
// receive.denyCurrentBranch=updateInstead updates the remote's working tree
// and index to match, or rejects if the working tree is dirty.
const remoteUrl = (await git.remote(['get-url', remote]))?.trim();
if (!remoteUrl) throw err;
const remotePath = resolve(repoPath, remoteUrl);
const remoteGit = simpleGit(remotePath);
await remoteGit.addConfig('receive.denyCurrentBranch', 'updateInstead');
try {
await git.push(remote, branch);
} finally {
await remoteGit.raw(['config', '--unset', 'receive.denyCurrentBranch']);
}
}
log.info({ repoPath, branch, remote }, 'branch pushed to remote');
}
async checkMergeability(repoPath: string, sourceBranch: string, targetBranch: string): Promise<MergeabilityResult> {
const git = simpleGit(repoPath);
// git merge-tree --write-tree outputs everything to stdout.
// simple-git's .raw() resolves with stdout even on exit code 1 (conflicts),
// so we parse the output text instead of relying on catch.
const output = await git.raw(['merge-tree', '--write-tree', targetBranch, sourceBranch]);
// Parse conflict file names from "CONFLICT (content): Merge conflict in <path>"
const conflictPattern = /CONFLICT \([^)]+\): (?:Merge conflict in|.* -> )(.+)/g;
const conflicts: string[] = [];
let match: RegExpExecArray | null;
while ((match = conflictPattern.exec(output)) !== null) {
conflicts.push(match[1].trim());
}
if (conflicts.length > 0) {
log.debug({ repoPath, sourceBranch, targetBranch, conflicts }, 'merge-tree check: conflicts');
return { mergeable: false, conflicts };
}
// Fallback: check for any CONFLICT text we couldn't parse specifically
if (output.includes('CONFLICT')) {
log.debug({ repoPath, sourceBranch, targetBranch }, 'merge-tree check: unparsed conflicts');
return { mergeable: false, conflicts: ['(unable to parse conflict details)'] };
}
log.debug({ repoPath, sourceBranch, targetBranch }, 'merge-tree check: clean');
return { mergeable: true };
}
async fetchRemote(repoPath: string, remote = 'origin'): Promise<void> {
const git = simpleGit(repoPath);
await git.fetch(remote);
log.info({ repoPath, remote }, 'fetched remote');
}
async fastForwardBranch(repoPath: string, branch: string, remote = 'origin'): Promise<void> {
const git = simpleGit(repoPath);
const remoteBranch = `${remote}/${branch}`;
// Verify it's a genuine fast-forward (branch is ancestor of remote)
try {
await git.raw(['merge-base', '--is-ancestor', branch, remoteBranch]);
} catch {
throw new Error(`Cannot fast-forward ${branch}: it has diverged from ${remoteBranch}`);
}
// Use update-ref instead of git merge so dirty working trees don't block it.
// The clone may have uncommitted agent work; we only need to advance the ref.
const targetCommit = (await git.raw(['rev-parse', remoteBranch])).trim();
await git.raw(['update-ref', `refs/heads/${branch}`, targetCommit]);
log.info({ repoPath, branch, remoteBranch }, 'fast-forwarded branch');
}
async updateRef(repoPath: string, branch: string, commitHash: string): Promise<void> {
const git = simpleGit(repoPath);
await git.raw(['update-ref', `refs/heads/${branch}`, commitHash]);
log.info({ repoPath, branch, commitHash: commitHash.slice(0, 7) }, 'branch ref updated');
}
}

View File

@@ -56,6 +56,21 @@ export interface MergeResult {
conflicts?: string[];
/** Human-readable message describing the result */
message: string;
/** The target branch's commit hash before the merge (for rollback on push failure) */
previousRef?: string;
}
// =============================================================================
// Mergeability Check
// =============================================================================
/**
* Result of a dry-run merge check.
* No side effects — only tells you whether the merge would succeed.
*/
export interface MergeabilityResult {
mergeable: boolean;
conflicts?: string[];
}
// =============================================================================

View File

@@ -164,7 +164,8 @@ describe('generateGatewayCaddyfile', () => {
const caddyfile = generateGatewayCaddyfile(previews, 9100);
expect(caddyfile).toContain('auto_https off');
expect(caddyfile).toContain('abc123.localhost:9100 {');
expect(caddyfile).toContain('abc123.localhost:80 {');
expect(caddyfile).toContain('handle /* {');
expect(caddyfile).toContain('reverse_proxy cw-preview-abc123-app:3000');
});
@@ -176,13 +177,14 @@ describe('generateGatewayCaddyfile', () => {
]);
const caddyfile = generateGatewayCaddyfile(previews, 9100);
expect(caddyfile).toContain('abc123.localhost:80 {');
expect(caddyfile).toContain('handle_path /api/*');
expect(caddyfile).toContain('reverse_proxy cw-preview-abc123-backend:8080');
expect(caddyfile).toContain('handle {');
expect(caddyfile).toContain('handle /* {');
expect(caddyfile).toContain('reverse_proxy cw-preview-abc123-frontend:3000');
});
it('generates multi-preview Caddyfile with separate subdomain blocks', () => {
it('generates separate subdomain blocks for each preview', () => {
const previews = new Map<string, GatewayRoute[]>();
previews.set('abc', [
{ containerName: 'cw-preview-abc-app', port: 3000, route: '/' },
@@ -192,8 +194,8 @@ describe('generateGatewayCaddyfile', () => {
]);
const caddyfile = generateGatewayCaddyfile(previews, 9100);
expect(caddyfile).toContain('abc.localhost:9100 {');
expect(caddyfile).toContain('xyz.localhost:9100 {');
expect(caddyfile).toContain('abc.localhost:80 {');
expect(caddyfile).toContain('xyz.localhost:80 {');
expect(caddyfile).toContain('reverse_proxy cw-preview-abc-app:3000');
expect(caddyfile).toContain('reverse_proxy cw-preview-xyz-app:5000');
});
@@ -209,10 +211,10 @@ describe('generateGatewayCaddyfile', () => {
const caddyfile = generateGatewayCaddyfile(previews, 9100);
const apiAuthIdx = caddyfile.indexOf('/api/auth');
const apiIdx = caddyfile.indexOf('handle_path /api/*');
const handleIdx = caddyfile.indexOf('handle {');
const rootIdx = caddyfile.indexOf('handle /* {');
expect(apiAuthIdx).toBeLessThan(apiIdx);
expect(apiIdx).toBeLessThan(handleIdx);
expect(apiIdx).toBeLessThan(rootIdx);
});
});

View File

@@ -2,7 +2,7 @@
* Gateway Manager
*
* Manages a single shared Caddy reverse proxy (the "gateway") that routes
* subdomain requests to per-preview compose stacks on a shared Docker network.
* subdomain-based requests to per-preview compose stacks on a shared Docker network.
*
* Architecture:
* .cw-previews/gateway/
@@ -195,18 +195,20 @@ export class GatewayManager {
/**
* Generate a Caddyfile for the gateway from all active preview routes.
*
* Each preview gets a subdomain block: `<previewId>.localhost:<port>`
* Uses subdomain-based routing: each preview gets its own `<previewId>.localhost:80` block.
* Chrome/Firefox resolve `*.localhost` to 127.0.0.1 natively — no DNS setup needed.
* Routes within a preview are sorted by specificity (longest path first).
*/
export function generateGatewayCaddyfile(
previews: Map<string, GatewayRoute[]>,
port: number,
_port: number,
): string {
// Caddy runs inside a container where Docker maps host:${port} → container:80.
// The Caddyfile must listen on the container-internal port (80), not the host port.
const lines: string[] = [
'{',
' auto_https off',
'}',
'',
];
for (const [previewId, routes] of previews) {
@@ -217,11 +219,12 @@ export function generateGatewayCaddyfile(
return b.route.length - a.route.length;
});
lines.push(`${previewId}.localhost:${port} {`);
lines.push('');
lines.push(`${previewId}.localhost:80 {`);
for (const route of sorted) {
if (route.route === '/') {
lines.push(` handle {`);
lines.push(` handle /* {`);
lines.push(` reverse_proxy ${route.containerName}:${route.port}`);
lines.push(` }`);
} else {
@@ -233,8 +236,9 @@ export function generateGatewayCaddyfile(
}
lines.push('}');
lines.push('');
}
lines.push('');
return lines.join('\n');
}

View File

@@ -1,7 +1,7 @@
/**
* Health Checker
*
* Polls service healthcheck endpoints through the gateway's subdomain routing
* Polls service healthcheck endpoints through the gateway's subdomain-based routing
* to verify that preview services are ready.
*/

View File

@@ -67,7 +67,7 @@ vi.mock('node:fs/promises', () => ({
}));
vi.mock('nanoid', () => ({
nanoid: vi.fn(() => 'abc123test'),
customAlphabet: vi.fn(() => vi.fn(() => 'abc123test')),
}));
import { PreviewManager } from './manager.js';
@@ -220,7 +220,7 @@ describe('PreviewManager', () => {
expect(result.projectId).toBe('proj-1');
expect(result.branch).toBe('feature-x');
expect(result.gatewayPort).toBe(9100);
expect(result.url).toBe('http://abc123test.localhost:9100');
expect(result.url).toBe('http://abc123test.localhost:9100/');
expect(result.mode).toBe('preview');
expect(result.status).toBe('running');
@@ -233,7 +233,7 @@ describe('PreviewManager', () => {
expect(buildingEvent).toBeDefined();
expect(readyEvent).toBeDefined();
expect((readyEvent!.payload as Record<string, unknown>).url).toBe(
'http://abc123test.localhost:9100',
'http://abc123test.localhost:9100/',
);
});
@@ -472,7 +472,7 @@ describe('PreviewManager', () => {
expect(previews).toHaveLength(2);
expect(previews[0].id).toBe('aaa');
expect(previews[0].gatewayPort).toBe(9100);
expect(previews[0].url).toBe('http://aaa.localhost:9100');
expect(previews[0].url).toBe('http://aaa.localhost:9100/');
expect(previews[0].mode).toBe('preview');
expect(previews[0].services).toHaveLength(1);
expect(previews[1].id).toBe('bbb');
@@ -573,7 +573,7 @@ describe('PreviewManager', () => {
expect(status!.status).toBe('running');
expect(status!.id).toBe('abc');
expect(status!.gatewayPort).toBe(9100);
expect(status!.url).toBe('http://abc.localhost:9100');
expect(status!.url).toBe('http://abc.localhost:9100/');
expect(status!.mode).toBe('preview');
});

View File

@@ -8,7 +8,7 @@
import { join } from 'node:path';
import { mkdir, writeFile, rm } from 'node:fs/promises';
import { nanoid } from 'nanoid';
import { customAlphabet } from 'nanoid';
import type { ProjectRepository } from '../db/repositories/project-repository.js';
import type { PhaseRepository } from '../db/repositories/phase-repository.js';
import type { InitiativeRepository } from '../db/repositories/initiative-repository.js';
@@ -116,7 +116,8 @@ export class PreviewManager {
);
// 4. Generate ID and prepare deploy dir
const id = nanoid(10);
const previewNanoid = customAlphabet('0123456789abcdefghijklmnopqrstuvwxyz', 10);
const id = previewNanoid();
const projectName = `${COMPOSE_PROJECT_PREFIX}${id}`;
const deployDir = join(this.workspaceRoot, PREVIEWS_DIR, id);
await mkdir(deployDir, { recursive: true });
@@ -238,7 +239,7 @@ export class PreviewManager {
await this.runSeeds(projectName, config);
// 11. Success
const url = `http://${id}.localhost:${gatewayPort}`;
const url = `http://${id}.localhost:${gatewayPort}/`;
log.info({ id, url }, 'preview deployment ready');
this.eventBus.emit<PreviewReadyEvent>({
@@ -604,7 +605,7 @@ export class PreviewManager {
projectId,
branch,
gatewayPort,
url: `http://${previewId}.localhost:${gatewayPort}`,
url: `http://${previewId}.localhost:${gatewayPort}/`,
mode,
status: 'running',
services: [],

View File

@@ -143,7 +143,7 @@ describe('Detail Workflow E2E', () => {
harness.setArchitectDetailComplete('detailer', [
{ number: 1, name: 'Task 1', content: 'First task', type: 'auto', dependencies: [] },
{ number: 2, name: 'Task 2', content: 'Second task', type: 'auto', dependencies: [1] },
{ number: 3, name: 'Verify', content: 'Verify all', type: 'checkpoint:human-verify', dependencies: [2] },
{ number: 3, name: 'Verify', content: 'Verify all', type: 'auto', dependencies: [2] },
]);
// Resume with all answers
@@ -261,7 +261,7 @@ describe('Detail Workflow E2E', () => {
tasks: [
{ number: 1, name: 'Schema', description: 'Create tables', type: 'auto', dependencies: [] },
{ number: 2, name: 'API', description: 'Create endpoints', type: 'auto', dependencies: [1] },
{ number: 3, name: 'Verify', description: 'Test flow', type: 'checkpoint:human-verify', dependencies: [2] },
{ number: 3, name: 'Verify', description: 'Test flow', type: 'auto', dependencies: [2] },
],
});
@@ -271,33 +271,31 @@ describe('Detail Workflow E2E', () => {
expect(tasks[0].name).toBe('Schema');
expect(tasks[1].name).toBe('API');
expect(tasks[2].name).toBe('Verify');
expect(tasks[2].type).toBe('checkpoint:human-verify');
expect(tasks[2].type).toBe('auto');
});
it('should handle all task types', async () => {
it('should create tasks with auto type', async () => {
const initiative = await harness.createInitiative('Task Types Test');
const phases = await harness.createPhasesFromPlan(initiative.id, [
{ name: 'Phase 1' },
]);
const detailTask = await harness.createDetailTask(phases[0].id, 'Mixed Tasks');
// Create tasks with all types
await harness.caller.createChildTasks({
parentTaskId: detailTask.id,
tasks: [
{ number: 1, name: 'Auto Task', description: 'Automated work', type: 'auto' },
{ number: 2, name: 'Human Verify', description: 'Visual check', type: 'checkpoint:human-verify', dependencies: [1] },
{ number: 3, name: 'Decision', description: 'Choose approach', type: 'checkpoint:decision', dependencies: [2] },
{ number: 4, name: 'Human Action', description: 'Manual step', type: 'checkpoint:human-action', dependencies: [3] },
{ number: 2, name: 'Second Task', description: 'More work', type: 'auto', dependencies: [1] },
{ number: 3, name: 'Third Task', description: 'Even more', type: 'auto', dependencies: [2] },
{ number: 4, name: 'Final Task', description: 'Last step', type: 'auto', dependencies: [3] },
],
});
const tasks = await harness.getChildTasks(detailTask.id);
expect(tasks).toHaveLength(4);
expect(tasks[0].type).toBe('auto');
expect(tasks[1].type).toBe('checkpoint:human-verify');
expect(tasks[2].type).toBe('checkpoint:decision');
expect(tasks[3].type).toBe('checkpoint:human-action');
for (const task of tasks) {
expect(task.type).toBe('auto');
}
});
it('should create task dependencies', async () => {
@@ -346,7 +344,7 @@ describe('Detail Workflow E2E', () => {
{ number: 1, name: 'Create user schema', content: 'Define User model', type: 'auto', dependencies: [] },
{ number: 2, name: 'Implement JWT', content: 'Token generation', type: 'auto', dependencies: [1] },
{ number: 3, name: 'Protected routes', content: 'Middleware', type: 'auto', dependencies: [2] },
{ number: 4, name: 'Verify auth', content: 'Test login flow', type: 'checkpoint:human-verify', dependencies: [3] },
{ number: 4, name: 'Verify auth', content: 'Test login flow', type: 'auto', dependencies: [3] },
]);
await harness.caller.spawnArchitectDetail({
@@ -367,7 +365,7 @@ describe('Detail Workflow E2E', () => {
{ number: 1, name: 'Create user schema', description: 'Define User model', type: 'auto', dependencies: [] },
{ number: 2, name: 'Implement JWT', description: 'Token generation', type: 'auto', dependencies: [1] },
{ number: 3, name: 'Protected routes', description: 'Middleware', type: 'auto', dependencies: [2] },
{ number: 4, name: 'Verify auth', description: 'Test login flow', type: 'checkpoint:human-verify', dependencies: [3] },
{ number: 4, name: 'Verify auth', description: 'Test login flow', type: 'auto', dependencies: [3] },
],
});
@@ -375,7 +373,7 @@ describe('Detail Workflow E2E', () => {
const tasks = await harness.getChildTasks(detailTask.id);
expect(tasks).toHaveLength(4);
expect(tasks[0].name).toBe('Create user schema');
expect(tasks[3].type).toBe('checkpoint:human-verify');
expect(tasks[3].type).toBe('auto');
// Agent should be idle
const finalAgent = await harness.caller.getAgent({ name: 'detailer' });

View File

@@ -202,6 +202,27 @@ export function agentProcedures(publicProcedure: ProcedureBuilder) {
return candidates[0] ?? null;
}),
getActiveConflictAgent: publicProcedure
.input(z.object({ initiativeId: z.string().min(1) }))
.query(async ({ ctx, input }): Promise<AgentInfo | null> => {
const agentManager = requireAgentManager(ctx);
const allAgents = await agentManager.list();
const candidates = allAgents
.filter(
(a) =>
a.mode === 'execute' &&
a.initiativeId === input.initiativeId &&
a.name?.startsWith('conflict-') &&
['running', 'waiting_for_input', 'idle', 'crashed'].includes(a.status) &&
!a.userDismissedAt,
)
.sort(
(a, b) =>
new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(),
);
return candidates[0] ?? null;
}),
getAgentOutput: publicProcedure
.input(agentIdentifierSchema)
.query(async ({ ctx, input }): Promise<string> => {

View File

@@ -91,6 +91,9 @@ export function changeSetProcedures(publicProcedure: ProcedureBuilder) {
}
}
// Mark reverted FIRST to avoid ghost state if entity deletion fails partway
await repo.markReverted(input.id);
// Apply reverts in reverse entry order
const reversedEntries = [...cs.entries].reverse();
for (const entry of reversedEntries) {
@@ -159,8 +162,6 @@ export function changeSetProcedures(publicProcedure: ProcedureBuilder) {
}
}
await repo.markReverted(input.id);
ctx.eventBus.emit({
type: 'changeset:reverted' as const,
timestamp: new Date(),

View File

@@ -7,7 +7,7 @@ import { z } from 'zod';
import type { ProcedureBuilder } from '../trpc.js';
import { requireAgentManager, requireInitiativeRepository, requireProjectRepository, requireTaskRepository, requireBranchManager, requireExecutionOrchestrator } from './_helpers.js';
import { deriveInitiativeActivity } from './initiative-activity.js';
import { buildRefinePrompt } from '../../agent/prompts/index.js';
import { buildRefinePrompt, buildConflictResolutionPrompt, buildConflictResolutionDescription } from '../../agent/prompts/index.js';
import type { PageForSerialization } from '../../agent/content-serializer.js';
import { ensureProjectClone } from '../../git/project-clones.js';
@@ -335,5 +335,145 @@ export function initiativeProcedures(publicProcedure: ProcedureBuilder) {
await orchestrator.approveInitiative(input.initiativeId, input.strategy);
return { success: true };
}),
requestInitiativeChanges: publicProcedure
.input(z.object({
initiativeId: z.string().min(1),
summary: z.string().trim().min(1),
}))
.mutation(async ({ ctx, input }) => {
const orchestrator = requireExecutionOrchestrator(ctx);
const result = await orchestrator.requestChangesOnInitiative(
input.initiativeId,
input.summary,
);
return { success: true, taskId: result.taskId };
}),
checkInitiativeMergeability: publicProcedure
.input(z.object({ initiativeId: z.string().min(1) }))
.query(async ({ ctx, input }) => {
const initiativeRepo = requireInitiativeRepository(ctx);
const projectRepo = requireProjectRepository(ctx);
const branchManager = requireBranchManager(ctx);
const initiative = await initiativeRepo.findById(input.initiativeId);
if (!initiative) {
throw new TRPCError({ code: 'NOT_FOUND', message: `Initiative '${input.initiativeId}' not found` });
}
if (!initiative.branch) {
throw new TRPCError({ code: 'BAD_REQUEST', message: 'Initiative has no branch configured' });
}
const projects = await projectRepo.findProjectsByInitiativeId(input.initiativeId);
const allConflicts: string[] = [];
let mergeable = true;
for (const project of projects) {
const clonePath = await ensureProjectClone(project, ctx.workspaceRoot!);
const result = await branchManager.checkMergeability(clonePath, initiative.branch, project.defaultBranch);
if (!result.mergeable) {
mergeable = false;
if (result.conflicts) allConflicts.push(...result.conflicts);
}
}
return {
mergeable,
conflictFiles: allConflicts,
targetBranch: projects[0]?.defaultBranch ?? 'main',
};
}),
spawnConflictResolutionAgent: publicProcedure
.input(z.object({
initiativeId: z.string().min(1),
provider: z.string().optional(),
}))
.mutation(async ({ ctx, input }) => {
const agentManager = requireAgentManager(ctx);
const initiativeRepo = requireInitiativeRepository(ctx);
const projectRepo = requireProjectRepository(ctx);
const taskRepo = requireTaskRepository(ctx);
const branchManager = requireBranchManager(ctx);
const initiative = await initiativeRepo.findById(input.initiativeId);
if (!initiative) {
throw new TRPCError({ code: 'NOT_FOUND', message: `Initiative '${input.initiativeId}' not found` });
}
if (!initiative.branch) {
throw new TRPCError({ code: 'BAD_REQUEST', message: 'Initiative has no branch configured' });
}
const projects = await projectRepo.findProjectsByInitiativeId(input.initiativeId);
if (projects.length === 0) {
throw new TRPCError({ code: 'BAD_REQUEST', message: 'Initiative has no linked projects' });
}
// Auto-dismiss stale conflict agents
const allAgents = await agentManager.list();
const staleAgents = allAgents.filter(
(a) =>
a.mode === 'execute' &&
a.initiativeId === input.initiativeId &&
a.name?.startsWith('conflict-') &&
['crashed', 'idle'].includes(a.status) &&
!a.userDismissedAt,
);
for (const stale of staleAgents) {
await agentManager.dismiss(stale.id);
}
// Reject if active conflict agent already running
const activeConflictAgents = allAgents.filter(
(a) =>
a.mode === 'execute' &&
a.initiativeId === input.initiativeId &&
a.name?.startsWith('conflict-') &&
['running', 'waiting_for_input'].includes(a.status),
);
if (activeConflictAgents.length > 0) {
throw new TRPCError({
code: 'CONFLICT',
message: 'A conflict resolution agent is already running for this initiative',
});
}
// Re-check mergeability to get current conflict list
const project = projects[0];
const clonePath = await ensureProjectClone(project, ctx.workspaceRoot!);
const mergeCheck = await branchManager.checkMergeability(clonePath, initiative.branch, project.defaultBranch);
if (mergeCheck.mergeable) {
throw new TRPCError({ code: 'BAD_REQUEST', message: 'No merge conflicts detected — merge is clean' });
}
const conflicts = mergeCheck.conflicts ?? [];
const targetBranch = project.defaultBranch;
// Create task
const task = await taskRepo.create({
initiativeId: input.initiativeId,
name: `Resolve conflicts: ${initiative.name}`,
description: buildConflictResolutionDescription(initiative.branch, targetBranch, conflicts),
category: 'merge',
status: 'in_progress',
});
// Spawn agent on a unique temp branch based off the initiative branch.
// Using initiative.branch directly as branchName would cause SimpleGitWorktreeManager.create()
// to run `git branch -f <branch> <base>`, force-resetting the initiative branch.
const tempBranch = `${initiative.branch}-conflict-${Date.now()}`;
const prompt = buildConflictResolutionPrompt(initiative.branch, targetBranch, conflicts);
return agentManager.spawn({
name: `conflict-${Date.now()}`,
taskId: task.id,
prompt,
mode: 'execute',
provider: input.provider,
initiativeId: input.initiativeId,
baseBranch: initiative.branch,
branchName: tempBranch,
});
}),
};
}

View File

@@ -53,7 +53,7 @@ export function phaseDispatchProcedures(publicProcedure: ProcedureBuilder) {
number: z.number().int().positive(),
name: z.string().min(1),
description: z.string(),
type: z.enum(['auto', 'checkpoint:human-verify', 'checkpoint:decision', 'checkpoint:human-action']).default('auto'),
type: z.enum(['auto']).default('auto'),
dependencies: z.array(z.number().int().positive()).optional(),
})),
}))

View File

@@ -6,7 +6,7 @@ import { TRPCError } from '@trpc/server';
import { z } from 'zod';
import type { Phase } from '../../db/schema.js';
import type { ProcedureBuilder } from '../trpc.js';
import { requirePhaseRepository, requireTaskRepository, requireBranchManager, requireInitiativeRepository, requireProjectRepository, requireExecutionOrchestrator, requireReviewCommentRepository } from './_helpers.js';
import { requirePhaseRepository, requireTaskRepository, requireBranchManager, requireInitiativeRepository, requireProjectRepository, requireExecutionOrchestrator, requireReviewCommentRepository, requireChangeSetRepository } from './_helpers.js';
import { phaseBranchName } from '../../git/branch-naming.js';
import { ensureProjectClone } from '../../git/project-clones.js';
@@ -98,6 +98,29 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
.mutation(async ({ ctx, input }) => {
const repo = requirePhaseRepository(ctx);
await repo.delete(input.id);
// Reconcile any applied changesets that created this phase.
// If all created phases in a changeset are now deleted, mark it reverted.
if (ctx.changeSetRepository) {
try {
const csRepo = requireChangeSetRepository(ctx);
const affectedChangeSets = await csRepo.findAppliedByCreatedEntity('phase', input.id);
for (const cs of affectedChangeSets) {
const createdPhaseIds = cs.entries
.filter(e => e.entityType === 'phase' && e.action === 'create')
.map(e => e.entityId);
const survivingPhases = await Promise.all(
createdPhaseIds.map(id => repo.findById(id)),
);
if (survivingPhases.every(p => p === null)) {
await csRepo.markReverted(cs.id);
}
}
} catch {
// Best-effort reconciliation — don't fail the delete
}
}
return { success: true };
}),
@@ -196,8 +219,8 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
if (!phase) {
throw new TRPCError({ code: 'NOT_FOUND', message: `Phase '${input.phaseId}' not found` });
}
if (phase.status !== 'pending_review') {
throw new TRPCError({ code: 'BAD_REQUEST', message: `Phase is not pending review (status: ${phase.status})` });
if (phase.status !== 'pending_review' && phase.status !== 'completed') {
throw new TRPCError({ code: 'BAD_REQUEST', message: `Phase is not reviewable (status: ${phase.status})` });
}
const initiative = await initiativeRepo.findById(phase.initiativeId);
@@ -207,13 +230,15 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
const initBranch = initiative.branch;
const phBranch = phaseBranchName(initBranch, phase.name);
// For completed phases, use stored merge base; for pending_review, use initiative branch
const diffBase = (phase.status === 'completed' && phase.mergeBase) ? phase.mergeBase : initBranch;
const projects = await projectRepo.findProjectsByInitiativeId(phase.initiativeId);
let rawDiff = '';
for (const project of projects) {
const clonePath = await ensureProjectClone(project, ctx.workspaceRoot!);
const diff = await branchManager.diffBranches(clonePath, initBranch, phBranch);
const diff = await branchManager.diffBranches(clonePath, diffBase, phBranch);
if (diff) {
rawDiff += diff + '\n';
}
@@ -247,8 +272,8 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
if (!phase) {
throw new TRPCError({ code: 'NOT_FOUND', message: `Phase '${input.phaseId}' not found` });
}
if (phase.status !== 'pending_review') {
throw new TRPCError({ code: 'BAD_REQUEST', message: `Phase is not pending review (status: ${phase.status})` });
if (phase.status !== 'pending_review' && phase.status !== 'completed') {
throw new TRPCError({ code: 'BAD_REQUEST', message: `Phase is not reviewable (status: ${phase.status})` });
}
const initiative = await initiativeRepo.findById(phase.initiativeId);
@@ -258,13 +283,14 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
const initBranch = initiative.branch;
const phBranch = phaseBranchName(initBranch, phase.name);
const diffBase = (phase.status === 'completed' && phase.mergeBase) ? phase.mergeBase : initBranch;
const projects = await projectRepo.findProjectsByInitiativeId(phase.initiativeId);
const allCommits: Array<{ hash: string; shortHash: string; message: string; author: string; date: string; filesChanged: number; insertions: number; deletions: number }> = [];
for (const project of projects) {
const clonePath = await ensureProjectClone(project, ctx.workspaceRoot!);
const commits = await branchManager.listCommits(clonePath, initBranch, phBranch);
const commits = await branchManager.listCommits(clonePath, diffBase, phBranch);
allCommits.push(...commits);
}
@@ -320,6 +346,20 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
return repo.create(input);
}),
updateReviewComment: publicProcedure
.input(z.object({
id: z.string().min(1),
body: z.string().trim().min(1),
}))
.mutation(async ({ ctx, input }) => {
const repo = requireReviewCommentRepository(ctx);
const comment = await repo.update(input.id, input.body);
if (!comment) {
throw new TRPCError({ code: 'NOT_FOUND', message: `Review comment '${input.id}' not found` });
}
return comment;
}),
resolveReviewComment: publicProcedure
.input(z.object({ id: z.string().min(1) }))
.mutation(async ({ ctx, input }) => {
@@ -342,25 +382,54 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
return comment;
}),
replyToReviewComment: publicProcedure
.input(z.object({
parentCommentId: z.string().min(1),
body: z.string().trim().min(1),
author: z.string().optional(),
}))
.mutation(async ({ ctx, input }) => {
const repo = requireReviewCommentRepository(ctx);
return repo.createReply(input.parentCommentId, input.body, input.author);
}),
requestPhaseChanges: publicProcedure
.input(z.object({
phaseId: z.string().min(1),
summary: z.string().optional(),
summary: z.string().trim().min(1).optional(),
}))
.mutation(async ({ ctx, input }) => {
const orchestrator = requireExecutionOrchestrator(ctx);
const reviewCommentRepo = requireReviewCommentRepository(ctx);
const allComments = await reviewCommentRepo.findByPhaseId(input.phaseId);
const unresolved = allComments
.filter((c: { resolved: boolean }) => !c.resolved)
.map((c: { filePath: string; lineNumber: number; body: string }) => ({
// Build threaded structure: unresolved root comments with their replies
const rootComments = allComments.filter((c) => !c.parentCommentId);
const repliesByParent = new Map<string, typeof allComments>();
for (const c of allComments) {
if (c.parentCommentId) {
const arr = repliesByParent.get(c.parentCommentId) ?? [];
arr.push(c);
repliesByParent.set(c.parentCommentId, arr);
}
}
const unresolvedThreads = rootComments
.filter((c) => !c.resolved)
.map((c) => ({
id: c.id,
filePath: c.filePath,
lineNumber: c.lineNumber,
body: c.body,
author: c.author,
replies: (repliesByParent.get(c.id) ?? []).map((r) => ({
id: r.id,
body: r.body,
author: r.author,
})),
}));
if (unresolved.length === 0 && !input.summary) {
if (unresolvedThreads.length === 0 && !input.summary) {
throw new TRPCError({
code: 'BAD_REQUEST',
message: 'Add comments or a summary before requesting changes',
@@ -369,7 +438,7 @@ export function phaseProcedures(publicProcedure: ProcedureBuilder) {
const result = await orchestrator.requestChangesOnPhase(
input.phaseId,
unresolved,
unresolvedThreads,
input.summary,
);
return { success: true, taskId: result.taskId };

View File

@@ -2,7 +2,6 @@
* Subscription Router — SSE event streams
*/
import { z } from 'zod';
import type { ProcedureBuilder } from '../trpc.js';
import {
eventBusIterable,
@@ -17,42 +16,40 @@ import {
export function subscriptionProcedures(publicProcedure: ProcedureBuilder) {
return {
onEvent: publicProcedure
.input(z.object({ lastEventId: z.string().nullish() }).optional())
.subscription(async function* (opts) {
const signal = opts.signal ?? new AbortController().signal;
yield* eventBusIterable(opts.ctx.eventBus, ALL_EVENT_TYPES, signal);
}),
onAgentUpdate: publicProcedure
.input(z.object({ lastEventId: z.string().nullish() }).optional())
.subscription(async function* (opts) {
const signal = opts.signal ?? new AbortController().signal;
yield* eventBusIterable(opts.ctx.eventBus, AGENT_EVENT_TYPES, signal);
}),
onTaskUpdate: publicProcedure
.input(z.object({ lastEventId: z.string().nullish() }).optional())
.subscription(async function* (opts) {
const signal = opts.signal ?? new AbortController().signal;
yield* eventBusIterable(opts.ctx.eventBus, TASK_EVENT_TYPES, signal);
}),
onPageUpdate: publicProcedure
.input(z.object({ lastEventId: z.string().nullish() }).optional())
.subscription(async function* (opts) {
const signal = opts.signal ?? new AbortController().signal;
yield* eventBusIterable(opts.ctx.eventBus, PAGE_EVENT_TYPES, signal);
}),
onPreviewUpdate: publicProcedure
.input(z.object({ lastEventId: z.string().nullish() }).optional())
.subscription(async function* (opts) {
const signal = opts.signal ?? new AbortController().signal;
yield* eventBusIterable(opts.ctx.eventBus, PREVIEW_EVENT_TYPES, signal);
}),
// NOTE: No frontend view currently displays inter-agent conversation data.
// When a conversation view is added, add to its useLiveUpdates call:
// { prefix: 'conversation:', invalidate: ['<query-key>'] }
// and add the relevant mutation(s) to INVALIDATION_MAP in apps/web/src/lib/invalidation.ts.
onConversationUpdate: publicProcedure
.input(z.object({ lastEventId: z.string().nullish() }).optional())
.subscription(async function* (opts) {
const signal = opts.signal ?? new AbortController().signal;
yield* eventBusIterable(opts.ctx.eventBus, CONVERSATION_EVENT_TYPES, signal);

View File

@@ -10,6 +10,7 @@ import {
requireInitiativeRepository,
requirePhaseRepository,
requireDispatchManager,
requireChangeSetRepository,
} from './_helpers.js';
export function taskProcedures(publicProcedure: ProcedureBuilder) {
@@ -49,6 +50,14 @@ export function taskProcedures(publicProcedure: ProcedureBuilder) {
message: `Task '${input.id}' not found`,
});
}
// Route through dispatchManager when completing — emits task:completed
// event so the orchestrator can check phase completion and merge branches
if (input.status === 'completed' && ctx.dispatchManager) {
await ctx.dispatchManager.completeTask(input.id);
return (await taskRepository.findById(input.id))!;
}
return taskRepository.update(input.id, { status: input.status });
}),
@@ -58,7 +67,7 @@ export function taskProcedures(publicProcedure: ProcedureBuilder) {
name: z.string().min(1),
description: z.string().optional(),
category: z.enum(['execute', 'research', 'discuss', 'plan', 'detail', 'refine', 'verify', 'merge', 'review']).optional(),
type: z.enum(['auto', 'checkpoint:human-verify', 'checkpoint:decision', 'checkpoint:human-action']).optional(),
type: z.enum(['auto']).optional(),
}))
.mutation(async ({ ctx, input }) => {
const taskRepository = requireTaskRepository(ctx);
@@ -88,7 +97,7 @@ export function taskProcedures(publicProcedure: ProcedureBuilder) {
name: z.string().min(1),
description: z.string().optional(),
category: z.enum(['execute', 'research', 'discuss', 'plan', 'detail', 'refine', 'verify', 'merge', 'review']).optional(),
type: z.enum(['auto', 'checkpoint:human-verify', 'checkpoint:decision', 'checkpoint:human-action']).optional(),
type: z.enum(['auto']).optional(),
}))
.mutation(async ({ ctx, input }) => {
const taskRepository = requireTaskRepository(ctx);
@@ -152,6 +161,29 @@ export function taskProcedures(publicProcedure: ProcedureBuilder) {
.mutation(async ({ ctx, input }) => {
const taskRepository = requireTaskRepository(ctx);
await taskRepository.delete(input.id);
// Reconcile any applied changesets that created this task.
// If all created tasks in a changeset are now deleted, mark it reverted.
if (ctx.changeSetRepository) {
try {
const csRepo = requireChangeSetRepository(ctx);
const affectedChangeSets = await csRepo.findAppliedByCreatedEntity('task', input.id);
for (const cs of affectedChangeSets) {
const createdTaskIds = cs.entries
.filter(e => e.entityType === 'task' && e.action === 'create')
.map(e => e.entityId);
const survivingTasks = await Promise.all(
createdTaskIds.map(id => taskRepository.findById(id)),
);
if (survivingTasks.every(t => t === null)) {
await csRepo.markReverted(cs.id);
}
}
} catch {
// Best-effort reconciliation — don't fail the delete
}
}
return { success: true };
}),