Class: Concurrent::RubyThreadPoolExecutor

Inherits:
Object
  • Object
show all
Includes:
RubyExecutor
Defined in:
lib/concurrent/executor/ruby_thread_pool_executor.rb

Overview

Note:

When running on the JVM (JRuby) this class will inherit from ‘JavaThreadPoolExecutor`. On all other platforms it will inherit from `RubyThreadPoolExecutor`.

An abstraction composed of one or more threads and a task queue. Tasks (blocks or ‘proc` objects) are submit to the pool and added to the queue. The threads in the pool remove the tasks and execute them in the order they were received. When there are more tasks queued than there are threads to execute them the pool will create new threads, up to the configured maximum. Similarly, threads that are idle for too long will be garbage collected, down to the configured minimum options. Should a thread crash it, too, will be garbage collected.

‘ThreadPoolExecutor` is based on the Java class of the same name. From the official Java documentationa;

> Thread pools address two different problems: they usually provide > improved performance when executing large numbers of asynchronous tasks, > due to reduced per-task invocation overhead, and they provide a means > of bounding and managing the resources, including threads, consumed > when executing a collection of tasks. Each ThreadPoolExecutor also > maintains some basic statistics, such as the number of completed tasks. > > To be useful across a wide range of contexts, this class provides many > adjustable parameters and extensibility hooks. However, programmers are > urged to use the more convenient Executors factory methods > [CachedThreadPool] (unbounded thread pool, with automatic thread reclamation), > [FixedThreadPool] (fixed size thread pool) and [SingleThreadExecutor] (single > background thread), that preconfigure settings for the most common usage > scenarios.

Thread pools support several configuration options:

  • ‘max_threads`: The maximum number of threads that may be created in the pool.

  • ‘min_threads`: The minimum number of threads that may be retained in the pool.

  • ‘idletime`: The number of seconds that a thread may be idle before being reclaimed.

  • ‘max_queue`: The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches `max_queue` subsequent tasks will be rejected in accordance with the configured `overflow_policy`.

  • ‘overflow_policy`: The policy defining how rejected tasks are handled. #

Three overflow policies are supported:

  • ‘:abort`: Raise a `RejectedExecutionError` exception and discard the task.

  • ‘:discard`: Silently discard the task and return `nil` as the task result.

  • ‘:caller_runs`: Execute the task on the calling thread.

Constant Summary collapse

DEFAULT_MAX_POOL_SIZE =

Default maximum number of threads that will be created in the pool.

2**15
DEFAULT_MIN_POOL_SIZE =

Default minimum number of threads that will be retained in the pool.

0
DEFAULT_MAX_QUEUE_SIZE =

Default maximum number of tasks that may be added to the task queue.

0
DEFAULT_THREAD_IDLETIMEOUT =

Default maximum number of seconds a thread in the pool may remain idle before being reclaimed.

60
OVERFLOW_POLICIES =

The set of possible overflow policies that may be set at thread pool creation.

[:abort, :discard, :caller_runs]

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from RubyExecutor

#<<, #kill, #post, #running?, #shutdown, #shutdown?, #shuttingdown?, #wait_for_termination

Methods included from Logging

#log

Methods included from Executor

#serialized?

Constructor Details

#initialize(opts = {}) ⇒ RubyThreadPoolExecutor

Create a new thread pool.

Parameters:

  • opts (Hash) (defaults to: {})

    the options which configure the thread pool

Options Hash (opts):

  • :max_threads (Integer) — default: DEFAULT_MAX_POOL_SIZE

    the maximum number of threads to be created

  • :min_threads (Integer) — default: DEFAULT_MIN_POOL_SIZE

    the minimum number of threads to be retained

  • :idletime (Integer) — default: DEFAULT_THREAD_IDLETIMEOUT

    the maximum number of seconds a thread may be idle before being reclaimed

  • :max_queue (Integer) — default: DEFAULT_MAX_QUEUE_SIZE

    the maximum number of tasks allowed in the work queue at any one time; a value of zero means the queue may grow without bounnd

  • :overflow_policy (Symbol) — default: :abort

    the policy for handling new tasks that are received when the queue size has reached ‘max_queue`

Raises:

  • (ArgumentError)

    if ‘:max_threads` is less than one

  • (ArgumentError)

    if ‘:min_threads` is less than zero

  • (ArgumentError)

    if ‘:overflow_policy` is not one of the values specified in `OVERFLOW_POLICIES`

See Also:



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 79

def initialize(opts = {})
  @min_length      = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
  @max_length      = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
  @idletime        = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
  @max_queue       = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
  @overflow_policy = opts.fetch(:overflow_policy, :abort)

  raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
  raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
  raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy)
  raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length

  init_executor

  @pool                 = []
  @queue                = Queue.new
  @scheduled_task_count = 0
  @completed_task_count = 0
  @largest_length       = 0

  @gc_interval  = opts.fetch(:gc_interval, 1).to_i # undocumented
  @last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max
end

Instance Attribute Details

#completed_task_countObject (readonly)

The number of tasks that have been completed by the pool since construction.



42
43
44
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 42

def completed_task_count
  @completed_task_count
end

#idletimeObject (readonly)

The number of seconds that a thread may be idle before being reclaimed.



45
46
47
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 45

def idletime
  @idletime
end

#largest_lengthObject (readonly)

The largest number of threads that have been created in the pool since construction.



36
37
38
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 36

def largest_length
  @largest_length
end

#max_lengthObject (readonly)

The maximum number of threads that may be created in the pool.



30
31
32
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 30

def max_length
  @max_length
end

#max_queueObject (readonly)

The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches ‘max_queue` subsequent tasks will be rejected in accordance with the configured `overflow_policy`.



50
51
52
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 50

def max_queue
  @max_queue
end

#min_lengthObject (readonly)

The minimum number of threads that may be retained in the pool.



33
34
35
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 33

def min_length
  @min_length
end

#overflow_policyObject (readonly)

The policy defining how rejected tasks (tasks received once the queue size reaches the configured ‘max_queue`) are handled. Must be one of the values specified in `OVERFLOW_POLICIES`.



55
56
57
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 55

def overflow_policy
  @overflow_policy
end

#scheduled_task_countObject (readonly)

The number of tasks that have been scheduled for execution on the pool since construction.



39
40
41
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 39

def scheduled_task_count
  @scheduled_task_count
end

Instance Method Details

#can_overflow?Boolean

Does the task queue have a maximum size?

Returns:

  • (Boolean)

    True if the task queue has a maximum size else false.



104
105
106
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 104

def can_overflow?
  @max_queue != 0
end

#lengthInteger Also known as: current_length

The number of threads currently in the pool.

Returns:

  • (Integer)

    the length



111
112
113
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 111

def length
  mutex.synchronize { running? ? @pool.length : 0 }
end

#queue_lengthInteger

The number of tasks in the queue awaiting execution.

Returns:

  • (Integer)

    the queue_length



120
121
122
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 120

def queue_length
  mutex.synchronize { running? ? @queue.length : 0 }
end

#remaining_capacityInteger

Number of tasks that may be enqueued before reaching ‘max_queue` and rejecting new tasks. A value of -1 indicates that the queue may grow without bound.

Returns:

  • (Integer)

    the remaining_capacity



128
129
130
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 128

def remaining_capacity
  mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
end

#statusObject

Returns an array with the status of each thread in the pool

This method is deprecated and will be removed soon.



135
136
137
138
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 135

def status
  warn '[DEPRECATED] `status` is deprecated and will be removed soon.'
  mutex.synchronize { @pool.collect { |worker| worker.status } }
end