/** * Drizzle Log Chunk Repository Adapter * * Implements LogChunkRepository interface using Drizzle ORM. */ import { eq, asc, max, inArray, sql } from 'drizzle-orm'; import { nanoid } from 'nanoid'; import type { DrizzleDatabase } from '../../index.js'; import { agentLogChunks, agentMetrics } from '../../schema.js'; import type { LogChunkRepository } from '../log-chunk-repository.js'; export class DrizzleLogChunkRepository implements LogChunkRepository { constructor(private db: DrizzleDatabase) {} async insertChunk(data: { agentId: string; agentName: string; sessionNumber: number; content: string; }): Promise { // better-sqlite3 is synchronous — transaction callback must be sync, use .run() not await this.db.transaction((tx) => { // 1. Always insert the chunk row first tx.insert(agentLogChunks).values({ id: nanoid(), agentId: data.agentId, agentName: data.agentName, sessionNumber: data.sessionNumber, content: data.content, createdAt: new Date(), }).run(); // 2. Parse content and determine metric increments // Wrap only the parse + upsert block — chunk insert is not rolled back on parse failure try { const parsed = JSON.parse(data.content); let deltaQuestions = 0; let deltaSubagents = 0; let deltaCompactions = 0; if (parsed.type === 'tool_use' && parsed.name === 'AskUserQuestion') { deltaQuestions = parsed.input?.questions?.length ?? 0; } else if (parsed.type === 'tool_use' && parsed.name === 'Agent') { deltaSubagents = 1; } else if (parsed.type === 'system' && parsed.subtype === 'init' && parsed.source === 'compact') { deltaCompactions = 1; } // 3. Only upsert if there is something to increment if (deltaQuestions > 0 || deltaSubagents > 0 || deltaCompactions > 0) { tx.insert(agentMetrics) .values({ agentId: data.agentId, questionsCount: deltaQuestions, subagentsCount: deltaSubagents, compactionsCount: deltaCompactions, updatedAt: new Date(), }) .onConflictDoUpdate({ target: agentMetrics.agentId, set: { questionsCount: sql`${agentMetrics.questionsCount} + ${deltaQuestions}`, subagentsCount: sql`${agentMetrics.subagentsCount} + ${deltaSubagents}`, compactionsCount: sql`${agentMetrics.compactionsCount} + ${deltaCompactions}`, updatedAt: new Date(), }, }) .run(); } } catch { // Malformed JSON — skip metric upsert, chunk insert already committed within transaction } }); } async findByAgentId(agentId: string): Promise<{ content: string; sessionNumber: number; createdAt: Date }[]> { return this.db .select({ content: agentLogChunks.content, sessionNumber: agentLogChunks.sessionNumber, createdAt: agentLogChunks.createdAt, }) .from(agentLogChunks) .where(eq(agentLogChunks.agentId, agentId)) .orderBy(asc(agentLogChunks.createdAt)); } async findByAgentIds(agentIds: string[]): Promise<{ agentId: string; content: string; sessionNumber: number; createdAt: Date }[]> { if (agentIds.length === 0) return []; return this.db .select({ agentId: agentLogChunks.agentId, content: agentLogChunks.content, sessionNumber: agentLogChunks.sessionNumber, createdAt: agentLogChunks.createdAt, }) .from(agentLogChunks) .where(inArray(agentLogChunks.agentId, agentIds)) .orderBy(asc(agentLogChunks.createdAt)); } async deleteByAgentId(agentId: string): Promise { await this.db .delete(agentLogChunks) .where(eq(agentLogChunks.agentId, agentId)); } async getSessionCount(agentId: string): Promise { const result = await this.db .select({ maxSession: max(agentLogChunks.sessionNumber) }) .from(agentLogChunks) .where(eq(agentLogChunks.agentId, agentId)); return result[0]?.maxSession ?? 0; } async findMetricsByAgentIds(agentIds: string[]): Promise<{ agentId: string; questionsCount: number; subagentsCount: number; compactionsCount: number; }[]> { if (agentIds.length === 0) return []; return this.db .select({ agentId: agentMetrics.agentId, questionsCount: agentMetrics.questionsCount, subagentsCount: agentMetrics.subagentsCount, compactionsCount: agentMetrics.compactionsCount, }) .from(agentMetrics) .where(inArray(agentMetrics.agentId, agentIds)); } }