Class: VCAP::Concurrency::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/vcap/concurrency/thread_pool.rb

Constant Summary collapse

STOP_SENTINEL =
:stop
STATE_CREATED =
0
STATE_STARTED =
1
STATE_STOPPED =
2

Instance Method Summary collapse

Constructor Details

#initialize(num_threads) ⇒ ThreadPool

Returns a new instance of ThreadPool.



18
19
20
21
22
23
24
25
# File 'lib/vcap/concurrency/thread_pool.rb', line 18

def initialize(num_threads)
  @num_threads = num_threads
  @threads     = []
  @work_queue  = Queue.new
  @state       = STATE_CREATED
  @pool_lock   = Mutex.new
  @num_active_tasks = VCAP::Concurrency::AtomicVar.new(0)
end

Instance Method Details

#enqueue(&blk) ⇒ VCAP::Concurrent::Promise

Adds a block that will be executed by a worker thread.

Parameters:

  • blk (Block)

    The block to be executed by a worker thread.

Returns:

  • (VCAP::Concurrent::Promise)

    The caller of enqueue() may wait for the result of blk by calling resolve()



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/vcap/concurrency/thread_pool.rb', line 50

def enqueue(&blk)
  @pool_lock.synchronize do
    assert_state_in(STATE_CREATED, STATE_STARTED)

    promise = VCAP::Concurrency::Promise.new

    @work_queue.enq([blk, promise])

    promise
  end
end

#joinObject

Waits for all worker threads to finish executing.



74
75
76
77
78
79
80
81
82
# File 'lib/vcap/concurrency/thread_pool.rb', line 74

def join
  @pool_lock.synchronize do
    assert_state_in(STATE_STARTED, STATE_STOPPED)
  end

  @threads.each { |t| t.join }

  nil
end

#num_active_tasksInteger

Returns the number of tasks that are currently running. This is equivalent to the number of active threads.

Returns:

  • (Integer)


97
98
99
# File 'lib/vcap/concurrency/thread_pool.rb', line 97

def num_active_tasks
  @num_active_tasks.value
end

#num_queued_tasksInteger

Returns the number of tasks waiting to be processed

NB: While technically correct, this will include the number of unprocessed

sentinel tasks after stop() is called.

Returns:

  • (Integer)


107
108
109
# File 'lib/vcap/concurrency/thread_pool.rb', line 107

def num_queued_tasks
  @work_queue.length
end

#shutdownObject

Queues up sentinel values to notify workers to stop, then waits for them to finish.



86
87
88
89
90
91
# File 'lib/vcap/concurrency/thread_pool.rb', line 86

def shutdown
  stop
  join

  nil
end

#startObject

Creates all threads in the pool and starts them. Tasks that were enqueued prior to starting the pool will be processed immediately.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/vcap/concurrency/thread_pool.rb', line 29

def start
  @pool_lock.synchronize do

    assert_state_in(STATE_CREATED)

    @num_threads.times do
      @threads << create_worker_thread
    end

    @state = STATE_STARTED
  end

  nil
end

#stopObject

Stops the thread pool politely, allowing existing work to be completed.



63
64
65
66
67
68
69
70
71
# File 'lib/vcap/concurrency/thread_pool.rb', line 63

def stop
  @pool_lock.synchronize do
    @num_threads.times { @work_queue.enq(STOP_SENTINEL) }

    @state = STATE_STOPPED
  end

  nil
end