Class: Qup::BatchConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/qup/batch_consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ BatchConsumer

options - A hash of configuration options.

:client - The object upon which the callbacks are fired. It's class
          should include Qup::BatchConsumerAPI for declarative
          documentation purposes. Technically, the only constraint
          is that this object implement #setup, #process and
          #teardown (including Qup::BatchConsumerAPI includes noop
          definitions for #setup and #teardown). Required.
:queue_uri - The Qup format queue URI. Required.
:queue_name - The name of the queue that messages will be consumed
              from. Required.
:max_size - The maximum number of messages to process. Optional.
:max_age - The maximum number of seconds (from when #run is called)
           before finishing. Note that :max_size and :max_age are
           ORed, meaning that the BatchConsumer will finish when
           the first constraint is met. If neither constraint is
           provided, the BatchConsumer will never finish (i.e. #run
           will block indefinitely). Optional.


78
79
80
81
# File 'lib/qup/batch_consumer.rb', line 78

def initialize(options = {})
  @message_count = 0
  @options       = options
end

Instance Method Details

#runObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/qup/batch_consumer.rb', line 83

def run
  @start = Time.now
  client.setup

  while live?
    sleeper.tick
    consumer.consume do |message|
      client.process(message)
      @message_count += 1
      sleeper.reset
    end
  end

  client.teardown
end

#sessionObject



99
100
101
# File 'lib/qup/batch_consumer.rb', line 99

def session
  @session ||= Qup::Session.new(@options[:queue_uri], @options[:session_options] || {})
end