Hermes MongoDB
    Preparing search index...

    Type Alias ConsumerCreationParams<Event>Template

    Configuration parameters for creating a MongoDB outbox consumer.

    Event - Type of events handled by the consumer (use discriminated unions for multiple event types)

    Basic configuration

    type DomainEvent =
    | { type: 'MedicineAssigned'; patientId: string; medicineId: string }
    | { type: 'TaskCompleted'; taskId: string }

    const outbox = createOutboxConsumer<DomainEvent>({
    client: mongoClient,
    db: mongoClient.db('hospital'),
    publish: async (event) => {
    await messageBroker.publish(event)
    }
    })

    Full configuration with all options

    const outbox = createOutboxConsumer<DomainEvent>({
    client: mongoClient,
    db: mongoClient.db('hospital'),
    publish: async (event) => {
    // IMPORTANT: Throw error on failure to trigger retry
    await messageBroker.publish(event)
    },
    partitionKey: 'tenant-123',
    waitAfterFailedPublishMs: 5000,
    shouldDisposeOnSigterm: true,
    saveTimestamps: false,
    onFailedPublish: (error) => {
    console.error('Publish failed:', error)
    },
    onDbError: (error) => {
    console.error('Database error:', error)
    },
    now: () => new Date()
    })
    type ConsumerCreationParams<Event> = {
        client: MongoClient;
        db: Db;
        now?: NowFunction;
        onDbError?: ErrorCallback;
        onFailedPublish?: ErrorCallback;
        partitionKey?: string;
        publish: (event: Event) => AsyncOrSync<void> | never;
        saveTimestamps?: boolean;
        shouldDisposeOnSigterm?: boolean;
        waitAfterFailedPublishMs?: number;
    }

    Type Parameters

    • Event
    Index

    Properties

    client: MongoClient

    MongoDB client instance.

    import { MongoClient } from 'mongodb'

    const client = new MongoClient('mongodb://localhost:27017')
    await client.connect()
    db: Db

    MongoDB database instance where the outbox will operate.

    The outbox will create two collections in this database:

    • hermes_outbox_messages - Stores outgoing events
    • hermes_outbox_consumers - Stores consumer state
    const db = client.db('hospital')
    

    Function that returns the current date/time.

    Override this for testing with fixed timestamps.

    () => new Date()

    Testing with fixed time

    {
    now: () => new Date('2024-01-15T10:00:00Z')
    }
    onDbError?: ErrorCallback

    Callback invoked when a database error occurs.

    Use this for logging, monitoring, or alerting on database issues.

    No-op function
    
    {
    onDbError: (error) => {
    console.error('Database error:', error)
    monitoring.alert('outbox.database.error', { error })
    }
    }
    onFailedPublish?: ErrorCallback

    Callback invoked when event publishing fails.

    Use this for logging, monitoring, or alerting on publish failures. Hermes will continue retrying the event after calling this callback.

    No-op function
    
    {
    onFailedPublish: (error) => {
    console.error('Failed to publish event:', error)
    monitoring.increment('outbox.publish.failures')
    }
    }
    partitionKey?: string

    Partition key for horizontal scaling.

    Multiple consumers can run concurrently by using different partition keys. Events are filtered by partition key, allowing you to scale by tenant, region, etc.

    'default'

    Multi-tenant partitioning

    // Tenant 1 consumer
    const tenant1Outbox = createOutboxConsumer({
    // ...
    partitionKey: 'tenant-abc'
    })

    // Tenant 2 consumer
    const tenant2Outbox = createOutboxConsumer({
    // ...
    partitionKey: 'tenant-xyz'
    })

    // Publish to specific partition
    await tenant1Outbox.publish(event, async (session, db) => {
    // Event goes to tenant-abc partition
    })
    publish: (event: Event) => AsyncOrSync<void> | never

    Callback function invoked when Hermes delivers an event.

    IMPORTANT: This callback MUST throw an error if publish fails. If it completes successfully, the event is considered delivered and won't be retried.

    Type Declaration

      • (event: Event): AsyncOrSync<void> | never
      • Parameters

        • event: Event

          The event to publish

        Returns AsyncOrSync<void> | never

    Error to trigger redelivery

    Publishing to RabbitMQ

    publish: async (event) => {
    // ✅ Throws on failure - event will be retried
    await rabbitMQChannel.publish(
    'events',
    event.type,
    Buffer.from(JSON.stringify(event))
    )
    }

    Publishing with idempotency check

    publish: async (event) => {
    const eventId = `${event.type}-${event.patientId}`

    // Check if already processed
    if (await isProcessed(eventId)) {
    return // Safe to skip
    }

    await broker.publish(event)
    await markProcessed(eventId)
    }
    saveTimestamps?: boolean

    Whether to save sentAt timestamps for each processed message.

    ⚠️ Use with caution: When true, Hermes will update each message document after successful delivery, significantly increasing I/O operations and database load.

    Only enable this if you need to track exact delivery times for debugging or auditing.

    false
    
    {
    saveTimestamps: true // Each message gets sentAt field after delivery
    }
    shouldDisposeOnSigterm?: boolean

    Whether to automatically stop the consumer on SIGTERM/SIGINT signals.

    When true, Hermes registers signal handlers to gracefully shutdown on process termination.

    true
    

    Automatic shutdown (default)

    {
    shouldDisposeOnSigterm: true // Consumer stops on SIGTERM/SIGINT
    }

    Manual shutdown control

    {
    shouldDisposeOnSigterm: false // You handle shutdown yourself
    }

    const stop = await outbox.start()
    process.on('SIGTERM', async () => {
    await stop()
    process.exit(0)
    })
    waitAfterFailedPublishMs?: number

    Wait time in milliseconds after a failed publish attempt before retrying.

    1000 (1 second)
    
    {
    waitAfterFailedPublishMs: 5000 // Wait 5 seconds between retries
    }