Class: MessageQueue::Adapters::Bunny::Connection::Consumer
- Defined in:
- lib/message_queue/adapters/bunny/consumer.rb
Instance Attribute Summary collapse
-
#exchange_name ⇒ Object
readonly
Returns the value of attribute exchange_name.
-
#exchange_options ⇒ Object
readonly
Returns the value of attribute exchange_options.
-
#exchange_routing_key ⇒ Object
readonly
Returns the value of attribute exchange_routing_key.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#queue_options ⇒ Object
readonly
Returns the value of attribute queue_options.
-
#subscribe_options ⇒ Object
readonly
Returns the value of attribute subscribe_options.
Attributes inherited from Consumer
Instance Method Summary collapse
- #ack(delivery_tag) ⇒ Object
-
#initialize(connection, options = {}) ⇒ Consumer
constructor
Public: Initialize a new Bunny consumer.
- #queue ⇒ Object
- #subscribe(options = {}, &block) ⇒ Object
- #unsubscribe(options = {}) ⇒ Object
Methods inherited from Consumer
Methods included from OptionsHelper
Constructor Details
#initialize(connection, options = {}) ⇒ Consumer
Public: Initialize a new Bunny consumer.
connection - The Bunny Connection. options - The Hash options used to initialize the exchange
of a consumer:
:queue -
:name - The String queue name.
:durable - The Boolean queue durability.
:exchange -
:name - The String exchange name.
:routing_key - The String exchange routing key.
:subscribe -
:ack - The Boolean indicate if it acks.
:block - The Boolean indicate if it blocks.
Detailed see
https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/queue.rb
and
https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/exchange.rb.
Returns a Consumer.
26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 26 def initialize(connection, = {}) super = self..fetch(:queue) @queue_name = .delete(:name) || (raise "Missing queue name") = self..fetch(:exchange) @exchange_name = .delete(:name) || (raise "Missing exchange name") @exchange_routing_key = .delete(:routing_key) || queue_name = self..fetch(:subscribe, {}).merge(:ack => true) end |
Instance Attribute Details
#exchange_name ⇒ Object (readonly)
Returns the value of attribute exchange_name.
3 4 5 |
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 3 def exchange_name @exchange_name end |
#exchange_options ⇒ Object (readonly)
Returns the value of attribute exchange_options.
3 4 5 |
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 3 def end |
#exchange_routing_key ⇒ Object (readonly)
Returns the value of attribute exchange_routing_key.
3 4 5 |
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 3 def exchange_routing_key @exchange_routing_key end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
2 3 4 |
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 2 def queue_name @queue_name end |
#queue_options ⇒ Object (readonly)
Returns the value of attribute queue_options.
2 3 4 |
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 2 def end |
#subscribe_options ⇒ Object (readonly)
Returns the value of attribute subscribe_options.
4 5 6 |
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 4 def end |
Instance Method Details
#ack(delivery_tag) ⇒ Object
62 63 64 |
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 62 def ack(delivery_tag) channel.ack(delivery_tag, false) end |
#queue ⇒ Object
58 59 60 |
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 58 def queue @queue ||= channel.queue(queue_name, ).bind(exchange_name, :routing_key => exchange_routing_key) end |
#subscribe(options = {}, &block) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 39 def subscribe( = {}, &block) @subscription = queue.subscribe(.merge()) do |delivery_info, , payload| begin = MessageQueue::Message.new(:message_id => [:message_id], :type => [:type], :timestamp => [:timestamp], :routing_key => delivery_info[:routing_key], :payload => load_object(payload)) block.call() ensure ack(delivery_info.delivery_tag) end end end |
#unsubscribe(options = {}) ⇒ Object
54 55 56 |
# File 'lib/message_queue/adapters/bunny/consumer.rb', line 54 def unsubscribe( = {}) @subscription.cancel if @subscription end |