Class: Zold::AsyncEntrance
- Inherits:
-
Object
- Object
- Zold::AsyncEntrance
- Defined in:
- lib/zold/node/async_entrance.rb
Overview
The entrance
Constant Summary collapse
- THREADS =
How many threads to use for processing
Concurrent.processor_count * 8
- MAX_QUEUE =
Queue length
Concurrent.processor_count * 64
Instance Method Summary collapse
-
#initialize(entrance, dir, log: Log::Quiet.new) ⇒ AsyncEntrance
constructor
A new instance of AsyncEntrance.
-
#push(id, body) ⇒ Object
Always returns an array with a single ID of the pushed wallet.
- #start ⇒ Object
- #to_json ⇒ Object
Constructor Details
#initialize(entrance, dir, log: Log::Quiet.new) ⇒ AsyncEntrance
Returns a new instance of AsyncEntrance.
43 44 45 46 47 48 49 50 51 52 |
# File 'lib/zold/node/async_entrance.rb', line 43 def initialize(entrance, dir, log: Log::Quiet.new) raise 'Entrance can\'t be nil' if entrance.nil? @entrance = entrance raise 'Directory can\'t be nil' if dir.nil? raise 'Directory must be of type String' unless dir.is_a?(String) @dir = dir raise 'Log can\'t be nil' if log.nil? @log = log @mutex = Mutex.new end |
Instance Method Details
#push(id, body) ⇒ Object
Always returns an array with a single ID of the pushed wallet
97 98 99 100 101 102 103 |
# File 'lib/zold/node/async_entrance.rb', line 97 def push(id, body) raise "Queue is too long (#{queue.count} wallets), try again later" if queue.count > AsyncEntrance::MAX_QUEUE @mutex.synchronize do File.write(File.join(@dir, id.to_s), body) end [id] end |
#start ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/zold/node/async_entrance.rb', line 65 def start @entrance.start do FileUtils.mkdir_p(@dir) @pool = Concurrent::FixedThreadPool.new( AsyncEntrance::THREADS, max_queue: AsyncEntrance::MAX_QUEUE, fallback_policy: :abort ) AsyncEntrance::THREADS.times do |t| @pool.post do Thread.current.name = "async-e##{t}" loop do VerboseThread.new(@log).run(true) { take } break if @pool.shuttingdown? sleep Random.rand(100) / 100 end end end begin yield(self) ensure @log.info("Stopping async entrance, pool length is #{@pool.length}, queue length is #{@pool.queue_length}") @pool.shutdown if @pool.wait_for_termination(10) @log.info('Async entrance terminated peacefully') else @pool.kill @log.info('Async entrance was killed') end end end end |
#to_json ⇒ Object
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/zold/node/async_entrance.rb', line 54 def to_json json = { 'queue': queue.count, 'pool.length': @pool.length, 'pool.running': @pool.running? } opts = queue json['queue_age'] = opts.empty? ? 0 : Time.now - File.mtime(File.join(@dir, opts[0])) @entrance.to_json.merge(json) end |