Class: Contender::Pool::PoolExecutor

Inherits:
ExecutorService show all
Defined in:
lib/contender/pool/pool_executor.rb

Overview

Executor that uses a thread pool to execute tasks asynchronously

Made idiomatic by Ian Unruh Original author is Doug Lea, from the JDK

Instance Attribute Summary collapse

State management collapse

Pool size management collapse

Work queue management collapse

Statistics collapse

Instance Method Summary collapse

Methods inherited from ExecutorService

#future_for, #invoke_all, #submit

Constructor Details

#initialize(core_size, maximum_size, work_timeout, queue, thread_factory) ⇒ undefined

Parameters:

  • core_size (Integer)
  • maximum_size (Integer)
  • work_timeout (Float)
  • queue (Queue)

    Used to hold tasks that will be executed by the thread pool

  • thread_factory (ThreadFactory)

    Used to create threads for new pool workers

Raises:

  • (ArgumentError)

    If the core size is less than zero

  • (ArgumentError)

    If the maximum size is less than one or less than the core size

  • (ArgumentError)

    If the work timeout is less than zero



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/contender/pool/pool_executor.rb', line 48

def initialize(core_size, maximum_size, work_timeout, queue, thread_factory)
  raise ArgumentError if core_size < 0
  raise ArgumentError if maximum_size <= 0 || maximum_size < core_size
  raise ArgumentError if work_timeout < 0

  @queue = queue
  @thread_factory = thread_factory
  @rejection_policy = AbortPolicy.new

  @core_size = core_size
  @maximum_size = maximum_size
  @allow_core_timeout = false
  @work_timeout = work_timeout

  @control = Atomic.new(control_for(RUNNING, 0))

  @monitor = Monitor.new
  @termination = @monitor.new_cond

  # The following instance variables are guarded by the monitor
  @workers = Set.new

  @largest_size = 0
  @completed_task_count = 0
end

Instance Attribute Details

#allow_core_timeoutBoolean

Returns:

  • (Boolean)


18
19
20
# File 'lib/contender/pool/pool_executor.rb', line 18

def allow_core_timeout
  @allow_core_timeout
end

#core_sizeInteger

Returns:

  • (Integer)


9
10
11
# File 'lib/contender/pool/pool_executor.rb', line 9

def core_size
  @core_size
end

#largest_sizeInteger (readonly)

Returns:

  • (Integer)


15
16
17
# File 'lib/contender/pool/pool_executor.rb', line 15

def largest_size
  @largest_size
end

#maximum_sizeInteger

Returns:

  • (Integer)


12
13
14
# File 'lib/contender/pool/pool_executor.rb', line 12

def maximum_size
  @maximum_size
end

#queueQueue (readonly)

Returns:



27
28
29
# File 'lib/contender/pool/pool_executor.rb', line 27

def queue
  @queue
end

#rejection_policyRejectionPolicy

Returns:



30
31
32
# File 'lib/contender/pool/pool_executor.rb', line 30

def rejection_policy
  @rejection_policy
end

#thread_factoryThreadFactory (readonly)

Returns:



24
25
26
# File 'lib/contender/pool/pool_executor.rb', line 24

def thread_factory
  @thread_factory
end

#work_timeoutFloat

Returns:

  • (Float)


21
22
23
# File 'lib/contender/pool/pool_executor.rb', line 21

def work_timeout
  @work_timeout
end

Instance Method Details

#active_countInteger

Returns:

  • (Integer)


289
290
291
292
293
294
295
# File 'lib/contender/pool/pool_executor.rb', line 289

def active_count
  synchronize do
    @workers.count do |worker|
      worker.locked?
    end
  end
end

#await_termination(timeout) ⇒ Boolean

Parameters:

  • timeout (Float)

Returns:

  • (Boolean)


132
133
134
135
136
137
138
139
# File 'lib/contender/pool/pool_executor.rb', line 132

def await_termination(timeout)
  synchronize do
    return true if terminated?

    @termination.wait timeout
    terminated?
  end
end

#backlogInteger

Returns:

  • (Integer)


274
275
276
# File 'lib/contender/pool/pool_executor.rb', line 274

def backlog
  @queue.size
end

#completed_tasksInteger

Returns:

  • (Integer)


305
306
307
308
309
310
311
312
313
314
315
# File 'lib/contender/pool/pool_executor.rb', line 305

def completed_tasks
  synchronize do
    total = @completed_task_count

    @workers.each do |worker|
      total += worker.completed_task_count
    end

    total
  end
end

#current_sizeInteger

Returns:

  • (Integer)


280
281
282
283
284
285
# File 'lib/contender/pool/pool_executor.rb', line 280

def current_size
  synchronize do
    return 0 if current_control.state > STOP
    return @workers.size
  end
end

#execute(task = nil, &block) ⇒ undefined

Parameters:

  • task (Object) (defaults to: nil)

Returns:

  • (undefined)

Raises:



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/contender/pool/pool_executor.rb', line 78

def execute(task = nil, &block)
  task ||= block

  raise ArgumentError unless task

  return if try_core_worker task

  # Core worker couldn't be started with the task; enqueue it instead
  control = current_control

  if control.state == RUNNING && @queue.offer(task)
    after_task_enqueue task
    return
  end

  # Either the pool is shutting down or the queue is full
  unless add_worker false, task
    reject task
  end
end

#prestartBoolean

Returns:

  • (Boolean)


166
167
168
# File 'lib/contender/pool/pool_executor.rb', line 166

def prestart
  current_control.worker_count < @core_size && add_worker(true)
end

#prestart!Integer

Returns:

  • (Integer)


172
173
174
175
176
177
178
179
180
# File 'lib/contender/pool/pool_executor.rb', line 172

def prestart!
  count = 0

  while add_worker true
    count += 1
  end

  count
end

#remove(task) ⇒ Boolean

Parameters:

  • task (Object)

Returns:

  • (Boolean)


262
263
264
265
266
# File 'lib/contender/pool/pool_executor.rb', line 262

def remove(task)
  removed = @queue.delete task
  try_terminate
  removed
end

#shutdownundefined

Returns:

  • (undefined)


103
104
105
106
107
108
109
110
111
# File 'lib/contender/pool/pool_executor.rb', line 103

def shutdown
  synchronize do
    advance_state_to SHUTDOWN
    interrupt_idle_workers
    on_shutdown
  end

  try_terminate
end

#shutdown!Array

Returns:

  • (Array)


115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/contender/pool/pool_executor.rb', line 115

def shutdown!
  tasks = Array.new

  synchronize do
    advance_state_to STOP
    interrupt_workers

    @queue.drain_to tasks
  end

  try_terminate
  tasks
end

#shutdown?Boolean

Returns:

  • (Boolean)


143
144
145
# File 'lib/contender/pool/pool_executor.rb', line 143

def shutdown?
  current_control.state != RUNNING
end

#terminated?Boolean

Returns:

  • (Boolean)


156
157
158
# File 'lib/contender/pool/pool_executor.rb', line 156

def terminated?
  current_control.state == TERMINATED
end

#terminating?Boolean

Returns:

  • (Boolean)


149
150
151
152
# File 'lib/contender/pool/pool_executor.rb', line 149

def terminating?
  state = current_control.state
  state > RUNNING && state < TERMINATED
end

#to_sString

Returns:

  • (String)


321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/contender/pool/pool_executor.rb', line 321

def to_s
  state = current_control.state

  state_text = "Running" if state == RUNNING
  state_text = "Shutting down" if state > RUNNING && state < TERMINATED
  state_text = "Terminated" if state == TERMINATED

  "{#{state_text}" +
  ", pool size = #{current_size}" +
  ", active threads = #{active_count}" +
  ", queued tasks = #{backlog}" +
  ", completed tasks = #{completed_tasks}" +
  "}"
end

#total_tasksInteger

Returns:

  • (Integer)


299
300
301
# File 'lib/contender/pool/pool_executor.rb', line 299

def total_tasks
  completed_tasks + backlog
end