TemplateConfiguration parameters including database connection, publish callback, and consumer settings
An OutboxConsumer instance ready to start consuming events
import { createOutboxConsumer } from '@arturwojnar/hermes-mongodb'
import { MongoClient } from 'mongodb'
type DomainEvent =
| { type: 'MedicineAssigned'; patientId: string; medicineId: string }
| { type: 'TaskCompleted'; taskId: string; completedAt: Date }
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
const outbox = createOutboxConsumer<DomainEvent>({
client,
db: client.db('hospital'),
publish: async (event) => {
// IMPORTANT: Throw error on failure to trigger retry
await messageBroker.publish(event)
}
})
// Start consuming events
const stop = await outbox.start()
// Publish event with business logic in same transaction
await outbox.publish(
{
type: 'MedicineAssigned',
patientId: 'patient-123',
medicineId: 'med-456'
},
async (session, db) => {
// Store assignment in same transaction
await db.collection('medicine_assignments').insertOne({
patientId: 'patient-123',
medicineId: 'med-456',
assignedAt: new Date()
}, { session })
}
)
// Either both succeed or both fail - no inconsistency possible
await outbox.withScope(async ({ publish }) => {
// All events in same transaction
await publish({ type: 'OrderCreated', orderId: '123' })
await publish({ type: 'InvoiceGenerated', invoiceId: '456' })
await publish({ type: 'NotificationSent', userId: 'user-789' })
})
// Tenant 1 consumer
const tenant1Outbox = createOutboxConsumer({
client,
db: client.db('hospital'),
publish: async (event) => await broker.publish(event),
partitionKey: 'tenant-abc'
})
// Tenant 2 consumer (can run concurrently)
const tenant2Outbox = createOutboxConsumer({
client,
db: client.db('hospital'),
publish: async (event) => await broker.publish(event),
partitionKey: 'tenant-xyz'
})
const stop = await outbox.start()
process.on('SIGTERM', async () => {
console.log('Shutting down gracefully...')
await stop() // Waits for in-flight events
await client.close()
process.exit(0)
})
const outbox = createOutboxConsumer<DomainEvent>({
client,
db: client.db('hospital'),
publish: async (event) => {
await messageBroker.publish(event)
},
onFailedPublish: (error) => {
console.error('Failed to publish:', error)
monitoring.increment('outbox.publish.failures')
},
onDbError: (error) => {
console.error('Database error:', error)
monitoring.alert('outbox.database.error')
}
})
Creates a new outbox consumer instance for MongoDB.
This is the main entry point for using Hermes MongoDB. It creates a consumer that leverages MongoDB Change Streams to implement the Outbox Pattern, ensuring reliable at-least-once event delivery with transactional consistency.
How It Works
publishcallback for each eventImportant: Oplog Retention
⚠️ Unlike PostgreSQL WAL, MongoDB's oplog has limited retention (typically hours). If a consumer is down longer than the oplog retention window, events will be lost. Monitor oplog retention with
rs.printReplicationInfo().Event - The type of domain events this consumer will handle (use discriminated unions)