Hermes PostgreSQL
    Preparing search index...

    Type Alias IOutboxConsumer<Message>Template

    Interface for the Outbox Consumer.

    This is the main interface for interacting with Hermes PostgreSQL. It provides methods for starting the consumer, queueing messages, and accessing the database connection.

    Message - The type of the domain message/event

    const outbox = createOutboxConsumer<DomainEvent>({
    getOptions: () => ({
    host: 'localhost',
    port: 5432,
    database: 'mydb',
    user: 'user',
    password: 'pass'
    }),
    publish: async (envelope) => {
    await messageBroker.publish(envelope.message)
    },
    consumerName: 'my-service'
    })

    // Start consuming
    const stop = await outbox.start()

    // Queue message with business logic
    await sql.begin(async (sql) => {
    await updateData(sql)
    await outbox.queue(event, { tx: sql })
    })

    createOutboxConsumer for creating instances

    type IOutboxConsumer<Message extends JSONValue> = {
        queue: Publish<Message>;
        send: (
            message: MessageEnvelope<Message> | MessageEnvelope<Message>[],
            tx?: TransactionSql,
        ) => Promise<void>;
        start: Start;
        getCreationParams(): {
            asyncOutbox?: UseAsyncOutboxPolicy<Message>;
            consumerName: string;
            getOptions: () => Options<Record<string, PostgresType>>;
            now?: NowFunction;
            onDbError?: ErrorCallback;
            onFailedPublish?: ErrorCallback;
            partitionKey?: string;
            publish: (
                message:
                    | HermesMessageEnvelope<Message>
                    | HermesMessageEnvelope<Message>[],
            ) => AsyncOrSync<void> | never;
            saveTimestamps?: boolean;
            serialization?: boolean;
            shouldDisposeOnSigterm?: boolean;
            waitAfterFailedPublish?: Duration;
        };
        getDbConnection(): Sql;
    }

    Type Parameters

    • Message extends JSONValue

    Implemented by

    Index

    Properties

    Queues messages to the primary outbox (WAL-based, guaranteed delivery)

    send: (
        message: MessageEnvelope<Message> | MessageEnvelope<Message>[],
        tx?: TransactionSql,
    ) => Promise<void>

    Sends messages to the async outbox (polling-based, eventual delivery)

    start: Start

    Starts consuming messages from PostgreSQL Logical Replication

    Methods

    • Gets the consumer configuration parameters

      Returns {
          asyncOutbox?: UseAsyncOutboxPolicy<Message>;
          consumerName: string;
          getOptions: () => Options<Record<string, PostgresType>>;
          now?: NowFunction;
          onDbError?: ErrorCallback;
          onFailedPublish?: ErrorCallback;
          partitionKey?: string;
          publish: (
              message:
                  | HermesMessageEnvelope<Message>
                  | HermesMessageEnvelope<Message>[],
          ) => AsyncOrSync<void> | never;
          saveTimestamps?: boolean;
          serialization?: boolean;
          shouldDisposeOnSigterm?: boolean;
          waitAfterFailedPublish?: Duration;
      }

      • Optional ReadonlyasyncOutbox?: UseAsyncOutboxPolicy<Message>

        Policy for configuring a separate async outbox consumer.

        The async outbox is used for non-critical messages like compensations.

      • ReadonlyconsumerName: string

        Unique name for this consumer instance.

        Used to create a PostgreSQL replication slot.

      • ReadonlygetOptions: () => Options<Record<string, PostgresType>>

        Function that returns PostgreSQL connection options.

      • Optional Readonlynow?: NowFunction

        Function that returns the current date/time.

        Current date/time

        () => new Date()

      • Optional ReadonlyonDbError?: ErrorCallback

        Callback invoked when a database error occurs.

        The database error

        No-op function
        
      • Optional ReadonlyonFailedPublish?: ErrorCallback

        Callback invoked when message publishing fails.

        The error that occurred

        No-op function
        
      • Optional ReadonlypartitionKey?: string

        Partition key for horizontal scaling.

        'default'

      • Readonlypublish: (
            message:
                | HermesMessageEnvelope<Message>
                | HermesMessageEnvelope<Message>[],
        ) => AsyncOrSync<void> | never

        Callback invoked when Hermes delivers a message.

        If this callback completes successfully, the message is acknowledged. If it throws an error, the message will be retried.

        Error to trigger redelivery

      • Optional ReadonlysaveTimestamps?: boolean

        Whether to save processing timestamps for each message.

        ⚠️ Use with caution: Significantly increases I/O operations.

        false

      • Optional Readonlyserialization?: boolean

        Whether to process messages serially (one at a time) or concurrently.

        false

      • Optional ReadonlyshouldDisposeOnSigterm?: boolean

        Whether to automatically stop the consumer on SIGTERM/SIGINT signals.

        true

      • Optional ReadonlywaitAfterFailedPublish?: Duration

        Duration to wait after a failed publish attempt before retrying.

        Duration.ofSeconds(30)