Class: Concurrent::JavaThreadPoolExecutor

Inherits:
Object
  • Object
show all
Includes:
JavaExecutor
Defined in:
lib/concurrent/executor/java_thread_pool_executor.rb

Overview

Note:

When running on the JVM (JRuby) this class will inherit from ‘JavaThreadPoolExecutor`. On all other platforms it will inherit from `RubyThreadPoolExecutor`.

An abstraction composed of one or more threads and a task queue. Tasks (blocks or ‘proc` objects) are submit to the pool and added to the queue. The threads in the pool remove the tasks and execute them in the order they were received. When there are more tasks queued than there are threads to execute them the pool will create new threads, up to the configured maximum. Similarly, threads that are idle for too long will be garbage collected, down to the configured minimum options. Should a thread crash it, too, will be garbage collected.

‘ThreadPoolExecutor` is based on the Java class of the same name. From the official Java documentationa;

> Thread pools address two different problems: they usually provide > improved performance when executing large numbers of asynchronous tasks, > due to reduced per-task invocation overhead, and they provide a means > of bounding and managing the resources, including threads, consumed > when executing a collection of tasks. Each ThreadPoolExecutor also > maintains some basic statistics, such as the number of completed tasks. > > To be useful across a wide range of contexts, this class provides many > adjustable parameters and extensibility hooks. However, programmers are > urged to use the more convenient Executors factory methods > [CachedThreadPool] (unbounded thread pool, with automatic thread reclamation), > [FixedThreadPool] (fixed size thread pool) and [SingleThreadExecutor] (single > background thread), that preconfigure settings for the most common usage > scenarios.

Thread pools support several configuration options:

  • ‘max_threads`: The maximum number of threads that may be created in the pool.

  • ‘min_threads`: The minimum number of threads that may be retained in the pool.

  • ‘idletime`: The number of seconds that a thread may be idle before being reclaimed.

  • ‘max_queue`: The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches `max_queue` subsequent tasks will be rejected in accordance with the configured `fallback_policy`.

  • ‘fallback_policy`: The policy defining how rejected tasks are handled. #

Three fallback policies are supported:

  • ‘:abort`: Raise a `RejectedExecutionError` exception and discard the task.

  • ‘:discard`: Discard the task and return false.

  • ‘:caller_runs`: Execute the task on the calling thread.

Direct Known Subclasses

JavaCachedThreadPool, JavaFixedThreadPool

Constant Summary collapse

DEFAULT_MAX_POOL_SIZE =

Default maximum number of threads that will be created in the pool.

java.lang.Integer::MAX_VALUE
DEFAULT_MIN_POOL_SIZE =

Default minimum number of threads that will be retained in the pool.

0
DEFAULT_MAX_QUEUE_SIZE =

Default maximum number of tasks that may be added to the task queue.

0
DEFAULT_THREAD_IDLETIMEOUT =

Default maximum number of seconds a thread in the pool may remain idle before being reclaimed.

60

Constants included from JavaExecutor

Concurrent::JavaExecutor::FALLBACK_POLICIES

Instance Attribute Summary collapse

Attributes included from Executor

#fallback_policy

Instance Method Summary collapse

Methods included from JavaExecutor

#<<, #kill, #post, #shutdown, #shutdown?, #shuttingdown?, #wait_for_termination

Methods included from Executor

#serialized?

Constructor Details

#initialize(opts = {}) ⇒ JavaThreadPoolExecutor

Create a new thread pool.

Parameters:

  • opts (Hash) (defaults to: {})

    the options which configure the thread pool

Options Hash (opts):

  • :max_threads (Integer) — default: DEFAULT_MAX_POOL_SIZE

    the maximum number of threads to be created

  • :min_threads (Integer) — default: DEFAULT_MIN_POOL_SIZE

    the minimum number of threads to be retained

  • :idletime (Integer) — default: DEFAULT_THREAD_IDLETIMEOUT

    the maximum number of seconds a thread may be idle before being reclaimed

  • :max_queue (Integer) — default: DEFAULT_MAX_QUEUE_SIZE

    the maximum number of tasks allowed in the work queue at any one time; a value of zero means the queue may grow without bound

  • :fallback_policy (Symbol) — default: :abort

    the policy for handling new tasks that are received when the queue size has reached ‘max_queue` or the executir has shut down

Raises:

  • (ArgumentError)

    if ‘:max_threads` is less than one

  • (ArgumentError)

    if ‘:min_threads` is less than zero

  • (ArgumentError)

    if ‘:fallback_policy` is not one of the values specified in `FALLBACK_POLICIES`

See Also:



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 54

def initialize(opts = {})
  min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
  max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
  idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
  @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
  @fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
  warn '[DEPRECATED] :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)

  raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
  raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0
  raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
  raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)

  if @max_queue == 0
    queue = java.util.concurrent.LinkedBlockingQueue.new
  else
    queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue)
  end

  @executor = java.util.concurrent.ThreadPoolExecutor.new(
    min_length, max_length,
    idletime, java.util.concurrent.TimeUnit::SECONDS,
    queue, FALLBACK_POLICIES[@fallback_policy].new)

  set_shutdown_hook
end

Instance Attribute Details

#max_lengthInteger (readonly)

The maximum number of threads that may be created in the pool.

Returns:

  • (Integer)

    the max_length



24
25
26
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 24

def max_length
  @max_length
end

#max_queueObject (readonly)

The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches ‘max_queue` subsequent tasks will be rejected in accordance with the configured `fallback_policy`.



29
30
31
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 29

def max_queue
  @max_queue
end

Instance Method Details

#can_overflow?Boolean

Does the task queue have a maximum size?

Returns:

  • (Boolean)

    True if the task queue has a maximum size else false.



82
83
84
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 82

def can_overflow?
  @max_queue != 0
end

#completed_task_countInteger

The number of tasks that have been completed by the pool since construction.

Returns:

  • (Integer)

    the completed_task_count



125
126
127
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 125

def completed_task_count
  @executor.getCompletedTaskCount
end

#idletimeInteger

The number of seconds that a thread may be idle before being reclaimed.

Returns:

  • (Integer)

    the idletime



132
133
134
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 132

def idletime
  @executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS)
end

#largest_lengthInteger

The largest number of threads that have been created in the pool since construction.

Returns:

  • (Integer)

    the largest_length



111
112
113
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 111

def largest_length
  @executor.getLargestPoolSize
end

#lengthInteger Also known as: current_length

The number of threads currently in the pool.

Returns:

  • (Integer)

    the length



103
104
105
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 103

def length
  @executor.getPoolSize
end

#min_lengthInteger

The minimum number of threads that may be retained in the pool.

Returns:

  • (Integer)

    the min_length



89
90
91
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 89

def min_length
  @executor.getCorePoolSize
end

#queue_lengthInteger

The number of tasks in the queue awaiting execution.

Returns:

  • (Integer)

    the queue_length



139
140
141
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 139

def queue_length
  @executor.getQueue.size
end

#remaining_capacityInteger

Number of tasks that may be enqueued before reaching ‘max_queue` and rejecting new tasks. A value of -1 indicates that the queue may grow without bound.

Returns:

  • (Integer)

    the remaining_capacity



147
148
149
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 147

def remaining_capacity
  @max_queue == 0 ? -1 : @executor.getQueue.remainingCapacity
end

#running?Boolean

Is the thread pool running?

Returns:

  • (Boolean)

    ‘true` when running, `false` when shutting down or shutdown



163
164
165
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 163

def running?
  super && !@executor.isTerminating
end

#scheduled_task_countInteger

The number of tasks that have been scheduled for execution on the pool since construction.

Returns:

  • (Integer)

    the scheduled_task_count



118
119
120
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 118

def scheduled_task_count
  @executor.getTaskCount
end

#statusObject

This method is deprecated and will be removed soon. This method is supost to return the threads status, but Java API doesn’t provide a way to get the thread status. So we return an empty Array instead.



154
155
156
157
158
# File 'lib/concurrent/executor/java_thread_pool_executor.rb', line 154

def status
  warn '[DEPRECATED] `status` is deprecated and will be removed soon.'
  warn "Calls to `status` return an empty Array. Java ThreadPoolExecutor does not provide thread's status."
  []
end