Class: Zold::AsyncEntrance

Inherits:
Object
  • Object
show all
Defined in:
lib/zold/node/async_entrance.rb

Overview

The entrance

Instance Method Summary collapse

Constructor Details

#initialize(entrance, log: Log::Quiet.new) ⇒ AsyncEntrance

Returns a new instance of AsyncEntrance.



32
33
34
35
36
37
# File 'lib/zold/node/async_entrance.rb', line 32

def initialize(entrance, log: Log::Quiet.new)
  raise 'Entrance can\'t be nil' if entrance.nil?
  @entrance = entrance
  raise 'Log can\'t be nil' if log.nil?
  @log = log
end

Instance Method Details

#push(id, body) ⇒ Object



71
72
73
74
75
76
77
78
79
# File 'lib/zold/node/async_entrance.rb', line 71

def push(id, body)
  @pool.post do
    VerboseThread.new(@log).run(true) do
      @entrance.push(id, body)
    end
  end
  @log.debug("Pushed #{id}/#{body.length}b to #{@entrance.class.name}, \
pool: #{@pool.length}/#{@pool.queue_length}")
end

#startObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/zold/node/async_entrance.rb', line 39

def start
  @entrance.start do
    @pool = Concurrent::FixedThreadPool.new(
      Concurrent.processor_count,
      max_queue: Concurrent.processor_count * 10,
      fallback_policy: :abort
    )
    begin
      yield(self)
    ensure
      @log.info("Stopping async entrance, pool length is #{@pool.length}, queue length is #{@pool.queue_length}")
      @pool.shutdown
      if @pool.wait_for_termination(10)
        @log.info('Async entrance terminated peacefully')
      else
        @pool.kill
        @log.info('Async entrance was killed')
      end
    end
  end
end

#to_jsonObject



61
62
63
64
65
66
67
68
69
# File 'lib/zold/node/async_entrance.rb', line 61

def to_json
  @entrance.to_json.merge(
    'pool.completed_task_count': @pool.completed_task_count,
    'pool.largest_length': @pool.largest_length,
    'pool.length': @pool.length,
    'pool.queue_length': @pool.queue_length,
    'pool.running': @pool.running?
  )
end