Class: Sbmt::Outbox::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/sbmt/outbox/thread_pool.rb

Constant Summary collapse

BREAK =
Object.new.freeze
SKIPPED =
Object.new.freeze
PROCESSED =
Object.new.freeze

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ ThreadPool

Returns a new instance of ThreadPool.



12
13
14
15
16
# File 'lib/sbmt/outbox/thread_pool.rb', line 12

def initialize(&block)
  self.task_source = block
  self.task_mutex = Mutex.new
  self.stopped = true
end

Instance Method Details

#next_taskObject



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/sbmt/outbox/thread_pool.rb', line 18

def next_task
  task_mutex.synchronize do
    return if stopped
    item = task_source.call

    if item == BREAK
      self.stopped = true
      return
    end

    item
  end
end

#start(concurrency:) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/sbmt/outbox/thread_pool.rb', line 32

def start(concurrency:)
  self.stopped = false
  result = run_threads(count: concurrency) do |item|
    yield worker_number, item
  end

  raise result if result.is_a?(Exception)
  nil
ensure
  self.stopped = true
end

#stopObject



44
45
46
# File 'lib/sbmt/outbox/thread_pool.rb', line 44

def stop
  self.stopped = true
end

#worker_numberObject



48
49
50
# File 'lib/sbmt/outbox/thread_pool.rb', line 48

def worker_number
  Thread.current["thread_pool_worker_number:#{object_id}"]
end