Class: HotBunnies::Queue::BlockingCallbackConsumer
- Inherits:
-
CallbackConsumer
- Object
- DefaultConsumer
- BaseConsumer
- CallbackConsumer
- HotBunnies::Queue::BlockingCallbackConsumer
- Includes:
- JavaConcurrent
- Defined in:
- lib/hot_bunnies/queue.rb
Instance Method Summary collapse
- #deliver(*pair) ⇒ Object
-
#initialize(channel, buffer_size, callback) ⇒ BlockingCallbackConsumer
constructor
A new instance of BlockingCallbackConsumer.
- #start ⇒ Object
Methods inherited from CallbackConsumer
Methods inherited from BaseConsumer
#cancel, #handleCancel, #handleCancelOk, #handleDelivery
Constructor Details
#initialize(channel, buffer_size, callback) ⇒ BlockingCallbackConsumer
Returns a new instance of BlockingCallbackConsumer.
221 222 223 224 225 226 227 228 |
# File 'lib/hot_bunnies/queue.rb', line 221 def initialize(channel, buffer_size, callback) super(channel, callback) if buffer_size @internal_queue = ArrayBlockingQueue.new(buffer_size) else @internal_queue = LinkedBlockingQueue.new end end |
Instance Method Details
#deliver(*pair) ⇒ Object
248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/hot_bunnies/queue.rb', line 248 def deliver(*pair) if @cancelling || @cancelled || JavaConcurrent::Thread.current_thread.interrupted? @internal_queue.offer(pair) else begin @internal_queue.put(pair) rescue InterruptedException => e JavaConcurrent::Thread.current_thread.interrupt end end end |
#start ⇒ Object
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/hot_bunnies/queue.rb', line 230 def start interrupted = false until @cancelled || JavaConcurrent::Thread.current_thread.interrupted? begin pair = @internal_queue.take callback(*pair) if pair rescue InterruptedException => e interrupted = true end end while (pair = @internal_queue.poll) callback(*pair) end if interrupted JavaConcurrent::Thread.current_thread.interrupt end end |