# new EventBus()
Event-driven pub/sub system with adapter pattern and error recovery.
EventBus is the backbone of the plugin architecture. It enables:
- Decoupling: Services emit events without knowing who's listening
- Extensibility: Add new plugins without modifying existing code
- Resilience: Handler failures don't crash the application
- Observability: Monitor events via Slack/Email notifications
- Scalability: Distribute events across instances via Redis adapter
How It Works:
- Services (with
withPluggablemixin) emit events after operations - EventBus wraps handlers with error handling and notification logic
- All registered handlers execute asynchronously (non-blocking)
- Failed handlers trigger notifications but don't affect others
Adapter Swapping:
// Development: In-memory events (single instance)
const eventBus = new EventBus();
eventBus.setAdapter(new MemoryEventBusAdapter());
// Production: Distributed events (multi-instance)
const redisClient = redis.createClient({ url: process.env.REDIS_URL });
await redisClient.connect();
eventBus.setAdapter(new RedisEventBusAdapter(redisClient));
Memory Management:
- Uses WeakMaps to track handler configs (no strong references)
- Handlers are garbage collected when no longer referenced
- off() method cleans up WeakMap entries explicitly
Performance:
- Memory adapter: ~0.01ms per emit (synchronous, in-process)
- Redis adapter: ~1-5ms per emit (network overhead, pub/sub)
- Handlers execute async (don't block service methods)
Event Naming Convention:
Use entity.operation format: icon.created, user.signup, order.completed
See EventTypes.js for predefined constants.
Examples
// Basic usage
const eventBus = new EventBus();
eventBus.on('user.signup', async (user) => {
console.log('New user:', user.email);
await sendWelcomeEmail(user);
});
eventBus.emit('user.signup', { id: 123, email: 'user@example.com' });
// Plugin system integration
class AnalyticsPlugin {
constructor(eventBus) {
eventBus.on('icon.created', this.trackIconCreation.bind(this));
eventBus.on('icon.updated', this.trackIconUpdate.bind(this));
eventBus.on('icon.deleted', this.trackIconDeletion.bind(this));
}
async trackIconCreation(icon) {
await analytics.track('Icon Created', { iconId: icon.id });
}
}
// Cache invalidation pattern
eventBus.on('icon.updated', async (icon) => {
await cache.clearCache({ baseKey: 'icons' });
console.log('Cleared icon cache after update');
});
// Error handling with notifications
eventBus.on('payment.failed', async (payment) => {
await notifyFinanceTeam(payment);
throw new Error('Payment processing failed'); // Will trigger notifications
}, {
onError: { notify: ['slack', 'email'] }
});
// Distributed events across server instances
// Instance 1:
eventBus.emit('user.login', { userId: 123 });
// Instance 2 (different server):
eventBus.on('user.login', async (data) => {
console.log('User logged in on another instance:', data.userId);
});
// Works with RedisEventBusAdapter!
Methods
# clear()
Removes all event handlers and clears internal state.
Useful for testing or resetting the event bus to a clean state.
Examples
// Clean up after tests
afterEach(() => {
eventBus.clear();
});
// Reset event bus
eventBus.clear();
eventBus.on(EventTypes.USER_SIGNUP, newHandler);
# emit(event, payloadopt) → {boolean}
Emits an event with optional payload data.
All registered handlers for this event will be called asynchronously. Handlers run independently - one handler's failure won't affect others.
Parameters:
| Name | Type | Attributes | Description |
|---|---|---|---|
event |
string
|
The event type to emit |
|
payload |
*
|
<optional> |
Data to pass to handlers (any type) |
True if event was emitted, false if event name invalid
boolean
Examples
// Emit with object payload
eventBus.emit(EventTypes.USER_SIGNUP, {
id: 123,
email: 'user@example.com',
displayName: 'John Doe'
});
// Emit without payload
eventBus.emit(EventTypes.SYSTEM_READY);
// Emit with primitive payload
eventBus.emit(EventTypes.ORDER_COMPLETED, orderId);
// Emit after database update
await iconService.createIcon(data);
eventBus.emit(EventTypes.ICON_CREATED, icon);
// All plugins listening to ICON_CREATED will execute
# off(event, handler)
Unsubscribes a handler from an event.
Removes the handler registration and cleans up associated WeakMap entries.
Parameters:
| Name | Type | Description |
|---|---|---|
event |
string
|
The event type |
handler |
function
|
The original handler function passed to |
Examples
const handler = async (user) => console.log(user);
eventBus.on(EventTypes.USER_SIGNUP, handler);
// Later, unsubscribe
eventBus.off(EventTypes.USER_SIGNUP, handler);
// Conditional subscription
if (config.enableNotifications) {
eventBus.on(EventTypes.ORDER_COMPLETED, notifyHandler);
} else {
eventBus.off(EventTypes.ORDER_COMPLETED, notifyHandler);
}
# on(event, handler, configopt)
Subscribes to an event with an async handler function.
Handlers are automatically wrapped with error handling. Failed handlers won't crash the application and can optionally notify via Slack/Email.
Parameters:
| Name | Type | Attributes | Default | Description |
|---|---|---|---|---|
event |
string
|
The event type to listen for (e.g., EventTypes.USER_SIGNUP) |
||
handler |
function
|
Async function to handle the event: |
||
config |
Object
|
<optional> |
{} | Handler configuration |
onError |
Object
|
<optional> |
Error handling configuration |
|
onError.notify |
Array.<string>
|
<optional> |
Notifiers to use on error: ['slack', 'email'] |
Examples
// Basic subscription
eventBus.on(EventTypes.USER_SIGNUP, async (user) => {
console.log('New user:', user.email);
await sendWelcomeEmail(user);
});
// With error notifications
eventBus.on(EventTypes.ORDER_COMPLETED, async (order) => {
await processOrder(order);
}, {
onError: { notify: ['slack', 'email'] }
});
// Multiple handlers for same event
eventBus.on(EventTypes.USER_VERIFY_EMAIL, sendWelcomeEmail);
eventBus.on(EventTypes.USER_VERIFY_EMAIL, createCouponCode);
eventBus.on(EventTypes.USER_VERIFY_EMAIL, notifySlack);
# once(event, handler, configopt)
Subscribes to an event for a single execution.
The handler will be automatically unsubscribed after it executes once.
Parameters:
| Name | Type | Attributes | Default | Description |
|---|---|---|---|---|
event |
string
|
The event type |
||
handler |
function
|
Async function to handle the event once |
||
config |
Object
|
<optional> |
{} | Handler configuration (same as |
Examples
// Execute only on first user signup
eventBus.once(EventTypes.USER_SIGNUP, async (user) => {
console.log('First user signed up:', user.email);
await sendFounderEmail(user);
});
// Wait for specific event (promise pattern)
function waitForOrderComplete(orderId) {
return new Promise(resolve => {
eventBus.once(EventTypes.ORDER_COMPLETED, (order) => {
if (order.id === orderId) resolve(order);
});
});
}
# setAdapter(nextAdapter)
Sets the event bus adapter (Memory, Redis, etc.).
Allows swapping the underlying pub/sub implementation without changing consumer code. Useful for switching from in-memory events to distributed events via Redis.
Parameters:
| Name | Type | Description |
|---|---|---|
nextAdapter |
BaseEventBusAdapter
|
The new adapter instance |
If adapter is not a BaseEventBusAdapter instance
Error
Examples
// Switch to Redis adapter
const redisAdapter = new RedisEventBusAdapter(redisClient);
eventBus.setAdapter(redisAdapter);
// Use memory adapter (default)
eventBus.setAdapter(new MemoryEventBusAdapter());