Module: Parallel

Extended by:
ProcessorCount
Defined in:
lib/parallel.rb,
lib/parallel/version.rb,
lib/parallel/processor_count.rb

Defined Under Namespace

Modules: ProcessorCount Classes: Break, DeadWorker, ExceptionWrapper, JobFactory, Kill, UndumpableException, UserInterruptHandler, Worker

Constant Summary collapse

Stop =
Object.new.freeze
VERSION =
Version = '1.19.2'

Class Method Summary collapse

Methods included from ProcessorCount

physical_processor_count, processor_count

Class Method Details

.all?(*args, &block) ⇒ Boolean

Returns:

  • (Boolean)


236
237
238
239
# File 'lib/parallel.rb', line 236

def all?(*args, &block)
  raise "You must provide a block when calling #all?" if block.nil?
  !!each(*args) { |*a| raise Parallel::Kill unless block.call(*a) }
end

.any?(*args, &block) ⇒ Boolean

Returns:

  • (Boolean)


231
232
233
234
# File 'lib/parallel.rb', line 231

def any?(*args, &block)
  raise "You must provide a block when calling #any?" if block.nil?
  !each(*args) { |*a| raise Parallel::Kill if block.call(*a) }
end

.each(array, options = {}, &block) ⇒ Object



227
228
229
# File 'lib/parallel.rb', line 227

def each(array, options={}, &block)
  map(array, options.merge(:preserve_results => false), &block)
end

.each_with_index(array, options = {}, &block) ⇒ Object



241
242
243
# File 'lib/parallel.rb', line 241

def each_with_index(array, options={}, &block)
  each(array, options.merge(:with_index => true), &block)
end

.flat_map(*args, &block) ⇒ Object



289
290
291
# File 'lib/parallel.rb', line 289

def flat_map(*args, &block)
  map(*args, &block).flatten(1)
end

.in_processes(options = {}, &block) ⇒ Object



221
222
223
224
225
# File 'lib/parallel.rb', line 221

def in_processes(options = {}, &block)
  count, options = extract_count_from_options(options)
  count ||= processor_count
  map(0...count, options.merge(:in_processes => count), &block)
end

.in_threads(options = {:count => 2}) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/parallel.rb', line 203

def in_threads(options={:count => 2})
  threads = []
  count, _ = extract_count_from_options(options)

  Thread.handle_interrupt(Exception => :never) do
    begin
      Thread.handle_interrupt(Exception => :immediate) do
        count.times do |i|
          threads << Thread.new { yield(i) }
        end
        threads.map(&:value)
      end
    ensure
      threads.each(&:kill)
    end
  end
end

.map(source, options = {}, &block) ⇒ Object



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/parallel.rb', line 245

def map(source, options = {}, &block)
  options = options.dup
  options[:mutex] = Mutex.new

  if options[:in_processes] && options[:in_threads]
    raise ArgumentError.new("Please specify only one of `in_processes` or `in_threads`.")
  elsif RUBY_PLATFORM =~ /java/ and not options[:in_processes]
    method = :in_threads
    size = options[method] || processor_count
  elsif options[:in_threads]
    method = :in_threads
    size = options[method]
  else
    method = :in_processes
    if Process.respond_to?(:fork)
      size = options[method] || processor_count
    else
      warn "Process.fork is not supported by this Ruby"
      size = 0
    end
  end

  job_factory = JobFactory.new(source, options[:mutex])
  size = [job_factory.size, size].min

  options[:return_results] = (options[:preserve_results] != false || !!options[:finish])
  add_progress_bar!(job_factory, options)

  results = if size == 0
    work_direct(job_factory, options, &block)
  elsif method == :in_threads
    work_in_threads(job_factory, options.merge(:count => size), &block)
  else
    work_in_processes(job_factory, options.merge(:count => size), &block)
  end
  if results
    options[:return_results] ? results : source
  end
end

.map_with_index(array, options = {}, &block) ⇒ Object



285
286
287
# File 'lib/parallel.rb', line 285

def map_with_index(array, options={}, &block)
  map(array, options.merge(:with_index => true), &block)
end

.worker_numberObject



293
294
295
# File 'lib/parallel.rb', line 293

def worker_number
  Thread.current[:parallel_worker_number]
end

.worker_number=(worker_num) ⇒ Object

TODO: this does not work when doing threads in forks, so should remove and yield the number instead if needed



298
299
300
# File 'lib/parallel.rb', line 298

def worker_number=(worker_num)
  Thread.current[:parallel_worker_number] = worker_num
end