Class: TRuby::ParallelProcessor
- Inherits:
-
Object
- Object
- TRuby::ParallelProcessor
- Defined in:
- lib/t_ruby/cache.rb
Overview
Parallel file processor
Instance Attribute Summary collapse
-
#thread_count ⇒ Object
readonly
Returns the value of attribute thread_count.
Instance Method Summary collapse
-
#initialize(thread_count: nil) ⇒ ParallelProcessor
constructor
A new instance of ParallelProcessor.
-
#process_files(file_paths, &block) ⇒ Object
Process files in parallel.
-
#process_with_queue(file_paths, &block) ⇒ Object
Process with work stealing.
Constructor Details
#initialize(thread_count: nil) ⇒ ParallelProcessor
381 382 383 |
# File 'lib/t_ruby/cache.rb', line 381 def initialize(thread_count: nil) @thread_count = thread_count || determine_thread_count end |
Instance Attribute Details
#thread_count ⇒ Object (readonly)
Returns the value of attribute thread_count.
379 380 381 |
# File 'lib/t_ruby/cache.rb', line 379 def thread_count @thread_count end |
Instance Method Details
#process_files(file_paths, &block) ⇒ Object
Process files in parallel
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 |
# File 'lib/t_ruby/cache.rb', line 386 def process_files(file_paths, &block) return [] if file_paths.empty? # Split into batches batches = file_paths.each_slice(batch_size(file_paths.length)).to_a results = [] mutex = Mutex.new threads = batches.map do |batch| Thread.new do batch_results = batch.map { |file| block.call(file) } mutex.synchronize { results.concat(batch_results) } end end threads.each(&:join) results end |
#process_with_queue(file_paths, &block) ⇒ Object
Process with work stealing
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 |
# File 'lib/t_ruby/cache.rb', line 407 def process_with_queue(file_paths, &block) queue = Queue.new file_paths.each { |f| queue << f } results = [] mutex = Mutex.new threads = @thread_count.times.map do Thread.new do loop do file = begin queue.pop(true) rescue StandardError break end result = block.call(file) mutex.synchronize { results << result } end end end threads.each(&:join) results end |