Class: Zold::AsyncEntrance

Inherits:
Object
  • Object
show all
Defined in:
lib/zold/node/async_entrance.rb

Overview

The entrance

Constant Summary collapse

THREADS =

How many threads to use for processing

Concurrent.processor_count * 8
MAX_QUEUE =

Max items in the queue. If there will be more, push() requests will be rejected.

128

Instance Method Summary collapse

Constructor Details

#initialize(entrance, dir, log: Log::Quiet.new) ⇒ AsyncEntrance

Returns a new instance of AsyncEntrance.



45
46
47
48
49
50
51
52
53
54
# File 'lib/zold/node/async_entrance.rb', line 45

def initialize(entrance, dir, log: Log::Quiet.new)
  raise 'Entrance can\'t be nil' if entrance.nil?
  @entrance = entrance
  raise 'Directory can\'t be nil' if dir.nil?
  raise 'Directory must be of type String' unless dir.is_a?(String)
  @dir = dir
  raise 'Log can\'t be nil' if log.nil?
  @log = log
  @mutex = Mutex.new
end

Instance Method Details

#push(id, body) ⇒ Object

Always returns an array with a single ID of the pushed wallet



99
100
101
102
103
104
105
# File 'lib/zold/node/async_entrance.rb', line 99

def push(id, body)
  raise "Queue is too long (#{queue.count} wallets), try again later" if queue.count > AsyncEntrance::MAX_QUEUE
  @mutex.synchronize do
    AtomicFile.new(File.join(@dir, id.to_s)).write(body)
  end
  [id]
end

#startObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/zold/node/async_entrance.rb', line 67

def start
  @entrance.start do
    FileUtils.mkdir_p(@dir)
    @pool = Concurrent::FixedThreadPool.new(
      AsyncEntrance::THREADS, max_queue: AsyncEntrance::MAX_QUEUE, fallback_policy: :abort
    )
    AsyncEntrance::THREADS.times do |t|
      @pool.post do
        Thread.current.name = "async-e##{t}"
        loop do
          VerboseThread.new(@log).run(true) { take }
          break if @pool.shuttingdown?
          sleep Random.rand(100) / 100
        end
      end
    end
    begin
      yield(self)
    ensure
      @log.info("Stopping async entrance, pool length is #{@pool.length}, queue length is #{@pool.queue_length}")
      @pool.shutdown
      if @pool.wait_for_termination(10)
        @log.info('Async entrance terminated peacefully')
      else
        @pool.kill
        @log.info('Async entrance was killed')
      end
    end
  end
end

#to_jsonObject



56
57
58
59
60
61
62
63
64
65
# File 'lib/zold/node/async_entrance.rb', line 56

def to_json
  json = {
    'queue': queue.count,
    'pool.length': @pool.length,
    'pool.running': @pool.running?
  }
  opts = queue
  json['queue_age'] = opts.empty? ? 0 : Time.now - File.mtime(File.join(@dir, opts[0]))
  @entrance.to_json.merge(json)
end