Module: Parallel

Extended by:
ProcessorCount
Defined in:
lib/gpack/core/parallel.rb,
lib/gpack/core/parallel.rb,
lib/gpack/core/parallel.rb

Defined Under Namespace

Modules: ProcessorCount Classes: Break, DeadWorker, ExceptionWrapper, JobFactory, Kill, UndumpableException, UserInterruptHandler, Worker

Constant Summary collapse

VERSION =
Version = '1.10.0'
Stop =
Object.new

Class Method Summary collapse

Methods included from ProcessorCount

physical_processor_count, processor_count

Class Method Details

.each(array, options = {}, &block) ⇒ Object



305
306
307
308
# File 'lib/gpack/core/parallel.rb', line 305

def each(array, options={}, &block)
  map(array, options.merge(:preserve_results => false), &block)
  array
end

.each_with_index(array, options = {}, &block) ⇒ Object



310
311
312
# File 'lib/gpack/core/parallel.rb', line 310

def each_with_index(array, options={}, &block)
  each(array, options.merge(:with_index => true), &block)
end

.in_processes(options = {}, &block) ⇒ Object



299
300
301
302
303
# File 'lib/gpack/core/parallel.rb', line 299

def in_processes(options = {}, &block)
  count, options = extract_count_from_options(options)
  count ||= processor_count
  map(0...count, options.merge(:in_processes => count), &block)
end

.in_threads(options = {:count => 2}) ⇒ Object



292
293
294
295
296
297
# File 'lib/gpack/core/parallel.rb', line 292

def in_threads(options={:count => 2})
  count, _ = extract_count_from_options(options)
  Array.new(count) do |i|
    Thread.new { yield(i) }
  end.map!(&:value)
end

.map(source, options = {}, &block) ⇒ Object



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
# File 'lib/gpack/core/parallel.rb', line 314

def map(source, options = {}, &block)
  options[:mutex] = Mutex.new

  if RUBY_PLATFORM =~ /java/ and not options[:in_processes]
    method = :in_threads
    size = options[method] || processor_count
  elsif options[:in_threads]
    method = :in_threads
    size = options[method]
  else
    method = :in_processes
    if Process.respond_to?(:fork)
      size = options[method] || processor_count
    else
      warn "Process.fork is not supported by this Ruby"
      size = 0
    end
  end

  job_factory = JobFactory.new(source, options[:mutex])
  size = [job_factory.size, size].min

  options[:return_results] = (options[:preserve_results] != false || !!options[:finish])
  add_progress_bar!(job_factory, options)

  if size == 0
    work_direct(job_factory, options, &block)
  elsif method == :in_threads
    work_in_threads(job_factory, options.merge(:count => size), &block)
  else
    work_in_processes(job_factory, options.merge(:count => size), &block)
  end
end

.map_with_index(array, options = {}, &block) ⇒ Object



348
349
350
# File 'lib/gpack/core/parallel.rb', line 348

def map_with_index(array, options={}, &block)
  map(array, options.merge(:with_index => true), &block)
end

.worker_numberObject



352
353
354
# File 'lib/gpack/core/parallel.rb', line 352

def worker_number
  Thread.current[:parallel_worker_number]
end

.worker_number=(worker_num) ⇒ Object



356
357
358
# File 'lib/gpack/core/parallel.rb', line 356

def worker_number=(worker_num)
  Thread.current[:parallel_worker_number] = worker_num
end