Class: Griffin::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/griffin/thread_pool.rb

Constant Summary collapse

DEFAULT_POOL_SIZE =
20
DEFAULT_QUEUE_SIZE =
512

Instance Method Summary collapse

Constructor Details

#initialize(pool_size = DEFAULT_POOL_SIZE, queue_size: DEFAULT_QUEUE_SIZE, &block) ⇒ ThreadPool

Returns a new instance of ThreadPool.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/griffin/thread_pool.rb', line 10

def initialize(pool_size = DEFAULT_POOL_SIZE, queue_size: DEFAULT_QUEUE_SIZE, &block)
  @pool_size = pool_size
  @queue_size = queue_size
  @block = block
  @shutdown = false
  @semaphore = Griffin::CountingSemaphore.new(queue_size)
  @tasks = Queue.new

  @spawned = 0
  @workers = []
  @mutex = Mutex.new

  @pool_size.times { spawn_thread }
end

Instance Method Details

#schedule(task, &block) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/griffin/thread_pool.rb', line 25

def schedule(task, &block)
  if task.nil?
    return
  end

  if @shutdown
    raise "scheduling new task isn't allowed during shutdown"
  end

  # TODO: blocking now..
  @semaphore.wait
  @tasks.push(block || task)

  @mutex.synchronize do
    if @spawned < @pool_size
      spawn_thread
    end
  end
end

#shutdownObject



45
46
47
48
49
# File 'lib/griffin/thread_pool.rb', line 45

def shutdown
  @shutdown = true
  @pool_size.times { @tasks.push(nil) }
  sleep 1 until @workers.empty?
end