Class: Zold::AsyncEntrance
- Inherits:
-
Object
- Object
- Zold::AsyncEntrance
- Defined in:
- lib/zold/node/async_entrance.rb
Overview
The entrance
Instance Method Summary collapse
-
#initialize(entrance, log: Log::Quiet.new) ⇒ AsyncEntrance
constructor
A new instance of AsyncEntrance.
- #push(id, body) ⇒ Object
- #start ⇒ Object
- #to_json ⇒ Object
Constructor Details
#initialize(entrance, log: Log::Quiet.new) ⇒ AsyncEntrance
Returns a new instance of AsyncEntrance.
32 33 34 35 36 37 |
# File 'lib/zold/node/async_entrance.rb', line 32 def initialize(entrance, log: Log::Quiet.new) raise 'Entrance can\'t be nil' if entrance.nil? @entrance = entrance raise 'Log can\'t be nil' if log.nil? @log = log end |
Instance Method Details
#push(id, body) ⇒ Object
71 72 73 74 75 76 77 78 79 |
# File 'lib/zold/node/async_entrance.rb', line 71 def push(id, body) @pool.post do VerboseThread.new(@log).run(true) do @entrance.push(id, body) end end @log.debug("Pushed #{id}/#{body.length}b to #{@entrance.class.name}, \ pool: #{@pool.length}/#{@pool.queue_length}") end |
#start ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/zold/node/async_entrance.rb', line 39 def start @entrance.start do @pool = Concurrent::FixedThreadPool.new( Concurrent.processor_count, max_queue: Concurrent.processor_count * 10, fallback_policy: :abort ) begin yield(self) ensure @log.info("Stopping async entrance, pool length is #{@pool.length}, queue length is #{@pool.queue_length}") @pool.shutdown if @pool.wait_for_termination(10) @log.info('Async entrance terminated peacefully') else @pool.kill @log.info('Async entrance was killed') end end end end |
#to_json ⇒ Object
61 62 63 64 65 66 67 68 69 |
# File 'lib/zold/node/async_entrance.rb', line 61 def to_json @entrance.to_json.merge( 'pool.completed_task_count': @pool.completed_task_count, 'pool.largest_length': @pool.largest_length, 'pool.length': @pool.length, 'pool.queue_length': @pool.queue_length, 'pool.running': @pool.running? ) end |