Class: RJR::ThreadPool

Inherits:
Object show all
Defined in:
lib/rjr/thread_pool.rb

Overview

Utility to launches a specified number of threads on instantiation, assigning work to them in order as it arrives.

Supports optional timeout which allows the user to kill and restart threads if a job is taking too long to run.

Config options (must be set before first node is instantiated) collapse

Instance Method Summary collapse

Class Attribute Details

.num_threadsObject

Number of threads to instantiate in local worker pool



86
87
88
# File 'lib/rjr/thread_pool.rb', line 86

def num_threads
  @num_threads
end

.timeoutObject

Timeout after which worker threads are killed



89
90
91
# File 'lib/rjr/thread_pool.rb', line 89

def timeout
  @timeout
end

Instance Method Details

#<<(work) ⇒ Object

Add work to the pool

Parameters:

  • work (ThreadPoolJob)

    job to execute in first available thread

Returns:

  • self



204
205
206
207
208
# File 'lib/rjr/thread_pool.rb', line 204

def <<(work)
  # TODO option to increase worker threads if work queue gets saturated
  @work_queue.push work
  self
end

#joinObject

Block until all worker threads have finished executing

Returns:

  • self



229
230
231
232
# File 'lib/rjr/thread_pool.rb', line 229

def join
  @manager_thread.join if @manager_thread
  self
end

#running?Boolean

Return boolean indicating if thread pool is running

Returns:

  • (Boolean)


196
197
198
199
# File 'lib/rjr/thread_pool.rb', line 196

def running?
  !@manager_thread.nil? &&
  ['sleep', 'run'].include?(@manager_thread.status)
end

#startObject

Start the thread pool



187
188
189
190
191
192
193
# File 'lib/rjr/thread_pool.rb', line 187

def start
  return self unless @terminate
  @terminate = false
  0.upto(@num_threads-1) { |i| launch_worker }
  launch_manager
  self
end

#stopObject

Terminate the thread pool, stopping all worker threads

Returns:

  • self



213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/rjr/thread_pool.rb', line 213

def stop
  @terminate = true

  # this will wake up on it's own, but we can
  # speed things up if we manually wake it up,
  # surround w/ block incase thread cleans up on its own
  begin
    @manager_thread.wakeup if @manager_thread
  rescue
  end
  self
end