From 8ee02f60131bc8b2ecaba66919b054d62599db42 Mon Sep 17 00:00:00 2001 From: Lukas May Date: Wed, 4 Mar 2026 12:05:21 +0100 Subject: [PATCH] fix: Spawn chat agent in background to avoid blocking event loop sendChatMessage was awaiting the full agentManager.spawn() which includes worktree creation, ~50 writeFileSync calls for input files, credential setup, and process spawning. This blocked the Node.js event loop long enough to cause ECONNREFUSED on the SSE connection. Now the mutation returns immediately after storing the user message and creating the task. The heavy spawn work runs in a detached promise. On failure, a system message is stored so the UI can display the error. --- apps/server/trpc/routers/chat-session.ts | 98 +++++++++++++++--------- 1 file changed, 60 insertions(+), 38 deletions(-) diff --git a/apps/server/trpc/routers/chat-session.ts b/apps/server/trpc/routers/chat-session.ts index 6f3fb5e..9c2625e 100644 --- a/apps/server/trpc/routers/chat-session.ts +++ b/apps/server/trpc/routers/chat-session.ts @@ -8,8 +8,6 @@ import type { ProcedureBuilder } from '../trpc.js'; import { requireAgentManager, requireInitiativeRepository, - requirePhaseRepository, - requirePageRepository, requireTaskRepository, requireChatSessionRepository, } from './_helpers.js'; @@ -83,22 +81,12 @@ export function chatSessionProcedures(publicProcedure: ProcedureBuilder) { } } - // Spawn fresh agent with chat history + context + // Gather data needed for spawn (fast DB reads) before returning const messages = await chatRepo.findMessagesBySessionId(session.id); const chatHistory: ChatHistoryEntry[] = messages .slice(-MAX_HISTORY_MESSAGES) .map(m => ({ role: m.role as 'user' | 'assistant' | 'system', content: m.content })); - const context = await gatherInitiativeContext( - ctx.phaseRepository, - ctx.taskRepository, - ctx.pageRepository, - input.initiativeId, - ); - - const prompt = buildChatPrompt(input.targetType, input.targetId, chatHistory, input.message); - - // Create a task for the chat agent const targetName = input.targetType === 'phase' ? (await ctx.phaseRepository?.findById(input.targetId))?.name ?? input.targetId : (await ctx.taskRepository?.findById(input.targetId))?.name ?? input.targetId; @@ -111,34 +99,68 @@ export function chatSessionProcedures(publicProcedure: ProcedureBuilder) { status: 'in_progress', }); - // Determine target phase/task for input context - const targetPhase = input.targetType === 'phase' - ? await ctx.phaseRepository?.findById(input.targetId) - : undefined; - const targetTask = input.targetType === 'task' - ? await ctx.taskRepository?.findById(input.targetId) - : undefined; + const prompt = buildChatPrompt(input.targetType, input.targetId, chatHistory, input.message); - const agent = await agentManager.spawn({ - taskId: task.id, - prompt, - mode: 'chat', - provider: input.provider, - initiativeId: input.initiativeId, - inputContext: { - initiative, - phase: targetPhase ?? undefined, - task: targetTask ?? undefined, - pages: context.pages.length > 0 ? context.pages : undefined, - phases: context.phases.length > 0 ? context.phases : undefined, - tasks: context.tasks.length > 0 ? context.tasks : undefined, - }, - }); + // Spawn agent in background — worktree creation + file I/O is heavy + // and blocks the event loop, causing ECONNREFUSED on the SSE connection. + // The UI detects the agent via SSE events (agent:spawned) + query invalidation. + const sessionId = session.id; + void (async () => { + try { + const context = await gatherInitiativeContext( + ctx.phaseRepository, + ctx.taskRepository, + ctx.pageRepository, + input.initiativeId, + ); - // Link agent to session - await chatRepo.updateSession(session.id, { agentId: agent.id }); + const targetPhase = input.targetType === 'phase' + ? await ctx.phaseRepository?.findById(input.targetId) + : undefined; + const targetTask = input.targetType === 'task' + ? await ctx.taskRepository?.findById(input.targetId) + : undefined; - return { sessionId: session.id, agentId: agent.id, action: 'spawned' as const }; + const agent = await agentManager.spawn({ + taskId: task.id, + prompt, + mode: 'chat', + provider: input.provider, + initiativeId: input.initiativeId, + inputContext: { + initiative, + phase: targetPhase ?? undefined, + task: targetTask ?? undefined, + pages: context.pages.length > 0 ? context.pages : undefined, + phases: context.phases.length > 0 ? context.phases : undefined, + tasks: context.tasks.length > 0 ? context.tasks : undefined, + }, + }); + + await chatRepo.updateSession(sessionId, { agentId: agent.id }); + + ctx.eventBus.emit({ + type: 'chat:message_created' as const, + timestamp: new Date(), + payload: { chatSessionId: sessionId, role: 'system' as const }, + }); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + // Store error as system message so the UI can display it + await chatRepo.createMessage({ + chatSessionId: sessionId, + role: 'system', + content: `Failed to start agent: ${msg}`, + }); + ctx.eventBus.emit({ + type: 'chat:message_created' as const, + timestamp: new Date(), + payload: { chatSessionId: sessionId, role: 'system' as const }, + }); + } + })(); + + return { sessionId: session.id, agentId: null, action: 'spawned' as const }; }), getChatSession: publicProcedure