fix: Spawn chat agent in background to avoid blocking event loop
sendChatMessage was awaiting the full agentManager.spawn() which includes worktree creation, ~50 writeFileSync calls for input files, credential setup, and process spawning. This blocked the Node.js event loop long enough to cause ECONNREFUSED on the SSE connection. Now the mutation returns immediately after storing the user message and creating the task. The heavy spawn work runs in a detached promise. On failure, a system message is stored so the UI can display the error.
This commit is contained in:
@@ -8,8 +8,6 @@ import type { ProcedureBuilder } from '../trpc.js';
|
||||
import {
|
||||
requireAgentManager,
|
||||
requireInitiativeRepository,
|
||||
requirePhaseRepository,
|
||||
requirePageRepository,
|
||||
requireTaskRepository,
|
||||
requireChatSessionRepository,
|
||||
} from './_helpers.js';
|
||||
@@ -83,22 +81,12 @@ export function chatSessionProcedures(publicProcedure: ProcedureBuilder) {
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn fresh agent with chat history + context
|
||||
// Gather data needed for spawn (fast DB reads) before returning
|
||||
const messages = await chatRepo.findMessagesBySessionId(session.id);
|
||||
const chatHistory: ChatHistoryEntry[] = messages
|
||||
.slice(-MAX_HISTORY_MESSAGES)
|
||||
.map(m => ({ role: m.role as 'user' | 'assistant' | 'system', content: m.content }));
|
||||
|
||||
const context = await gatherInitiativeContext(
|
||||
ctx.phaseRepository,
|
||||
ctx.taskRepository,
|
||||
ctx.pageRepository,
|
||||
input.initiativeId,
|
||||
);
|
||||
|
||||
const prompt = buildChatPrompt(input.targetType, input.targetId, chatHistory, input.message);
|
||||
|
||||
// Create a task for the chat agent
|
||||
const targetName = input.targetType === 'phase'
|
||||
? (await ctx.phaseRepository?.findById(input.targetId))?.name ?? input.targetId
|
||||
: (await ctx.taskRepository?.findById(input.targetId))?.name ?? input.targetId;
|
||||
@@ -111,34 +99,68 @@ export function chatSessionProcedures(publicProcedure: ProcedureBuilder) {
|
||||
status: 'in_progress',
|
||||
});
|
||||
|
||||
// Determine target phase/task for input context
|
||||
const targetPhase = input.targetType === 'phase'
|
||||
? await ctx.phaseRepository?.findById(input.targetId)
|
||||
: undefined;
|
||||
const targetTask = input.targetType === 'task'
|
||||
? await ctx.taskRepository?.findById(input.targetId)
|
||||
: undefined;
|
||||
const prompt = buildChatPrompt(input.targetType, input.targetId, chatHistory, input.message);
|
||||
|
||||
const agent = await agentManager.spawn({
|
||||
taskId: task.id,
|
||||
prompt,
|
||||
mode: 'chat',
|
||||
provider: input.provider,
|
||||
initiativeId: input.initiativeId,
|
||||
inputContext: {
|
||||
initiative,
|
||||
phase: targetPhase ?? undefined,
|
||||
task: targetTask ?? undefined,
|
||||
pages: context.pages.length > 0 ? context.pages : undefined,
|
||||
phases: context.phases.length > 0 ? context.phases : undefined,
|
||||
tasks: context.tasks.length > 0 ? context.tasks : undefined,
|
||||
},
|
||||
});
|
||||
// Spawn agent in background — worktree creation + file I/O is heavy
|
||||
// and blocks the event loop, causing ECONNREFUSED on the SSE connection.
|
||||
// The UI detects the agent via SSE events (agent:spawned) + query invalidation.
|
||||
const sessionId = session.id;
|
||||
void (async () => {
|
||||
try {
|
||||
const context = await gatherInitiativeContext(
|
||||
ctx.phaseRepository,
|
||||
ctx.taskRepository,
|
||||
ctx.pageRepository,
|
||||
input.initiativeId,
|
||||
);
|
||||
|
||||
// Link agent to session
|
||||
await chatRepo.updateSession(session.id, { agentId: agent.id });
|
||||
const targetPhase = input.targetType === 'phase'
|
||||
? await ctx.phaseRepository?.findById(input.targetId)
|
||||
: undefined;
|
||||
const targetTask = input.targetType === 'task'
|
||||
? await ctx.taskRepository?.findById(input.targetId)
|
||||
: undefined;
|
||||
|
||||
return { sessionId: session.id, agentId: agent.id, action: 'spawned' as const };
|
||||
const agent = await agentManager.spawn({
|
||||
taskId: task.id,
|
||||
prompt,
|
||||
mode: 'chat',
|
||||
provider: input.provider,
|
||||
initiativeId: input.initiativeId,
|
||||
inputContext: {
|
||||
initiative,
|
||||
phase: targetPhase ?? undefined,
|
||||
task: targetTask ?? undefined,
|
||||
pages: context.pages.length > 0 ? context.pages : undefined,
|
||||
phases: context.phases.length > 0 ? context.phases : undefined,
|
||||
tasks: context.tasks.length > 0 ? context.tasks : undefined,
|
||||
},
|
||||
});
|
||||
|
||||
await chatRepo.updateSession(sessionId, { agentId: agent.id });
|
||||
|
||||
ctx.eventBus.emit({
|
||||
type: 'chat:message_created' as const,
|
||||
timestamp: new Date(),
|
||||
payload: { chatSessionId: sessionId, role: 'system' as const },
|
||||
});
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
// Store error as system message so the UI can display it
|
||||
await chatRepo.createMessage({
|
||||
chatSessionId: sessionId,
|
||||
role: 'system',
|
||||
content: `Failed to start agent: ${msg}`,
|
||||
});
|
||||
ctx.eventBus.emit({
|
||||
type: 'chat:message_created' as const,
|
||||
timestamp: new Date(),
|
||||
payload: { chatSessionId: sessionId, role: 'system' as const },
|
||||
});
|
||||
}
|
||||
})();
|
||||
|
||||
return { sessionId: session.id, agentId: null, action: 'spawned' as const };
|
||||
}),
|
||||
|
||||
getChatSession: publicProcedure
|
||||
|
||||
Reference in New Issue
Block a user