Class: Lobster::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/lobster/thread_pool.rb

Overview

Note:

Exceptions raised in tasks by the thread pool are silently consumed. This behavior is to prevent threads in the pool from constantly dying and new ones being created in their place. If you want to capture exceptions, make sure the block passed to #queue has a rescue statement in it.

Collection of threads available to perform arbitrary tasks. The pool will automatically add and remove threads as they are needed.

Defined Under Namespace

Classes: Task

Constant Summary collapse

TOLERANCE =

Number of hits/misses before removing or adding a thread.

4

Instance Method Summary collapse

Constructor Details

#initialize(thread_count = 0) ⇒ ThreadPool

Creates the thread pool.

Parameters:

  • thread_count (Fixnum) (defaults to: 0)

    Initial number of threads.



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/lobster/thread_pool.rb', line 26

def initialize(thread_count = 0)
  fail ArgumentError unless(thread_count.is_a?(Fixnum) && thread_count >= 0)

  @queue  = Queue.new
  @group  = ThreadGroup.new
  @slider = 0

  # Mutexes used to safely access the task queue and thread group.
  @queue_mutex = Mutex.new
  @group_mutex = Mutex.new

  thread_count.times { spawn_thread }
end

Instance Method Details

#joinnil

Waits for all tasks to complete and threads to exit. All remaining tasks will be processed before this method returns. Any new tasks added with #queue will not start until this method returns.

Returns:

  • (nil)


92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/lobster/thread_pool.rb', line 92

def join
  @group_mutex.synchronize do
    threads = @group.list

    # Start tearing down threads.
    # The threads will continue running until their current task completes.
    threads.each do
      teardown_thread
    end

    # Wait for each thread to exit.
    threads.each do |thread|
      thread.join
    end

    # The two loops above can't be merged.
    # If they were, then the thread being stopped probably won't be the same one being joined.
  end

  nil # Hide previous return value.
end

#queue(*args, &block) ⇒ ThreadPool

Adds a new task for the thread pool to process.

Parameters:

  • args (Array)

    Arguments to pass into the block of code.

  • block (Proc)

    Block of code to execute in the thread pool.

Returns:

Raises:

  • (ArgumentError)


61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/lobster/thread_pool.rb', line 61

def queue(*args, &block)
  raise ArgumentError unless block_given?

  # Create a task.
  task       = Task.new
  task.args  = args
  task.block = block

  @group_mutex.synchronize do
    assess_thread_count
    @queue_mutex.synchronize do
      push(task)
    end
  end

  self # Return self for chaining.
end

#queue_lengthFixnum

Number of tasks waiting to be assigned to a thread. That is, this is the number of jobs that haven't been started yet.

Returns:

  • (Fixnum)


51
52
53
54
55
# File 'lib/lobster/thread_pool.rb', line 51

def queue_length
  @queue_mutex.synchronize do
    @queue.length
  end
end

#thread_countFixnum

Number of threads currently allocated to the pool.

Returns:

  • (Fixnum)


42
43
44
45
46
# File 'lib/lobster/thread_pool.rb', line 42

def thread_count
  @group_mutex.synchronize do
    @group.list.size
  end
end