Class: Zold::AsyncEntrance
- Inherits:
-
Object
- Object
- Zold::AsyncEntrance
- Defined in:
- lib/zold/node/async_entrance.rb
Overview
The entrance
Constant Summary collapse
- THREADS =
How many threads to use for processing
Concurrent.processor_count * 8
- MAX_QUEUE =
Max items in the queue. If there will be more, push() requests will be rejected.
128
Instance Method Summary collapse
-
#initialize(entrance, dir, log: Log::Quiet.new) ⇒ AsyncEntrance
constructor
A new instance of AsyncEntrance.
-
#push(id, body) ⇒ Object
Always returns an array with a single ID of the pushed wallet.
- #start ⇒ Object
- #to_json ⇒ Object
Constructor Details
#initialize(entrance, dir, log: Log::Quiet.new) ⇒ AsyncEntrance
Returns a new instance of AsyncEntrance.
45 46 47 48 49 50 51 52 53 54 |
# File 'lib/zold/node/async_entrance.rb', line 45 def initialize(entrance, dir, log: Log::Quiet.new) raise 'Entrance can\'t be nil' if entrance.nil? @entrance = entrance raise 'Directory can\'t be nil' if dir.nil? raise 'Directory must be of type String' unless dir.is_a?(String) @dir = dir raise 'Log can\'t be nil' if log.nil? @log = log @mutex = Mutex.new end |
Instance Method Details
#push(id, body) ⇒ Object
Always returns an array with a single ID of the pushed wallet
99 100 101 102 103 104 105 |
# File 'lib/zold/node/async_entrance.rb', line 99 def push(id, body) raise "Queue is too long (#{queue.count} wallets), try again later" if queue.count > AsyncEntrance::MAX_QUEUE @mutex.synchronize do AtomicFile.new(File.join(@dir, id.to_s)).write(body) end [id] end |
#start ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/zold/node/async_entrance.rb', line 67 def start @entrance.start do FileUtils.mkdir_p(@dir) @pool = Concurrent::FixedThreadPool.new( AsyncEntrance::THREADS, max_queue: AsyncEntrance::MAX_QUEUE, fallback_policy: :abort ) AsyncEntrance::THREADS.times do |t| @pool.post do Thread.current.name = "async-e##{t}" loop do VerboseThread.new(@log).run(true) { take } break if @pool.shuttingdown? sleep Random.rand(100) / 100 end end end begin yield(self) ensure @log.info("Stopping async entrance, pool length is #{@pool.length}, queue length is #{@pool.queue_length}") @pool.shutdown if @pool.wait_for_termination(10) @log.info('Async entrance terminated peacefully') else @pool.kill @log.info('Async entrance was killed') end end end end |
#to_json ⇒ Object
56 57 58 59 60 61 62 63 64 65 |
# File 'lib/zold/node/async_entrance.rb', line 56 def to_json json = { 'queue': queue.count, 'pool.length': @pool.length, 'pool.running': @pool.running? } opts = queue json['queue_age'] = opts.empty? ? 0 : Time.now - File.mtime(File.join(@dir, opts[0])) @entrance.to_json.merge(json) end |