How often to poll for new messages (default: 15 seconds)
A policy function that creates an async outbox consumer
import {
createOutboxConsumer,
useBasicAsyncOutboxConsumerPolicy
} from '@arturwojnar/hermes-postgresql'
const outbox = createOutboxConsumer<DomainEvent>({
getOptions: () => ({ host: 'localhost', port: 5432, ... }),
publish: async (envelope) => {
await messageBroker.publish(envelope.message)
},
consumerName: 'my-service',
asyncOutbox: useBasicAsyncOutboxConsumerPolicy()
})
// Poll every 30 seconds instead of default 15
asyncOutbox: useBasicAsyncOutboxConsumerPolicy(Duration.ofSeconds(30))
// Critical event - use main outbox (WAL-based, guaranteed delivery)
await sql.begin(async (sql) => {
await db.createOrder(order, sql)
await outbox.queue({
messageId: constructMessageId('OrderCreated', order.id),
messageType: 'OrderCreated',
message: { type: 'OrderCreated', data: order }
}, { tx: sql })
})
// Non-critical notification - use async outbox (polling-based)
await outbox.send({
messageId: constructMessageId('SendWelcomeEmail', user.id),
messageType: 'SendWelcomeEmail',
message: { type: 'SendWelcomeEmail', data: { userId: user.id } }
})
// Main flow uses WAL-based outbox
try {
const subject = await addUserToIdP(email)
await storePatient(patientId, subject, sql)
await outbox.queue(patientRegisteredEvent, { tx: sql })
} catch (error) {
// Compensation uses async outbox
await outbox.send({
messageId: constructMessageId('RevertRegistration', patientId),
messageType: 'RevertRegistration',
message: {
type: 'RevertRegistration',
data: { patientId, subject }
}
})
}
Creates a basic async outbox consumer policy.
The async outbox is a secondary message queue for non-critical messages that don't require the guarantees of PostgreSQL Logical Replication. It uses polling instead of WAL streaming.
When to Use Async Outbox
Benefits Over Main Outbox