Class: Zold::AsyncEntrance
- Inherits:
-
Object
- Object
- Zold::AsyncEntrance
- Defined in:
- lib/zold/node/async_entrance.rb
Overview
The entrance
Constant Summary collapse
- THREADS =
How many threads to use for processing
[Concurrent.processor_count, 4].max
- MAX_QUEUE =
Queue length
Concurrent.processor_count * 64
Instance Method Summary collapse
-
#initialize(entrance, dir, log: Log::Quiet.new) ⇒ AsyncEntrance
constructor
A new instance of AsyncEntrance.
-
#push(id, body) ⇒ Object
Always returns an array with a single ID of the pushed wallet.
- #start ⇒ Object
- #to_json ⇒ Object
Constructor Details
#initialize(entrance, dir, log: Log::Quiet.new) ⇒ AsyncEntrance
Returns a new instance of AsyncEntrance.
45 46 47 48 49 50 |
# File 'lib/zold/node/async_entrance.rb', line 45 def initialize(entrance, dir, log: Log::Quiet.new) @entrance = entrance @dir = dir @log = log @mutex = Mutex.new end |
Instance Method Details
#push(id, body) ⇒ Object
Always returns an array with a single ID of the pushed wallet
104 105 106 107 108 109 110 111 112 113 |
# File 'lib/zold/node/async_entrance.rb', line 104 def push(id, body) raise "Queue is too long (#{queue.count} wallets), try again later" if queue.count > AsyncEntrance::MAX_QUEUE start = Time.now Futex.new(file(id), log: @log).open do |f| IO.write(f, body) end @log.debug("Added #{id}/#{Size.new(body.length)} to the queue at pos.#{queue.count} \ in #{Age.new(start, limit: 0.05)}") [id] end |
#start ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/zold/node/async_entrance.rb', line 63 def start @entrance.start do FileUtils.mkdir_p(@dir) @pool = Concurrent::FixedThreadPool.new( AsyncEntrance::THREADS, max_queue: AsyncEntrance::MAX_QUEUE, fallback_policy: :abort ) AsyncEntrance::THREADS.times do |t| @pool.post do Thread.current.name = "async-e##{t}" loop do VerboseThread.new(@log).run(true) { take } break if @pool.shuttingdown? sleep Random.rand(100) / 100 end end end begin yield(self) cycle = 0 until queue.empty? @log.info("Stopping async entrance, #{queue.count} still in the queue (cycle=#{cycle})...") cycle += 1 raise "Can't wait for async entrance to stop for so long" if cycle > 10 sleep 1 end ensure @log.info("Stopping async entrance, pool length is #{@pool.length}, queue length is #{@pool.queue_length}") @pool.shutdown if @pool.wait_for_termination(10) @log.info("Async entrance terminated peacefully with #{queue.count} wallets left in the queue") else @pool.kill @log.info("Async entrance was killed, #{queue.count} wallets left in the queue") end end end end |
#to_json ⇒ Object
52 53 54 55 56 57 58 59 60 61 |
# File 'lib/zold/node/async_entrance.rb', line 52 def to_json opts = queue json = { 'queue': opts.count, 'pool.length': @pool.length, 'pool.running': @pool.running? } json['queue_age'] = opts.empty? ? 0 : Time.now - File.mtime(File.join(@dir, opts[0])) @entrance.to_json.merge(json) end |