Class: BBK::App::Dispatcher::DirectStreamStrategy
- Inherits:
-
Object
- Object
- BBK::App::Dispatcher::DirectStreamStrategy
- Defined in:
- lib/bbk/app/dispatcher/direct_stream_strategy.rb
Instance Method Summary collapse
-
#initialize(pool, logger:) ⇒ DirectStreamStrategy
constructor
A new instance of DirectStreamStrategy.
- #push(message) ⇒ Object (also: #<<)
- #run(consumers, &block) ⇒ Object
- #stop(timeout = 5) ⇒ Object
Constructor Details
#initialize(pool, logger:) ⇒ DirectStreamStrategy
Returns a new instance of DirectStreamStrategy.
8 9 10 11 12 |
# File 'lib/bbk/app/dispatcher/direct_stream_strategy.rb', line 8 def initialize(pool, logger:) @pool = pool @logger = logger @stopped = false end |
Instance Method Details
#push(message) ⇒ Object Also known as: <<
31 32 33 34 |
# File 'lib/bbk/app/dispatcher/direct_stream_strategy.rb', line 31 def push() @logger.debug "[#{self.class}] Consumed message #{message.headers}" @pool.post(, &@block) unless @stopped end |
#run(consumers, &block) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/bbk/app/dispatcher/direct_stream_strategy.rb', line 14 def run(consumers, &block) @block = block @unblocker = Queue.new consumers.each {|cons| cons.run(self) } @pool.wait_for_termination(0) begin @pool.shutdown rescue StandardError nil end @pool.kill unless @pool.wait_for_termination(@stop_queue_timeout) ensure @unblocker.push(:ok) end |
#stop(timeout = 5) ⇒ Object
37 38 39 40 41 42 43 |
# File 'lib/bbk/app/dispatcher/direct_stream_strategy.rb', line 37 def stop(timeout = 5) @stopped = true @stop_queue_timeout = timeout @pool.shutdown rescue nil @unblocker.pop end |