Class: Concurrent::RubyThreadPoolExecutor

Inherits:
Object
  • Object
show all
Includes:
RubyExecutor
Defined in:
lib/concurrent/executor/ruby_thread_pool_executor.rb

Overview

Note:

When running on the JVM (JRuby) this class will inherit from ‘JavaThreadPoolExecutor`. On all other platforms it will inherit from `RubyThreadPoolExecutor`.

An abstraction composed of one or more threads and a task queue. Tasks (blocks or ‘proc` objects) are submit to the pool and added to the queue. The threads in the pool remove the tasks and execute them in the order they were received. When there are more tasks queued than there are threads to execute them the pool will create new threads, up to the configured maximum. Similarly, threads that are idle for too long will be garbage collected, down to the configured minimum options. Should a thread crash it, too, will be garbage collected.

‘ThreadPoolExecutor` is based on the Java class of the same name. From the official Java documentationa;

> Thread pools address two different problems: they usually provide > improved performance when executing large numbers of asynchronous tasks, > due to reduced per-task invocation overhead, and they provide a means > of bounding and managing the resources, including threads, consumed > when executing a collection of tasks. Each ThreadPoolExecutor also > maintains some basic statistics, such as the number of completed tasks. > > To be useful across a wide range of contexts, this class provides many > adjustable parameters and extensibility hooks. However, programmers are > urged to use the more convenient Executors factory methods > [CachedThreadPool] (unbounded thread pool, with automatic thread reclamation), > [FixedThreadPool] (fixed size thread pool) and [SingleThreadExecutor] (single > background thread), that preconfigure settings for the most common usage > scenarios.

Thread pools support several configuration options:

  • ‘max_threads`: The maximum number of threads that may be created in the pool.

  • ‘min_threads`: The minimum number of threads that may be retained in the pool.

  • ‘idletime`: The number of seconds that a thread may be idle before being reclaimed.

  • ‘max_queue`: The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches `max_queue` subsequent tasks will be rejected in accordance with the configured `fallback_policy`.

  • ‘fallback_policy`: The policy defining how rejected tasks are handled. #

Three fallback policies are supported:

  • ‘:abort`: Raise a `RejectedExecutionError` exception and discard the task.

  • ‘:discard`: Discard the task and return false.

  • ‘:caller_runs`: Execute the task on the calling thread.

Constant Summary collapse

DEFAULT_MAX_POOL_SIZE =

Default maximum number of threads that will be created in the pool.

2**15
DEFAULT_MIN_POOL_SIZE =

Default minimum number of threads that will be retained in the pool.

0
DEFAULT_MAX_QUEUE_SIZE =

Default maximum number of tasks that may be added to the task queue.

0
DEFAULT_THREAD_IDLETIMEOUT =

Default maximum number of seconds a thread in the pool may remain idle before being reclaimed.

60

Constants included from RubyExecutor

Concurrent::RubyExecutor::FALLBACK_POLICIES

Instance Attribute Summary collapse

Attributes included from Executor

#fallback_policy

Instance Method Summary collapse

Methods included from RubyExecutor

#<<, #kill, #post, #running?, #shutdown, #shutdown?, #shuttingdown?, #wait_for_termination

Methods included from Logging

#log

Methods included from Executor

#serialized?

Constructor Details

#initialize(opts = {}) ⇒ RubyThreadPoolExecutor

Create a new thread pool.

Parameters:

  • opts (Hash) (defaults to: {})

    the options which configure the thread pool

Options Hash (opts):

  • :max_threads (Integer) — default: DEFAULT_MAX_POOL_SIZE

    the maximum number of threads to be created

  • :min_threads (Integer) — default: DEFAULT_MIN_POOL_SIZE

    the minimum number of threads to be retained

  • :idletime (Integer) — default: DEFAULT_THREAD_IDLETIMEOUT

    the maximum number of seconds a thread may be idle before being reclaimed

  • :max_queue (Integer) — default: DEFAULT_MAX_QUEUE_SIZE

    the maximum number of tasks allowed in the work queue at any one time; a value of zero means the queue may grow without bound

  • :fallback_policy (Symbol) — default: :abort

    the policy for handling new tasks that are received when the queue size has reached ‘max_queue` or the executor has shut down

Raises:

  • (ArgumentError)

    if ‘:max_threads` is less than one

  • (ArgumentError)

    if ‘:min_threads` is less than zero

  • (ArgumentError)

    if ‘:fallback_policy` is not one of the values specified in `FALLBACK_POLICIES`

See Also:



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 72

def initialize(opts = {})
  @min_length      = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
  @max_length      = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
  @idletime        = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
  @max_queue       = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
  @fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
  warn '[DEPRECATED] :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)

  raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
  raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
  raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
  raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length

  init_executor

  @pool                 = []
  @queue                = Queue.new
  @scheduled_task_count = 0
  @completed_task_count = 0
  @largest_length       = 0

  @gc_interval  = opts.fetch(:gc_interval, 1).to_i # undocumented
  @last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max
end

Instance Attribute Details

#completed_task_countObject (readonly)

The number of tasks that have been completed by the pool since construction.



39
40
41
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 39

def completed_task_count
  @completed_task_count
end

#idletimeObject (readonly)

The number of seconds that a thread may be idle before being reclaimed.



42
43
44
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 42

def idletime
  @idletime
end

#largest_lengthObject (readonly)

The largest number of threads that have been created in the pool since construction.



33
34
35
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 33

def largest_length
  @largest_length
end

#max_lengthObject (readonly)

The maximum number of threads that may be created in the pool.



27
28
29
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 27

def max_length
  @max_length
end

#max_queueObject (readonly)

The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches ‘max_queue` subsequent tasks will be rejected in accordance with the configured `fallback_policy`.



47
48
49
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 47

def max_queue
  @max_queue
end

#min_lengthObject (readonly)

The minimum number of threads that may be retained in the pool.



30
31
32
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 30

def min_length
  @min_length
end

#scheduled_task_countObject (readonly)

The number of tasks that have been scheduled for execution on the pool since construction.



36
37
38
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 36

def scheduled_task_count
  @scheduled_task_count
end

Instance Method Details

#can_overflow?Boolean

Does the task queue have a maximum size?

Returns:

  • (Boolean)

    True if the task queue has a maximum size else false.



98
99
100
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 98

def can_overflow?
  @max_queue != 0
end

#lengthInteger Also known as: current_length

The number of threads currently in the pool.

Returns:

  • (Integer)

    the length



105
106
107
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 105

def length
  mutex.synchronize { running? ? @pool.length : 0 }
end

#queue_lengthInteger

The number of tasks in the queue awaiting execution.

Returns:

  • (Integer)

    the queue_length



114
115
116
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 114

def queue_length
  mutex.synchronize { running? ? @queue.length : 0 }
end

#remaining_capacityInteger

Number of tasks that may be enqueued before reaching ‘max_queue` and rejecting new tasks. A value of -1 indicates that the queue may grow without bound.

Returns:

  • (Integer)

    the remaining_capacity



122
123
124
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 122

def remaining_capacity
  mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
end

#statusObject

Returns an array with the status of each thread in the pool

This method is deprecated and will be removed soon.



129
130
131
132
# File 'lib/concurrent/executor/ruby_thread_pool_executor.rb', line 129

def status
  warn '[DEPRECATED] `status` is deprecated and will be removed soon.'
  mutex.synchronize { @pool.collect { |worker| worker.status } }
end