Each SSE client registers a listener per event type (30+ types), so a few concurrent connections easily exceed the previous limit of 100. Listeners are properly cleaned up on disconnect via eventBusIterable's finally block, so this is not a real leak.
69 lines
1.8 KiB
TypeScript
69 lines
1.8 KiB
TypeScript
/**
|
||
* Event Emitter Bus - Adapter Implementation
|
||
*
|
||
* In-process EventBus implementation using Node.js EventEmitter.
|
||
* This is the ADAPTER for the EventBus PORT.
|
||
*
|
||
* Can be swapped for RabbitMQ, Kafka, WebSocket, etc. later
|
||
* without changing consumers.
|
||
*/
|
||
|
||
import { EventEmitter } from 'node:events';
|
||
import type { DomainEvent, EventBus } from './types.js';
|
||
|
||
/**
|
||
* EventEmitter-based implementation of the EventBus interface.
|
||
*
|
||
* Wraps Node.js EventEmitter to provide type-safe event handling
|
||
* that conforms to the EventBus port interface.
|
||
*/
|
||
export class EventEmitterBus implements EventBus {
|
||
private emitter: EventEmitter;
|
||
|
||
constructor() {
|
||
this.emitter = new EventEmitter();
|
||
// SSE subscriptions register per-event-type listeners (30+ types × N clients).
|
||
// Listeners are properly cleaned up on disconnect, so disable the warning.
|
||
this.emitter.setMaxListeners(0);
|
||
}
|
||
|
||
/**
|
||
* Emit an event to all subscribed handlers.
|
||
* The event's type property determines which handlers receive it.
|
||
*/
|
||
emit<T extends DomainEvent>(event: T): void {
|
||
this.emitter.emit(event.type, event);
|
||
}
|
||
|
||
/**
|
||
* Subscribe to events of a specific type.
|
||
*/
|
||
on<T extends DomainEvent>(
|
||
eventType: T['type'],
|
||
handler: (event: T) => void
|
||
): void {
|
||
this.emitter.on(eventType, handler);
|
||
}
|
||
|
||
/**
|
||
* Unsubscribe from events of a specific type.
|
||
*/
|
||
off<T extends DomainEvent>(
|
||
eventType: T['type'],
|
||
handler: (event: T) => void
|
||
): void {
|
||
this.emitter.off(eventType, handler);
|
||
}
|
||
|
||
/**
|
||
* Subscribe to a single occurrence of an event type.
|
||
* Handler is automatically removed after first invocation.
|
||
*/
|
||
once<T extends DomainEvent>(
|
||
eventType: T['type'],
|
||
handler: (event: T) => void
|
||
): void {
|
||
this.emitter.once(eventType, handler);
|
||
}
|
||
}
|