Class: AMQ::Client::Async::Consumer
- Inherits:
-
Object
- Object
- AMQ::Client::Async::Consumer
- Extended by:
- ProtocolMethodHandlers
- Defined in:
- lib/amq/client/async/extensions/rabbitmq/cancel.rb,
lib/amq/client/async/consumer.rb
Overview
Extensions
Instance Attribute Summary collapse
-
#arguments ⇒ Object
readonly
Returns the value of attribute arguments.
-
#channel ⇒ Object
readonly
API.
-
#consumer_tag ⇒ Object
readonly
Returns the value of attribute consumer_tag.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Acknowledging & Rejecting Messages collapse
-
#acknowledge(delivery_tag) ⇒ Consumer
Acknowledge a delivery tag.
-
#reject(delivery_tag, requeue = true) ⇒ Consumer
Self.
Error Handling & Recovery collapse
-
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
-
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened.
-
#on_connection_interruption(&block) ⇒ Object
(also: #after_connection_interruption)
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
-
#on_recovery(&block) ⇒ Object
(also: #after_recovery)
Defines a callback that will be executed when AMQP connection is recovered after a network failure..
Class Method Summary collapse
- .tag_generator ⇒ Object
-
.tag_generator=(generator) ⇒ Object
self.tag_generator.
Instance Method Summary collapse
- #cancel(nowait = false, &block) ⇒ Object
- #consume(nowait = false, &block) ⇒ Object
- #exclusive? ⇒ Boolean
-
#handle_cancel_ok(cancel_ok) ⇒ Object
handle_consume_ok(consume_ok).
-
#handle_consume_ok(consume_ok) ⇒ Object
handle_delivery(basic_deliver, metadata, payload).
-
#handle_delivery(basic_deliver, metadata, payload) ⇒ Object
Implementation.
-
#initialize(channel, queue, consumer_tag = self.class.tag_generator.generate_for(queue), exclusive = false, no_ack = false, arguments = {}, no_local = false, &block) ⇒ Consumer
constructor
A new instance of Consumer.
- #on_delivery(&block) ⇒ Object
- #register_with_channel ⇒ Object protected
-
#register_with_queue ⇒ Object
protected
register_with_channel.
-
#resubscribe(&block) ⇒ Object
Used by automatic recovery code.
- #to_s ⇒ Object
-
#unregister_with_channel ⇒ Object
protected
register_with_queue.
-
#unregister_with_queue ⇒ Object
protected
register_with_channel.
Methods included from ProtocolMethodHandlers
Methods included from Extensions::RabbitMQ::Basic::ConsumerMixin
#handle_cancel, included, #on_cancel
Methods included from Callbacks
#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback
Constructor Details
#initialize(channel, queue, consumer_tag = self.class.tag_generator.generate_for(queue), exclusive = false, no_ack = false, arguments = {}, no_local = false, &block) ⇒ Consumer
Returns a new instance of Consumer.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/amq/client/async/consumer.rb', line 39 def initialize(channel, queue, consumer_tag = self.class.tag_generator.generate_for(queue), exclusive = false, no_ack = false, arguments = {}, no_local = false, &block) @callbacks = Hash.new @channel = channel || raise(ArgumentError, "channel is nil") @connection = channel.connection || raise(ArgumentError, "connection is nil") @queue = queue || raise(ArgumentError, "queue is nil") @consumer_tag = consumer_tag @exclusive = exclusive @no_ack = no_ack @arguments = arguments @no_local = no_local self.register_with_channel self.register_with_queue end |
Instance Attribute Details
#arguments ⇒ Object (readonly)
Returns the value of attribute arguments.
27 28 29 |
# File 'lib/amq/client/async/consumer.rb', line 27 def arguments @arguments end |
#channel ⇒ Object (readonly)
API
24 25 26 |
# File 'lib/amq/client/async/consumer.rb', line 24 def channel @channel end |
#consumer_tag ⇒ Object (readonly)
Returns the value of attribute consumer_tag.
26 27 28 |
# File 'lib/amq/client/async/consumer.rb', line 26 def consumer_tag @consumer_tag end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
25 26 27 |
# File 'lib/amq/client/async/consumer.rb', line 25 def queue @queue end |
Class Method Details
.tag_generator ⇒ Object
30 31 32 |
# File 'lib/amq/client/async/consumer.rb', line 30 def self.tag_generator @tag_generator ||= AMQ::Client::ConsumerTagGenerator.new end |
.tag_generator=(generator) ⇒ Object
self.tag_generator
34 35 36 |
# File 'lib/amq/client/async/consumer.rb', line 34 def self.tag_generator=(generator) @tag_generator = generator end |
Instance Method Details
#acknowledge(delivery_tag) ⇒ Consumer
Acknowledge a delivery tag.
115 116 117 118 119 |
# File 'lib/amq/client/async/consumer.rb', line 115 def acknowledge(delivery_tag) @channel.acknowledge(delivery_tag) self end |
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
187 188 189 190 191 |
# File 'lib/amq/client/async/consumer.rb', line 187 def auto_recover self.exec_callback_yielding_self(:before_recovery) self.resubscribe self.exec_callback_yielding_self(:after_recovery) end |
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
157 158 159 |
# File 'lib/amq/client/async/consumer.rb', line 157 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end |
#cancel(nowait = false, &block) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/amq/client/async/consumer.rb', line 82 def cancel(nowait = false, &block) @connection.send_frame(Protocol::Basic::Cancel.encode(@channel.id, @consumer_tag, nowait)) self.clear_callbacks(:delivery) self.clear_callbacks(:consume) self.clear_callbacks(:scancel) self.unregister_with_channel self.unregister_with_queue if !nowait self.redefine_callback(:cancel, &block) @channel.consumers_awaiting_cancel_ok.push(self) end self end |
#consume(nowait = false, &block) ⇒ Object
63 64 65 66 67 68 69 70 |
# File 'lib/amq/client/async/consumer.rb', line 63 def consume(nowait = false, &block) @connection.send_frame(Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, nowait, @arguments)) self.redefine_callback(:consume, &block) @channel.consumers_awaiting_consume_ok.push(self) self end |
#exclusive? ⇒ Boolean
57 58 59 |
# File 'lib/amq/client/async/consumer.rb', line 57 def exclusive? !!@exclusive end |
#handle_cancel_ok(cancel_ok) ⇒ Object
handle_consume_ok(consume_ok)
213 214 215 216 217 218 219 220 221 222 |
# File 'lib/amq/client/async/consumer.rb', line 213 def handle_cancel_ok(cancel_ok) @consumer_tag = nil # detach from object graph so that this object will be garbage-collected @queue = nil @channel = nil @connection = nil self.exec_callback_once(:cancel, cancel_ok) end |
#handle_consume_ok(consume_ok) ⇒ Object
handle_delivery(basic_deliver, metadata, payload)
209 210 211 |
# File 'lib/amq/client/async/consumer.rb', line 209 def handle_consume_ok(consume_ok) self.exec_callback_once(:consume, consume_ok) end |
#handle_delivery(basic_deliver, metadata, payload) ⇒ Object
Implementation
205 206 207 |
# File 'lib/amq/client/async/consumer.rb', line 205 def handle_delivery(basic_deliver, , payload) self.exec_callback(:delivery, basic_deliver, , payload) end |
#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
141 142 143 |
# File 'lib/amq/client/async/consumer.rb', line 141 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end |
#on_delivery(&block) ⇒ Object
101 102 103 104 105 |
# File 'lib/amq/client/async/consumer.rb', line 101 def on_delivery(&block) self.append_callback(:delivery, &block) self end |
#on_recovery(&block) ⇒ Object Also known as: after_recovery
Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
171 172 173 |
# File 'lib/amq/client/async/consumer.rb', line 171 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end |
#register_with_channel ⇒ Object (protected)
258 259 260 |
# File 'lib/amq/client/async/consumer.rb', line 258 def register_with_channel @channel.consumers[@consumer_tag] = self end |
#register_with_queue ⇒ Object (protected)
register_with_channel
262 263 264 |
# File 'lib/amq/client/async/consumer.rb', line 262 def register_with_queue @queue.consumers[@consumer_tag] = self end |
#reject(delivery_tag, requeue = true) ⇒ Consumer
Returns self.
126 127 128 129 130 |
# File 'lib/amq/client/async/consumer.rb', line 126 def reject(delivery_tag, requeue = true) @channel.reject(delivery_tag, requeue) self end |
#resubscribe(&block) ⇒ Object
Used by automatic recovery code.
74 75 76 77 78 79 |
# File 'lib/amq/client/async/consumer.rb', line 74 def resubscribe(&block) @connection.send_frame(Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, block.nil?, @arguments)) self.redefine_callback(:consume, &block) if block self end |
#to_s ⇒ Object
196 197 198 |
# File 'lib/amq/client/async/consumer.rb', line 196 def to_s "#<#{self.class.name} @consumer_tag=#{@consumer_tag} @queue=#{@queue.name} @channel=#{@channel.id}>" end |
#unregister_with_channel ⇒ Object (protected)
register_with_queue
266 267 268 |
# File 'lib/amq/client/async/consumer.rb', line 266 def unregister_with_channel @channel.consumers.delete(@consumer_tag) end |
#unregister_with_queue ⇒ Object (protected)
register_with_channel
270 271 272 |
# File 'lib/amq/client/async/consumer.rb', line 270 def unregister_with_queue @queue.consumers.delete(@consumer_tag) end |