Class: Zold::AsyncEntrance

Inherits:
Object
  • Object
show all
Defined in:
lib/zold/node/async_entrance.rb

Overview

The entrance

Constant Summary collapse

THREADS =

How many threads to use for processing

Concurrent.processor_count * 8
MAX_QUEUE =

Queue length

Concurrent.processor_count * 64

Instance Method Summary collapse

Constructor Details

#initialize(entrance, dir, log: Log::Quiet.new) ⇒ AsyncEntrance

Returns a new instance of AsyncEntrance.



43
44
45
46
47
48
49
50
51
52
# File 'lib/zold/node/async_entrance.rb', line 43

def initialize(entrance, dir, log: Log::Quiet.new)
  raise 'Entrance can\'t be nil' if entrance.nil?
  @entrance = entrance
  raise 'Directory can\'t be nil' if dir.nil?
  raise 'Directory must be of type String' unless dir.is_a?(String)
  @dir = dir
  raise 'Log can\'t be nil' if log.nil?
  @log = log
  @mutex = Mutex.new
end

Instance Method Details

#push(id, body) ⇒ Object

Always returns an array with a single ID of the pushed wallet



97
98
99
100
101
102
103
# File 'lib/zold/node/async_entrance.rb', line 97

def push(id, body)
  raise "Queue is too long (#{queue.count} wallets), try again later" if queue.count > AsyncEntrance::MAX_QUEUE
  @mutex.synchronize do
    File.write(File.join(@dir, id.to_s), body)
  end
  [id]
end

#startObject



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/zold/node/async_entrance.rb', line 65

def start
  @entrance.start do
    FileUtils.mkdir_p(@dir)
    @pool = Concurrent::FixedThreadPool.new(
      AsyncEntrance::THREADS, max_queue: AsyncEntrance::MAX_QUEUE, fallback_policy: :abort
    )
    AsyncEntrance::THREADS.times do |t|
      @pool.post do
        Thread.current.name = "async-e##{t}"
        loop do
          VerboseThread.new(@log).run(true) { take }
          break if @pool.shuttingdown?
          sleep Random.rand(100) / 100
        end
      end
    end
    begin
      yield(self)
    ensure
      @log.info("Stopping async entrance, pool length is #{@pool.length}, queue length is #{@pool.queue_length}")
      @pool.shutdown
      if @pool.wait_for_termination(10)
        @log.info('Async entrance terminated peacefully')
      else
        @pool.kill
        @log.info('Async entrance was killed')
      end
    end
  end
end

#to_jsonObject



54
55
56
57
58
59
60
61
62
63
# File 'lib/zold/node/async_entrance.rb', line 54

def to_json
  json = {
    'queue': queue.count,
    'pool.length': @pool.length,
    'pool.running': @pool.running?
  }
  opts = queue
  json['queue_age'] = opts.empty? ? 0 : Time.now - File.mtime(File.join(@dir, opts[0]))
  @entrance.to_json.merge(json)
end