feat(01.1-05): add event emission to server and shutdown handler
- CoordinationServer accepts optional EventBus parameter - Emit ServerStarted event on start (port, host, pid) - Emit ServerStopped event on stop (uptime) - Add event emission tests to server tests - Add GracefulShutdown tests for shutdown() and install()
This commit is contained in:
@@ -12,6 +12,7 @@ import { join } from 'node:path';
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import type { ProcessManager } from '../process/index.js';
|
||||
import type { LogManager } from '../logging/index.js';
|
||||
import type { EventBus, ServerStartedEvent, ServerStoppedEvent } from '../events/index.js';
|
||||
|
||||
/**
|
||||
* Creates a mock ProcessManager for testing.
|
||||
@@ -41,6 +42,18 @@ function createMockLogManager(): LogManager {
|
||||
} as unknown as LogManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a mock EventBus for testing.
|
||||
*/
|
||||
function createMockEventBus(): EventBus & { emit: ReturnType<typeof vi.fn> } {
|
||||
return {
|
||||
emit: vi.fn(),
|
||||
on: vi.fn(),
|
||||
off: vi.fn(),
|
||||
once: vi.fn(),
|
||||
} as unknown as EventBus & { emit: ReturnType<typeof vi.fn> };
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a random high port to avoid conflicts.
|
||||
*/
|
||||
@@ -217,4 +230,60 @@ describe('CoordinationServer', () => {
|
||||
expect(parseInt(content.trim(), 10)).toBe(process.pid);
|
||||
});
|
||||
});
|
||||
|
||||
describe('event emission', () => {
|
||||
let eventBus: ReturnType<typeof createMockEventBus>;
|
||||
let serverWithEvents: CoordinationServer;
|
||||
|
||||
beforeEach(() => {
|
||||
eventBus = createMockEventBus();
|
||||
serverWithEvents = new CoordinationServer(
|
||||
{ port, pidFile },
|
||||
processManager,
|
||||
logManager,
|
||||
eventBus
|
||||
);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
if (serverWithEvents.isRunning()) {
|
||||
await serverWithEvents.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it('should emit ServerStarted event on start', async () => {
|
||||
await serverWithEvents.start();
|
||||
|
||||
expect(eventBus.emit).toHaveBeenCalledOnce();
|
||||
const emittedEvent = eventBus.emit.mock.calls[0][0] as ServerStartedEvent;
|
||||
expect(emittedEvent.type).toBe('server:started');
|
||||
expect(emittedEvent.payload.port).toBe(port);
|
||||
expect(emittedEvent.payload.host).toBe('127.0.0.1');
|
||||
expect(emittedEvent.payload.pid).toBe(process.pid);
|
||||
expect(emittedEvent.timestamp).toBeInstanceOf(Date);
|
||||
});
|
||||
|
||||
it('should emit ServerStopped event on stop', async () => {
|
||||
await serverWithEvents.start();
|
||||
eventBus.emit.mockClear(); // Clear the start event
|
||||
|
||||
await serverWithEvents.stop();
|
||||
|
||||
expect(eventBus.emit).toHaveBeenCalledOnce();
|
||||
const emittedEvent = eventBus.emit.mock.calls[0][0] as ServerStoppedEvent;
|
||||
expect(emittedEvent.type).toBe('server:stopped');
|
||||
expect(typeof emittedEvent.payload.uptime).toBe('number');
|
||||
expect(emittedEvent.payload.uptime).toBeGreaterThanOrEqual(0);
|
||||
expect(emittedEvent.timestamp).toBeInstanceOf(Date);
|
||||
});
|
||||
|
||||
it('should not emit events if eventBus is not provided', async () => {
|
||||
// Use the original server without eventBus
|
||||
await server.start();
|
||||
await server.stop();
|
||||
|
||||
// eventBus.emit should never have been called since server doesn't have it
|
||||
expect(eventBus.emit).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -13,6 +13,7 @@ import { join } from 'node:path';
|
||||
import type { ServerConfig, ServerState, HealthResponse, StatusResponse } from './types.js';
|
||||
import type { ProcessManager } from '../process/index.js';
|
||||
import type { LogManager } from '../logging/index.js';
|
||||
import type { EventBus, ServerStartedEvent, ServerStoppedEvent } from '../events/index.js';
|
||||
|
||||
/** Default port for the coordination server */
|
||||
const DEFAULT_PORT = 3847;
|
||||
@@ -34,13 +35,15 @@ export class CoordinationServer {
|
||||
private readonly config: ServerConfig;
|
||||
private readonly processManager: ProcessManager;
|
||||
private readonly logManager: LogManager;
|
||||
private readonly eventBus: EventBus | undefined;
|
||||
private server: Server | null = null;
|
||||
private state: ServerState | null = null;
|
||||
|
||||
constructor(
|
||||
config: Partial<ServerConfig>,
|
||||
processManager: ProcessManager,
|
||||
logManager: LogManager
|
||||
logManager: LogManager,
|
||||
eventBus?: EventBus
|
||||
) {
|
||||
this.config = {
|
||||
port: config.port ?? DEFAULT_PORT,
|
||||
@@ -49,6 +52,7 @@ export class CoordinationServer {
|
||||
};
|
||||
this.processManager = processManager;
|
||||
this.logManager = logManager;
|
||||
this.eventBus = eventBus;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -91,6 +95,20 @@ export class CoordinationServer {
|
||||
// Write PID file
|
||||
await this.writePidFile();
|
||||
|
||||
// Emit ServerStarted event
|
||||
if (this.eventBus) {
|
||||
const event: ServerStartedEvent = {
|
||||
type: 'server:started',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
port: this.config.port,
|
||||
host: this.config.host,
|
||||
pid: process.pid,
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
|
||||
console.log(`Coordination server listening on http://${this.config.host}:${this.config.port}`);
|
||||
}
|
||||
|
||||
@@ -102,6 +120,23 @@ export class CoordinationServer {
|
||||
return;
|
||||
}
|
||||
|
||||
// Calculate uptime before clearing state
|
||||
const uptime = this.state
|
||||
? Math.floor((Date.now() - this.state.startedAt.getTime()) / 1000)
|
||||
: 0;
|
||||
|
||||
// Emit ServerStopped event before stopping
|
||||
if (this.eventBus) {
|
||||
const event: ServerStoppedEvent = {
|
||||
type: 'server:stopped',
|
||||
timestamp: new Date(),
|
||||
payload: {
|
||||
uptime,
|
||||
},
|
||||
};
|
||||
this.eventBus.emit(event);
|
||||
}
|
||||
|
||||
// Close server
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
this.server!.close((err) => {
|
||||
|
||||
153
src/server/shutdown.test.ts
Normal file
153
src/server/shutdown.test.ts
Normal file
@@ -0,0 +1,153 @@
|
||||
/**
|
||||
* GracefulShutdown Tests
|
||||
*
|
||||
* Tests for the graceful shutdown handler.
|
||||
*/
|
||||
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||||
import { GracefulShutdown } from './shutdown.js';
|
||||
import type { CoordinationServer } from './index.js';
|
||||
import type { ProcessManager } from '../process/index.js';
|
||||
import type { LogManager } from '../logging/index.js';
|
||||
|
||||
/**
|
||||
* Creates a mock CoordinationServer for testing.
|
||||
*/
|
||||
function createMockServer(): CoordinationServer & {
|
||||
stop: ReturnType<typeof vi.fn>;
|
||||
} {
|
||||
return {
|
||||
start: vi.fn(),
|
||||
stop: vi.fn().mockResolvedValue(undefined),
|
||||
isRunning: vi.fn().mockReturnValue(true),
|
||||
getPort: vi.fn().mockReturnValue(3847),
|
||||
getPidFile: vi.fn().mockReturnValue('/tmp/test.pid'),
|
||||
} as unknown as CoordinationServer & { stop: ReturnType<typeof vi.fn> };
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a mock ProcessManager for testing.
|
||||
*/
|
||||
function createMockProcessManager(): ProcessManager & {
|
||||
stopAll: ReturnType<typeof vi.fn>;
|
||||
} {
|
||||
return {
|
||||
spawn: vi.fn(),
|
||||
stop: vi.fn(),
|
||||
stopAll: vi.fn().mockResolvedValue(undefined),
|
||||
restart: vi.fn(),
|
||||
isRunning: vi.fn(),
|
||||
} as unknown as ProcessManager & { stopAll: ReturnType<typeof vi.fn> };
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a mock LogManager for testing.
|
||||
*/
|
||||
function createMockLogManager(): LogManager {
|
||||
return {
|
||||
ensureLogDir: vi.fn(),
|
||||
ensureProcessDir: vi.fn(),
|
||||
getProcessDir: vi.fn(),
|
||||
getLogPath: vi.fn(),
|
||||
listLogs: vi.fn(),
|
||||
cleanOldLogs: vi.fn(),
|
||||
getBaseDir: vi.fn(),
|
||||
} as unknown as LogManager;
|
||||
}
|
||||
|
||||
describe('GracefulShutdown', () => {
|
||||
let server: ReturnType<typeof createMockServer>;
|
||||
let processManager: ReturnType<typeof createMockProcessManager>;
|
||||
let logManager: LogManager;
|
||||
let shutdown: GracefulShutdown;
|
||||
|
||||
beforeEach(() => {
|
||||
server = createMockServer();
|
||||
processManager = createMockProcessManager();
|
||||
logManager = createMockLogManager();
|
||||
shutdown = new GracefulShutdown(server, processManager, logManager);
|
||||
|
||||
// Suppress console output during tests
|
||||
vi.spyOn(console, 'log').mockImplementation(() => {});
|
||||
vi.spyOn(console, 'error').mockImplementation(() => {});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe('shutdown()', () => {
|
||||
it('should call server.stop()', async () => {
|
||||
await shutdown.shutdown();
|
||||
|
||||
expect(server.stop).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it('should call processManager.stopAll()', async () => {
|
||||
await shutdown.shutdown();
|
||||
|
||||
expect(processManager.stopAll).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it('should call server.stop() before processManager.stopAll()', async () => {
|
||||
const callOrder: string[] = [];
|
||||
|
||||
server.stop.mockImplementation(async () => {
|
||||
callOrder.push('server.stop');
|
||||
});
|
||||
processManager.stopAll.mockImplementation(async () => {
|
||||
callOrder.push('processManager.stopAll');
|
||||
});
|
||||
|
||||
await shutdown.shutdown();
|
||||
|
||||
expect(callOrder).toEqual(['server.stop', 'processManager.stopAll']);
|
||||
});
|
||||
});
|
||||
|
||||
describe('install()', () => {
|
||||
let originalListeners: {
|
||||
SIGTERM: NodeJS.SignalsListener[];
|
||||
SIGINT: NodeJS.SignalsListener[];
|
||||
SIGHUP: NodeJS.SignalsListener[];
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
// Store original listeners
|
||||
originalListeners = {
|
||||
SIGTERM: process.listeners('SIGTERM') as NodeJS.SignalsListener[],
|
||||
SIGINT: process.listeners('SIGINT') as NodeJS.SignalsListener[],
|
||||
SIGHUP: process.listeners('SIGHUP') as NodeJS.SignalsListener[],
|
||||
};
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
// Remove all listeners we added and restore original ones
|
||||
process.removeAllListeners('SIGTERM');
|
||||
process.removeAllListeners('SIGINT');
|
||||
process.removeAllListeners('SIGHUP');
|
||||
|
||||
originalListeners.SIGTERM.forEach((l) => process.on('SIGTERM', l));
|
||||
originalListeners.SIGINT.forEach((l) => process.on('SIGINT', l));
|
||||
originalListeners.SIGHUP.forEach((l) => process.on('SIGHUP', l));
|
||||
});
|
||||
|
||||
it('should install SIGTERM handler', () => {
|
||||
const initialCount = process.listenerCount('SIGTERM');
|
||||
shutdown.install();
|
||||
expect(process.listenerCount('SIGTERM')).toBe(initialCount + 1);
|
||||
});
|
||||
|
||||
it('should install SIGINT handler', () => {
|
||||
const initialCount = process.listenerCount('SIGINT');
|
||||
shutdown.install();
|
||||
expect(process.listenerCount('SIGINT')).toBe(initialCount + 1);
|
||||
});
|
||||
|
||||
it('should install SIGHUP handler', () => {
|
||||
const initialCount = process.listenerCount('SIGHUP');
|
||||
shutdown.install();
|
||||
expect(process.listenerCount('SIGHUP')).toBe(initialCount + 1);
|
||||
});
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user