Class: MessageQueue::Adapters::Bunny::Connection::Consumer

Inherits:
Consumer
  • Object
show all
Defined in:
lib/message_queue/adapters/bunny/consumer.rb

Instance Attribute Summary collapse

Attributes inherited from Consumer

#connection, #options

Instance Method Summary collapse

Methods inherited from Consumer

#load_object

Methods included from OptionsHelper

#compute_values, #deep_clone

Constructor Details

#initialize(connection, options = {}) ⇒ Consumer

Public: Initialize a new Bunny consumer.

connection - The Bunny Connection. options - The Hash options used to initialize the exchange

of a consumer:
:queue -
   :name    - The String queue name.
   :durable - The Boolean queue durability.
:exchange -
   :name        - The String exchange name.
   :routing_key - The String exchange routing key.
:subscribe -
   :ack   - The Boolean indicate if it acks.
   :block - The Boolean indicate if it blocks.
Detailed options see
https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/queue.rb
and
https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/exchange.rb.

Returns a Consumer.



26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 26

def initialize(connection, options = {})
  super

  @queue_options = self.options.fetch(:queue)
  @queue_name = queue_options.delete(:name) || (raise "Missing queue name")

  @exchange_options = self.options.fetch(:exchange)
  @exchange_name = exchange_options.delete(:name) || (raise "Missing exchange name")
  @exchange_routing_key = exchange_options.delete(:routing_key) || queue_name

  @subscribe_options = self.options.fetch(:subscribe, {}).merge(:ack => true)
end

Instance Attribute Details

#exchange_nameObject (readonly)

Returns the value of attribute exchange_name.



3
4
5
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 3

def exchange_name
  @exchange_name
end

#exchange_optionsObject (readonly)

Returns the value of attribute exchange_options.



3
4
5
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 3

def exchange_options
  @exchange_options
end

#exchange_routing_keyObject (readonly)

Returns the value of attribute exchange_routing_key.



3
4
5
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 3

def exchange_routing_key
  @exchange_routing_key
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



2
3
4
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 2

def queue_name
  @queue_name
end

#queue_optionsObject (readonly)

Returns the value of attribute queue_options.



2
3
4
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 2

def queue_options
  @queue_options
end

#subscribe_optionsObject (readonly)

Returns the value of attribute subscribe_options.



4
5
6
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 4

def subscribe_options
  @subscribe_options
end

Instance Method Details

#ack(delivery_tag) ⇒ Object



62
63
64
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 62

def ack(delivery_tag)
  channel.ack(delivery_tag, false)
end

#queueObject



58
59
60
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 58

def queue
  @queue ||= channel.queue(queue_name, queue_options).bind(exchange_name, :routing_key => exchange_routing_key)
end

#subscribe(options = {}, &block) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 39

def subscribe(options = {}, &block)
  @subscription = queue.subscribe(subscribe_options.merge(options)) do |delivery_info, , payload|
    begin
      message = MessageQueue::Message.new(:message_id => [:message_id],
                                          :type => [:type],
                                          :timestamp => [:timestamp],
                                          :routing_key => delivery_info[:routing_key],
                                          :payload => load_object(payload))
      block.call(message)
    ensure
      ack(delivery_info.delivery_tag)
    end
  end
end

#unsubscribe(options = {}) ⇒ Object



54
55
56
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 54

def unsubscribe(options = {})
  @subscription.cancel if @subscription
end