Files
Codewalkers/.planning/research/ARCHITECTURE.md
Lukas May 0ff65b0b02 feat: Rename application from "Codewalk District" to "Codewalkers"
Update all user-facing strings (HTML title, manifest, header logo,
browser title updater), code comments, and documentation references.
Folder name retained as-is.
2026-03-05 12:05:08 +01:00

1208 lines
39 KiB
Markdown

# Task Orchestration Architecture Research
- **Domain**: Multi-agent orchestration / Developer tooling
- **Researched**: 2026-01-30
- **Confidence**: HIGH
---
## System Overview
```
Codewalkers Architecture
================================================================================
+------------------+
| CLI Entry |
| (cw binary) |
+--------+---------+
|
+------------------+------------------+
| |
+--------v--------+ +--------v--------+
| Server Mode | | Client Mode |
| (Background) | | (Commands) |
+--------+--------+ +--------+--------+
| |
+------------------+------------------+
|
+--------v--------+
| tRPC Router |
| (API Layer) |
+--------+--------+
|
+-------------+----------------+----------------+-------------+
| | | | |
+-------v------+ +----v-----+ +--------v-------+ +------v------+ +----v-----+
| Task | | Agent | | Session | | Workspace | | Config |
| Scheduler | | Pool | | Manager | | Manager | | Store |
+--------------+ +----------+ +----------------+ +-------------+ +----------+
| | | | |
+-------------+----------------+----------------+-------------+
|
+--------v--------+
| Event Bus |
| (EventEmitter) |
+--------+--------+
|
+-------------+----------------+----------------+-------------+
| | | | |
+-------v------+ +----v-----+ +--------v-------+ +------v------+ +----v-----+
| SQLite | | Process | | MCP STDIO | | File | | Event |
| Database | | Spawner | | Transport | | Watcher | | Store |
+--------------+ +----------+ +----------------+ +-------------+ +----------+
Infrastructure Layer
================================================================================
```
---
## Component Responsibilities
| Component | Responsibility | Pattern | Key Interfaces |
|-----------|---------------|---------|----------------|
| **CLI Entry** | Single binary entry point for all modes | Command Pattern | `run()`, `serve()`, `spawn()` |
| **tRPC Router** | Type-safe API layer between client/server | RPC Pattern | Procedures (Query/Mutation/Subscription) |
| **Task Scheduler** | Job queue with priority, retry, delay | Task Queue | `enqueue()`, `dequeue()`, `complete()`, `fail()` |
| **Agent Pool** | Manage Claude Code agent lifecycle | Pool Pattern | `acquire()`, `release()`, `spawn()`, `terminate()` |
| **Session Manager** | Track active sessions and state | State Machine | `create()`, `pause()`, `resume()`, `close()` |
| **Workspace Manager** | File system operations, project context | Repository Pattern | `scan()`, `watch()`, `sync()` |
| **Event Bus** | Decoupled module communication | Pub/Sub | `emit()`, `on()`, `off()` |
| **SQLite Database** | Persistent storage for tasks, state | Repository Pattern | Repositories per entity |
| **Process Spawner** | Child process lifecycle management | Supervisor Pattern | `spawn()`, `monitor()`, `restart()` |
| **MCP Transport** | STDIO-based agent communication | Message Protocol | `send()`, `receive()`, `handshake()` |
| **File Watcher** | Detect file system changes | Observer Pattern | `watch()`, `onChange()`, `onAdd()` |
| **Event Store** | Persist domain events (optional CQRS) | Event Sourcing | `append()`, `read()`, `replay()` |
---
## Recommended Project Structure
```
src/
├── bin/
│ └── cw.ts # CLI entry point
├── application/ # Use Cases (Orchestration)
│ ├── commands/ # Write operations
│ │ ├── spawn-agent.ts
│ │ ├── enqueue-task.ts
│ │ └── create-session.ts
│ ├── queries/ # Read operations
│ │ ├── list-agents.ts
│ │ ├── get-task-status.ts
│ │ └── session-summary.ts
│ └── services/ # Application services
│ ├── task-orchestrator.ts
│ └── session-coordinator.ts
├── domain/ # Core Business Logic
│ ├── agent/
│ │ ├── agent.ts # Aggregate root
│ │ ├── agent-pool.ts
│ │ └── agent.repository.ts # Port (interface)
│ ├── task/
│ │ ├── task.ts # Aggregate root
│ │ ├── task-queue.ts
│ │ ├── task.repository.ts # Port (interface)
│ │ └── task.events.ts # Domain events
│ ├── session/
│ │ ├── session.ts
│ │ └── session.repository.ts
│ └── workspace/
│ ├── workspace.ts
│ └── workspace.repository.ts
├── infrastructure/ # Adapters (Implementations)
│ ├── persistence/
│ │ ├── sqlite/
│ │ │ ├── connection.ts
│ │ │ ├── migrations/
│ │ │ ├── task.repository.sqlite.ts
│ │ │ ├── agent.repository.sqlite.ts
│ │ │ └── session.repository.sqlite.ts
│ │ └── repositories.ts # Repository factory
│ ├── process/
│ │ ├── process-spawner.ts
│ │ ├── process-supervisor.ts
│ │ └── stdio-transport.ts
│ ├── mcp/
│ │ ├── mcp-client.ts
│ │ └── mcp-server.ts
│ ├── filesystem/
│ │ ├── file-watcher.ts
│ │ └── workspace-scanner.ts
│ └── events/
│ ├── event-bus.ts # In-memory EventEmitter
│ └── event-store.ts # SQLite-backed persistence
├── interfaces/ # Entry Points
│ ├── api/
│ │ ├── router.ts # tRPC root router
│ │ ├── procedures/
│ │ │ ├── agent.procedures.ts
│ │ │ ├── task.procedures.ts
│ │ │ └── session.procedures.ts
│ │ └── context.ts # tRPC context
│ ├── cli/
│ │ ├── commands/
│ │ │ ├── spawn.ts
│ │ │ ├── status.ts
│ │ │ └── serve.ts
│ │ └── parser.ts
│ └── server/
│ └── http-server.ts
├── shared/ # Cross-cutting concerns
│ ├── types/
│ ├── errors/
│ ├── utils/
│ └── constants/
└── config/
├── index.ts
└── schema.ts # Zod validation
```
---
## Architectural Patterns
### 1. Hexagonal Architecture (Ports & Adapters)
The core principle: business logic at the center, external concerns at the edges.
```typescript
// domain/task/task.repository.ts (PORT - Interface)
export interface TaskRepository {
findById(id: TaskId): Promise<Task | null>;
findPending(): Promise<Task[]>;
save(task: Task): Promise<void>;
delete(id: TaskId): Promise<void>;
}
// infrastructure/persistence/sqlite/task.repository.sqlite.ts (ADAPTER)
export class SqliteTaskRepository implements TaskRepository {
constructor(private db: Database) {}
async findById(id: TaskId): Promise<Task | null> {
const row = this.db.prepare(
'SELECT * FROM tasks WHERE id = ?'
).get(id.value);
return row ? TaskMapper.toDomain(row) : null;
}
async findPending(): Promise<Task[]> {
const rows = this.db.prepare(
'SELECT * FROM tasks WHERE status = ? ORDER BY priority DESC, created_at ASC'
).all('pending');
return rows.map(TaskMapper.toDomain);
}
async save(task: Task): Promise<void> {
const data = TaskMapper.toPersistence(task);
this.db.prepare(`
INSERT OR REPLACE INTO tasks (id, type, payload, status, priority, created_at, updated_at)
VALUES (@id, @type, @payload, @status, @priority, @created_at, @updated_at)
`).run(data);
}
}
```
### 2. Task Queue with SQLite
A lightweight, persistent task queue without external dependencies.
```typescript
// domain/task/task-queue.ts
export interface TaskQueuePort {
enqueue(task: Task): Promise<void>;
dequeue(): Promise<Task | null>;
acknowledge(taskId: TaskId): Promise<void>;
fail(taskId: TaskId, error: Error): Promise<void>;
requeue(taskId: TaskId, delay?: number): Promise<void>;
}
// infrastructure/persistence/sqlite/task-queue.sqlite.ts
export class SqliteTaskQueue implements TaskQueuePort {
private db: Database;
private processingTimeout: number = 30000; // 30s
async enqueue(task: Task): Promise<void> {
this.db.prepare(`
INSERT INTO task_queue (id, payload, status, priority, run_at, created_at)
VALUES (?, ?, 'pending', ?, ?, ?)
`).run(
task.id.value,
JSON.stringify(task.payload),
task.priority,
task.runAt?.toISOString() ?? new Date().toISOString(),
new Date().toISOString()
);
}
async dequeue(): Promise<Task | null> {
// Atomic claim: SELECT + UPDATE in transaction
const task = this.db.transaction(() => {
// Reclaim stale tasks (processing timeout)
this.db.prepare(`
UPDATE task_queue
SET status = 'pending', claimed_at = NULL
WHERE status = 'processing'
AND claimed_at < datetime('now', '-' || ? || ' seconds')
`).run(this.processingTimeout / 1000);
// Claim next available task
const row = this.db.prepare(`
SELECT * FROM task_queue
WHERE status = 'pending' AND run_at <= datetime('now')
ORDER BY priority DESC, created_at ASC
LIMIT 1
`).get();
if (!row) return null;
this.db.prepare(`
UPDATE task_queue
SET status = 'processing', claimed_at = datetime('now')
WHERE id = ?
`).run(row.id);
return row;
})();
return task ? TaskMapper.toDomain(task) : null;
}
async acknowledge(taskId: TaskId): Promise<void> {
this.db.prepare(`
UPDATE task_queue
SET status = 'completed', completed_at = datetime('now')
WHERE id = ?
`).run(taskId.value);
}
async fail(taskId: TaskId, error: Error): Promise<void> {
this.db.prepare(`
UPDATE task_queue
SET status = 'failed',
error = ?,
retry_count = retry_count + 1,
failed_at = datetime('now')
WHERE id = ?
`).run(error.message, taskId.value);
}
}
```
### 3. Process Supervision Pattern
Managing Claude Code agent processes with automatic restart.
```typescript
// infrastructure/process/process-supervisor.ts
import { spawn, ChildProcess } from 'child_process';
import { EventEmitter } from 'events';
export interface SupervisorConfig {
command: string;
args: string[];
maxRestarts: number;
restartDelay: number;
env?: Record<string, string>;
}
export class ProcessSupervisor extends EventEmitter {
private process: ChildProcess | null = null;
private restartCount = 0;
private isRunning = false;
constructor(
private id: string,
private config: SupervisorConfig
) {
super();
}
async start(): Promise<void> {
if (this.isRunning) return;
this.isRunning = true;
await this.spawn();
}
private async spawn(): Promise<void> {
this.process = spawn(this.config.command, this.config.args, {
stdio: ['pipe', 'pipe', 'pipe'],
env: { ...process.env, ...this.config.env },
});
this.process.on('exit', (code, signal) => {
this.emit('exit', { code, signal });
if (this.isRunning && code !== 0) {
this.handleCrash();
}
});
this.process.stdout?.on('data', (data) => {
this.emit('stdout', data.toString());
});
this.process.stderr?.on('data', (data) => {
this.emit('stderr', data.toString());
});
this.emit('started', { pid: this.process.pid });
}
private async handleCrash(): Promise<void> {
if (this.restartCount >= this.config.maxRestarts) {
this.emit('max-restarts', { restartCount: this.restartCount });
this.isRunning = false;
return;
}
this.restartCount++;
this.emit('restarting', {
attempt: this.restartCount,
delay: this.config.restartDelay
});
await this.delay(this.config.restartDelay);
if (this.isRunning) {
await this.spawn();
}
}
async stop(): Promise<void> {
this.isRunning = false;
if (this.process) {
this.process.kill('SIGTERM');
// Give process time to graceful shutdown
await this.delay(1000);
if (!this.process.killed) {
this.process.kill('SIGKILL');
}
}
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
```
### 4. Worker Pool Pattern
Managing a pool of Claude Code agents with Piscina-inspired design.
```typescript
// domain/agent/agent-pool.ts
export interface AgentPoolConfig {
minAgents: number;
maxAgents: number;
idleTimeout: number;
taskTimeout: number;
}
export class AgentPool {
private agents: Map<string, Agent> = new Map();
private available: Agent[] = [];
private pending: Array<{
resolve: (agent: Agent) => void;
reject: (error: Error) => void;
}> = [];
constructor(
private config: AgentPoolConfig,
private spawner: AgentSpawner,
private eventBus: EventBus
) {}
async acquire(): Promise<Agent> {
// Try to get an available agent
if (this.available.length > 0) {
const agent = this.available.pop()!;
agent.markBusy();
return agent;
}
// Spawn new agent if under limit
if (this.agents.size < this.config.maxAgents) {
const agent = await this.spawnAgent();
agent.markBusy();
return agent;
}
// Wait for an agent to become available
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
const idx = this.pending.findIndex(p => p.resolve === resolve);
if (idx !== -1) this.pending.splice(idx, 1);
reject(new Error('Agent acquisition timeout'));
}, this.config.taskTimeout);
this.pending.push({
resolve: (agent) => {
clearTimeout(timeout);
resolve(agent);
},
reject
});
});
}
release(agent: Agent): void {
agent.markIdle();
// Fulfill pending request if any
if (this.pending.length > 0) {
const { resolve } = this.pending.shift()!;
agent.markBusy();
resolve(agent);
return;
}
// Return to pool
this.available.push(agent);
this.eventBus.emit('agent:released', { agentId: agent.id });
// Schedule idle cleanup
this.scheduleIdleCleanup(agent);
}
private async spawnAgent(): Promise<Agent> {
const agent = await this.spawner.spawn();
this.agents.set(agent.id, agent);
this.eventBus.emit('agent:spawned', { agentId: agent.id });
return agent;
}
private scheduleIdleCleanup(agent: Agent): void {
setTimeout(() => {
if (agent.isIdle && this.agents.size > this.config.minAgents) {
this.terminateAgent(agent);
}
}, this.config.idleTimeout);
}
private async terminateAgent(agent: Agent): Promise<void> {
this.agents.delete(agent.id);
const idx = this.available.indexOf(agent);
if (idx !== -1) this.available.splice(idx, 1);
await agent.terminate();
this.eventBus.emit('agent:terminated', { agentId: agent.id });
}
}
```
### 5. Event Bus for Module Communication
```typescript
// infrastructure/events/event-bus.ts
import { EventEmitter } from 'events';
export type DomainEvent = {
type: string;
payload: unknown;
timestamp: Date;
correlationId?: string;
};
export interface EventBusPort {
emit(event: DomainEvent): void;
on(eventType: string, handler: (event: DomainEvent) => void): void;
off(eventType: string, handler: (event: DomainEvent) => void): void;
once(eventType: string, handler: (event: DomainEvent) => void): void;
}
export class InMemoryEventBus implements EventBusPort {
private emitter = new EventEmitter();
private eventStore?: EventStore;
constructor(eventStore?: EventStore) {
this.eventStore = eventStore;
this.emitter.setMaxListeners(100);
}
emit(event: DomainEvent): void {
const enrichedEvent = {
...event,
timestamp: event.timestamp ?? new Date(),
};
// Persist event if store is configured
if (this.eventStore) {
this.eventStore.append(enrichedEvent);
}
this.emitter.emit(event.type, enrichedEvent);
this.emitter.emit('*', enrichedEvent); // Wildcard listener
}
on(eventType: string, handler: (event: DomainEvent) => void): void {
this.emitter.on(eventType, handler);
}
off(eventType: string, handler: (event: DomainEvent) => void): void {
this.emitter.off(eventType, handler);
}
once(eventType: string, handler: (event: DomainEvent) => void): void {
this.emitter.once(eventType, handler);
}
}
```
### 6. MCP STDIO Transport
```typescript
// infrastructure/mcp/stdio-transport.ts
import { ChildProcess } from 'child_process';
import { createInterface, Interface } from 'readline';
export interface MCPMessage {
jsonrpc: '2.0';
id?: number | string;
method?: string;
params?: unknown;
result?: unknown;
error?: { code: number; message: string; data?: unknown };
}
export class StdioTransport {
private readline: Interface;
private messageId = 0;
private pendingRequests = new Map<
number | string,
{ resolve: (result: unknown) => void; reject: (error: Error) => void }
>();
constructor(private process: ChildProcess) {
this.readline = createInterface({
input: process.stdout!,
crlfDelay: Infinity,
});
this.readline.on('line', (line) => this.handleLine(line));
}
private handleLine(line: string): void {
try {
const message: MCPMessage = JSON.parse(line);
if (message.id !== undefined && (message.result !== undefined || message.error)) {
// Response to our request
const pending = this.pendingRequests.get(message.id);
if (pending) {
this.pendingRequests.delete(message.id);
if (message.error) {
pending.reject(new Error(message.error.message));
} else {
pending.resolve(message.result);
}
}
} else if (message.method) {
// Notification or request from server
this.handleServerMessage(message);
}
} catch (error) {
console.error('Failed to parse MCP message:', error);
}
}
async request(method: string, params?: unknown): Promise<unknown> {
const id = ++this.messageId;
const message: MCPMessage = {
jsonrpc: '2.0',
id,
method,
params,
};
return new Promise((resolve, reject) => {
this.pendingRequests.set(id, { resolve, reject });
this.send(message);
});
}
notify(method: string, params?: unknown): void {
const message: MCPMessage = {
jsonrpc: '2.0',
method,
params,
};
this.send(message);
}
private send(message: MCPMessage): void {
this.process.stdin!.write(JSON.stringify(message) + '\n');
}
private handleServerMessage(message: MCPMessage): void {
// Handle notifications from the agent
// Override in subclass or use event emitter
}
close(): void {
this.readline.close();
for (const [id, pending] of this.pendingRequests) {
pending.reject(new Error('Transport closed'));
}
this.pendingRequests.clear();
}
}
```
### 7. tRPC Router Setup
```typescript
// interfaces/api/router.ts
import { initTRPC } from '@trpc/server';
import { z } from 'zod';
import type { Context } from './context';
const t = initTRPC.context<Context>().create();
export const router = t.router;
export const publicProcedure = t.procedure;
// interfaces/api/procedures/task.procedures.ts
export const taskRouter = router({
enqueue: publicProcedure
.input(z.object({
type: z.string(),
payload: z.unknown(),
priority: z.number().optional().default(0),
runAt: z.date().optional(),
}))
.mutation(async ({ ctx, input }) => {
const command = new EnqueueTaskCommand(input);
return ctx.commandBus.execute(command);
}),
status: publicProcedure
.input(z.object({ taskId: z.string() }))
.query(async ({ ctx, input }) => {
const query = new GetTaskStatusQuery(input.taskId);
return ctx.queryBus.execute(query);
}),
list: publicProcedure
.input(z.object({
status: z.enum(['pending', 'processing', 'completed', 'failed']).optional(),
limit: z.number().optional().default(50),
}))
.query(async ({ ctx, input }) => {
const query = new ListTasksQuery(input);
return ctx.queryBus.execute(query);
}),
// Real-time task updates via subscription
onUpdate: publicProcedure
.input(z.object({ taskId: z.string().optional() }))
.subscription(async function* ({ ctx, input }) {
const eventBus = ctx.eventBus;
for await (const event of eventBus.subscribe('task:*')) {
if (!input.taskId || event.payload.taskId === input.taskId) {
yield event;
}
}
}),
});
// Root router
export const appRouter = router({
task: taskRouter,
agent: agentRouter,
session: sessionRouter,
workspace: workspaceRouter,
});
export type AppRouter = typeof appRouter;
```
### 8. File Watcher with Chokidar
```typescript
// infrastructure/filesystem/file-watcher.ts
import chokidar, { FSWatcher } from 'chokidar';
import { EventEmitter } from 'events';
export interface FileWatcherConfig {
paths: string[];
ignored?: string[];
persistent?: boolean;
usePolling?: boolean;
interval?: number;
}
export class FileWatcher extends EventEmitter {
private watcher: FSWatcher | null = null;
constructor(private config: FileWatcherConfig) {
super();
}
start(): void {
this.watcher = chokidar.watch(this.config.paths, {
ignored: this.config.ignored ?? [
'**/node_modules/**',
'**/.git/**',
'**/dist/**',
],
persistent: this.config.persistent ?? true,
usePolling: this.config.usePolling ?? false,
interval: this.config.interval ?? 100,
ignoreInitial: true,
awaitWriteFinish: {
stabilityThreshold: 100,
pollInterval: 50,
},
});
this.watcher
.on('add', (path) => this.emit('add', { path, type: 'file' }))
.on('addDir', (path) => this.emit('add', { path, type: 'directory' }))
.on('change', (path) => this.emit('change', { path }))
.on('unlink', (path) => this.emit('remove', { path, type: 'file' }))
.on('unlinkDir', (path) => this.emit('remove', { path, type: 'directory' }))
.on('error', (error) => this.emit('error', error))
.on('ready', () => this.emit('ready'));
}
async stop(): Promise<void> {
if (this.watcher) {
await this.watcher.close();
this.watcher = null;
}
}
addPath(path: string): void {
this.watcher?.add(path);
}
removePath(path: string): void {
this.watcher?.unwatch(path);
}
}
```
---
## Data Flow Diagrams
### Task Execution Flow
```
User Command Database Agent Pool
| | |
| 1. cw run "task" | |
v | |
+----------+ | |
| CLI/API | | |
+----+-----+ | |
| | |
| 2. EnqueueTaskCommand | |
v | |
+----------+ | |
| Command | | |
| Handler | | |
+----+-----+ | |
| | |
| 3. taskRepo.save() | |
+------------------------->| |
| | (persisted) |
| 4. emit('task:enqueued') | |
v | |
+----------+ | |
| Event | | |
| Bus | | |
+----+-----+ | |
| | |
| 5. Scheduler picks up | |
v | |
+----------+ | |
| Task | 6. dequeue() | |
| Scheduler|<-------------------+ |
+----+-----+ | |
| | |
| 7. pool.acquire() | |
+-------------------------------------------------->|
| | |
| | 8. agent.execute() |
| | +------------------->|
| | | |
| | | 9. MCP Protocol |
| | | (STDIO) |
| | | |
| | | 10. result |
| | |<-------------------|
| | | |
| 11. acknowledge(taskId) | | |
+------------------------->| | |
| | | |
| 12. pool.release() | | |
+-------------------------------------------------->|
| | |
| 13. emit('task:completed') |
v | |
```
### Event Flow for Multi-Agent Coordination
```
+-------------+
| Event Bus |
+------+------+
|
+-----------------------+-----------------------+
| | |
v v v
+---------------+ +---------------+ +---------------+
| Task Module | | Agent Module | | Session Module|
| Subscribers: | | Subscribers: | | Subscribers: |
| - task:* | | - agent:* | | - session:* |
+-------+-------+ +-------+-------+ +-------+-------+
| | |
v v v
+---------------+ +---------------+ +---------------+
| task:enqueued | | agent:spawned | | session:start |
| task:started | | agent:busy | | session:pause |
| task:completed| | agent:idle | | session:resume|
| task:failed | | agent:error | | session:close |
+---------------+ +---------------+ +---------------+
```
---
## Scaling Considerations
### For SQLite + Local Processes (Single Machine)
| Concern | Solution | Limit |
|---------|----------|-------|
| **Concurrent agents** | Agent pool with configurable max | ~10-20 agents per machine (memory bound) |
| **Task throughput** | SQLite WAL mode + connection pool | ~10,000-15,000 tasks/sec |
| **Write contention** | Single writer pattern, queue writes | Serialize via transaction |
| **Memory pressure** | Streaming results, lazy loading | Monitor with V8 heap stats |
| **File descriptors** | Limit open agent processes | `ulimit -n` considerations |
### SQLite Optimization for Task Queue
```typescript
// infrastructure/persistence/sqlite/connection.ts
import Database from 'better-sqlite3';
export function createDatabase(path: string): Database.Database {
const db = new Database(path);
// Enable WAL mode for better concurrent read/write
db.pragma('journal_mode = WAL');
// Faster writes, acceptable for local use
db.pragma('synchronous = NORMAL');
// Increase cache size (negative = KB)
db.pragma('cache_size = -64000'); // 64MB
// Enable foreign keys
db.pragma('foreign_keys = ON');
// Busy timeout for lock contention
db.pragma('busy_timeout = 5000');
return db;
}
```
### When NOT to Scale (Anti-Pattern Avoidance)
```
DON'T:
- Run multiple `cw` servers on same SQLite file (write contention)
- Spawn unlimited agents (memory exhaustion)
- Use polling intervals < 100ms (CPU waste)
- Store large blobs in task payloads (use file references)
DO:
- Single server process, multiple agent child processes
- Bounded agent pool (min: 1, max: CPU cores)
- Use SQLite WAL mode
- Store task results as file references, not inline
```
---
## Anti-Patterns to Avoid
### 1. Distributed Queue Without Need
```typescript
// BAD: Using Redis/BullMQ when SQLite suffices
const queue = new Queue('tasks', { connection: redisConfig });
// GOOD: SQLite-backed queue for local orchestration
const queue = new SqliteTaskQueue(db);
```
### 2. Tight Module Coupling
```typescript
// BAD: Direct cross-module imports
import { AgentRepository } from '../agent/agent.repository.sqlite';
class TaskService {
constructor() {
this.agentRepo = new AgentRepository(db); // Direct coupling!
}
}
// GOOD: Dependency injection via ports
class TaskService {
constructor(private agentRepo: AgentRepositoryPort) {}
}
```
### 3. Synchronous Process Management
```typescript
// BAD: Blocking on child process
const result = execSync('claude-code run task');
// GOOD: Async with proper supervision
const supervisor = new ProcessSupervisor(config);
supervisor.on('exit', handleExit);
await supervisor.start();
```
### 4. Unbounded Event Listeners
```typescript
// BAD: Memory leak from unremoved listeners
eventBus.on('task:completed', handler);
// ... never removed
// GOOD: Proper cleanup
const unsubscribe = eventBus.on('task:completed', handler);
// Later...
unsubscribe();
```
### 5. Missing Transaction Boundaries
```typescript
// BAD: Race condition in dequeue
const task = await taskRepo.findPending(); // Another process could grab this
await taskRepo.updateStatus(task.id, 'processing');
// GOOD: Atomic claim in transaction
const task = db.transaction(() => {
const task = db.prepare('SELECT ...').get();
db.prepare('UPDATE ... SET status = "processing"').run(task.id);
return task;
})();
```
### 6. Polling Without Backoff
```typescript
// BAD: Constant polling
setInterval(() => checkForTasks(), 100);
// GOOD: Exponential backoff or event-driven
let delay = 100;
async function poll() {
const task = await queue.dequeue();
if (task) {
delay = 100; // Reset on success
await process(task);
} else {
delay = Math.min(delay * 2, 5000); // Backoff
}
setTimeout(poll, delay);
}
```
---
## Integration Points
### 1. CLI Entry Point
```typescript
// bin/cw.ts
#!/usr/bin/env node
import { Command } from 'commander';
import { createContainer } from '../config/container';
const program = new Command();
const container = createContainer();
program
.name('cw')
.description('Codewalkers - Multi-agent workspace orchestrator')
.version('0.1.0');
program
.command('serve')
.description('Start the orchestration server')
.option('-p, --port <port>', 'Server port', '3000')
.action(async (options) => {
const server = container.resolve('Server');
await server.start(options.port);
});
program
.command('spawn')
.description('Spawn a Claude Code agent')
.argument('<task>', 'Task description')
.action(async (task) => {
const client = container.resolve('TRPCClient');
const result = await client.task.enqueue.mutate({ type: 'run', payload: { task } });
console.log('Task enqueued:', result.id);
});
program.parse();
```
### 2. Server Mode Integration
```typescript
// interfaces/server/http-server.ts
import { createHTTPServer } from '@trpc/server/adapters/standalone';
import { appRouter } from '../api/router';
import { createContext } from '../api/context';
export class Server {
private httpServer: ReturnType<typeof createHTTPServer>;
constructor(private container: Container) {}
async start(port: number): Promise<void> {
this.httpServer = createHTTPServer({
router: appRouter,
createContext: () => createContext(this.container),
});
this.httpServer.listen(port);
console.log(`Server listening on port ${port}`);
// Start background services
await this.container.resolve('TaskScheduler').start();
await this.container.resolve('AgentPool').initialize();
}
async stop(): Promise<void> {
await this.container.resolve('TaskScheduler').stop();
await this.container.resolve('AgentPool').shutdown();
this.httpServer.server.close();
}
}
```
### 3. MCP Server for External Tools
```typescript
// infrastructure/mcp/mcp-server.ts
export class MCPServer {
private tools: Map<string, ToolHandler> = new Map();
registerTool(name: string, handler: ToolHandler): void {
this.tools.set(name, handler);
}
async handleRequest(request: MCPMessage): Promise<MCPMessage> {
if (request.method === 'tools/list') {
return this.listTools();
}
if (request.method === 'tools/call') {
const { name, arguments: args } = request.params as ToolCallParams;
const handler = this.tools.get(name);
if (!handler) {
return this.error(request.id, -32601, `Tool not found: ${name}`);
}
const result = await handler(args);
return this.success(request.id, result);
}
return this.error(request.id, -32601, 'Method not found');
}
}
```
---
## Sources
### Task Queue & Job Processing
- [BullMQ Documentation](https://docs.bullmq.io)
- [BullMQ GitHub](https://github.com/taskforcesh/bullmq)
- [Better Stack - Job Scheduling with BullMQ](https://betterstack.com/community/guides/scaling-nodejs/bullmq-scheduled-tasks/)
- [DigitalOcean - Async Tasks with BullMQ](https://www.digitalocean.com/community/tutorials/how-to-handle-asynchronous-tasks-with-node-js-and-bullmq)
- [SQLite Background Job System](https://jasongorman.uk/writing/sqlite-background-job-system/)
- [plainjob - SQLite Job Queue](https://github.com/justplainstuff/plainjob)
### Process Management & Supervision
- [Node.js Child Process Documentation](https://nodejs.org/api/child_process.html)
- [PM2 Documentation](https://pm2.keymetrics.io/docs/usage/quick-start/)
- [PM2 Cluster Mode](https://pm2.keymetrics.io/docs/usage/cluster-mode/)
- [TheLinuxCode - Mastering Child Processes](https://thelinuxcode.com/mastering-node-js-child-processes-an-in-depth-practical-guide/)
### Worker Pools
- [Piscina GitHub](https://github.com/piscinajs/piscina)
- [Piscina Documentation](https://piscinajs.dev/)
- [NearForm - Learning Piscina](https://www.nearform.com/blog/learning-to-swim-with-piscina-the-node-js-worker-pool/)
- [Workerpool npm](https://www.npmjs.com/package/workerpool)
- [Advanced Web - Worker Pools in Node.js](https://advancedweb.hu/using-worker-pools-in-nodejs/)
### Hexagonal Architecture
- [Software Patterns Lexicon - Ports and Adapters](https://softwarepatternslexicon.com/patterns-ts/7/7/2/)
- [Better Programming - Ports and Adapters with TypeScript](https://betterprogramming.pub/how-to-ports-and-adapter-with-typescript-32a50a0fc9eb)
- [Alex Rusin - Ports & Adapters Guide](https://blog.alexrusin.com/future-proof-your-code-a-guide-to-ports-adapters-hexagonal-architecture/)
- [DEV - Hexagonal Architecture Examples](https://dev.to/dyarleniber/hexagonal-architecture-and-clean-architecture-with-examples-48oi)
### Event-Driven Architecture
- [Medium - EDA with Node.js](https://medium.com/@erickzanetti/event-driven-architecture-eda-with-node-js-a-modern-approach-and-challenges-82e7d9932b34)
- [FreeCodeCamp - Event-Based Architectures](https://www.freecodecamp.org/news/event-based-architectures-in-javascript-a-handbook-for-devs/)
- [GeeksforGeeks - Node.js Event Architecture](https://www.geeksforgeeks.org/explain-the-event-driven-architecture-of-node-js/)
### File System Watching
- [Chokidar GitHub](https://github.com/paulmillr/chokidar)
- [Chokidar npm](https://www.npmjs.com/package/chokidar)
### Workflow Engines & CI/CD
- [Temporal Documentation](https://docs.temporal.io/evaluate/use-cases-design-patterns)
- [Temporal Blog - Workflow Engine Principles](https://temporal.io/blog/workflow-engine-principles)
- [Temporal TypeScript SDK](https://github.com/temporalio/sdk-typescript)
- [GitHub Actions Runner Architecture](https://depot.dev/blog/github-actions-runner-architecture-part-1-the-listener)
- [nektos/act - Local GitHub Actions](https://github.com/nektos/act)
### tRPC
- [tRPC Documentation](https://trpc.io/docs/)
- [Better Stack - From REST to tRPC](https://betterstack.com/community/guides/scaling-nodejs/trpc-explained/)
- [Medium - Understanding tRPC](https://medium.com/@ignatovich.dm/understanding-trpc-building-type-safe-apis-in-typescript-45258c6c3b73)
### Model Context Protocol
- [MCP Architecture Overview](https://modelcontextprotocol.io/docs/learn/architecture)
- [IBM - What is MCP](https://www.ibm.com/think/topics/model-context-protocol)
- [Google Cloud - MCP Guide](https://cloud.google.com/discover/what-is-model-context-protocol)
- [Composio - MCP Explained](https://composio.dev/blog/what-is-model-context-protocol-mcp-explained)
### Modular Monolith
- [Software Architecture Guild - Modular Monolith](https://software-architecture-guild.com/guide/architecture/styles/modular-monolith/)
- [DEV - Structuring Modular Monoliths](https://dev.to/xoubaman/modular-monolith-3fg1)
- [Medium - Scalable Express API with Modular Monolith](https://medium.com/@mwwtstq/building-a-scalable-express-api-using-clean-architecture-and-a-modular-monolith-with-typescript-c855614b05dc)
- [GitHub - Modular Monolith Node.js](https://github.com/mgce/modular-monolith-nodejs)
### CQRS & Event Sourcing
- [Event-Driven.io - Emmett](https://event-driven.io/en/introducing_emmett/)
- [Medium - CQRS From Scratch with TypeScript](https://medium.com/swlh/cqrs-from-scratch-with-typescript-e2ccf7fc2b64)
- [DEV - Event Sourcing in TypeScript](https://dev.to/dariowoollover/an-opinionated-guide-to-event-sourcing-in-typescript-kickoff-42d6)