Class: Contender::Pool::PoolExecutor
Overview
Executor that uses a thread pool to execute tasks asynchronously
Made idiomatic by Ian Unruh Original author is Doug Lea, from the JDK
Instance Attribute Summary collapse
State management
collapse
Pool size management
collapse
Work queue management
collapse
Instance Method Summary
collapse
#future_for, #invoke_all, #submit
Constructor Details
#initialize(core_size, maximum_size, work_timeout, queue, thread_factory) ⇒ undefined
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/contender/pool/pool_executor.rb', line 48
def initialize(core_size, maximum_size, work_timeout, queue, thread_factory)
raise ArgumentError if core_size < 0
raise ArgumentError if maximum_size <= 0 || maximum_size < core_size
raise ArgumentError if work_timeout < 0
@queue = queue
@thread_factory = thread_factory
@rejection_policy = AbortPolicy.new
@core_size = core_size
@maximum_size = maximum_size
@allow_core_timeout = false
@work_timeout = work_timeout
@control = Atomic.new(control_for(RUNNING, 0))
@monitor = Monitor.new
@termination = @monitor.new_cond
@workers = Set.new
@largest_size = 0
@completed_task_count = 0
end
|
Instance Attribute Details
#allow_core_timeout ⇒ Boolean
18
19
20
|
# File 'lib/contender/pool/pool_executor.rb', line 18
def allow_core_timeout
@allow_core_timeout
end
|
#core_size ⇒ Integer
9
10
11
|
# File 'lib/contender/pool/pool_executor.rb', line 9
def core_size
@core_size
end
|
#largest_size ⇒ Integer
15
16
17
|
# File 'lib/contender/pool/pool_executor.rb', line 15
def largest_size
@largest_size
end
|
#maximum_size ⇒ Integer
12
13
14
|
# File 'lib/contender/pool/pool_executor.rb', line 12
def maximum_size
@maximum_size
end
|
27
28
29
|
# File 'lib/contender/pool/pool_executor.rb', line 27
def queue
@queue
end
|
30
31
32
|
# File 'lib/contender/pool/pool_executor.rb', line 30
def rejection_policy
@rejection_policy
end
|
24
25
26
|
# File 'lib/contender/pool/pool_executor.rb', line 24
def thread_factory
@thread_factory
end
|
#work_timeout ⇒ Float
21
22
23
|
# File 'lib/contender/pool/pool_executor.rb', line 21
def work_timeout
@work_timeout
end
|
Instance Method Details
#active_count ⇒ Integer
289
290
291
292
293
294
295
|
# File 'lib/contender/pool/pool_executor.rb', line 289
def active_count
synchronize do
@workers.count do |worker|
worker.locked?
end
end
end
|
#await_termination(timeout) ⇒ Boolean
132
133
134
135
136
137
138
139
|
# File 'lib/contender/pool/pool_executor.rb', line 132
def await_termination(timeout)
synchronize do
return true if terminated?
@termination.wait timeout
terminated?
end
end
|
#backlog ⇒ Integer
274
275
276
|
# File 'lib/contender/pool/pool_executor.rb', line 274
def backlog
@queue.size
end
|
#completed_tasks ⇒ Integer
305
306
307
308
309
310
311
312
313
314
315
|
# File 'lib/contender/pool/pool_executor.rb', line 305
def completed_tasks
synchronize do
total = @completed_task_count
@workers.each do |worker|
total += worker.completed_task_count
end
total
end
end
|
#current_size ⇒ Integer
280
281
282
283
284
285
|
# File 'lib/contender/pool/pool_executor.rb', line 280
def current_size
synchronize do
return 0 if current_control.state > STOP
return @workers.size
end
end
|
#execute(task = nil, &block) ⇒ undefined
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
# File 'lib/contender/pool/pool_executor.rb', line 78
def execute(task = nil, &block)
task ||= block
raise ArgumentError unless task
return if try_core_worker task
control = current_control
if control.state == RUNNING && @queue.offer(task)
after_task_enqueue task
return
end
unless add_worker false, task
reject task
end
end
|
#prestart ⇒ Boolean
166
167
168
|
# File 'lib/contender/pool/pool_executor.rb', line 166
def prestart
current_control.worker_count < @core_size && add_worker(true)
end
|
#prestart! ⇒ Integer
172
173
174
175
176
177
178
179
180
|
# File 'lib/contender/pool/pool_executor.rb', line 172
def prestart!
count = 0
while add_worker true
count += 1
end
count
end
|
#remove(task) ⇒ Boolean
262
263
264
265
266
|
# File 'lib/contender/pool/pool_executor.rb', line 262
def remove(task)
removed = @queue.delete task
try_terminate
removed
end
|
#shutdown ⇒ undefined
103
104
105
106
107
108
109
110
111
|
# File 'lib/contender/pool/pool_executor.rb', line 103
def shutdown
synchronize do
advance_state_to SHUTDOWN
interrupt_idle_workers
on_shutdown
end
try_terminate
end
|
#shutdown! ⇒ Array
115
116
117
118
119
120
121
122
123
124
125
126
127
|
# File 'lib/contender/pool/pool_executor.rb', line 115
def shutdown!
tasks = Array.new
synchronize do
advance_state_to STOP
interrupt_workers
@queue.drain_to tasks
end
try_terminate
tasks
end
|
#shutdown? ⇒ Boolean
143
144
145
|
# File 'lib/contender/pool/pool_executor.rb', line 143
def shutdown?
current_control.state != RUNNING
end
|
#terminated? ⇒ Boolean
156
157
158
|
# File 'lib/contender/pool/pool_executor.rb', line 156
def terminated?
current_control.state == TERMINATED
end
|
#terminating? ⇒ Boolean
149
150
151
152
|
# File 'lib/contender/pool/pool_executor.rb', line 149
def terminating?
state = current_control.state
state > RUNNING && state < TERMINATED
end
|
#to_s ⇒ String
321
322
323
324
325
326
327
328
329
330
331
332
333
334
|
# File 'lib/contender/pool/pool_executor.rb', line 321
def to_s
state = current_control.state
state_text = "Running" if state == RUNNING
state_text = "Shutting down" if state > RUNNING && state < TERMINATED
state_text = "Terminated" if state == TERMINATED
"{#{state_text}" +
", pool size = #{current_size}" +
", active threads = #{active_count}" +
", queued tasks = #{backlog}" +
", completed tasks = #{completed_tasks}" +
"}"
end
|
#total_tasks ⇒ Integer
299
300
301
|
# File 'lib/contender/pool/pool_executor.rb', line 299
def total_tasks
completed_tasks + backlog
end
|