Class: AMQ::Client::Async::Consumer

Inherits:
Object
  • Object
show all
Extended by:
ProtocolMethodHandlers
Includes:
Callbacks, Extensions::RabbitMQ::Basic::ConsumerMixin
Defined in:
lib/amq/client/async/extensions/rabbitmq/cancel.rb,
lib/amq/client/async/consumer.rb

Overview

Extensions

Instance Attribute Summary collapse

Acknowledging & Rejecting Messages collapse

Error Handling & Recovery collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ProtocolMethodHandlers

handle, handlers

Methods included from Extensions::RabbitMQ::Basic::ConsumerMixin

#handle_cancel, included, #on_cancel

Methods included from Callbacks

#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback

Constructor Details

#initialize(channel, queue, consumer_tag = self.class.tag_generator.generate_for(queue), exclusive = false, no_ack = false, arguments = {}, no_local = false, &block) ⇒ Consumer

Returns a new instance of Consumer.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/amq/client/async/consumer.rb', line 39

def initialize(channel, queue, consumer_tag = self.class.tag_generator.generate_for(queue), exclusive = false, no_ack = false, arguments = {}, no_local = false, &block)
  @callbacks    = Hash.new

  @channel       = channel            || raise(ArgumentError, "channel is nil")
  @connection    = channel.connection || raise(ArgumentError, "connection is nil")
  @queue         = queue        || raise(ArgumentError, "queue is nil")
  @consumer_tag  = consumer_tag
  @exclusive     = exclusive
  @no_ack        = no_ack
  @arguments     = arguments

  @no_local     = no_local

  self.register_with_channel
  self.register_with_queue
end

Instance Attribute Details

#argumentsObject (readonly)

Returns the value of attribute arguments.



27
28
29
# File 'lib/amq/client/async/consumer.rb', line 27

def arguments
  @arguments
end

#channelObject (readonly)

API



24
25
26
# File 'lib/amq/client/async/consumer.rb', line 24

def channel
  @channel
end

#consumer_tagObject (readonly)

Returns the value of attribute consumer_tag.



26
27
28
# File 'lib/amq/client/async/consumer.rb', line 26

def consumer_tag
  @consumer_tag
end

#queueObject (readonly)

Returns the value of attribute queue.



25
26
27
# File 'lib/amq/client/async/consumer.rb', line 25

def queue
  @queue
end

Class Method Details

.tag_generatorObject



30
31
32
# File 'lib/amq/client/async/consumer.rb', line 30

def self.tag_generator
  @tag_generator ||= AMQ::Client::ConsumerTagGenerator.new
end

.tag_generator=(generator) ⇒ Object

self.tag_generator



34
35
36
# File 'lib/amq/client/async/consumer.rb', line 34

def self.tag_generator=(generator)
  @tag_generator = generator
end

Instance Method Details

#acknowledge(delivery_tag) ⇒ Consumer

Acknowledge a delivery tag.



115
116
117
118
119
# File 'lib/amq/client/async/consumer.rb', line 115

def acknowledge(delivery_tag)
  @channel.acknowledge(delivery_tag)

  self
end

#auto_recoverObject

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).



187
188
189
190
191
# File 'lib/amq/client/async/consumer.rb', line 187

def auto_recover
  self.exec_callback_yielding_self(:before_recovery)
  self.resubscribe
  self.exec_callback_yielding_self(:after_recovery)
end

#before_recovery(&block) ⇒ Object

Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).



157
158
159
# File 'lib/amq/client/async/consumer.rb', line 157

def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end

#cancel(nowait = false, &block) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/amq/client/async/consumer.rb', line 82

def cancel(nowait = false, &block)
  @connection.send_frame(Protocol::Basic::Cancel.encode(@channel.id, @consumer_tag, nowait))
  self.clear_callbacks(:delivery)
  self.clear_callbacks(:consume)
  self.clear_callbacks(:scancel)

  self.unregister_with_channel
  self.unregister_with_queue

  if !nowait
    self.redefine_callback(:cancel, &block)
    @channel.consumers_awaiting_cancel_ok.push(self)
  end

  self
end

#consume(nowait = false, &block) ⇒ Object



63
64
65
66
67
68
69
70
# File 'lib/amq/client/async/consumer.rb', line 63

def consume(nowait = false, &block)
  @connection.send_frame(Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, nowait, @arguments))
  self.redefine_callback(:consume, &block)

  @channel.consumers_awaiting_consume_ok.push(self)

  self
end

#exclusive?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/amq/client/async/consumer.rb', line 57

def exclusive?
  !!@exclusive
end

#handle_cancel_ok(cancel_ok) ⇒ Object

handle_consume_ok(consume_ok)



213
214
215
216
217
218
219
220
221
222
# File 'lib/amq/client/async/consumer.rb', line 213

def handle_cancel_ok(cancel_ok)
  @consumer_tag = nil

  # detach from object graph so that this object will be garbage-collected
  @queue        = nil
  @channel      = nil
  @connection   = nil

  self.exec_callback_once(:cancel, cancel_ok)
end

#handle_consume_ok(consume_ok) ⇒ Object

handle_delivery(basic_deliver, metadata, payload)



209
210
211
# File 'lib/amq/client/async/consumer.rb', line 209

def handle_consume_ok(consume_ok)
  self.exec_callback_once(:consume, consume_ok)
end

#handle_delivery(basic_deliver, metadata, payload) ⇒ Object

Implementation



205
206
207
# File 'lib/amq/client/async/consumer.rb', line 205

def handle_delivery(basic_deliver, , payload)
  self.exec_callback(:delivery, basic_deliver, , payload)
end

#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).



141
142
143
# File 'lib/amq/client/async/consumer.rb', line 141

def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end

#on_delivery(&block) ⇒ Object



101
102
103
104
105
# File 'lib/amq/client/async/consumer.rb', line 101

def on_delivery(&block)
  self.append_callback(:delivery, &block)

  self
end

#on_recovery(&block) ⇒ Object Also known as: after_recovery

Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).



171
172
173
# File 'lib/amq/client/async/consumer.rb', line 171

def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end

#register_with_channelObject (protected)



258
259
260
# File 'lib/amq/client/async/consumer.rb', line 258

def register_with_channel
  @channel.consumers[@consumer_tag] = self
end

#register_with_queueObject (protected)

register_with_channel



262
263
264
# File 'lib/amq/client/async/consumer.rb', line 262

def register_with_queue
  @queue.consumers[@consumer_tag]   = self
end

#reject(delivery_tag, requeue = true) ⇒ Consumer

Returns self.



126
127
128
129
130
# File 'lib/amq/client/async/consumer.rb', line 126

def reject(delivery_tag, requeue = true)
  @channel.reject(delivery_tag, requeue)

  self
end

#resubscribe(&block) ⇒ Object

Used by automatic recovery code.



74
75
76
77
78
79
# File 'lib/amq/client/async/consumer.rb', line 74

def resubscribe(&block)
  @connection.send_frame(Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, block.nil?, @arguments))
  self.redefine_callback(:consume, &block) if block

  self
end

#to_sObject



196
197
198
# File 'lib/amq/client/async/consumer.rb', line 196

def to_s
  "#<#{self.class.name} @consumer_tag=#{@consumer_tag} @queue=#{@queue.name} @channel=#{@channel.id}>"
end

#unregister_with_channelObject (protected)

register_with_queue



266
267
268
# File 'lib/amq/client/async/consumer.rb', line 266

def unregister_with_channel
  @channel.consumers.delete(@consumer_tag)
end

#unregister_with_queueObject (protected)

register_with_channel



270
271
272
# File 'lib/amq/client/async/consumer.rb', line 270

def unregister_with_queue
  @queue.consumers.delete(@consumer_tag)
end