Module: Parallel

Extended by:
ProcessorCount
Defined in:
lib/parallel.rb,
lib/parallel/version.rb,
lib/parallel/processor_count.rb

Defined Under Namespace

Modules: ProcessorCount Classes: Break, DeadWorker, ExceptionWrapper, JobFactory, Kill, UndumpableException, UserInterruptHandler, Worker

Constant Summary collapse

Stop =
Object.new
VERSION =
Version = '1.11.2'

Class Method Summary collapse

Methods included from ProcessorCount

physical_processor_count, processor_count

Class Method Details

.all?(*args, &block) ⇒ Boolean

Returns:

  • (Boolean)


225
226
227
228
# File 'lib/parallel.rb', line 225

def all?(*args, &block)
  raise "You must provide a block when calling #all?" if block.nil?
  !!each(*args) { |*args| raise Parallel::Kill unless block.call(*args) }
end

.any?(*args, &block) ⇒ Boolean

Returns:

  • (Boolean)


220
221
222
223
# File 'lib/parallel.rb', line 220

def any?(*args, &block)
  raise "You must provide a block when calling #any?" if block.nil?
  !each(*args) { |*args| raise Parallel::Kill if block.call(*args) }
end

.each(array, options = {}, &block) ⇒ Object



216
217
218
# File 'lib/parallel.rb', line 216

def each(array, options={}, &block)
  map(array, options.merge(:preserve_results => false), &block)
end

.each_with_index(array, options = {}, &block) ⇒ Object



230
231
232
# File 'lib/parallel.rb', line 230

def each_with_index(array, options={}, &block)
  each(array, options.merge(:with_index => true), &block)
end

.in_processes(options = {}, &block) ⇒ Object



210
211
212
213
214
# File 'lib/parallel.rb', line 210

def in_processes(options = {}, &block)
  count, options = extract_count_from_options(options)
  count ||= processor_count
  map(0...count, options.merge(:in_processes => count), &block)
end

.in_threads(options = {:count => 2}) ⇒ Object



203
204
205
206
207
208
# File 'lib/parallel.rb', line 203

def in_threads(options={:count => 2})
  count, _ = extract_count_from_options(options)
  Array.new(count) do |i|
    Thread.new { yield(i) }
  end.map!(&:value)
end

.map(source, options = {}, &block) ⇒ Object



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/parallel.rb', line 234

def map(source, options = {}, &block)
  options[:mutex] = Mutex.new

  if RUBY_PLATFORM =~ /java/ and not options[:in_processes]
    method = :in_threads
    size = options[method] || processor_count
  elsif options[:in_threads]
    method = :in_threads
    size = options[method]
  else
    method = :in_processes
    if Process.respond_to?(:fork)
      size = options[method] || processor_count
    else
      warn "Process.fork is not supported by this Ruby"
      size = 0
    end
  end

  job_factory = JobFactory.new(source, options[:mutex])
  size = [job_factory.size, size].min

  options[:return_results] = (options[:preserve_results] != false || !!options[:finish])
  add_progress_bar!(job_factory, options)

  results = if size == 0
    work_direct(job_factory, options, &block)
  elsif method == :in_threads
    work_in_threads(job_factory, options.merge(:count => size), &block)
  else
    work_in_processes(job_factory, options.merge(:count => size), &block)
  end
  if results
    options[:return_results] ? results : source
  end
end

.map_with_index(array, options = {}, &block) ⇒ Object



271
272
273
# File 'lib/parallel.rb', line 271

def map_with_index(array, options={}, &block)
  map(array, options.merge(:with_index => true), &block)
end

.worker_numberObject



275
276
277
# File 'lib/parallel.rb', line 275

def worker_number
  Thread.current[:parallel_worker_number]
end

.worker_number=(worker_num) ⇒ Object



279
280
281
# File 'lib/parallel.rb', line 279

def worker_number=(worker_num)
  Thread.current[:parallel_worker_number] = worker_num
end