Class: Rmega::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/rmega/pool.rb

Constant Summary collapse

MAX =
5

Instance Method Summary collapse

Constructor Details

#initialize(max) ⇒ Pool

Returns a new instance of Pool.



7
8
9
10
11
12
# File 'lib/rmega/pool.rb', line 7

def initialize(max)
  max ||= MAX
  Thread.abort_on_exception = true
  @mutex = Mutex.new
  @threads = Array.new(max)
end

Instance Method Details

#available_slotObject

Gets the first position of the pool in which a thread could be started.



16
17
18
19
20
21
# File 'lib/rmega/pool.rb', line 16

def available_slot
  @threads.each_with_index do |thread, index|
    return index if thread.nil? or !thread.alive?
  end
  nil
end

#defer(&block) ⇒ Object

Blocking. Starts a new thread with the given block when a pool’s slot become available.



58
59
60
61
62
# File 'lib/rmega/pool.rb', line 58

def defer(&block)
  index = wait_available_slot
  @threads[index].kill if @threads[index].respond_to?(:kill)
  @threads[index] = Thread.new(&block)
end

#done?Boolean

Returns true if all the threads are finished, false otherwise.

Returns:

  • (Boolean)


29
30
31
32
# File 'lib/rmega/pool.rb', line 29

def done?
  @threads.each { |thread| return false if thread and thread.alive? }
  true
end

#shutdownObject

Sends a KILL signal to all the threads.



51
52
53
54
# File 'lib/rmega/pool.rb', line 51

def shutdown
  @threads.each { |thread| thread.kill if thread.respond_to?(:kill) }
  @threads.map! { nil }
end

#synchronize(&block) ⇒ Object



23
24
25
# File 'lib/rmega/pool.rb', line 23

def synchronize(&block)
  @mutex.synchronize(&block)
end

#wait_available_slotObject

Blocking. Waits until a pool’s slot become available and returns that position. TODO: raise an error on wait timeout.



42
43
44
45
46
47
48
# File 'lib/rmega/pool.rb', line 42

def wait_available_slot
  while true
    index = available_slot
    return index if index
    sleep 0.01
  end
end

#wait_doneObject

Blocking. Waits until all the threads are finished.



35
36
37
# File 'lib/rmega/pool.rb', line 35

def wait_done
  sleep 0.01 until done?
end