Class: MarchHare::BaseConsumer
- Inherits:
-
DefaultConsumer
- Object
- DefaultConsumer
- MarchHare::BaseConsumer
- Defined in:
- lib/march_hare/consumers/base.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#auto_ack ⇒ Object
Returns the value of attribute auto_ack.
-
#consumer_tag ⇒ Object
Returns the value of attribute consumer_tag.
Instance Method Summary collapse
- #active? ⇒ Boolean
- #cancelled? ⇒ Boolean
- #deliver(headers, message) ⇒ Object
- #gracefully_shut_down ⇒ Object
- #handleCancel(consumer_tag) ⇒ Object
- #handleCancelOk(consumer_tag) ⇒ Object
- #handleDelivery(consumer_tag, envelope, properties, bytes) ⇒ Object
-
#initialize(channel, queue, opts) ⇒ BaseConsumer
constructor
A new instance of BaseConsumer.
- #recover_from_network_failure ⇒ Object
- #start ⇒ Object
- #terminated? ⇒ Boolean
Constructor Details
#initialize(channel, queue, opts) ⇒ BaseConsumer
Returns a new instance of BaseConsumer.
10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/march_hare/consumers/base.rb', line 10 def initialize(channel, queue, opts) super(channel) @channel = channel @queue = queue @opts = opts @auto_ack = true @cancelling = JavaConcurrent::AtomicBoolean.new @cancelled = JavaConcurrent::AtomicBoolean.new @terminated = JavaConcurrent::AtomicBoolean.new end |
Instance Attribute Details
#auto_ack ⇒ Object
Returns the value of attribute auto_ack.
8 9 10 |
# File 'lib/march_hare/consumers/base.rb', line 8 def auto_ack @auto_ack end |
#consumer_tag ⇒ Object
Returns the value of attribute consumer_tag.
7 8 9 |
# File 'lib/march_hare/consumers/base.rb', line 7 def consumer_tag @consumer_tag end |
Instance Method Details
#active? ⇒ Boolean
75 76 77 |
# File 'lib/march_hare/consumers/base.rb', line 75 def active? !terminated? end |
#cancelled? ⇒ Boolean
71 72 73 |
# File 'lib/march_hare/consumers/base.rb', line 71 def cancelled? @cancelling.get || @cancelled.get end |
#deliver(headers, message) ⇒ Object
67 68 69 |
# File 'lib/march_hare/consumers/base.rb', line 67 def deliver(headers, ) raise NotImplementedError, 'To be implemented by a subclass' end |
#gracefully_shut_down ⇒ Object
63 64 65 |
# File 'lib/march_hare/consumers/base.rb', line 63 def gracefully_shut_down # no-op end |
#handleCancel(consumer_tag) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/march_hare/consumers/base.rb', line 30 def handleCancel(consumer_tag) @cancelled.set(true) @channel.unregister_consumer(consumer_tag) if f = @opts[:on_cancellation] case f.arity when 0 then f.call when 1 then f.call(self) when 2 then f.call(@channel, self) when 3 then f.call(@channel, self, consumer_tag) else f.call(@channel, self, consumer_tag) end end @terminated.set(true) end |
#handleCancelOk(consumer_tag) ⇒ Object
52 53 54 55 56 57 |
# File 'lib/march_hare/consumers/base.rb', line 52 def handleCancelOk(consumer_tag) @cancelled.set(true) @channel.unregister_consumer(consumer_tag) @terminated.set(true) end |
#handleDelivery(consumer_tag, envelope, properties, bytes) ⇒ Object
23 24 25 26 27 28 |
# File 'lib/march_hare/consumers/base.rb', line 23 def handleDelivery(consumer_tag, envelope, properties, bytes) body = String.from_java_bytes(bytes) headers = Headers.new(channel, consumer_tag, envelope, properties) deliver(headers, body) end |
#recover_from_network_failure ⇒ Object
84 85 86 87 88 89 90 |
# File 'lib/march_hare/consumers/base.rb', line 84 def recover_from_network_failure @terminated.set(false) @cancelled.set(false) @consumer_tag = @channel.basic_consume(@queue.name, @auto_ack, @consumer_tag, self) @consumer_tag end |
#start ⇒ Object
59 60 61 |
# File 'lib/march_hare/consumers/base.rb', line 59 def start # no-op end |
#terminated? ⇒ Boolean
79 80 81 |
# File 'lib/march_hare/consumers/base.rb', line 79 def terminated? @terminated.get end |