feat: add agent_metrics write+read path to LogChunkRepository
Wrap insertChunk in a synchronous better-sqlite3 transaction that upserts agent_metrics counters atomically on every chunk insert. Malformed JSON skips the upsert but always preserves the chunk row. Add findMetricsByAgentIds to the interface and Drizzle adapter for efficient bulk metric reads. Add 8-test suite covering all write/read paths and edge cases. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
129
apps/server/db/repositories/drizzle/log-chunk.test.ts
Normal file
129
apps/server/db/repositories/drizzle/log-chunk.test.ts
Normal file
@@ -0,0 +1,129 @@
|
|||||||
|
import { describe, it, expect, beforeEach } from 'vitest';
|
||||||
|
import { DrizzleLogChunkRepository } from './log-chunk.js';
|
||||||
|
import { createTestDatabase } from './test-helpers.js';
|
||||||
|
import type { DrizzleDatabase } from '../../index.js';
|
||||||
|
|
||||||
|
describe('DrizzleLogChunkRepository', () => {
|
||||||
|
let db: DrizzleDatabase;
|
||||||
|
let repo: DrizzleLogChunkRepository;
|
||||||
|
const testAgentId = 'agent-test-001';
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
db = createTestDatabase();
|
||||||
|
repo = new DrizzleLogChunkRepository(db);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('AskUserQuestion chunk — questionsCount upserted correctly', async () => {
|
||||||
|
await repo.insertChunk({
|
||||||
|
agentId: testAgentId,
|
||||||
|
agentName: 'test-agent',
|
||||||
|
sessionNumber: 1,
|
||||||
|
content: JSON.stringify({ type: 'tool_use', name: 'AskUserQuestion', input: { questions: [{}, {}] } }),
|
||||||
|
});
|
||||||
|
const metrics = await repo.findMetricsByAgentIds([testAgentId]);
|
||||||
|
expect(metrics).toEqual([{
|
||||||
|
agentId: testAgentId,
|
||||||
|
questionsCount: 2,
|
||||||
|
subagentsCount: 0,
|
||||||
|
compactionsCount: 0,
|
||||||
|
}]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('Agent tool chunk — subagentsCount incremented', async () => {
|
||||||
|
await repo.insertChunk({
|
||||||
|
agentId: testAgentId,
|
||||||
|
agentName: 'test-agent',
|
||||||
|
sessionNumber: 1,
|
||||||
|
content: JSON.stringify({ type: 'tool_use', name: 'Agent' }),
|
||||||
|
});
|
||||||
|
const metrics = await repo.findMetricsByAgentIds([testAgentId]);
|
||||||
|
expect(metrics).toEqual([{
|
||||||
|
agentId: testAgentId,
|
||||||
|
questionsCount: 0,
|
||||||
|
subagentsCount: 1,
|
||||||
|
compactionsCount: 0,
|
||||||
|
}]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('Compaction event — compactionsCount incremented', async () => {
|
||||||
|
await repo.insertChunk({
|
||||||
|
agentId: testAgentId,
|
||||||
|
agentName: 'test-agent',
|
||||||
|
sessionNumber: 1,
|
||||||
|
content: JSON.stringify({ type: 'system', subtype: 'init', source: 'compact' }),
|
||||||
|
});
|
||||||
|
const metrics = await repo.findMetricsByAgentIds([testAgentId]);
|
||||||
|
expect(metrics).toEqual([{
|
||||||
|
agentId: testAgentId,
|
||||||
|
questionsCount: 0,
|
||||||
|
subagentsCount: 0,
|
||||||
|
compactionsCount: 1,
|
||||||
|
}]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('Irrelevant chunk type — no metrics row created', async () => {
|
||||||
|
await repo.insertChunk({
|
||||||
|
agentId: testAgentId,
|
||||||
|
agentName: 'test-agent',
|
||||||
|
sessionNumber: 1,
|
||||||
|
content: JSON.stringify({ type: 'text', text: 'hello' }),
|
||||||
|
});
|
||||||
|
const metrics = await repo.findMetricsByAgentIds([testAgentId]);
|
||||||
|
expect(metrics).toEqual([]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('Malformed JSON chunk — chunk persisted, metrics row absent', async () => {
|
||||||
|
await repo.insertChunk({
|
||||||
|
agentId: testAgentId,
|
||||||
|
agentName: 'test-agent',
|
||||||
|
sessionNumber: 1,
|
||||||
|
content: 'not-valid-json',
|
||||||
|
});
|
||||||
|
const chunks = await repo.findByAgentId(testAgentId);
|
||||||
|
expect(chunks).toHaveLength(1);
|
||||||
|
const metrics = await repo.findMetricsByAgentIds([testAgentId]);
|
||||||
|
expect(metrics).toEqual([]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('Multiple inserts, same agent — counts accumulate additively', async () => {
|
||||||
|
// 3 Agent tool chunks
|
||||||
|
for (let i = 0; i < 3; i++) {
|
||||||
|
await repo.insertChunk({
|
||||||
|
agentId: testAgentId,
|
||||||
|
agentName: 'test-agent',
|
||||||
|
sessionNumber: 1,
|
||||||
|
content: JSON.stringify({ type: 'tool_use', name: 'Agent' }),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
// 1 AskUserQuestion with 2 questions
|
||||||
|
await repo.insertChunk({
|
||||||
|
agentId: testAgentId,
|
||||||
|
agentName: 'test-agent',
|
||||||
|
sessionNumber: 1,
|
||||||
|
content: JSON.stringify({ type: 'tool_use', name: 'AskUserQuestion', input: { questions: [{}, {}] } }),
|
||||||
|
});
|
||||||
|
const metrics = await repo.findMetricsByAgentIds([testAgentId]);
|
||||||
|
expect(metrics).toEqual([{
|
||||||
|
agentId: testAgentId,
|
||||||
|
questionsCount: 2,
|
||||||
|
subagentsCount: 3,
|
||||||
|
compactionsCount: 0,
|
||||||
|
}]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('findMetricsByAgentIds with empty array — returns []', async () => {
|
||||||
|
const metrics = await repo.findMetricsByAgentIds([]);
|
||||||
|
expect(metrics).toEqual([]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('findMetricsByAgentIds with agentId that has no metrics row — returns []', async () => {
|
||||||
|
await repo.insertChunk({
|
||||||
|
agentId: testAgentId,
|
||||||
|
agentName: 'test-agent',
|
||||||
|
sessionNumber: 1,
|
||||||
|
content: JSON.stringify({ type: 'text', text: 'hello' }),
|
||||||
|
});
|
||||||
|
const metrics = await repo.findMetricsByAgentIds([testAgentId]);
|
||||||
|
expect(metrics).toEqual([]);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -4,10 +4,10 @@
|
|||||||
* Implements LogChunkRepository interface using Drizzle ORM.
|
* Implements LogChunkRepository interface using Drizzle ORM.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { eq, asc, max, inArray } from 'drizzle-orm';
|
import { eq, asc, max, inArray, sql } from 'drizzle-orm';
|
||||||
import { nanoid } from 'nanoid';
|
import { nanoid } from 'nanoid';
|
||||||
import type { DrizzleDatabase } from '../../index.js';
|
import type { DrizzleDatabase } from '../../index.js';
|
||||||
import { agentLogChunks } from '../../schema.js';
|
import { agentLogChunks, agentMetrics } from '../../schema.js';
|
||||||
import type { LogChunkRepository } from '../log-chunk-repository.js';
|
import type { LogChunkRepository } from '../log-chunk-repository.js';
|
||||||
|
|
||||||
export class DrizzleLogChunkRepository implements LogChunkRepository {
|
export class DrizzleLogChunkRepository implements LogChunkRepository {
|
||||||
@@ -19,13 +19,58 @@ export class DrizzleLogChunkRepository implements LogChunkRepository {
|
|||||||
sessionNumber: number;
|
sessionNumber: number;
|
||||||
content: string;
|
content: string;
|
||||||
}): Promise<void> {
|
}): Promise<void> {
|
||||||
await this.db.insert(agentLogChunks).values({
|
// better-sqlite3 is synchronous — transaction callback must be sync, use .run() not await
|
||||||
|
this.db.transaction((tx) => {
|
||||||
|
// 1. Always insert the chunk row first
|
||||||
|
tx.insert(agentLogChunks).values({
|
||||||
id: nanoid(),
|
id: nanoid(),
|
||||||
agentId: data.agentId,
|
agentId: data.agentId,
|
||||||
agentName: data.agentName,
|
agentName: data.agentName,
|
||||||
sessionNumber: data.sessionNumber,
|
sessionNumber: data.sessionNumber,
|
||||||
content: data.content,
|
content: data.content,
|
||||||
createdAt: new Date(),
|
createdAt: new Date(),
|
||||||
|
}).run();
|
||||||
|
|
||||||
|
// 2. Parse content and determine metric increments
|
||||||
|
// Wrap only the parse + upsert block — chunk insert is not rolled back on parse failure
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(data.content);
|
||||||
|
let deltaQuestions = 0;
|
||||||
|
let deltaSubagents = 0;
|
||||||
|
let deltaCompactions = 0;
|
||||||
|
|
||||||
|
if (parsed.type === 'tool_use' && parsed.name === 'AskUserQuestion') {
|
||||||
|
deltaQuestions = parsed.input?.questions?.length ?? 0;
|
||||||
|
} else if (parsed.type === 'tool_use' && parsed.name === 'Agent') {
|
||||||
|
deltaSubagents = 1;
|
||||||
|
} else if (parsed.type === 'system' && parsed.subtype === 'init' && parsed.source === 'compact') {
|
||||||
|
deltaCompactions = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Only upsert if there is something to increment
|
||||||
|
if (deltaQuestions > 0 || deltaSubagents > 0 || deltaCompactions > 0) {
|
||||||
|
tx.insert(agentMetrics)
|
||||||
|
.values({
|
||||||
|
agentId: data.agentId,
|
||||||
|
questionsCount: deltaQuestions,
|
||||||
|
subagentsCount: deltaSubagents,
|
||||||
|
compactionsCount: deltaCompactions,
|
||||||
|
updatedAt: new Date(),
|
||||||
|
})
|
||||||
|
.onConflictDoUpdate({
|
||||||
|
target: agentMetrics.agentId,
|
||||||
|
set: {
|
||||||
|
questionsCount: sql`${agentMetrics.questionsCount} + ${deltaQuestions}`,
|
||||||
|
subagentsCount: sql`${agentMetrics.subagentsCount} + ${deltaSubagents}`,
|
||||||
|
compactionsCount: sql`${agentMetrics.compactionsCount} + ${deltaCompactions}`,
|
||||||
|
updatedAt: new Date(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
.run();
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Malformed JSON — skip metric upsert, chunk insert already committed within transaction
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -69,4 +114,22 @@ export class DrizzleLogChunkRepository implements LogChunkRepository {
|
|||||||
|
|
||||||
return result[0]?.maxSession ?? 0;
|
return result[0]?.maxSession ?? 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async findMetricsByAgentIds(agentIds: string[]): Promise<{
|
||||||
|
agentId: string;
|
||||||
|
questionsCount: number;
|
||||||
|
subagentsCount: number;
|
||||||
|
compactionsCount: number;
|
||||||
|
}[]> {
|
||||||
|
if (agentIds.length === 0) return [];
|
||||||
|
return this.db
|
||||||
|
.select({
|
||||||
|
agentId: agentMetrics.agentId,
|
||||||
|
questionsCount: agentMetrics.questionsCount,
|
||||||
|
subagentsCount: agentMetrics.subagentsCount,
|
||||||
|
compactionsCount: agentMetrics.compactionsCount,
|
||||||
|
})
|
||||||
|
.from(agentMetrics)
|
||||||
|
.where(inArray(agentMetrics.agentId, agentIds));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,4 +27,16 @@ export interface LogChunkRepository {
|
|||||||
deleteByAgentId(agentId: string): Promise<void>;
|
deleteByAgentId(agentId: string): Promise<void>;
|
||||||
|
|
||||||
getSessionCount(agentId: string): Promise<number>;
|
getSessionCount(agentId: string): Promise<number>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Batch-fetch pre-computed metrics for multiple agent IDs.
|
||||||
|
* Returns one row per agent that has metrics. Agents with no
|
||||||
|
* matching row in agent_metrics are omitted (not returned as zeros).
|
||||||
|
*/
|
||||||
|
findMetricsByAgentIds(agentIds: string[]): Promise<{
|
||||||
|
agentId: string;
|
||||||
|
questionsCount: number;
|
||||||
|
subagentsCount: number;
|
||||||
|
compactionsCount: number;
|
||||||
|
}[]>;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user