Class: HotBunnies::Queue::Subscription
- Inherits:
-
Object
- Object
- HotBunnies::Queue::Subscription
- Includes:
- JavaConcurrent
- Defined in:
- lib/hot_bunnies/queue.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#consumer_tag ⇒ Object
readonly
Returns the value of attribute consumer_tag.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
Instance Method Summary collapse
- #active? ⇒ Boolean
- #cancel ⇒ Object
- #cancelled? ⇒ Boolean
- #each(options = {}, &block) ⇒ Object (also: #each_message)
-
#initialize(channel, queue_name, options = {}) ⇒ Subscription
constructor
A new instance of Subscription.
- #shutdown! ⇒ Object (also: #shut_down!)
- #start(consumer) ⇒ Object
Constructor Details
#initialize(channel, queue_name, options = {}) ⇒ Subscription
Returns a new instance of Subscription.
76 77 78 79 80 81 82 |
# File 'lib/hot_bunnies/queue.rb', line 76 def initialize(channel, queue_name, ={}) @channel = channel @queue_name = queue_name @ack = .fetch(:ack, false) @cancelled = AtomicBoolean.new(false) end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
74 75 76 |
# File 'lib/hot_bunnies/queue.rb', line 74 def channel @channel end |
#consumer_tag ⇒ Object (readonly)
Returns the value of attribute consumer_tag.
74 75 76 |
# File 'lib/hot_bunnies/queue.rb', line 74 def consumer_tag @consumer_tag end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
74 75 76 |
# File 'lib/hot_bunnies/queue.rb', line 74 def queue_name @queue_name end |
Instance Method Details
#active? ⇒ Boolean
114 115 116 |
# File 'lib/hot_bunnies/queue.rb', line 114 def active? !@cancelled.get && !@consumer.nil? && !@consumer.consumer_tag.nil? end |
#cancel ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/hot_bunnies/queue.rb', line 97 def cancel raise 'Can\'t cancel: the subscriber haven\'t received an OK yet' if !self.active? @consumer.cancel # RabbitMQ Java client won't clear consumer_tag from cancelled consumers, # so we have to do this. Sharing consumers # between threads in general is a can of worms but someone somewhere # will almost certainly do it, so. MK. @cancelled.set(true) maybe_shutdown_executor end |
#cancelled? ⇒ Boolean
110 111 112 |
# File 'lib/hot_bunnies/queue.rb', line 110 def cancelled? @cancelled.get end |
#each(options = {}, &block) ⇒ Object Also known as: each_message
84 85 86 87 88 |
# File 'lib/hot_bunnies/queue.rb', line 84 def each(={}, &block) raise 'The subscription already has a message listener' if @consumer start(create_consumer(, block)) nil end |
#shutdown! ⇒ Object Also known as: shut_down!
118 119 120 121 122 |
# File 'lib/hot_bunnies/queue.rb', line 118 def shutdown! if @executor && @shut_down_executor @executor.shutdown_now end end |
#start(consumer) ⇒ Object
91 92 93 94 95 |
# File 'lib/hot_bunnies/queue.rb', line 91 def start(consumer) @consumer = consumer @consumer_tag = @channel.basic_consume(@queue_name, !@ack, @consumer) @consumer.start end |