Hermes MongoDB
    Preparing search index...

    Function createOutboxConsumer

    • Template

      Creates a new outbox consumer instance for MongoDB.

      This is the main entry point for using Hermes MongoDB. It creates a consumer that leverages MongoDB Change Streams to implement the Outbox Pattern, ensuring reliable at-least-once event delivery with transactional consistency.

      1. Publish events to the outbox collection within your MongoDB transactions
      2. MongoDB oplog captures changes durably (within retention window)
      3. Change Streams streams changes to Hermes in real-time
      4. Hermes invokes your publish callback for each event
      5. Acknowledgment happens only after successful callback completion

      ⚠️ Unlike PostgreSQL WAL, MongoDB's oplog has limited retention (typically hours). If a consumer is down longer than the oplog retention window, events will be lost. Monitor oplog retention with rs.printReplicationInfo().

      Event - The type of domain events this consumer will handle (use discriminated unions)

      Type Parameters

      • Event extends object

      Parameters

      Returns OutboxConsumer<Event>

      An OutboxConsumer instance ready to start consuming events

      import { createOutboxConsumer } from '@arturwojnar/hermes-mongodb'
      import { MongoClient } from 'mongodb'

      type DomainEvent =
      | { type: 'MedicineAssigned'; patientId: string; medicineId: string }
      | { type: 'TaskCompleted'; taskId: string; completedAt: Date }

      const client = new MongoClient('mongodb://localhost:27017')
      await client.connect()

      const outbox = createOutboxConsumer<DomainEvent>({
      client,
      db: client.db('hospital'),
      publish: async (event) => {
      // IMPORTANT: Throw error on failure to trigger retry
      await messageBroker.publish(event)
      }
      })

      // Start consuming events
      const stop = await outbox.start()
      // Publish event with business logic in same transaction
      await outbox.publish(
      {
      type: 'MedicineAssigned',
      patientId: 'patient-123',
      medicineId: 'med-456'
      },
      async (session, db) => {
      // Store assignment in same transaction
      await db.collection('medicine_assignments').insertOne({
      patientId: 'patient-123',
      medicineId: 'med-456',
      assignedAt: new Date()
      }, { session })
      }
      )
      // Either both succeed or both fail - no inconsistency possible
      await outbox.withScope(async ({ publish }) => {
      // All events in same transaction
      await publish({ type: 'OrderCreated', orderId: '123' })
      await publish({ type: 'InvoiceGenerated', invoiceId: '456' })
      await publish({ type: 'NotificationSent', userId: 'user-789' })
      })
      // Tenant 1 consumer
      const tenant1Outbox = createOutboxConsumer({
      client,
      db: client.db('hospital'),
      publish: async (event) => await broker.publish(event),
      partitionKey: 'tenant-abc'
      })

      // Tenant 2 consumer (can run concurrently)
      const tenant2Outbox = createOutboxConsumer({
      client,
      db: client.db('hospital'),
      publish: async (event) => await broker.publish(event),
      partitionKey: 'tenant-xyz'
      })
      const stop = await outbox.start()

      process.on('SIGTERM', async () => {
      console.log('Shutting down gracefully...')
      await stop() // Waits for in-flight events
      await client.close()
      process.exit(0)
      })
      const outbox = createOutboxConsumer<DomainEvent>({
      client,
      db: client.db('hospital'),
      publish: async (event) => {
      await messageBroker.publish(event)
      },
      onFailedPublish: (error) => {
      console.error('Failed to publish:', error)
      monitoring.increment('outbox.publish.failures')
      },
      onDbError: (error) => {
      console.error('Database error:', error)
      monitoring.alert('outbox.database.error')
      }
      })