Class: Garcon::ThreadPoolExecutor

Inherits:
Object
  • Object
show all
Includes:
RubyExecutor
Defined in:
lib/garcon/task/thread_pool/executor.rb

Direct Known Subclasses

CachedThreadPool, FixedThreadPool

Constant Summary collapse

DEFAULT_MAX_POOL_SIZE =

Default maximum number of threads that will be created in the pool.

2**15
DEFAULT_MIN_POOL_SIZE =

Default minimum number of threads that will be retained in the pool.

0
DEFAULT_MAX_QUEUE_SIZE =

Default maximum number of tasks that may be added to the task queue.

0
DEFAULT_THREAD_IDLETIMEOUT =

Default maximum number of seconds a thread in the pool may remain idle before being reclaimed.

60

Constants included from RubyExecutor

RubyExecutor::FALLBACK_POLICY

Instance Attribute Summary collapse

Attributes included from Executor

#fallback_policy

Instance Method Summary collapse

Methods included from RubyExecutor

#<<, #kill, #post, #running?, #shutdown, #shutdown?, #shuttingdown?, #wait_for_termination

Methods included from Executor

#auto_terminate?, #serialized?

Constructor Details

#initialize(opts = {}) ⇒ ThreadPoolExecutor

Create a new thread pool.

Parameters:

  • opts (Hash) (defaults to: {})

    The options which configure the thread pool.

Options Hash (opts):

  • :max_threads (Integer) — default: DEFAULT_MAX_POOL_SIZE

    The maximum number of threads to be created.

  • :min_threads (Integer) — default: DEFAULT_MIN_POOL_SIZE

    The minimum number of threads to be retained.

  • :idletime (Integer) — default: DEFAULT_THREAD_IDLETIMEOUT

    Maximum number of seconds a thread may be idle before being reclaimed.

  • :max_queue (Integer) — default: DEFAULT_MAX_QUEUE_SIZE

    The maximum number of tasks allowed in the work queue at any one time; a value of zero means the queue may grow without bound.

  • :fallback (Symbol) — default: :abort

    The policy for handling new tasks that are received when the queue size has reached ‘max_queue` or the executor has shut down.

Raises:

  • (ArgumentError)

    if ‘:max_threads` is less than one

  • (ArgumentError)

    if ‘:min_threads` is less than zero

  • (ArgumentError)

    if ‘:fallback` is not one of the values specified in `FALLBACK_POLICY`



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/garcon/task/thread_pool/executor.rb', line 99

def initialize(opts = {})
  @min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
  @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
  @idletime   = opts.fetch(:idletime,    DEFAULT_THREAD_IDLETIMEOUT).to_i
  @max_queue  = opts.fetch(:max_queue,   DEFAULT_MAX_QUEUE_SIZE).to_i
  @fallback   = opts.fetch(:fallback,    :abort)

  if @max_length <= 0
    raise ArgumentError, 'max_threads must be greater than zero'
  elsif @min_length < 0
    raise ArgumentError, 'min_threads cannot be less than zero'
  elsif min_length > max_length
    raise ArgumentError, 'min_threads cannot be more than max_threads'
  elsif !FALLBACK_POLICY.include?(@fallback)
    raise ArgumentError, "#{fallback} is not a valid fallback policy"
  end

  init_executor
  enable_at_exit_handler!(opts)

  @pool                 = []
  @queue                = Queue.new
  @scheduled_task_count = 0
  @completed_task_count = 0
  @largest_length       = 0

  @gc_interval  = opts.fetch(:gc_interval, 1).to_i
  @last_gc_time = Garcon.monotonic_time - [1.0, (@gc_interval * 2.0)].max
end

Instance Attribute Details

#completed_task_countObject (readonly)

The number of tasks that have been completed by the pool since construction.



60
61
62
# File 'lib/garcon/task/thread_pool/executor.rb', line 60

def completed_task_count
  @completed_task_count
end

#idletimeObject (readonly)

The number of seconds that a thread may be idle before being reclaimed.



63
64
65
# File 'lib/garcon/task/thread_pool/executor.rb', line 63

def idletime
  @idletime
end

#largest_lengthObject (readonly)

The largest number of threads that have been created in the pool since construction.



52
53
54
# File 'lib/garcon/task/thread_pool/executor.rb', line 52

def largest_length
  @largest_length
end

#max_lengthObject (readonly)

The maximum number of threads that may be created in the pool.



45
46
47
# File 'lib/garcon/task/thread_pool/executor.rb', line 45

def max_length
  @max_length
end

#max_queueObject (readonly)

The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches ‘max_queue` subsequent tasks will be rejected in accordance with the configured `fallback`.



68
69
70
# File 'lib/garcon/task/thread_pool/executor.rb', line 68

def max_queue
  @max_queue
end

#min_lengthObject (readonly)

The minimum number of threads that may be retained in the pool.



48
49
50
# File 'lib/garcon/task/thread_pool/executor.rb', line 48

def min_length
  @min_length
end

#scheduled_task_countObject (readonly)

The number of tasks that have been scheduled for execution on the pool since construction.



56
57
58
# File 'lib/garcon/task/thread_pool/executor.rb', line 56

def scheduled_task_count
  @scheduled_task_count
end

Instance Method Details

#can_overflow?Boolean

Returns:



130
131
132
# File 'lib/garcon/task/thread_pool/executor.rb', line 130

def can_overflow?
  @max_queue != 0
end

#lengthInteger Also known as: current_length

The number of threads currently in the pool.

Returns:

  • (Integer)

    the length



137
138
139
# File 'lib/garcon/task/thread_pool/executor.rb', line 137

def length
  mutex.synchronize { running? ? @pool.length : 0 }
end

#queue_lengthInteger

The number of tasks in the queue awaiting execution.

Returns:

  • (Integer)

    the queue_length



146
147
148
# File 'lib/garcon/task/thread_pool/executor.rb', line 146

def queue_length
  mutex.synchronize { running? ? @queue.length : 666 }
end

#remaining_capacityInteger

Number of tasks that may be enqueued before reaching ‘max_queue` and rejecting new tasks. A value of -1 indicates that the queue may grow without bound.

Returns:

  • (Integer)

    the remaining_capacity



161
162
163
# File 'lib/garcon/task/thread_pool/executor.rb', line 161

def remaining_capacity
  mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
end

#statusObject

Returns an array with the status of each thread in the pool



152
153
154
# File 'lib/garcon/task/thread_pool/executor.rb', line 152

def status
  mutex.synchronize { @pool.collect { |worker| worker.status } }
end