Class: SimpleThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/simple_thread_pool.rb

Overview

Simple thread pool for executing blocks in parallel in a controlled manner. Threads are not re-used by the pool to prevent any thread local variables from leaking out.

Instance Method Summary collapse

Constructor Details

#initialize(max_threads) ⇒ SimpleThreadPool

Returns a new instance of SimpleThreadPool.



11
12
13
14
15
16
# File 'lib/simple_thread_pool.rb', line 11

def initialize(max_threads)
  @max_threads = max_threads
  @lock = Mutex.new
  @threads = []
  @processing_ids = []
end

Instance Method Details

#execute(id = nil, &block) ⇒ Object

Call this method to spawn a thread to run the block. If the thread pool is already full, this method will block until a thread is free. The block is responsible for handling any exceptions that could be raised.

The optional id argument can be used to provide an identifier for a block. If one is provided, processing will be blocked if the same id is already being processed. This ensures that each unique id is executed one at a time sequentially.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/simple_thread_pool.rb', line 26

def execute(id = nil, &block)
  loop do
    # Check if a new thread can be added without blocking.
    while !can_add_thread?(id)
      sleep(0.001)
    end
    
    @lock.synchronize do
      # Check again inside a synchronized block if the thread can still be added.
      if can_add_thread?(id)
        @processing_ids << id unless id.nil?
        add_thread(id, block)
        return
      end
    end
  end
end

#finishObject

Call this method to block until all current threads have finished executing.



45
46
47
48
49
# File 'lib/simple_thread_pool.rb', line 45

def finish
  active_threads = @lock.synchronize { @threads.select(&:alive?) }
  active_threads.each(&:join)
  nil
end

#synchronize(&block) ⇒ Object

Synchronize data access across the thread pool. This method will block waiting on the same internal Mutex the thread pool uses to manage scheduling threads.



54
55
56
# File 'lib/simple_thread_pool.rb', line 54

def synchronize(&block)
  @lock.synchronize(&block)
end