Class: TRuby::ParallelProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/t_ruby/cache.rb

Overview

Parallel file processor

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(thread_count: nil) ⇒ ParallelProcessor



381
382
383
# File 'lib/t_ruby/cache.rb', line 381

def initialize(thread_count: nil)
  @thread_count = thread_count || determine_thread_count
end

Instance Attribute Details

#thread_countObject (readonly)

Returns the value of attribute thread_count.



379
380
381
# File 'lib/t_ruby/cache.rb', line 379

def thread_count
  @thread_count
end

Instance Method Details

#process_files(file_paths, &block) ⇒ Object

Process files in parallel



386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
# File 'lib/t_ruby/cache.rb', line 386

def process_files(file_paths, &block)
  return [] if file_paths.empty?

  # Split into batches
  batches = file_paths.each_slice(batch_size(file_paths.length)).to_a

  results = []
  mutex = Mutex.new

  threads = batches.map do |batch|
    Thread.new do
      batch_results = batch.map { |file| block.call(file) }
      mutex.synchronize { results.concat(batch_results) }
    end
  end

  threads.each(&:join)
  results
end

#process_with_queue(file_paths, &block) ⇒ Object

Process with work stealing



407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
# File 'lib/t_ruby/cache.rb', line 407

def process_with_queue(file_paths, &block)
  queue = Queue.new
  file_paths.each { |f| queue << f }

  results = []
  mutex = Mutex.new

  threads = @thread_count.times.map do
    Thread.new do
      loop do
        file = begin
          queue.pop(true)
        rescue StandardError
          break
        end
        result = block.call(file)
        mutex.synchronize { results << result }
      end
    end
  end

  threads.each(&:join)
  results
end