/** * Backfill script for agent_metrics table. * * Reads all existing agent_log_chunks rows and populates agent_metrics with * accumulated counts of questions, subagent spawns, and compaction events. * * Intended to be run once per production database after applying the migration * that introduces the agent_metrics table. * * Idempotency note: Uses ON CONFLICT DO UPDATE with additive increments to match * the ongoing insertChunk write-path behavior. Running against an empty * agent_metrics table is fully safe. Running a second time will double-count — * only run this script once per database, immediately after applying the migration. */ import { asc, sql } from 'drizzle-orm'; import { createDatabase, DrizzleDatabase, agentLogChunks, agentMetrics } from '../db/index.js'; const BATCH_SIZE = 500; const LOG_EVERY = 1000; /** * Core backfill function. Accepts a DrizzleDatabase for testability. */ export async function backfillMetrics(db: DrizzleDatabase): Promise { const accumulator = new Map(); let offset = 0; let totalChunks = 0; let malformedCount = 0; while (true) { const batch = await db .select({ agentId: agentLogChunks.agentId, content: agentLogChunks.content }) .from(agentLogChunks) .orderBy(asc(agentLogChunks.createdAt)) .limit(BATCH_SIZE) .offset(offset); if (batch.length === 0) break; for (const chunk of batch) { let parsed: unknown; try { parsed = JSON.parse(chunk.content); } catch { malformedCount++; totalChunks++; if (totalChunks % LOG_EVERY === 0) { console.log(`Processed ${totalChunks} chunks...`); } continue; } if (typeof parsed !== 'object' || parsed === null) { totalChunks++; if (totalChunks % LOG_EVERY === 0) { console.log(`Processed ${totalChunks} chunks...`); } continue; } const obj = parsed as Record; const type = obj['type']; const name = obj['name']; if (type === 'tool_use' && name === 'AskUserQuestion') { const input = obj['input'] as Record | undefined; const questions = input?.['questions']; const count = Array.isArray(questions) ? questions.length : 0; if (count > 0) { const entry = accumulator.get(chunk.agentId) ?? { questionsCount: 0, subagentsCount: 0, compactionsCount: 0 }; entry.questionsCount += count; accumulator.set(chunk.agentId, entry); } } else if (type === 'tool_use' && name === 'Agent') { const entry = accumulator.get(chunk.agentId) ?? { questionsCount: 0, subagentsCount: 0, compactionsCount: 0 }; entry.subagentsCount += 1; accumulator.set(chunk.agentId, entry); } else if (type === 'system' && obj['subtype'] === 'init' && obj['source'] === 'compact') { const entry = accumulator.get(chunk.agentId) ?? { questionsCount: 0, subagentsCount: 0, compactionsCount: 0 }; entry.compactionsCount += 1; accumulator.set(chunk.agentId, entry); } totalChunks++; if (totalChunks % LOG_EVERY === 0) { console.log(`Processed ${totalChunks} chunks...`); } } offset += BATCH_SIZE; } // Upsert accumulated counts into agent_metrics. // Uses additive ON CONFLICT DO UPDATE to match the ongoing insertChunk behavior. for (const [agentId, counts] of accumulator) { await db .insert(agentMetrics) .values({ agentId, questionsCount: counts.questionsCount, subagentsCount: counts.subagentsCount, compactionsCount: counts.compactionsCount, updatedAt: new Date(), }) .onConflictDoUpdate({ target: agentMetrics.agentId, set: { questionsCount: sql`${agentMetrics.questionsCount} + ${counts.questionsCount}`, subagentsCount: sql`${agentMetrics.subagentsCount} + ${counts.subagentsCount}`, compactionsCount: sql`${agentMetrics.compactionsCount} + ${counts.compactionsCount}`, updatedAt: new Date(), }, }); } console.log( `Backfill complete: ${accumulator.size} agents updated, ${totalChunks} chunks processed, ${malformedCount} malformed chunks skipped` ); } /** * CLI wrapper — opens a database from a path, then delegates to backfillMetrics. */ export async function backfillMetricsFromPath(dbPath: string): Promise { const db = createDatabase(dbPath); await backfillMetrics(db); }