Class: Concurrent::FixedThreadPool

Inherits:
ThreadPoolExecutor show all
Defined in:
lib/concurrent/executor/fixed_thread_pool.rb

Overview

Note:

Failure to properly shutdown a thread pool can lead to unpredictable results. Please read *Shutting Down Thread Pools* for more information.

A thread pool that reuses a fixed number of threads operating off an unbounded queue. At any point, at most ‘num_threads` will be active processing tasks. When all threads are busy new tasks `#post` to the thread pool are enqueued until a thread becomes available. Should a thread crash for any reason the thread will immediately be removed from the pool and replaced.

The API and behavior of this class are based on Java’s ‘FixedThreadPool`

**Thread Pool Options**

Thread pools support several configuration options:

  • ‘idletime`: The number of seconds that a thread may be idle before being reclaimed.

  • ‘max_queue`: The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches `max_queue` and no new threads can be created, subsequent tasks will be rejected in accordance with the configured `fallback_policy`.

  • ‘auto_terminate`: When true (default) an `at_exit` handler will be registered which will stop the thread pool when the application exits. See below for more information on shutting down thread pools.

  • ‘fallback_policy`: The policy defining how rejected tasks are handled.

Three fallback policies are supported:

  • ‘:abort`: Raise a `RejectedExecutionError` exception and discard the task.

  • ‘:discard`: Discard the task and return false.

  • ‘:caller_runs`: Execute the task on the calling thread.

**Shutting Down Thread Pools**

Killing a thread pool while tasks are still being processed, either by calling the ‘#kill` method or at application exit, will have unpredictable results. There is no way for the thread pool to know what resources are being used by the in-progress tasks. When those tasks are killed the impact on those resources cannot be predicted. The best practice is to explicitly shutdown all thread pools using the provided methods:

  • Call ‘#shutdown` to initiate an orderly termination of all in-progress tasks

  • Call ‘#wait_for_termination` with an appropriate timeout interval an allow the orderly shutdown to complete

  • Call ‘#kill` *only when* the thread pool fails to shutdown in the allotted time

On some runtime platforms (most notably the JVM) the application will not exit until all thread pools have been shutdown. To prevent applications from “hanging” on exit all thread pools include an ‘at_exit` handler that will stop the thread pool when the application exits. This handler uses a brute force method to stop the pool and makes no guarantees regarding resources being used by any tasks still running. Registration of this `at_exit` handler can be prevented by setting the thread pool’s constructor ‘:auto_terminate` option to `false` when the thread pool is created. All thread pools support this option.

“‘ruby pool1 = Concurrent::FixedThreadPool.new(5) # an `at_exit` handler will be registered pool2 = Concurrent::FixedThreadPool.new(5, auto_terminate: false) # prevent `at_exit` handler registration “`

Instance Attribute Summary

Attributes inherited from ThreadPoolExecutor

#completed_task_count, #fallback_policy, #idletime, #largest_length, #length, #max_length, #max_queue, #min_length, #queue_length, #remaining_capacity, #scheduled_task_count

Instance Method Summary collapse

Methods inherited from ThreadPoolExecutor

#<<, #auto_terminate=, #auto_terminate?, #can_overflow?, #kill, #post, #running?, #serialized?, #shutdown, #shutdown?, #shuttingdown?, #wait_for_termination

Constructor Details

#initialize(num_threads, opts = {}) ⇒ FixedThreadPool

Create a new thread pool.

Parameters:

  • num_threads (Integer)

    the number of threads to allocate

  • opts (Hash) (defaults to: {})

    the options defining pool behavior.

Options Hash (opts):

  • :fallback_policy (Symbol) — default: `:abort`

    the fallback policy

Raises:

  • (ArgumentError)

    if ‘num_threads` is less than or equal to zero

  • (ArgumentError)

    if ‘fallback_policy` is not a known policy

See Also:



197
198
199
200
201
202
203
204
# File 'lib/concurrent/executor/fixed_thread_pool.rb', line 197

def initialize(num_threads, opts = {})
  raise ArgumentError.new('number of threads must be greater than zero') if num_threads.to_i < 1
  defaults  = { max_queue:   DEFAULT_MAX_QUEUE_SIZE,
                idletime:    DEFAULT_THREAD_IDLETIMEOUT }
  overrides = { min_threads: num_threads,
                max_threads: num_threads }
  super(defaults.merge(opts).merge(overrides))
end