Module: Outboxer::Publisher

Defined in:
lib/outboxer/publisher.rb

Class Method Summary collapse

Class Method Details

.publish(poll: 1, backoff: ->(current_backoff) { [current_backoff * 2, 5 * 60].min }) {|Hash| ... } ⇒ void

This method returns an undefined value.

Publishes messages from the Outboxer queue.

This method will continue to publish messages from the queue as long as @publishing is set to true. It uses an ActiveRecord connection to handle database interactions.

Examples:

Basic usage

Outboxer::Publisher.publish(poll: 1) do |hash|
  puts hash[:message]
  puts hash[:logger]
end

Parameters:

  • poll (Integer) (defaults to: 1)

    the interval (in seconds) at which the method should poll the database for new messages.

  • backoff (Proc) (defaults to: ->(current_backoff) { [current_backoff * 2, 5 * 60].min })

    a callable object that defines how to increase the backoff time when an error occurs. It should accept the current backoff time as a parameter and return the new backoff time.

Yields:

  • (Hash)

    once a message is dequeued, the method yields a hash to the given block. The hash contains the :message (the dequeued message) and :logger (the logger object).

Yield Parameters:

  • message (Message)

    the dequeued message from the Outboxer queue.

  • logger (Logger)

    the logger instance to log any information or errors.

Raises:

  • (StandardError)

    if an error occurs during the yield, it is rescued, logged, and then the failed method is invoked to handle the error. The method then moves on to the next message in the queue.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/outboxer/publisher.rb', line 40

def publish(poll: 1, backoff: ->(current_backoff) { [current_backoff * 2, 5 * 60].min })
  @publishing = true

  ActiveRecord::Base.connection_pool.with_connection do
    while @publishing
      outboxer_message = dequeue(backoff: backoff)

      if outboxer_message.nil?
        sleep poll

        next
      end

      begin
        yield message: outboxer_message.message, logger: @logger
      rescue StandardError => exception
        failed(
          outboxer_message: outboxer_message,
          backoff: backoff,
          exception: exception)

        next
      end

      published(
        outboxer_message: outboxer_message,
        backoff: backoff)
    end
  end
end

.stopvoid

Note:

This method will stop the current message publishing process It is a safe way to interrupt the publishing process at any point.

This method returns an undefined value.

Stops the publishing process.



137
138
139
# File 'lib/outboxer/publisher.rb', line 137

def stop
  @publishing = false
end