TemplateReturns the consumer creation parameters used to initialize this consumer.
Useful for debugging, logging, or creating derived consumers with modified configuration.
The original ConsumerCreationParams passed to the consumer
Returns the active PostgreSQL database connection.
Use this connection for querying and transactional operations. The connection is established when start is called and remains active until the consumer is stopped.
The active postgres.js HermesSql connection instance
If called before start - connection not yet established
Querying within the same database connection
const outbox = createOutboxConsumer({ ... })
await outbox.start()
const sql = outbox.getDbConnection()
// Use for business queries
const patients = await sql`SELECT * FROM patients WHERE active = true`
// Use for transactional event publishing
await sql.begin(async (tx) => {
const [patient] = await tx`
INSERT INTO patients (id, name) VALUES (${id}, ${name})
RETURNING *
`
await outbox.queue({
messageId: patient.id,
messageType: 'PatientRegistered',
message: { type: 'PatientRegistered', patientId: patient.id }
}, { tx })
})
Queues one or more messages to the main outbox table for delivery via logical replication.
Messages are inserted into the outbox table and will be streamed via PostgreSQL WAL to the
consumer's publish callback. This method ensures:
options.tx is provided, uses that transactionWhen queuing multiple messages:
A single message or array of messages to queue
Optionaloptions: PublishOptionsOptional configuration
Options for publishing messages to the outbox.
OptionalpartitionKey?: stringOptional partition key for horizontal scaling. Messages with the same partition key are processed by the same consumer.
Optionaltx?: TransactionSqlOptional transaction to include the message in. When provided, the message is only committed if the transaction succeeds.
Promise that resolves when message(s) are committed to outbox table
Queue message with business logic in same transaction
const outbox = createOutboxConsumer<PatientEvent>({ ... })
await outbox.start()
const sql = outbox.getDbConnection()
await sql.begin(async (tx) => {
// Insert business data
const [patient] = await tx`
INSERT INTO patients (id, name, email)
VALUES (${id}, ${name}, ${email})
RETURNING *
`
// Queue event - guaranteed consistent with data
await outbox.queue({
messageId: patient.id,
messageType: 'PatientRegistered',
message: {
type: 'PatientRegistered',
patientId: patient.id,
name: patient.name
}
}, { tx })
})
// Either both succeed or both fail - no inconsistency possible
Queue multiple messages atomically
await sql.begin(async (tx) => {
// Update order status
await tx`UPDATE orders SET status = 'completed' WHERE id = ${orderId}`
// Queue multiple related events
await outbox.queue([
{
messageId: `order-${orderId}-completed`,
messageType: 'OrderCompleted',
message: { type: 'OrderCompleted', orderId }
},
{
messageId: `invoice-${orderId}`,
messageType: 'InvoiceGenerated',
message: { type: 'InvoiceGenerated', orderId, amount: 100 }
}
], { tx })
})
Using partition keys for horizontal scaling
// Queue events to different partitions
await sql.begin(async (tx) => {
await tx`INSERT INTO tenant1_data ...`
await outbox.queue(
{ messageId: '1', messageType: 'DataChanged', message: event1 },
{ tx, partitionKey: 'tenant-1' }
)
await tx`INSERT INTO tenant2_data ...`
await outbox.queue(
{ messageId: '2', messageType: 'DataChanged', message: event2 },
{ tx, partitionKey: 'tenant-2' }
)
})
Queue without explicit transaction (auto-transaction)
// Hermes creates transaction automatically
await outbox.queue({
messageId: '123',
messageType: 'NotificationSent',
message: { type: 'NotificationSent', userId: '123' }
})
// Message inserted in its own transaction
Deterministic message IDs for idempotency
import { createHash } from 'crypto'
// Generate deterministic ID from business data
const messageId = createHash('sha256')
.update(`patient-registered-${patient.id}`)
.digest('hex')
await sql.begin(async (tx) => {
await tx`INSERT INTO patients ...`
await outbox.queue({
messageId, // Same patient.id always generates same messageId
messageType: 'PatientRegistered',
message: { type: 'PatientRegistered', patientId: patient.id }
}, { tx })
})
// If transaction retries, same messageId prevents duplicate events
Sends one or more messages to the async outbox for delivery via polling.
The async outbox is a separate, polling-based queue for non-critical messages like:
Unlike queue, messages sent via send():
send() vs queue()Use send() (async outbox) for:
Use queue (main outbox) for:
The async outbox must be enabled during consumer creation:
import { useBasicAsyncOutboxConsumerPolicy } from '@arturwojnar/hermes-postgresql'
const outbox = createOutboxConsumer({
// ... other config
asyncOutbox: useBasicAsyncOutboxConsumerPolicy()
})
A single message or array of messages to send
Optionaltx: TransactionSql<{}>Optional PostgreSQL transaction to use
Promise that resolves when message(s) are inserted into async outbox table
If async outbox hasn't been initialized (check asyncOutbox parameter in
ConsumerCreationParams)
Using async outbox for compensations
import { useBasicAsyncOutboxConsumerPolicy } from '@arturwojnar/hermes-postgresql'
const outbox = createOutboxConsumer({
// ... connection config
asyncOutbox: useBasicAsyncOutboxConsumerPolicy(),
publish: async (envelopes) => {
for (const envelope of envelopes) {
await broker.publish(envelope.message)
}
},
consumerName: 'order-service'
})
await outbox.start()
const sql = outbox.getDbConnection()
await sql.begin(async (tx) => {
// Critical business event - use main outbox (WAL-based)
await tx`INSERT INTO orders (id, status) VALUES (${id}, 'pending')`
await outbox.queue({
messageId: id,
messageType: 'OrderCreated',
message: { type: 'OrderCreated', orderId: id }
}, { tx })
// Non-critical compensation - use async outbox (polling-based)
await outbox.send({
messageId: `cleanup-${id}`,
messageType: 'CleanupScheduled',
message: { type: 'CleanupScheduled', orderId: id, delay: 3600 }
}, tx)
})
Error handling when async outbox not configured
const outbox = createOutboxConsumer({
// ... config WITHOUT asyncOutbox
})
await outbox.start()
try {
await outbox.send({
messageId: '123',
messageType: 'Notification',
message: { type: 'Notification', text: 'Hello' }
})
} catch (error) {
console.error("AsyncOutbox hasn't been initialized")
// Solution: Add asyncOutbox to consumer config
}
High-volume analytics events
// Analytics events don't need WAL guarantees
await outbox.send([
{
messageId: `page-view-${Date.now()}-1`,
messageType: 'PageView',
message: { type: 'PageView', page: '/home', userId: '123' }
},
{
messageId: `page-view-${Date.now()}-2`,
messageType: 'PageView',
message: { type: 'PageView', page: '/products', userId: '456' }
}
])
// Sent via polling, doesn't consume WAL storage
Mixing critical and non-critical events
await sql.begin(async (tx) => {
// Critical: Payment processed
await tx`UPDATE payments SET status = 'completed' WHERE id = ${paymentId}`
await outbox.queue({
messageId: paymentId,
messageType: 'PaymentCompleted',
message: { type: 'PaymentCompleted', paymentId, amount: 100 }
}, { tx })
// Non-critical: Send email notification
await outbox.send({
messageId: `email-${paymentId}`,
messageType: 'SendEmailNotification',
message: {
type: 'SendEmailNotification',
to: 'user@example.com',
subject: 'Payment Received'
}
}, tx)
})
Starts the outbox consumer and begins processing messages via PostgreSQL logical replication.
This method:
The consumer will continue running until the returned stop function is called.
A promise that resolves to a Stop function. Call this function to gracefully shutdown the consumer and close all connections.
If another consumer is already using this replication slot (same consumerName + partitionKey combination). Only one consumer can use a replication slot at a time.
Basic start and stop
const outbox = createOutboxConsumer({ ... })
// Start consuming
const stop = await outbox.start()
console.log('Outbox consumer started')
// Later, gracefully shutdown
await stop()
console.log('Outbox consumer stopped')
Handling already-taken consumer error
import { HermesConsumerAlreadyTakenError } from '@arturwojnar/hermes-postgresql'
try {
const stop = await outbox.start()
} catch (error) {
if (error instanceof HermesConsumerAlreadyTakenError) {
console.error('Consumer is already running elsewhere')
console.error('Consumer name:', error.consumerName)
console.error('Partition key:', error.partitionKey)
// Options:
// 1. Stop the other consumer first
// 2. Use a different partition key
// 3. Use a different consumer name
}
throw error
}
Graceful shutdown with timeout
const outbox = createOutboxConsumer({ ... })
const stop = await outbox.start()
// Handle shutdown signals
process.on('SIGTERM', async () => {
console.log('Shutting down...')
// Stop consumer with timeout
const timeout = setTimeout(() => {
console.error('Shutdown timeout, forcing exit')
process.exit(1)
}, 5000)
await stop()
clearTimeout(timeout)
console.log('Shutdown complete')
})
Running multiple consumers with different partition keys
// Scale horizontally by partitioning
const tenant1Outbox = createOutboxConsumer({
// ... connection config
consumerName: 'billing-service',
partitionKey: 'tenant-1'
})
const tenant2Outbox = createOutboxConsumer({
// ... connection config
consumerName: 'billing-service',
partitionKey: 'tenant-2'
})
// Both can run simultaneously
const stop1 = await tenant1Outbox.start()
const stop2 = await tenant2Outbox.start()
// Later, stop both
await Promise.all([stop1(), stop2()])
Implementation of the Outbox Pattern for PostgreSQL using Logical Replication.
This class manages the lifecycle of consuming messages from PostgreSQL's Write-Ahead Log (WAL) via logical replication and publishing them to external message brokers. It provides:
How It Works
publishcallback for each messageMessage - The type of domain events/messages to publish
Example
Basic usage with transactional event publishing
See