Hermes PostgreSQL
    Preparing search index...

    Class OutboxConsumer<Message>Template

    Implementation of the Outbox Pattern for PostgreSQL using Logical Replication.

    This class manages the lifecycle of consuming messages from PostgreSQL's Write-Ahead Log (WAL) via logical replication and publishing them to external message brokers. It provides:

    • At-least-once delivery guaranteed by PostgreSQL WAL retention
    • Zero message loss through transactional consistency
    • Real-time streaming without polling overhead
    • Horizontal scaling via partition keys
    • Optional async outbox for non-critical messages
    1. Application queues messages to the outbox table within database transactions
    2. PostgreSQL WAL captures all changes durably
    3. Logical replication streams changes to this consumer in real-time
    4. Consumer invokes your publish callback for each message
    5. Messages are acknowledged only after successful publish

    Message - The type of domain events/messages to publish

    Basic usage with transactional event publishing

    import { createOutboxConsumer } from '@arturwojnar/hermes-postgresql'

    type PatientEvent =
    | { type: 'PatientRegistered'; patientId: string; name: string }
    | { type: 'PatientUpdated'; patientId: string; changes: object }

    const outbox = createOutboxConsumer<PatientEvent>({
    getOptions: () => ({
    host: 'localhost',
    database: 'hospital',
    user: 'user',
    password: 'pass'
    }),
    publish: async (envelopes) => {
    for (const envelope of envelopes) {
    await messageBroker.publish(envelope.message)
    }
    },
    consumerName: 'patient-service'
    })

    // Start consuming
    const stop = await outbox.start()

    // Queue events with business logic in same transaction
    const sql = outbox.getDbConnection()
    await sql.begin(async (tx) => {
    await tx`INSERT INTO patients (id, name) VALUES (${id}, ${name})`
    await outbox.queue({
    messageId: id,
    messageType: 'PatientRegistered',
    message: { type: 'PatientRegistered', patientId: id, name }
    }, { tx })
    })

    Type Parameters

    • Message extends JSONValue

    Implements

    Index

    Methods

    • Returns the consumer creation parameters used to initialize this consumer.

      Useful for debugging, logging, or creating derived consumers with modified configuration.

      Returns ConsumerCreationParams<Message>

      The original ConsumerCreationParams passed to the consumer

      Logging consumer configuration

      const outbox = createOutboxConsumer({ ... })
      const params = outbox.getCreationParams()

      console.log('Consumer name:', params.consumerName)
      console.log('Partition key:', params.partitionKey)
      console.log('Serialization:', params.serialization)

      Creating a derived consumer with different partition key

      const baseParams = outbox.getCreationParams()

      const tenant2Outbox = createOutboxConsumer({
      ...baseParams,
      partitionKey: 'tenant-2' // Different partition, same config
      })
    • Returns the active PostgreSQL database connection.

      Use this connection for querying and transactional operations. The connection is established when start is called and remains active until the consumer is stopped.

      Returns HermesSql

      The active postgres.js HermesSql connection instance

      If called before start - connection not yet established

      Querying within the same database connection

      const outbox = createOutboxConsumer({ ... })
      await outbox.start()

      const sql = outbox.getDbConnection()

      // Use for business queries
      const patients = await sql`SELECT * FROM patients WHERE active = true`

      // Use for transactional event publishing
      await sql.begin(async (tx) => {
      const [patient] = await tx`
      INSERT INTO patients (id, name) VALUES (${id}, ${name})
      RETURNING *
      `
      await outbox.queue({
      messageId: patient.id,
      messageType: 'PatientRegistered',
      message: { type: 'PatientRegistered', patientId: patient.id }
      }, { tx })
      })

      Error handling when called too early

      const outbox = createOutboxConsumer({ ... })

      try {
      const sql = outbox.getDbConnection() // Throws!
      } catch (error) {
      console.error('Must call start() first')
      }

      await outbox.start()
      const sql = outbox.getDbConnection() // Now works
      • start - Must be called first to establish connection
      • queue - Uses this connection for message insertion
    • Queues one or more messages to the main outbox table for delivery via logical replication.

      Messages are inserted into the outbox table and will be streamed via PostgreSQL WAL to the consumer's publish callback. This method ensures:

      • Transactional consistency - Messages committed only if transaction succeeds
      • At-least-once delivery - WAL guarantees messages won't be lost
      • Zero data/event inconsistency - Business logic and events in same transaction
      • If options.tx is provided, uses that transaction
      • If already in a transaction (savepoint exists), uses current transaction
      • Otherwise, creates a new transaction automatically

      When queuing multiple messages:

      • All messages inserted in same transaction
      • Either all succeed or all fail (atomicity guaranteed)
      • Messages delivered in order within same partition key

      Parameters

      • message: MessageEnvelope<Message> | MessageEnvelope<Message>[]

        A single message or array of messages to queue

      • Optionaloptions: PublishOptions

        Optional configuration

        Options for publishing messages to the outbox.

        • OptionalpartitionKey?: string

          Optional partition key for horizontal scaling. Messages with the same partition key are processed by the same consumer.

        • Optionaltx?: TransactionSql

          Optional transaction to include the message in. When provided, the message is only committed if the transaction succeeds.

      Returns Promise<void>

      Promise that resolves when message(s) are committed to outbox table

      If database insertion fails or connection is not established

      Queue message with business logic in same transaction

      const outbox = createOutboxConsumer<PatientEvent>({ ... })
      await outbox.start()

      const sql = outbox.getDbConnection()

      await sql.begin(async (tx) => {
      // Insert business data
      const [patient] = await tx`
      INSERT INTO patients (id, name, email)
      VALUES (${id}, ${name}, ${email})
      RETURNING *
      `

      // Queue event - guaranteed consistent with data
      await outbox.queue({
      messageId: patient.id,
      messageType: 'PatientRegistered',
      message: {
      type: 'PatientRegistered',
      patientId: patient.id,
      name: patient.name
      }
      }, { tx })
      })
      // Either both succeed or both fail - no inconsistency possible

      Queue multiple messages atomically

      await sql.begin(async (tx) => {
      // Update order status
      await tx`UPDATE orders SET status = 'completed' WHERE id = ${orderId}`

      // Queue multiple related events
      await outbox.queue([
      {
      messageId: `order-${orderId}-completed`,
      messageType: 'OrderCompleted',
      message: { type: 'OrderCompleted', orderId }
      },
      {
      messageId: `invoice-${orderId}`,
      messageType: 'InvoiceGenerated',
      message: { type: 'InvoiceGenerated', orderId, amount: 100 }
      }
      ], { tx })
      })

      Using partition keys for horizontal scaling

      // Queue events to different partitions
      await sql.begin(async (tx) => {
      await tx`INSERT INTO tenant1_data ...`
      await outbox.queue(
      { messageId: '1', messageType: 'DataChanged', message: event1 },
      { tx, partitionKey: 'tenant-1' }
      )

      await tx`INSERT INTO tenant2_data ...`
      await outbox.queue(
      { messageId: '2', messageType: 'DataChanged', message: event2 },
      { tx, partitionKey: 'tenant-2' }
      )
      })

      Queue without explicit transaction (auto-transaction)

      // Hermes creates transaction automatically
      await outbox.queue({
      messageId: '123',
      messageType: 'NotificationSent',
      message: { type: 'NotificationSent', userId: '123' }
      })
      // Message inserted in its own transaction

      Deterministic message IDs for idempotency

      import { createHash } from 'crypto'

      // Generate deterministic ID from business data
      const messageId = createHash('sha256')
      .update(`patient-registered-${patient.id}`)
      .digest('hex')

      await sql.begin(async (tx) => {
      await tx`INSERT INTO patients ...`
      await outbox.queue({
      messageId, // Same patient.id always generates same messageId
      messageType: 'PatientRegistered',
      message: { type: 'PatientRegistered', patientId: patient.id }
      }, { tx })
      })
      // If transaction retries, same messageId prevents duplicate events
    • Sends one or more messages to the async outbox for delivery via polling.

      The async outbox is a separate, polling-based queue for non-critical messages like:

      • Compensation commands
      • Cleanup operations
      • Notifications
      • Telemetry events

      Unlike queue, messages sent via send():

      • ✅ Don't consume WAL storage (keeps WAL clean)
      • ✅ Suitable for high-volume, non-critical events
      • ❌ No at-least-once delivery guarantee during crashes
      • ❌ Polling-based (slightly higher latency)

      Use send() (async outbox) for:

      • Compensation commands that can be retried
      • Non-critical notifications
      • Telemetry/analytics events
      • Cleanup operations

      Use queue (main outbox) for:

      • Critical business events
      • Events that must be delivered
      • Events requiring transactional consistency
      • Events in the critical path

      The async outbox must be enabled during consumer creation:

      import { useBasicAsyncOutboxConsumerPolicy } from '@arturwojnar/hermes-postgresql'

      const outbox = createOutboxConsumer({
      // ... other config
      asyncOutbox: useBasicAsyncOutboxConsumerPolicy()
      })

      Parameters

      Returns Promise<void>

      Promise that resolves when message(s) are inserted into async outbox table

      If async outbox hasn't been initialized (check asyncOutbox parameter in ConsumerCreationParams)

      Using async outbox for compensations

      import { useBasicAsyncOutboxConsumerPolicy } from '@arturwojnar/hermes-postgresql'

      const outbox = createOutboxConsumer({
      // ... connection config
      asyncOutbox: useBasicAsyncOutboxConsumerPolicy(),
      publish: async (envelopes) => {
      for (const envelope of envelopes) {
      await broker.publish(envelope.message)
      }
      },
      consumerName: 'order-service'
      })

      await outbox.start()

      const sql = outbox.getDbConnection()

      await sql.begin(async (tx) => {
      // Critical business event - use main outbox (WAL-based)
      await tx`INSERT INTO orders (id, status) VALUES (${id}, 'pending')`
      await outbox.queue({
      messageId: id,
      messageType: 'OrderCreated',
      message: { type: 'OrderCreated', orderId: id }
      }, { tx })

      // Non-critical compensation - use async outbox (polling-based)
      await outbox.send({
      messageId: `cleanup-${id}`,
      messageType: 'CleanupScheduled',
      message: { type: 'CleanupScheduled', orderId: id, delay: 3600 }
      }, tx)
      })

      Error handling when async outbox not configured

      const outbox = createOutboxConsumer({
      // ... config WITHOUT asyncOutbox
      })

      await outbox.start()

      try {
      await outbox.send({
      messageId: '123',
      messageType: 'Notification',
      message: { type: 'Notification', text: 'Hello' }
      })
      } catch (error) {
      console.error("AsyncOutbox hasn't been initialized")
      // Solution: Add asyncOutbox to consumer config
      }

      High-volume analytics events

      // Analytics events don't need WAL guarantees
      await outbox.send([
      {
      messageId: `page-view-${Date.now()}-1`,
      messageType: 'PageView',
      message: { type: 'PageView', page: '/home', userId: '123' }
      },
      {
      messageId: `page-view-${Date.now()}-2`,
      messageType: 'PageView',
      message: { type: 'PageView', page: '/products', userId: '456' }
      }
      ])
      // Sent via polling, doesn't consume WAL storage

      Mixing critical and non-critical events

      await sql.begin(async (tx) => {
      // Critical: Payment processed
      await tx`UPDATE payments SET status = 'completed' WHERE id = ${paymentId}`
      await outbox.queue({
      messageId: paymentId,
      messageType: 'PaymentCompleted',
      message: { type: 'PaymentCompleted', paymentId, amount: 100 }
      }, { tx })

      // Non-critical: Send email notification
      await outbox.send({
      messageId: `email-${paymentId}`,
      messageType: 'SendEmailNotification',
      message: {
      type: 'SendEmailNotification',
      to: 'user@example.com',
      subject: 'Payment Received'
      }
      }, tx)
      })
    • Starts the outbox consumer and begins processing messages via PostgreSQL logical replication.

      This method:

      1. Establishes database connections (one for queries, one for replication)
      2. Runs database migrations to create outbox table and replication slot
      3. Loads or creates consumer state for LSN tracking
      4. Starts streaming changes from PostgreSQL WAL
      5. Optionally starts async outbox consumer (if configured)

      The consumer will continue running until the returned stop function is called.

      Returns Promise<Stop>

      A promise that resolves to a Stop function. Call this function to gracefully shutdown the consumer and close all connections.

      If another consumer is already using this replication slot (same consumerName + partitionKey combination). Only one consumer can use a replication slot at a time.

      If database connection fails, migrations fail, or logical replication cannot be established

      Basic start and stop

      const outbox = createOutboxConsumer({ ... })

      // Start consuming
      const stop = await outbox.start()
      console.log('Outbox consumer started')

      // Later, gracefully shutdown
      await stop()
      console.log('Outbox consumer stopped')

      Handling already-taken consumer error

      import { HermesConsumerAlreadyTakenError } from '@arturwojnar/hermes-postgresql'

      try {
      const stop = await outbox.start()
      } catch (error) {
      if (error instanceof HermesConsumerAlreadyTakenError) {
      console.error('Consumer is already running elsewhere')
      console.error('Consumer name:', error.consumerName)
      console.error('Partition key:', error.partitionKey)

      // Options:
      // 1. Stop the other consumer first
      // 2. Use a different partition key
      // 3. Use a different consumer name
      }
      throw error
      }

      Graceful shutdown with timeout

      const outbox = createOutboxConsumer({ ... })
      const stop = await outbox.start()

      // Handle shutdown signals
      process.on('SIGTERM', async () => {
      console.log('Shutting down...')

      // Stop consumer with timeout
      const timeout = setTimeout(() => {
      console.error('Shutdown timeout, forcing exit')
      process.exit(1)
      }, 5000)

      await stop()
      clearTimeout(timeout)
      console.log('Shutdown complete')
      })

      Running multiple consumers with different partition keys

      // Scale horizontally by partitioning
      const tenant1Outbox = createOutboxConsumer({
      // ... connection config
      consumerName: 'billing-service',
      partitionKey: 'tenant-1'
      })

      const tenant2Outbox = createOutboxConsumer({
      // ... connection config
      consumerName: 'billing-service',
      partitionKey: 'tenant-2'
      })

      // Both can run simultaneously
      const stop1 = await tenant1Outbox.start()
      const stop2 = await tenant2Outbox.start()

      // Later, stop both
      await Promise.all([stop1(), stop2()])