Class: HotBunnies::Queue::Subscription
- Inherits:
-
Object
- Object
- HotBunnies::Queue::Subscription
- Defined in:
- lib/hot_bunnies/queue.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#consumer_tag ⇒ Object
readonly
Returns the value of attribute consumer_tag.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
Instance Method Summary collapse
- #active? ⇒ Boolean
- #cancel ⇒ Object
- #cancelled? ⇒ Boolean
- #each(options = {}, &block) ⇒ Object
-
#initialize(channel, queue_name, options = {}) ⇒ Subscription
constructor
A new instance of Subscription.
- #shutdown! ⇒ Object
Constructor Details
#initialize(channel, queue_name, options = {}) ⇒ Subscription
Returns a new instance of Subscription.
64 65 66 67 68 69 70 |
# File 'lib/hot_bunnies/queue.rb', line 64 def initialize(channel, queue_name, ={}) @channel = channel @queue_name = queue_name @ack = .fetch(:ack, false) @cancelled = java.util.concurrent.atomic.AtomicBoolean.new(false) end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
62 63 64 |
# File 'lib/hot_bunnies/queue.rb', line 62 def channel @channel end |
#consumer_tag ⇒ Object (readonly)
Returns the value of attribute consumer_tag.
62 63 64 |
# File 'lib/hot_bunnies/queue.rb', line 62 def consumer_tag @consumer_tag end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
62 63 64 |
# File 'lib/hot_bunnies/queue.rb', line 62 def queue_name @queue_name end |
Instance Method Details
#active? ⇒ Boolean
105 106 107 |
# File 'lib/hot_bunnies/queue.rb', line 105 def active? !@cancelled.get && !@subscriber.nil? && !@subscriber.consumer_tag.nil? end |
#cancel ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/hot_bunnies/queue.rb', line 88 def cancel raise 'Can\'t cancel: the subscriber haven\'t received an OK yet' if !self.active? @channel.basic_cancel(@subscriber.consumer_tag) # RabbitMQ Java client won't clear consumer_tag from cancelled consumers, # so we have to do this. Sharing consumers # between threads in general is a can of worms but someone somewhere # will almost certainly do it, so. MK. @cancelled.set(true) maybe_shutdown_executor end |
#cancelled? ⇒ Boolean
101 102 103 |
# File 'lib/hot_bunnies/queue.rb', line 101 def cancelled? @cancelled.get end |
#each(options = {}, &block) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/hot_bunnies/queue.rb', line 72 def each(={}, &block) raise 'The subscription already has a message listener' if @subscriber if .fetch(:blocking, true) run(&block) else if [:executor] @shut_down_executor = false @executor = [:executor] else @shut_down_executor = true @executor = java.util.concurrent.Executors.new_single_thread_executor end @executor.submit { run(&block) } end end |
#shutdown! ⇒ Object
109 110 111 112 113 |
# File 'lib/hot_bunnies/queue.rb', line 109 def shutdown! if @executor && @shut_down_executor @executor.shutdown_now end end |