Merge branch 'cw/radar-screen-performance' into cw-merge-1772829950184
This commit is contained in:
@@ -13,6 +13,8 @@ import { createDefaultTrpcClient } from './trpc-client.js';
|
||||
import { createContainer } from '../container.js';
|
||||
import { findWorkspaceRoot, writeCwrc, defaultCwConfig } from '../config/index.js';
|
||||
import { createModuleLogger } from '../logger/index.js';
|
||||
import { backfillMetricsFromPath } from '../scripts/backfill-metrics.js';
|
||||
import { getDbPath } from '../db/index.js';
|
||||
|
||||
/** Environment variable for custom port */
|
||||
const CW_PORT_ENV = 'CW_PORT';
|
||||
@@ -134,6 +136,22 @@ export function createCli(serverHandler?: (port?: number) => Promise<void>): Com
|
||||
}
|
||||
});
|
||||
|
||||
// Backfill metrics command (standalone — no server, no tRPC)
|
||||
program
|
||||
.command('backfill-metrics')
|
||||
.description('Populate agent_metrics table from existing agent_log_chunks (run once after upgrading)')
|
||||
.option('--db <path>', 'Path to the SQLite database file (defaults to configured DB path)')
|
||||
.action(async (options: { db?: string }) => {
|
||||
const dbPath = options.db ?? getDbPath();
|
||||
console.log(`Backfilling metrics from ${dbPath}...`);
|
||||
try {
|
||||
await backfillMetricsFromPath(dbPath);
|
||||
} catch (error) {
|
||||
console.error('Backfill failed:', (error as Error).message);
|
||||
process.exit(1);
|
||||
}
|
||||
});
|
||||
|
||||
// Agent command group
|
||||
const agentCommand = program
|
||||
.command('agent')
|
||||
|
||||
48
apps/server/db/repositories/drizzle/agent-metrics.test.ts
Normal file
48
apps/server/db/repositories/drizzle/agent-metrics.test.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import { createTestDatabase } from './test-helpers.js';
|
||||
import { agentMetrics } from '../../schema.js';
|
||||
|
||||
describe('agentMetrics table', () => {
|
||||
it('select from empty agentMetrics returns []', async () => {
|
||||
const db = createTestDatabase();
|
||||
const rows = await db.select().from(agentMetrics);
|
||||
expect(rows).toEqual([]);
|
||||
});
|
||||
|
||||
it('insert and select a metrics row round-trips correctly', async () => {
|
||||
const db = createTestDatabase();
|
||||
await db.insert(agentMetrics).values({
|
||||
agentId: 'agent-abc',
|
||||
questionsCount: 3,
|
||||
subagentsCount: 1,
|
||||
compactionsCount: 0,
|
||||
updatedAt: new Date('2024-01-01T00:00:00Z'),
|
||||
});
|
||||
const rows = await db.select().from(agentMetrics);
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].agentId).toBe('agent-abc');
|
||||
expect(rows[0].questionsCount).toBe(3);
|
||||
expect(rows[0].subagentsCount).toBe(1);
|
||||
expect(rows[0].compactionsCount).toBe(0);
|
||||
});
|
||||
|
||||
it('agentId is primary key — duplicate insert throws', async () => {
|
||||
const db = createTestDatabase();
|
||||
await db.insert(agentMetrics).values({
|
||||
agentId: 'agent-dup',
|
||||
questionsCount: 0,
|
||||
subagentsCount: 0,
|
||||
compactionsCount: 0,
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
await expect(
|
||||
db.insert(agentMetrics).values({
|
||||
agentId: 'agent-dup',
|
||||
questionsCount: 1,
|
||||
subagentsCount: 0,
|
||||
compactionsCount: 0,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
129
apps/server/db/repositories/drizzle/log-chunk.test.ts
Normal file
129
apps/server/db/repositories/drizzle/log-chunk.test.ts
Normal file
@@ -0,0 +1,129 @@
|
||||
import { describe, it, expect, beforeEach } from 'vitest';
|
||||
import { DrizzleLogChunkRepository } from './log-chunk.js';
|
||||
import { createTestDatabase } from './test-helpers.js';
|
||||
import type { DrizzleDatabase } from '../../index.js';
|
||||
|
||||
describe('DrizzleLogChunkRepository', () => {
|
||||
let db: DrizzleDatabase;
|
||||
let repo: DrizzleLogChunkRepository;
|
||||
const testAgentId = 'agent-test-001';
|
||||
|
||||
beforeEach(() => {
|
||||
db = createTestDatabase();
|
||||
repo = new DrizzleLogChunkRepository(db);
|
||||
});
|
||||
|
||||
it('AskUserQuestion chunk — questionsCount upserted correctly', async () => {
|
||||
await repo.insertChunk({
|
||||
agentId: testAgentId,
|
||||
agentName: 'test-agent',
|
||||
sessionNumber: 1,
|
||||
content: JSON.stringify({ type: 'tool_use', name: 'AskUserQuestion', input: { questions: [{}, {}] } }),
|
||||
});
|
||||
const metrics = await repo.findMetricsByAgentIds([testAgentId]);
|
||||
expect(metrics).toEqual([{
|
||||
agentId: testAgentId,
|
||||
questionsCount: 2,
|
||||
subagentsCount: 0,
|
||||
compactionsCount: 0,
|
||||
}]);
|
||||
});
|
||||
|
||||
it('Agent tool chunk — subagentsCount incremented', async () => {
|
||||
await repo.insertChunk({
|
||||
agentId: testAgentId,
|
||||
agentName: 'test-agent',
|
||||
sessionNumber: 1,
|
||||
content: JSON.stringify({ type: 'tool_use', name: 'Agent' }),
|
||||
});
|
||||
const metrics = await repo.findMetricsByAgentIds([testAgentId]);
|
||||
expect(metrics).toEqual([{
|
||||
agentId: testAgentId,
|
||||
questionsCount: 0,
|
||||
subagentsCount: 1,
|
||||
compactionsCount: 0,
|
||||
}]);
|
||||
});
|
||||
|
||||
it('Compaction event — compactionsCount incremented', async () => {
|
||||
await repo.insertChunk({
|
||||
agentId: testAgentId,
|
||||
agentName: 'test-agent',
|
||||
sessionNumber: 1,
|
||||
content: JSON.stringify({ type: 'system', subtype: 'init', source: 'compact' }),
|
||||
});
|
||||
const metrics = await repo.findMetricsByAgentIds([testAgentId]);
|
||||
expect(metrics).toEqual([{
|
||||
agentId: testAgentId,
|
||||
questionsCount: 0,
|
||||
subagentsCount: 0,
|
||||
compactionsCount: 1,
|
||||
}]);
|
||||
});
|
||||
|
||||
it('Irrelevant chunk type — no metrics row created', async () => {
|
||||
await repo.insertChunk({
|
||||
agentId: testAgentId,
|
||||
agentName: 'test-agent',
|
||||
sessionNumber: 1,
|
||||
content: JSON.stringify({ type: 'text', text: 'hello' }),
|
||||
});
|
||||
const metrics = await repo.findMetricsByAgentIds([testAgentId]);
|
||||
expect(metrics).toEqual([]);
|
||||
});
|
||||
|
||||
it('Malformed JSON chunk — chunk persisted, metrics row absent', async () => {
|
||||
await repo.insertChunk({
|
||||
agentId: testAgentId,
|
||||
agentName: 'test-agent',
|
||||
sessionNumber: 1,
|
||||
content: 'not-valid-json',
|
||||
});
|
||||
const chunks = await repo.findByAgentId(testAgentId);
|
||||
expect(chunks).toHaveLength(1);
|
||||
const metrics = await repo.findMetricsByAgentIds([testAgentId]);
|
||||
expect(metrics).toEqual([]);
|
||||
});
|
||||
|
||||
it('Multiple inserts, same agent — counts accumulate additively', async () => {
|
||||
// 3 Agent tool chunks
|
||||
for (let i = 0; i < 3; i++) {
|
||||
await repo.insertChunk({
|
||||
agentId: testAgentId,
|
||||
agentName: 'test-agent',
|
||||
sessionNumber: 1,
|
||||
content: JSON.stringify({ type: 'tool_use', name: 'Agent' }),
|
||||
});
|
||||
}
|
||||
// 1 AskUserQuestion with 2 questions
|
||||
await repo.insertChunk({
|
||||
agentId: testAgentId,
|
||||
agentName: 'test-agent',
|
||||
sessionNumber: 1,
|
||||
content: JSON.stringify({ type: 'tool_use', name: 'AskUserQuestion', input: { questions: [{}, {}] } }),
|
||||
});
|
||||
const metrics = await repo.findMetricsByAgentIds([testAgentId]);
|
||||
expect(metrics).toEqual([{
|
||||
agentId: testAgentId,
|
||||
questionsCount: 2,
|
||||
subagentsCount: 3,
|
||||
compactionsCount: 0,
|
||||
}]);
|
||||
});
|
||||
|
||||
it('findMetricsByAgentIds with empty array — returns []', async () => {
|
||||
const metrics = await repo.findMetricsByAgentIds([]);
|
||||
expect(metrics).toEqual([]);
|
||||
});
|
||||
|
||||
it('findMetricsByAgentIds with agentId that has no metrics row — returns []', async () => {
|
||||
await repo.insertChunk({
|
||||
agentId: testAgentId,
|
||||
agentName: 'test-agent',
|
||||
sessionNumber: 1,
|
||||
content: JSON.stringify({ type: 'text', text: 'hello' }),
|
||||
});
|
||||
const metrics = await repo.findMetricsByAgentIds([testAgentId]);
|
||||
expect(metrics).toEqual([]);
|
||||
});
|
||||
});
|
||||
@@ -4,10 +4,10 @@
|
||||
* Implements LogChunkRepository interface using Drizzle ORM.
|
||||
*/
|
||||
|
||||
import { eq, asc, max, inArray } from 'drizzle-orm';
|
||||
import { eq, asc, max, inArray, sql } from 'drizzle-orm';
|
||||
import { nanoid } from 'nanoid';
|
||||
import type { DrizzleDatabase } from '../../index.js';
|
||||
import { agentLogChunks } from '../../schema.js';
|
||||
import { agentLogChunks, agentMetrics } from '../../schema.js';
|
||||
import type { LogChunkRepository } from '../log-chunk-repository.js';
|
||||
|
||||
export class DrizzleLogChunkRepository implements LogChunkRepository {
|
||||
@@ -19,13 +19,58 @@ export class DrizzleLogChunkRepository implements LogChunkRepository {
|
||||
sessionNumber: number;
|
||||
content: string;
|
||||
}): Promise<void> {
|
||||
await this.db.insert(agentLogChunks).values({
|
||||
id: nanoid(),
|
||||
agentId: data.agentId,
|
||||
agentName: data.agentName,
|
||||
sessionNumber: data.sessionNumber,
|
||||
content: data.content,
|
||||
createdAt: new Date(),
|
||||
// better-sqlite3 is synchronous — transaction callback must be sync, use .run() not await
|
||||
this.db.transaction((tx) => {
|
||||
// 1. Always insert the chunk row first
|
||||
tx.insert(agentLogChunks).values({
|
||||
id: nanoid(),
|
||||
agentId: data.agentId,
|
||||
agentName: data.agentName,
|
||||
sessionNumber: data.sessionNumber,
|
||||
content: data.content,
|
||||
createdAt: new Date(),
|
||||
}).run();
|
||||
|
||||
// 2. Parse content and determine metric increments
|
||||
// Wrap only the parse + upsert block — chunk insert is not rolled back on parse failure
|
||||
try {
|
||||
const parsed = JSON.parse(data.content);
|
||||
let deltaQuestions = 0;
|
||||
let deltaSubagents = 0;
|
||||
let deltaCompactions = 0;
|
||||
|
||||
if (parsed.type === 'tool_use' && parsed.name === 'AskUserQuestion') {
|
||||
deltaQuestions = parsed.input?.questions?.length ?? 0;
|
||||
} else if (parsed.type === 'tool_use' && parsed.name === 'Agent') {
|
||||
deltaSubagents = 1;
|
||||
} else if (parsed.type === 'system' && parsed.subtype === 'init' && parsed.source === 'compact') {
|
||||
deltaCompactions = 1;
|
||||
}
|
||||
|
||||
// 3. Only upsert if there is something to increment
|
||||
if (deltaQuestions > 0 || deltaSubagents > 0 || deltaCompactions > 0) {
|
||||
tx.insert(agentMetrics)
|
||||
.values({
|
||||
agentId: data.agentId,
|
||||
questionsCount: deltaQuestions,
|
||||
subagentsCount: deltaSubagents,
|
||||
compactionsCount: deltaCompactions,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: agentMetrics.agentId,
|
||||
set: {
|
||||
questionsCount: sql`${agentMetrics.questionsCount} + ${deltaQuestions}`,
|
||||
subagentsCount: sql`${agentMetrics.subagentsCount} + ${deltaSubagents}`,
|
||||
compactionsCount: sql`${agentMetrics.compactionsCount} + ${deltaCompactions}`,
|
||||
updatedAt: new Date(),
|
||||
},
|
||||
})
|
||||
.run();
|
||||
}
|
||||
} catch {
|
||||
// Malformed JSON — skip metric upsert, chunk insert already committed within transaction
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -69,4 +114,22 @@ export class DrizzleLogChunkRepository implements LogChunkRepository {
|
||||
|
||||
return result[0]?.maxSession ?? 0;
|
||||
}
|
||||
|
||||
async findMetricsByAgentIds(agentIds: string[]): Promise<{
|
||||
agentId: string;
|
||||
questionsCount: number;
|
||||
subagentsCount: number;
|
||||
compactionsCount: number;
|
||||
}[]> {
|
||||
if (agentIds.length === 0) return [];
|
||||
return this.db
|
||||
.select({
|
||||
agentId: agentMetrics.agentId,
|
||||
questionsCount: agentMetrics.questionsCount,
|
||||
subagentsCount: agentMetrics.subagentsCount,
|
||||
compactionsCount: agentMetrics.compactionsCount,
|
||||
})
|
||||
.from(agentMetrics)
|
||||
.where(inArray(agentMetrics.agentId, agentIds));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,4 +27,16 @@ export interface LogChunkRepository {
|
||||
deleteByAgentId(agentId: string): Promise<void>;
|
||||
|
||||
getSessionCount(agentId: string): Promise<number>;
|
||||
|
||||
/**
|
||||
* Batch-fetch pre-computed metrics for multiple agent IDs.
|
||||
* Returns one row per agent that has metrics. Agents with no
|
||||
* matching row in agent_metrics are omitted (not returned as zeros).
|
||||
*/
|
||||
findMetricsByAgentIds(agentIds: string[]): Promise<{
|
||||
agentId: string;
|
||||
questionsCount: number;
|
||||
subagentsCount: number;
|
||||
compactionsCount: number;
|
||||
}[]>;
|
||||
}
|
||||
|
||||
@@ -513,6 +513,21 @@ export const agentLogChunks = sqliteTable('agent_log_chunks', {
|
||||
export type AgentLogChunk = InferSelectModel<typeof agentLogChunks>;
|
||||
export type NewAgentLogChunk = InferInsertModel<typeof agentLogChunks>;
|
||||
|
||||
// ============================================================================
|
||||
// AGENT METRICS
|
||||
// ============================================================================
|
||||
|
||||
export const agentMetrics = sqliteTable('agent_metrics', {
|
||||
agentId: text('agent_id').primaryKey(),
|
||||
questionsCount: integer('questions_count').notNull().default(0),
|
||||
subagentsCount: integer('subagents_count').notNull().default(0),
|
||||
compactionsCount: integer('compactions_count').notNull().default(0),
|
||||
updatedAt: integer('updated_at', { mode: 'timestamp' }).notNull(),
|
||||
});
|
||||
|
||||
export type AgentMetrics = InferSelectModel<typeof agentMetrics>;
|
||||
export type NewAgentMetrics = InferInsertModel<typeof agentMetrics>;
|
||||
|
||||
// ============================================================================
|
||||
// CONVERSATIONS (inter-agent communication)
|
||||
// ============================================================================
|
||||
|
||||
7
apps/server/drizzle/0037_eager_devos.sql
Normal file
7
apps/server/drizzle/0037_eager_devos.sql
Normal file
@@ -0,0 +1,7 @@
|
||||
CREATE TABLE `agent_metrics` (
|
||||
`agent_id` text PRIMARY KEY NOT NULL,
|
||||
`questions_count` integer DEFAULT 0 NOT NULL,
|
||||
`subagents_count` integer DEFAULT 0 NOT NULL,
|
||||
`compactions_count` integer DEFAULT 0 NOT NULL,
|
||||
`updated_at` integer NOT NULL
|
||||
);
|
||||
@@ -2,7 +2,7 @@
|
||||
"version": "6",
|
||||
"dialect": "sqlite",
|
||||
"id": "c84e499f-7df8-4091-b2a5-6b12847898bd",
|
||||
"prevId": "5fbe1151-1dfb-4b0c-a7fa-2177369543fd",
|
||||
"prevId": "443071fe-31d6-498a-9f4a-4a3ff24a46fc",
|
||||
"tables": {
|
||||
"accounts": {
|
||||
"name": "accounts",
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
2029
apps/server/drizzle/meta/0037_snapshot.json
Normal file
2029
apps/server/drizzle/meta/0037_snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -260,6 +260,13 @@
|
||||
"when": 1772798869413,
|
||||
"tag": "0036_icy_silvermane",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 37,
|
||||
"version": "6",
|
||||
"when": 1772828694292,
|
||||
"tag": "0037_eager_devos",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
131
apps/server/scripts/backfill-metrics.test.ts
Normal file
131
apps/server/scripts/backfill-metrics.test.ts
Normal file
@@ -0,0 +1,131 @@
|
||||
/**
|
||||
* Tests for the backfill-metrics script.
|
||||
*
|
||||
* Uses an in-memory test database to verify that backfillMetrics correctly
|
||||
* accumulates counts from agent_log_chunks and upserts into agent_metrics.
|
||||
*/
|
||||
|
||||
import { describe, it, expect, beforeEach } from 'vitest';
|
||||
import { createTestDatabase } from '../db/repositories/drizzle/test-helpers.js';
|
||||
import type { DrizzleDatabase } from '../db/index.js';
|
||||
import { agentLogChunks, agentMetrics } from '../db/index.js';
|
||||
import { backfillMetrics } from './backfill-metrics.js';
|
||||
import { nanoid } from 'nanoid';
|
||||
import { eq } from 'drizzle-orm';
|
||||
|
||||
async function insertChunk(db: DrizzleDatabase, agentId: string, content: object | string) {
|
||||
await db.insert(agentLogChunks).values({
|
||||
id: nanoid(),
|
||||
agentId,
|
||||
agentName: 'test-agent',
|
||||
sessionNumber: 1,
|
||||
content: typeof content === 'string' ? content : JSON.stringify(content),
|
||||
createdAt: new Date(),
|
||||
});
|
||||
}
|
||||
|
||||
describe('backfillMetrics', () => {
|
||||
let db: DrizzleDatabase;
|
||||
|
||||
beforeEach(() => {
|
||||
db = createTestDatabase();
|
||||
});
|
||||
|
||||
it('AskUserQuestion chunks — questionsCount correct', async () => {
|
||||
await insertChunk(db, 'agent-a', { type: 'tool_use', name: 'AskUserQuestion', input: { questions: [{}, {}] } });
|
||||
await insertChunk(db, 'agent-a', { type: 'tool_use', name: 'AskUserQuestion', input: { questions: [{}] } });
|
||||
|
||||
await backfillMetrics(db);
|
||||
|
||||
const rows = await db.select().from(agentMetrics).where(eq(agentMetrics.agentId, 'agent-a'));
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].questionsCount).toBe(3);
|
||||
expect(rows[0].subagentsCount).toBe(0);
|
||||
expect(rows[0].compactionsCount).toBe(0);
|
||||
});
|
||||
|
||||
it('Agent tool chunks — subagentsCount correct', async () => {
|
||||
await insertChunk(db, 'agent-b', { type: 'tool_use', name: 'Agent' });
|
||||
await insertChunk(db, 'agent-b', { type: 'tool_use', name: 'Agent' });
|
||||
|
||||
await backfillMetrics(db);
|
||||
|
||||
const rows = await db.select().from(agentMetrics).where(eq(agentMetrics.agentId, 'agent-b'));
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].questionsCount).toBe(0);
|
||||
expect(rows[0].subagentsCount).toBe(2);
|
||||
expect(rows[0].compactionsCount).toBe(0);
|
||||
});
|
||||
|
||||
it('Compaction chunks — compactionsCount correct', async () => {
|
||||
await insertChunk(db, 'agent-c', { type: 'system', subtype: 'init', source: 'compact' });
|
||||
|
||||
await backfillMetrics(db);
|
||||
|
||||
const rows = await db.select().from(agentMetrics).where(eq(agentMetrics.agentId, 'agent-c'));
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].questionsCount).toBe(0);
|
||||
expect(rows[0].subagentsCount).toBe(0);
|
||||
expect(rows[0].compactionsCount).toBe(1);
|
||||
});
|
||||
|
||||
it('Irrelevant chunk type — no metrics row created', async () => {
|
||||
await insertChunk(db, 'agent-d', { type: 'text', text: 'hello' });
|
||||
|
||||
await backfillMetrics(db);
|
||||
|
||||
const rows = await db.select().from(agentMetrics).where(eq(agentMetrics.agentId, 'agent-d'));
|
||||
expect(rows).toEqual([]);
|
||||
});
|
||||
|
||||
it('Malformed JSON chunk — skipped, no crash', async () => {
|
||||
await insertChunk(db, 'agent-e', 'not-valid-json');
|
||||
await insertChunk(db, 'agent-e', { type: 'tool_use', name: 'Agent' });
|
||||
|
||||
await expect(backfillMetrics(db)).resolves.not.toThrow();
|
||||
|
||||
const rows = await db.select().from(agentMetrics).where(eq(agentMetrics.agentId, 'agent-e'));
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].subagentsCount).toBe(1);
|
||||
});
|
||||
|
||||
it('Multiple agents — counts isolated per agent', async () => {
|
||||
await insertChunk(db, 'agent-f', { type: 'tool_use', name: 'AskUserQuestion', input: { questions: [{}, {}, {}] } });
|
||||
await insertChunk(db, 'agent-f', { type: 'tool_use', name: 'AskUserQuestion', input: { questions: [{}, {}, {}] } });
|
||||
await insertChunk(db, 'agent-g', { type: 'tool_use', name: 'Agent' });
|
||||
|
||||
await backfillMetrics(db);
|
||||
|
||||
const rowsF = await db.select().from(agentMetrics).where(eq(agentMetrics.agentId, 'agent-f'));
|
||||
expect(rowsF).toHaveLength(1);
|
||||
expect(rowsF[0].questionsCount).toBe(6);
|
||||
expect(rowsF[0].subagentsCount).toBe(0);
|
||||
expect(rowsF[0].compactionsCount).toBe(0);
|
||||
|
||||
const rowsG = await db.select().from(agentMetrics).where(eq(agentMetrics.agentId, 'agent-g'));
|
||||
expect(rowsG).toHaveLength(1);
|
||||
expect(rowsG[0].questionsCount).toBe(0);
|
||||
expect(rowsG[0].subagentsCount).toBe(1);
|
||||
expect(rowsG[0].compactionsCount).toBe(0);
|
||||
});
|
||||
|
||||
it('Empty database — completes without error', async () => {
|
||||
await expect(backfillMetrics(db)).resolves.not.toThrow();
|
||||
|
||||
const rows = await db.select().from(agentMetrics);
|
||||
expect(rows).toEqual([]);
|
||||
});
|
||||
|
||||
it('Re-run idempotency note — second run doubles counts', async () => {
|
||||
// Documented behavior: run only once against a fresh agent_metrics table
|
||||
await insertChunk(db, 'agent-h', { type: 'tool_use', name: 'Agent' });
|
||||
|
||||
await backfillMetrics(db);
|
||||
const rowsAfterFirst = await db.select().from(agentMetrics).where(eq(agentMetrics.agentId, 'agent-h'));
|
||||
expect(rowsAfterFirst[0].subagentsCount).toBe(1);
|
||||
|
||||
await backfillMetrics(db);
|
||||
const rowsAfterSecond = await db.select().from(agentMetrics).where(eq(agentMetrics.agentId, 'agent-h'));
|
||||
expect(rowsAfterSecond[0].subagentsCount).toBe(2);
|
||||
});
|
||||
});
|
||||
128
apps/server/scripts/backfill-metrics.ts
Normal file
128
apps/server/scripts/backfill-metrics.ts
Normal file
@@ -0,0 +1,128 @@
|
||||
/**
|
||||
* Backfill script for agent_metrics table.
|
||||
*
|
||||
* Reads all existing agent_log_chunks rows and populates agent_metrics with
|
||||
* accumulated counts of questions, subagent spawns, and compaction events.
|
||||
*
|
||||
* Intended to be run once per production database after applying the migration
|
||||
* that introduces the agent_metrics table.
|
||||
*
|
||||
* Idempotency note: Uses ON CONFLICT DO UPDATE with additive increments to match
|
||||
* the ongoing insertChunk write-path behavior. Running against an empty
|
||||
* agent_metrics table is fully safe. Running a second time will double-count —
|
||||
* only run this script once per database, immediately after applying the migration.
|
||||
*/
|
||||
|
||||
import { asc, sql } from 'drizzle-orm';
|
||||
import { createDatabase, DrizzleDatabase, agentLogChunks, agentMetrics } from '../db/index.js';
|
||||
|
||||
const BATCH_SIZE = 500;
|
||||
const LOG_EVERY = 1000;
|
||||
|
||||
/**
|
||||
* Core backfill function. Accepts a DrizzleDatabase for testability.
|
||||
*/
|
||||
export async function backfillMetrics(db: DrizzleDatabase): Promise<void> {
|
||||
const accumulator = new Map<string, { questionsCount: number; subagentsCount: number; compactionsCount: number }>();
|
||||
let offset = 0;
|
||||
let totalChunks = 0;
|
||||
let malformedCount = 0;
|
||||
|
||||
while (true) {
|
||||
const batch = await db
|
||||
.select({ agentId: agentLogChunks.agentId, content: agentLogChunks.content })
|
||||
.from(agentLogChunks)
|
||||
.orderBy(asc(agentLogChunks.createdAt))
|
||||
.limit(BATCH_SIZE)
|
||||
.offset(offset);
|
||||
|
||||
if (batch.length === 0) break;
|
||||
|
||||
for (const chunk of batch) {
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(chunk.content);
|
||||
} catch {
|
||||
malformedCount++;
|
||||
totalChunks++;
|
||||
if (totalChunks % LOG_EVERY === 0) {
|
||||
console.log(`Processed ${totalChunks} chunks...`);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (typeof parsed !== 'object' || parsed === null) {
|
||||
totalChunks++;
|
||||
if (totalChunks % LOG_EVERY === 0) {
|
||||
console.log(`Processed ${totalChunks} chunks...`);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
const obj = parsed as Record<string, unknown>;
|
||||
const type = obj['type'];
|
||||
const name = obj['name'];
|
||||
|
||||
if (type === 'tool_use' && name === 'AskUserQuestion') {
|
||||
const input = obj['input'] as Record<string, unknown> | undefined;
|
||||
const questions = input?.['questions'];
|
||||
const count = Array.isArray(questions) ? questions.length : 0;
|
||||
if (count > 0) {
|
||||
const entry = accumulator.get(chunk.agentId) ?? { questionsCount: 0, subagentsCount: 0, compactionsCount: 0 };
|
||||
entry.questionsCount += count;
|
||||
accumulator.set(chunk.agentId, entry);
|
||||
}
|
||||
} else if (type === 'tool_use' && name === 'Agent') {
|
||||
const entry = accumulator.get(chunk.agentId) ?? { questionsCount: 0, subagentsCount: 0, compactionsCount: 0 };
|
||||
entry.subagentsCount += 1;
|
||||
accumulator.set(chunk.agentId, entry);
|
||||
} else if (type === 'system' && obj['subtype'] === 'init' && obj['source'] === 'compact') {
|
||||
const entry = accumulator.get(chunk.agentId) ?? { questionsCount: 0, subagentsCount: 0, compactionsCount: 0 };
|
||||
entry.compactionsCount += 1;
|
||||
accumulator.set(chunk.agentId, entry);
|
||||
}
|
||||
|
||||
totalChunks++;
|
||||
if (totalChunks % LOG_EVERY === 0) {
|
||||
console.log(`Processed ${totalChunks} chunks...`);
|
||||
}
|
||||
}
|
||||
|
||||
offset += BATCH_SIZE;
|
||||
}
|
||||
|
||||
// Upsert accumulated counts into agent_metrics.
|
||||
// Uses additive ON CONFLICT DO UPDATE to match the ongoing insertChunk behavior.
|
||||
for (const [agentId, counts] of accumulator) {
|
||||
await db
|
||||
.insert(agentMetrics)
|
||||
.values({
|
||||
agentId,
|
||||
questionsCount: counts.questionsCount,
|
||||
subagentsCount: counts.subagentsCount,
|
||||
compactionsCount: counts.compactionsCount,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: agentMetrics.agentId,
|
||||
set: {
|
||||
questionsCount: sql`${agentMetrics.questionsCount} + ${counts.questionsCount}`,
|
||||
subagentsCount: sql`${agentMetrics.subagentsCount} + ${counts.subagentsCount}`,
|
||||
compactionsCount: sql`${agentMetrics.compactionsCount} + ${counts.compactionsCount}`,
|
||||
updatedAt: new Date(),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
console.log(
|
||||
`Backfill complete: ${accumulator.size} agents updated, ${totalChunks} chunks processed, ${malformedCount} malformed chunks skipped`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* CLI wrapper — opens a database from a path, then delegates to backfillMetrics.
|
||||
*/
|
||||
export async function backfillMetricsFromPath(dbPath: string): Promise<void> {
|
||||
const db = createDatabase(dbPath);
|
||||
await backfillMetrics(db);
|
||||
}
|
||||
@@ -326,6 +326,47 @@ describe('agent.listForRadar', () => {
|
||||
expect(row!.subagentsCount).toBe(0);
|
||||
expect(row!.compactionsCount).toBe(0);
|
||||
});
|
||||
|
||||
it('returns zero counts for agent with no metrics row', async () => {
|
||||
const agents = new MockAgentManager();
|
||||
const ctx = makeCtx(agents);
|
||||
|
||||
const now = new Date();
|
||||
// Agent with no log chunks at all — no agent_metrics row will exist
|
||||
agents.addAgent({ id: 'agent-no-chunks', name: 'no-chunks-agent', status: 'running', createdAt: now });
|
||||
|
||||
const caller = createAgentCaller(ctx);
|
||||
const result = await caller.listForRadar({ timeRange: 'all' });
|
||||
|
||||
const row = result.find(r => r.id === 'agent-no-chunks');
|
||||
expect(row).toBeDefined();
|
||||
expect(row!.questionsCount).toBe(0);
|
||||
expect(row!.subagentsCount).toBe(0);
|
||||
expect(row!.compactionsCount).toBe(0);
|
||||
});
|
||||
|
||||
it('listForRadar response does not contain chunk content field', async () => {
|
||||
const agents = new MockAgentManager();
|
||||
const ctx = makeCtx(agents);
|
||||
const { logChunkRepo } = getRepos(ctx);
|
||||
|
||||
const now = new Date();
|
||||
agents.addAgent({ id: 'agent-content', name: 'content-agent', status: 'running', createdAt: now });
|
||||
|
||||
await logChunkRepo.insertChunk({
|
||||
agentId: 'agent-content',
|
||||
agentName: 'content-agent',
|
||||
sessionNumber: 1,
|
||||
content: JSON.stringify({ type: 'tool_use', name: 'Agent', input: { description: 'do stuff', prompt: 'some prompt' } }),
|
||||
});
|
||||
|
||||
const caller = createAgentCaller(ctx);
|
||||
const result = await caller.listForRadar({ timeRange: 'all' });
|
||||
|
||||
for (const row of result) {
|
||||
expect(row).not.toHaveProperty('content');
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// =============================================================================
|
||||
|
||||
@@ -475,8 +475,8 @@ export function agentProcedures(publicProcedure: ProcedureBuilder) {
|
||||
const uniqueTaskIds = [...new Set(filteredAgents.map(a => a.taskId).filter(Boolean) as string[])];
|
||||
const uniqueInitiativeIds = [...new Set(filteredAgents.map(a => a.initiativeId).filter(Boolean) as string[])];
|
||||
|
||||
const [chunks, messageCounts, taskResults, initiativeResults] = await Promise.all([
|
||||
logChunkRepo.findByAgentIds(matchingIds),
|
||||
const [metrics, messageCounts, taskResults, initiativeResults] = await Promise.all([
|
||||
logChunkRepo.findMetricsByAgentIds(matchingIds),
|
||||
conversationRepo.countByFromAgentIds(matchingIds),
|
||||
Promise.all(uniqueTaskIds.map(id => taskRepo.findById(id))),
|
||||
Promise.all(uniqueInitiativeIds.map(id => initiativeRepo.findById(id))),
|
||||
@@ -486,37 +486,14 @@ export function agentProcedures(publicProcedure: ProcedureBuilder) {
|
||||
const taskMap = new Map(taskResults.filter(Boolean).map(t => [t!.id, t!.name]));
|
||||
const initiativeMap = new Map(initiativeResults.filter(Boolean).map(i => [i!.id, i!.name]));
|
||||
const messagesMap = new Map(messageCounts.map(m => [m.agentId, m.count]));
|
||||
|
||||
// Group chunks by agentId
|
||||
const chunksByAgent = new Map<string, typeof chunks>();
|
||||
for (const chunk of chunks) {
|
||||
const existing = chunksByAgent.get(chunk.agentId);
|
||||
if (existing) {
|
||||
existing.push(chunk);
|
||||
} else {
|
||||
chunksByAgent.set(chunk.agentId, [chunk]);
|
||||
}
|
||||
}
|
||||
const metricsMap = new Map(metrics.map(m => [m.agentId, m]));
|
||||
|
||||
// Build result rows
|
||||
return filteredAgents.map(agent => {
|
||||
const agentChunks = chunksByAgent.get(agent.id) ?? [];
|
||||
let questionsCount = 0;
|
||||
let subagentsCount = 0;
|
||||
let compactionsCount = 0;
|
||||
|
||||
for (const chunk of agentChunks) {
|
||||
try {
|
||||
const parsed = JSON.parse(chunk.content);
|
||||
if (parsed.type === 'tool_use' && parsed.name === 'AskUserQuestion') {
|
||||
questionsCount += parsed.input?.questions?.length ?? 0;
|
||||
} else if (parsed.type === 'tool_use' && parsed.name === 'Agent') {
|
||||
subagentsCount++;
|
||||
} else if (parsed.type === 'system' && parsed.subtype === 'init' && parsed.source === 'compact') {
|
||||
compactionsCount++;
|
||||
}
|
||||
} catch { /* skip malformed */ }
|
||||
}
|
||||
const agentMetrics = metricsMap.get(agent.id);
|
||||
const questionsCount = agentMetrics?.questionsCount ?? 0;
|
||||
const subagentsCount = agentMetrics?.subagentsCount ?? 0;
|
||||
const compactionsCount = agentMetrics?.compactionsCount ?? 0;
|
||||
|
||||
return {
|
||||
id: agent.id,
|
||||
|
||||
@@ -55,3 +55,27 @@ Migrations 0000–0007 were generated by `drizzle-kit generate`. Migrations 0008
|
||||
- **Migration files are immutable.** Once committed, never edit them. Make a new migration instead.
|
||||
- **Keep schema.ts in sync.** The schema file is the source of truth for TypeScript types; migrations are the source of truth for database DDL. Both must reflect the same structure.
|
||||
- **Test with `npm test`** after generating migrations to verify they work with in-memory databases.
|
||||
|
||||
## Post-migration backfill scripts
|
||||
|
||||
Some schema additions require a one-time data backfill because SQLite migrations cannot execute Node.js logic (e.g., JSON parsing). In these cases, the migration creates the table structure, and a separate Node.js script populates it from existing data.
|
||||
|
||||
### agent_metrics backfill
|
||||
|
||||
**When to run:** After deploying the migration that creates the `agent_metrics` table (introduced in the Radar Screen Performance initiative). Run this once per production database after upgrading.
|
||||
|
||||
**Command:**
|
||||
```sh
|
||||
cw backfill-metrics
|
||||
# Or with a custom DB path:
|
||||
cw backfill-metrics --db /path/to/codewalkers.db
|
||||
```
|
||||
|
||||
**What it does:**
|
||||
- Reads all existing `agent_log_chunks` rows in batches of 500 (ordered by `createdAt ASC`)
|
||||
- Parses each chunk's `content` JSON to count `AskUserQuestion` tool calls, `Agent` spawns, and compaction events
|
||||
- Upserts the accumulated counts into `agent_metrics` using additive conflict resolution
|
||||
|
||||
**Idempotency:** The script uses `ON CONFLICT DO UPDATE` with additive increments, matching the ongoing write-path behavior. Running it against an empty `agent_metrics` table is fully safe. Running it a second time will double-count — only run it once per database, immediately after applying the migration.
|
||||
|
||||
**Batch size:** 500 rows per query, to avoid loading the full `agent_log_chunks` table into memory. Progress is logged every 1,000 chunks.
|
||||
|
||||
Reference in New Issue
Block a user