Module: Parallel
- Extended by:
- ProcessorCount
- Defined in:
- lib/parallel.rb,
lib/parallel/version.rb,
lib/parallel/processor_count.rb
Defined Under Namespace
Modules: ProcessorCount
Classes: Break, DeadWorker, ExceptionWrapper, JobFactory, Kill, UndumpableException, UserInterruptHandler, Worker
Constant Summary
collapse
- Stop =
Object.new.freeze
- VERSION =
Version = '1.19.2'
Class Method Summary
collapse
-
.all?(*args, &block) ⇒ Boolean
-
.any?(*args, &block) ⇒ Boolean
-
.each(array, options = {}, &block) ⇒ Object
-
.each_with_index(array, options = {}, &block) ⇒ Object
-
.flat_map(*args, &block) ⇒ Object
-
.in_processes(options = {}, &block) ⇒ Object
-
.in_threads(options = {:count => 2}) ⇒ Object
-
.map(source, options = {}, &block) ⇒ Object
-
.map_with_index(array, options = {}, &block) ⇒ Object
-
.worker_number ⇒ Object
-
.worker_number=(worker_num) ⇒ Object
TODO: this does not work when doing threads in forks, so should remove and yield the number instead if needed.
physical_processor_count, processor_count
Class Method Details
.all?(*args, &block) ⇒ Boolean
236
237
238
239
|
# File 'lib/parallel.rb', line 236
def all?(*args, &block)
raise "You must provide a block when calling #all?" if block.nil?
!!each(*args) { |*a| raise Parallel::Kill unless block.call(*a) }
end
|
.any?(*args, &block) ⇒ Boolean
231
232
233
234
|
# File 'lib/parallel.rb', line 231
def any?(*args, &block)
raise "You must provide a block when calling #any?" if block.nil?
!each(*args) { |*a| raise Parallel::Kill if block.call(*a) }
end
|
.each(array, options = {}, &block) ⇒ Object
227
228
229
|
# File 'lib/parallel.rb', line 227
def each(array, options={}, &block)
map(array, options.merge(:preserve_results => false), &block)
end
|
.each_with_index(array, options = {}, &block) ⇒ Object
241
242
243
|
# File 'lib/parallel.rb', line 241
def each_with_index(array, options={}, &block)
each(array, options.merge(:with_index => true), &block)
end
|
.flat_map(*args, &block) ⇒ Object
289
290
291
|
# File 'lib/parallel.rb', line 289
def flat_map(*args, &block)
map(*args, &block).flatten(1)
end
|
.in_processes(options = {}, &block) ⇒ Object
221
222
223
224
225
|
# File 'lib/parallel.rb', line 221
def in_processes(options = {}, &block)
count, options = (options)
count ||= processor_count
map(0...count, options.merge(:in_processes => count), &block)
end
|
.in_threads(options = {:count => 2}) ⇒ Object
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
|
# File 'lib/parallel.rb', line 203
def in_threads(options={:count => 2})
threads = []
count, _ = (options)
Thread.handle_interrupt(Exception => :never) do
begin
Thread.handle_interrupt(Exception => :immediate) do
count.times do |i|
threads << Thread.new { yield(i) }
end
threads.map(&:value)
end
ensure
threads.each(&:kill)
end
end
end
|
.map(source, options = {}, &block) ⇒ Object
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
|
# File 'lib/parallel.rb', line 245
def map(source, options = {}, &block)
options = options.dup
options[:mutex] = Mutex.new
if options[:in_processes] && options[:in_threads]
raise ArgumentError.new("Please specify only one of `in_processes` or `in_threads`.")
elsif RUBY_PLATFORM =~ /java/ and not options[:in_processes]
method = :in_threads
size = options[method] || processor_count
elsif options[:in_threads]
method = :in_threads
size = options[method]
else
method = :in_processes
if Process.respond_to?(:fork)
size = options[method] || processor_count
else
warn "Process.fork is not supported by this Ruby"
size = 0
end
end
job_factory = JobFactory.new(source, options[:mutex])
size = [job_factory.size, size].min
options[:return_results] = (options[:preserve_results] != false || !!options[:finish])
add_progress_bar!(job_factory, options)
results = if size == 0
work_direct(job_factory, options, &block)
elsif method == :in_threads
work_in_threads(job_factory, options.merge(:count => size), &block)
else
work_in_processes(job_factory, options.merge(:count => size), &block)
end
if results
options[:return_results] ? results : source
end
end
|
.map_with_index(array, options = {}, &block) ⇒ Object
285
286
287
|
# File 'lib/parallel.rb', line 285
def map_with_index(array, options={}, &block)
map(array, options.merge(:with_index => true), &block)
end
|
.worker_number ⇒ Object
293
294
295
|
# File 'lib/parallel.rb', line 293
def worker_number
Thread.current[:parallel_worker_number]
end
|
.worker_number=(worker_num) ⇒ Object
TODO: this does not work when doing threads in forks, so should remove and yield the number instead if needed
298
299
300
|
# File 'lib/parallel.rb', line 298
def worker_number=(worker_num)
Thread.current[:parallel_worker_number] = worker_num
end
|