Class: Workhorse::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/workhorse/pool.rb

Overview

Abstraction layer of a simple thread pool implementation used by the worker.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size) ⇒ Pool

Returns a new instance of Pool.



7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/workhorse/pool.rb', line 7

def initialize(size)
  @size = size
  @executor = Concurrent::ThreadPoolExecutor.new(
    min_threads:     0,
    max_threads:     @size,
    max_queue:       0,
    fallback_policy: :abort,
    auto_terminate:  false
  )
  @mutex = Mutex.new
  @active_threads = Concurrent::AtomicFixnum.new(0)
  @on_idle = nil
end

Instance Attribute Details

#active_threadsObject (readonly)

Returns the value of attribute active_threads.



5
6
7
# File 'lib/workhorse/pool.rb', line 5

def active_threads
  @active_threads
end

#mutexObject (readonly)

Returns the value of attribute mutex.



4
5
6
# File 'lib/workhorse/pool.rb', line 4

def mutex
  @mutex
end

Instance Method Details

#idleObject

Returns the number of idle threads.



46
47
48
# File 'lib/workhorse/pool.rb', line 46

def idle
  @size - @active_threads.value
end

#on_idle(&block) ⇒ Object



21
22
23
# File 'lib/workhorse/pool.rb', line 21

def on_idle(&block)
  @on_idle = block
end

#postObject

Posts a new work unit to the pool.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/workhorse/pool.rb', line 26

def post
  mutex.synchronize do
    if idle.zero?
      fail 'All threads are busy.'
    end

    active_threads = @active_threads

    active_threads.increment

    @executor.post do
      yield
    ensure
      active_threads.decrement
      @on_idle.try(:call)
    end
  end
end

#shutdownObject

Shuts down the pool and waits for termination.



64
65
66
67
# File 'lib/workhorse/pool.rb', line 64

def shutdown
  @executor.shutdown
  wait
end

#waitObject

Waits until the pool is shut down. This will wait forever unless you eventually call shutdown (either before calling ‘wait` or after it in another thread).



53
54
55
56
57
58
59
60
61
# File 'lib/workhorse/pool.rb', line 53

def wait
  # Here we use a loop-sleep combination instead of using
  # ThreadPoolExecutor's `wait_for_termination`. See issue #21 for more
  # information.
  loop do
    break if @executor.shutdown?
    sleep 0.1
  end
end