const Event = require('./Event.js');
const EventTypes = require('./EventTypes.js');
const SlackNotifier = require('../notifiers/slack-notifier');
const AdminEmailNotifier = require('../notifiers/admin-email-notifier');
const BaseEventBusAdapter = require('./adapters/BaseEventBusAdapter.js');
const MemoryEventBusAdapter = require('./adapters/MemoryAdapter.js');
// Singleton state
let adapter = new MemoryEventBusAdapter();
/**
* @module Event System
* @fileoverview EventBus - Event-driven pub/sub system for decoupled architecture.
*
* EventBus implements the Observer Pattern (also called Pub/Sub) to enable loosely-coupled
* communication between modules. This is a core component of the plugin system and allows
* services to emit events without knowing which plugins are listening.
*
* **Observer Pattern:**
* ```
* ┌──────────────┐ ┌──────────────┐
* │ Service │ emit('event') │ EventBus │
* │ (Subject) │ ──────────────────>│ (Mediator) │
* └──────────────┘ └──────┬───────┘
* │ notify
* ┌───────────────────────┼───────────────────────┐
* ↓ ↓ ↓
* ┌──────────┐ ┌──────────┐ ┌──────────┐
* │ Plugin A │ │ Plugin B │ │ Plugin C │
* │(Observer)│ │(Observer)│ │(Observer)│
* └──────────┘ └──────────┘ └──────────┘
* ```
*
* **Architecture Integration:**
* ```
* HTTP Layer (routes)
* ↓
* Service Layer
* ├─> emit events ──────────> EventBus
* ↓ ↓
* Repository Layer Plugins (on events)
* ├─> Analytics Plugin
* ├─> Notification Plugin
* ├─> Cache Invalidation Plugin
* └─> Audit Log Plugin
* ```
*
* **Production Use Cases:**
* 1. **Plugin System**: Services emit events (icon.created, user.signup) and plugins react
* 2. **Cache Invalidation**: Clear related caches when data changes
* 3. **Analytics**: Track business events without coupling to analytics code
* 4. **Notifications**: Send Slack/Email alerts on critical events
* 5. **Audit Logging**: Record all mutations for compliance
* 6. **Distributed Systems**: Redis adapter enables cross-instance communication
*
* **Adapter Pattern:**
* ```
* ┌─────────────┐
* │ EventBus │ ← Application uses this interface
* └──────┬──────┘
* │ (adapter: BaseEventBusAdapter)
* ├────────────────────┬─────────────────────
* ↓ ↓
* ┌────────────────┐ ┌────────────────┐
* │ MemoryAdapter │ │ RedisAdapter │
* └────────────────┘ └────────────────┘
* (single instance) (distributed)
* ```
*
* **Error Handling:**
* - Handlers are automatically wrapped with try/catch
* - Failed handlers don't crash the application
* - Errors can trigger Slack/Email notifications
* - Other handlers continue executing even if one fails
* - Promise.allSettled ensures notification delivery
*
* **Memory Management:**
* - WeakMaps for handler tracking (garbage collection friendly)
* - Automatic cleanup when handlers are removed
* - No memory leaks from handler registrations
*
* @example
* // Service emits events automatically (via withPluggable mixin)
* const icon = await iconService.create({ name: 'home', svgPath: '...' });
* // Emits: 'icon.created' event
*
* @example
* // Plugin listens for events
* eventBus.on('icon.created', async (icon) => {
* await analyticsService.track('Icon Created', {
* iconId: icon.id,
* setId: icon.setId
* });
* });
*
* @example
* // Error handling with notifications
* eventBus.on('order.completed', async (order) => {
* await processPayment(order); // Might throw error
* }, {
* onError: { notify: ['slack', 'email'] }
* });
* // If processPayment fails, Slack and Email are notified
*
* @example
* // Distributed events with Redis adapter
* const redisAdapter = new RedisEventBusAdapter(redisClient);
* eventBus.setAdapter(redisAdapter);
* // Now events are distributed across all server instances
*
* @example
* // Memory-safe one-time handlers
* eventBus.once('app.ready', async () => {
* console.log('Application started');
* await seedDatabase();
* });
* // Handler auto-removed after execution, no memory leak
*
* @see {@link withPluggable} For automatic event emission in services
* @see {@link EventTypes} For predefined event types
* @see {@link Event} For event wrapper with metadata
*/
/**
* Event-driven pub/sub system with adapter pattern and error recovery.
*
* EventBus is the backbone of the plugin architecture. It enables:
* - **Decoupling**: Services emit events without knowing who's listening
* - **Extensibility**: Add new plugins without modifying existing code
* - **Resilience**: Handler failures don't crash the application
* - **Observability**: Monitor events via Slack/Email notifications
* - **Scalability**: Distribute events across instances via Redis adapter
*
* **How It Works:**
* 1. Services (with `withPluggable` mixin) emit events after operations
* 2. EventBus wraps handlers with error handling and notification logic
* 3. All registered handlers execute asynchronously (non-blocking)
* 4. Failed handlers trigger notifications but don't affect others
*
* **Adapter Swapping:**
* ```javascript
* // Development: In-memory events (single instance)
* const eventBus = new EventBus();
* eventBus.setAdapter(new MemoryEventBusAdapter());
*
* // Production: Distributed events (multi-instance)
* const redisClient = redis.createClient({ url: process.env.REDIS_URL });
* await redisClient.connect();
* eventBus.setAdapter(new RedisEventBusAdapter(redisClient));
* ```
*
* **Memory Management:**
* - Uses WeakMaps to track handler configs (no strong references)
* - Handlers are garbage collected when no longer referenced
* - off() method cleans up WeakMap entries explicitly
*
* **Performance:**
* - Memory adapter: ~0.01ms per emit (synchronous, in-process)
* - Redis adapter: ~1-5ms per emit (network overhead, pub/sub)
* - Handlers execute async (don't block service methods)
*
* **Event Naming Convention:**
* Use `entity.operation` format: `icon.created`, `user.signup`, `order.completed`
* See EventTypes.js for predefined constants.
*
* @class EventBus
*
* @example
* // Basic usage
* const eventBus = new EventBus();
*
* eventBus.on('user.signup', async (user) => {
* console.log('New user:', user.email);
* await sendWelcomeEmail(user);
* });
*
* eventBus.emit('user.signup', { id: 123, email: 'user@example.com' });
*
* @example
* // Plugin system integration
* class AnalyticsPlugin {
* constructor(eventBus) {
* eventBus.on('icon.created', this.trackIconCreation.bind(this));
* eventBus.on('icon.updated', this.trackIconUpdate.bind(this));
* eventBus.on('icon.deleted', this.trackIconDeletion.bind(this));
* }
*
* async trackIconCreation(icon) {
* await analytics.track('Icon Created', { iconId: icon.id });
* }
* }
*
* @example
* // Cache invalidation pattern
* eventBus.on('icon.updated', async (icon) => {
* await cache.clearCache({ baseKey: 'icons' });
* console.log('Cleared icon cache after update');
* });
*
* @example
* // Error handling with notifications
* eventBus.on('payment.failed', async (payment) => {
* await notifyFinanceTeam(payment);
* throw new Error('Payment processing failed'); // Will trigger notifications
* }, {
* onError: { notify: ['slack', 'email'] }
* });
*
* @example
* // Distributed events across server instances
* // Instance 1:
* eventBus.emit('user.login', { userId: 123 });
*
* // Instance 2 (different server):
* eventBus.on('user.login', async (data) => {
* console.log('User logged in on another instance:', data.userId);
* });
* // Works with RedisEventBusAdapter!
*/
class EventBus {
/**
* Construct EventBus with default notifiers and memory-efficient tracking.
*
* Initializes:
* - SlackNotifier for error alerts to Slack channels
* - AdminEmailNotifier for error emails to administrators
* - WeakMaps for memory-safe handler configuration tracking
* - Default MemoryAdapter for single-instance events
*
* **WeakMap Benefits:**
* - Handlers can be garbage collected when no longer referenced
* - No memory leaks from forgotten handlers
* - Automatic cleanup without explicit off() calls
*
* @example
* // Basic construction
* const eventBus = new EventBus();
*
* @example
* // With custom adapter
* const eventBus = new EventBus();
* eventBus.setAdapter(new RedisEventBusAdapter(redisClient));
*
* @example
* // Singleton pattern (recommended for application-wide use)
* // event-bus-singleton.js
* const EventBus = require('./EventBus');
* module.exports = new EventBus();
*
* // app.js
* const eventBus = require('./event-bus-singleton');
* eventBus.on('app.ready', () => console.log('App ready'));
*/
constructor() {
/**
* Slack notifier for error alerts.
* @type {SlackNotifier}
* @private
*/
this.slackNotifier = new SlackNotifier();
/**
* Email notifier for error alerts.
* @type {AdminEmailNotifier}
* @private
*/
this.emailNotifier = new AdminEmailNotifier();
/**
* Maps handlers to their configuration options.
* @type {WeakMap}
* @private
*/
this.handlerConfigs = new WeakMap();
/**
* Maps original handlers to their wrapped versions.
* @type {WeakMap}
* @private
*/
this.wrappedHandlers = new WeakMap();
}
/**
* Sets the event bus adapter (Memory, Redis, etc.).
*
* Allows swapping the underlying pub/sub implementation without
* changing consumer code. Useful for switching from in-memory
* events to distributed events via Redis.
*
* @param {BaseEventBusAdapter} nextAdapter - The new adapter instance
* @throws {Error} If adapter is not a BaseEventBusAdapter instance
*
* @example
* // Switch to Redis adapter
* const redisAdapter = new RedisEventBusAdapter(redisClient);
* eventBus.setAdapter(redisAdapter);
*
* @example
* // Use memory adapter (default)
* eventBus.setAdapter(new MemoryEventBusAdapter());
*/
setAdapter(nextAdapter) {
if (!(nextAdapter instanceof BaseEventBusAdapter)) {
throw new Error('EventBus.setAdapter requires an BaseEventBusAdapter instance');
}
adapter = nextAdapter;
}
/**
* Subscribes to an event with an async handler function.
*
* Handlers are automatically wrapped with error handling. Failed handlers
* won't crash the application and can optionally notify via Slack/Email.
*
* @param {string} event - The event type to listen for (e.g., EventTypes.USER_SIGNUP)
* @param {Function} handler - Async function to handle the event: `async (payload) => void`
* @param {Object} [config={}] - Handler configuration
* @param {Object} [config.onError] - Error handling configuration
* @param {Array<string>} [config.onError.notify] - Notifiers to use on error: ['slack', 'email']
*
* @example
* // Basic subscription
* eventBus.on(EventTypes.USER_SIGNUP, async (user) => {
* console.log('New user:', user.email);
* await sendWelcomeEmail(user);
* });
*
* @example
* // With error notifications
* eventBus.on(EventTypes.ORDER_COMPLETED, async (order) => {
* await processOrder(order);
* }, {
* onError: { notify: ['slack', 'email'] }
* });
*
* @example
* // Multiple handlers for same event
* eventBus.on(EventTypes.USER_VERIFY_EMAIL, sendWelcomeEmail);
* eventBus.on(EventTypes.USER_VERIFY_EMAIL, createCouponCode);
* eventBus.on(EventTypes.USER_VERIFY_EMAIL, notifySlack);
*/
on(event, handler, config = {}) {
if (!event || !handler) return;
const wrapped = async (payload) => {
await this.safeRun(event, handler, payload);
};
this.handlerConfigs.set(handler, config);
this.wrappedHandlers.set(handler, wrapped);
adapter.on(event, wrapped);
}
/**
* Unsubscribes a handler from an event.
*
* Removes the handler registration and cleans up associated WeakMap entries.
*
* @param {string} event - The event type
* @param {Function} handler - The original handler function passed to `on()`
*
* @example
* const handler = async (user) => console.log(user);
* eventBus.on(EventTypes.USER_SIGNUP, handler);
*
* // Later, unsubscribe
* eventBus.off(EventTypes.USER_SIGNUP, handler);
*
* @example
* // Conditional subscription
* if (config.enableNotifications) {
* eventBus.on(EventTypes.ORDER_COMPLETED, notifyHandler);
* } else {
* eventBus.off(EventTypes.ORDER_COMPLETED, notifyHandler);
* }
*/
off(event, handler) {
if (!event || !handler) return;
const wrapped = this.wrappedHandlers.get(handler);
if (!wrapped) return;
adapter.off(event, wrapped);
this.handlerConfigs.delete(handler);
this.wrappedHandlers.delete(handler);
}
/**
* Subscribes to an event for a single execution.
*
* The handler will be automatically unsubscribed after it executes once.
*
* @param {string} event - The event type
* @param {Function} handler - Async function to handle the event once
* @param {Object} [config={}] - Handler configuration (same as `on()`)
*
* @example
* // Execute only on first user signup
* eventBus.once(EventTypes.USER_SIGNUP, async (user) => {
* console.log('First user signed up:', user.email);
* await sendFounderEmail(user);
* });
*
* @example
* // Wait for specific event (promise pattern)
* function waitForOrderComplete(orderId) {
* return new Promise(resolve => {
* eventBus.once(EventTypes.ORDER_COMPLETED, (order) => {
* if (order.id === orderId) resolve(order);
* });
* });
* }
*/
once(event, handler, config = {}) {
if (!event || !handler) return;
const wrapped = async (payload) => {
await this.safeRun(event, handler, payload);
};
this.handlerConfigs.set(handler, config);
this.wrappedHandlers.set(handler, wrapped);
adapter.once(event, wrapped);
}
/**
* Emits an event with optional payload data.
*
* All registered handlers for this event will be called asynchronously.
* Handlers run independently - one handler's failure won't affect others.
*
* @param {string} event - The event type to emit
* @param {*} [payload] - Data to pass to handlers (any type)
*
* @returns {boolean} True if event was emitted, false if event name invalid
*
* @example
* // Emit with object payload
* eventBus.emit(EventTypes.USER_SIGNUP, {
* id: 123,
* email: 'user@example.com',
* displayName: 'John Doe'
* });
*
* @example
* // Emit without payload
* eventBus.emit(EventTypes.SYSTEM_READY);
*
* @example
* // Emit with primitive payload
* eventBus.emit(EventTypes.ORDER_COMPLETED, orderId);
*
* @example
* // Emit after database update
* await iconService.createIcon(data);
* eventBus.emit(EventTypes.ICON_CREATED, icon);
* // All plugins listening to ICON_CREATED will execute
*/
emit(event, payload) {
if (!event) return false;
adapter.emit(event, Event.create(event, payload));
return true;
}
/**
* Removes all event handlers and clears internal state.
*
* Useful for testing or resetting the event bus to a clean state.
*
* @example
* // Clean up after tests
* afterEach(() => {
* eventBus.clear();
* });
*
* @example
* // Reset event bus
* eventBus.clear();
* eventBus.on(EventTypes.USER_SIGNUP, newHandler);
*/
clear() {
adapter.clear();
this.handlerConfigs = new WeakMap();
this.wrappedHandlers = new WeakMap();
}
/**
* Safely executes a handler with error handling and notifications.
*
* If the handler throws an error:
* 1. Error is logged to console
* 2. Configured notifiers are triggered (Slack, Email)
* 3. Event bus continues operating normally
*
* This prevents one failing handler from crashing the application
* or preventing other handlers from executing.
*
* @async
* @param {string} eventName - The event name (for error reporting)
* @param {Function} handler - The handler function to execute
* @param {*} payload - The event payload to pass to handler
*
* @private
*
* @example
* // This is called automatically by on(), once(), etc.
* // You don't need to call it directly
*/
async safeRun(eventName, handler, payload) {
try {
await handler(payload);
}
catch (error) {
console.error(`Error in plugin handler for event "${eventName}":`, error);
const config = this.handlerConfigs.get(handler) || {};
const notifiers = config?.onError?.notify || [];
const subject = `Error in plugin handler for ${eventName}`;
const tasks = [];
if (notifiers.includes('slack')) {
tasks.push(this.slackNotifier.notify(subject, error));
}
if (notifiers.includes('email')) {
tasks.push(this.emailNotifier.notify(subject, error));
}
await Promise.allSettled(tasks);
}
}
}
module.exports = EventBus;
Source