TemplateConfiguration parameters including database connection, publish callback, and consumer settings
An OutboxConsumer instance ready to start consuming messages
import { createOutboxConsumer } from '@arturwojnar/hermes-postgresql'
type DomainEvent =
| { type: 'PatientRegistered'; data: { patientId: string } }
| { type: 'OrderCreated'; data: { orderId: string } }
const outbox = createOutboxConsumer<DomainEvent>({
getOptions: () => ({
host: 'localhost',
port: 5432,
database: 'mydb',
user: 'user',
password: 'pass'
}),
publish: async (envelope) => {
await messageBroker.publish(envelope.message)
},
consumerName: 'my-service'
})
// Start consuming messages
const stop = await outbox.start()
// Queue message atomically with business logic
await sql.begin(async (sql) => {
// Business operation
await db.collection('patients').insertOne(patient, sql)
// Event publishing - same transaction!
await outbox.queue({
messageId: constructMessageId('PatientRegistered', patient.id),
messageType: 'PatientRegistered',
message: {
type: 'PatientRegistered',
data: { patientId: patient.id }
}
}, { tx: sql })
})
// Either both succeed or both fail - no inconsistency possible
import { useBasicAsyncOutboxConsumerPolicy } from '@arturwojnar/hermes-postgresql'
const outbox = createOutboxConsumer<DomainEvent>({
// ... other options
asyncOutbox: useBasicAsyncOutboxConsumerPolicy(Duration.ofSeconds(30))
})
// Critical events use main outbox (WAL-based, zero message loss)
await outbox.queue(criticalEvent, { tx: sql })
// Compensations use async outbox (polling-based, eventual delivery)
await outbox.send(compensationCommand)
// Tenant 1 consumer
const tenant1Outbox = createOutboxConsumer({
// ...
consumerName: 'order-service',
partitionKey: 'tenant-abc'
})
// Tenant 2 consumer (different partition, same consumer name)
const tenant2Outbox = createOutboxConsumer({
// ...
consumerName: 'order-service',
partitionKey: 'tenant-xyz'
})
const stopOutbox = await outbox.start()
process.on('SIGTERM', async () => {
console.log('Shutting down gracefully...')
await stopOutbox() // Waits for in-flight messages
await closeOtherResources()
process.exit(0)
})
Creates a new outbox consumer instance for PostgreSQL.
This is the main entry point for using Hermes PostgreSQL. It creates a consumer that leverages PostgreSQL Logical Replication to implement the Outbox Pattern, ensuring reliable at-least-once message delivery with zero message loss.
How It Works
publishcallback for each messageMessage - The type of domain messages/events this consumer will handle