feat(01.1-05): add event emission to server and shutdown handler

- CoordinationServer accepts optional EventBus parameter
- Emit ServerStarted event on start (port, host, pid)
- Emit ServerStopped event on stop (uptime)
- Add event emission tests to server tests
- Add GracefulShutdown tests for shutdown() and install()
This commit is contained in:
Lukas May
2026-01-30 14:03:24 +01:00
parent b556c10a69
commit 1f255ee8e3
3 changed files with 258 additions and 1 deletions

View File

@@ -12,6 +12,7 @@ import { join } from 'node:path';
import { randomUUID } from 'node:crypto';
import type { ProcessManager } from '../process/index.js';
import type { LogManager } from '../logging/index.js';
import type { EventBus, ServerStartedEvent, ServerStoppedEvent } from '../events/index.js';
/**
* Creates a mock ProcessManager for testing.
@@ -41,6 +42,18 @@ function createMockLogManager(): LogManager {
} as unknown as LogManager;
}
/**
* Creates a mock EventBus for testing.
*/
function createMockEventBus(): EventBus & { emit: ReturnType<typeof vi.fn> } {
return {
emit: vi.fn(),
on: vi.fn(),
off: vi.fn(),
once: vi.fn(),
} as unknown as EventBus & { emit: ReturnType<typeof vi.fn> };
}
/**
* Gets a random high port to avoid conflicts.
*/
@@ -217,4 +230,60 @@ describe('CoordinationServer', () => {
expect(parseInt(content.trim(), 10)).toBe(process.pid);
});
});
describe('event emission', () => {
let eventBus: ReturnType<typeof createMockEventBus>;
let serverWithEvents: CoordinationServer;
beforeEach(() => {
eventBus = createMockEventBus();
serverWithEvents = new CoordinationServer(
{ port, pidFile },
processManager,
logManager,
eventBus
);
});
afterEach(async () => {
if (serverWithEvents.isRunning()) {
await serverWithEvents.stop();
}
});
it('should emit ServerStarted event on start', async () => {
await serverWithEvents.start();
expect(eventBus.emit).toHaveBeenCalledOnce();
const emittedEvent = eventBus.emit.mock.calls[0][0] as ServerStartedEvent;
expect(emittedEvent.type).toBe('server:started');
expect(emittedEvent.payload.port).toBe(port);
expect(emittedEvent.payload.host).toBe('127.0.0.1');
expect(emittedEvent.payload.pid).toBe(process.pid);
expect(emittedEvent.timestamp).toBeInstanceOf(Date);
});
it('should emit ServerStopped event on stop', async () => {
await serverWithEvents.start();
eventBus.emit.mockClear(); // Clear the start event
await serverWithEvents.stop();
expect(eventBus.emit).toHaveBeenCalledOnce();
const emittedEvent = eventBus.emit.mock.calls[0][0] as ServerStoppedEvent;
expect(emittedEvent.type).toBe('server:stopped');
expect(typeof emittedEvent.payload.uptime).toBe('number');
expect(emittedEvent.payload.uptime).toBeGreaterThanOrEqual(0);
expect(emittedEvent.timestamp).toBeInstanceOf(Date);
});
it('should not emit events if eventBus is not provided', async () => {
// Use the original server without eventBus
await server.start();
await server.stop();
// eventBus.emit should never have been called since server doesn't have it
expect(eventBus.emit).not.toHaveBeenCalled();
});
});
});

View File

@@ -13,6 +13,7 @@ import { join } from 'node:path';
import type { ServerConfig, ServerState, HealthResponse, StatusResponse } from './types.js';
import type { ProcessManager } from '../process/index.js';
import type { LogManager } from '../logging/index.js';
import type { EventBus, ServerStartedEvent, ServerStoppedEvent } from '../events/index.js';
/** Default port for the coordination server */
const DEFAULT_PORT = 3847;
@@ -34,13 +35,15 @@ export class CoordinationServer {
private readonly config: ServerConfig;
private readonly processManager: ProcessManager;
private readonly logManager: LogManager;
private readonly eventBus: EventBus | undefined;
private server: Server | null = null;
private state: ServerState | null = null;
constructor(
config: Partial<ServerConfig>,
processManager: ProcessManager,
logManager: LogManager
logManager: LogManager,
eventBus?: EventBus
) {
this.config = {
port: config.port ?? DEFAULT_PORT,
@@ -49,6 +52,7 @@ export class CoordinationServer {
};
this.processManager = processManager;
this.logManager = logManager;
this.eventBus = eventBus;
}
/**
@@ -91,6 +95,20 @@ export class CoordinationServer {
// Write PID file
await this.writePidFile();
// Emit ServerStarted event
if (this.eventBus) {
const event: ServerStartedEvent = {
type: 'server:started',
timestamp: new Date(),
payload: {
port: this.config.port,
host: this.config.host,
pid: process.pid,
},
};
this.eventBus.emit(event);
}
console.log(`Coordination server listening on http://${this.config.host}:${this.config.port}`);
}
@@ -102,6 +120,23 @@ export class CoordinationServer {
return;
}
// Calculate uptime before clearing state
const uptime = this.state
? Math.floor((Date.now() - this.state.startedAt.getTime()) / 1000)
: 0;
// Emit ServerStopped event before stopping
if (this.eventBus) {
const event: ServerStoppedEvent = {
type: 'server:stopped',
timestamp: new Date(),
payload: {
uptime,
},
};
this.eventBus.emit(event);
}
// Close server
await new Promise<void>((resolve, reject) => {
this.server!.close((err) => {

153
src/server/shutdown.test.ts Normal file
View File

@@ -0,0 +1,153 @@
/**
* GracefulShutdown Tests
*
* Tests for the graceful shutdown handler.
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { GracefulShutdown } from './shutdown.js';
import type { CoordinationServer } from './index.js';
import type { ProcessManager } from '../process/index.js';
import type { LogManager } from '../logging/index.js';
/**
* Creates a mock CoordinationServer for testing.
*/
function createMockServer(): CoordinationServer & {
stop: ReturnType<typeof vi.fn>;
} {
return {
start: vi.fn(),
stop: vi.fn().mockResolvedValue(undefined),
isRunning: vi.fn().mockReturnValue(true),
getPort: vi.fn().mockReturnValue(3847),
getPidFile: vi.fn().mockReturnValue('/tmp/test.pid'),
} as unknown as CoordinationServer & { stop: ReturnType<typeof vi.fn> };
}
/**
* Creates a mock ProcessManager for testing.
*/
function createMockProcessManager(): ProcessManager & {
stopAll: ReturnType<typeof vi.fn>;
} {
return {
spawn: vi.fn(),
stop: vi.fn(),
stopAll: vi.fn().mockResolvedValue(undefined),
restart: vi.fn(),
isRunning: vi.fn(),
} as unknown as ProcessManager & { stopAll: ReturnType<typeof vi.fn> };
}
/**
* Creates a mock LogManager for testing.
*/
function createMockLogManager(): LogManager {
return {
ensureLogDir: vi.fn(),
ensureProcessDir: vi.fn(),
getProcessDir: vi.fn(),
getLogPath: vi.fn(),
listLogs: vi.fn(),
cleanOldLogs: vi.fn(),
getBaseDir: vi.fn(),
} as unknown as LogManager;
}
describe('GracefulShutdown', () => {
let server: ReturnType<typeof createMockServer>;
let processManager: ReturnType<typeof createMockProcessManager>;
let logManager: LogManager;
let shutdown: GracefulShutdown;
beforeEach(() => {
server = createMockServer();
processManager = createMockProcessManager();
logManager = createMockLogManager();
shutdown = new GracefulShutdown(server, processManager, logManager);
// Suppress console output during tests
vi.spyOn(console, 'log').mockImplementation(() => {});
vi.spyOn(console, 'error').mockImplementation(() => {});
});
afterEach(() => {
vi.restoreAllMocks();
});
describe('shutdown()', () => {
it('should call server.stop()', async () => {
await shutdown.shutdown();
expect(server.stop).toHaveBeenCalledOnce();
});
it('should call processManager.stopAll()', async () => {
await shutdown.shutdown();
expect(processManager.stopAll).toHaveBeenCalledOnce();
});
it('should call server.stop() before processManager.stopAll()', async () => {
const callOrder: string[] = [];
server.stop.mockImplementation(async () => {
callOrder.push('server.stop');
});
processManager.stopAll.mockImplementation(async () => {
callOrder.push('processManager.stopAll');
});
await shutdown.shutdown();
expect(callOrder).toEqual(['server.stop', 'processManager.stopAll']);
});
});
describe('install()', () => {
let originalListeners: {
SIGTERM: NodeJS.SignalsListener[];
SIGINT: NodeJS.SignalsListener[];
SIGHUP: NodeJS.SignalsListener[];
};
beforeEach(() => {
// Store original listeners
originalListeners = {
SIGTERM: process.listeners('SIGTERM') as NodeJS.SignalsListener[],
SIGINT: process.listeners('SIGINT') as NodeJS.SignalsListener[],
SIGHUP: process.listeners('SIGHUP') as NodeJS.SignalsListener[],
};
});
afterEach(() => {
// Remove all listeners we added and restore original ones
process.removeAllListeners('SIGTERM');
process.removeAllListeners('SIGINT');
process.removeAllListeners('SIGHUP');
originalListeners.SIGTERM.forEach((l) => process.on('SIGTERM', l));
originalListeners.SIGINT.forEach((l) => process.on('SIGINT', l));
originalListeners.SIGHUP.forEach((l) => process.on('SIGHUP', l));
});
it('should install SIGTERM handler', () => {
const initialCount = process.listenerCount('SIGTERM');
shutdown.install();
expect(process.listenerCount('SIGTERM')).toBe(initialCount + 1);
});
it('should install SIGINT handler', () => {
const initialCount = process.listenerCount('SIGINT');
shutdown.install();
expect(process.listenerCount('SIGINT')).toBe(initialCount + 1);
});
it('should install SIGHUP handler', () => {
const initialCount = process.listenerCount('SIGHUP');
shutdown.install();
expect(process.listenerCount('SIGHUP')).toBe(initialCount + 1);
});
});
});