Class: ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/threadpool.rb

Defined Under Namespace

Classes: PoolStopped, Task

Instance Method Summary collapse

Constructor Details

#initialize(thread_size = 10, queue_size = 100) ⇒ ThreadPool

Returns a new instance of ThreadPool.



8
9
10
11
12
13
14
15
16
# File 'lib/threadpool.rb', line 8

def initialize(thread_size=10, queue_size=100)
  @mutex = Monitor.new
  @cv = @mutex.new_cond
  @queue = []
  @max_queue_size = queue_size
  @threads = []
  @stopped = false
  thread_size.times { @threads << Thread.new { start_worker } }
end

Instance Method Details

#add_work(*args, &callback) ⇒ Object



18
19
20
# File 'lib/threadpool.rb', line 18

def add_work(*args, &callback)
  push_task(Task.new(*args, &callback))
end

#pop_taskObject



32
33
34
35
36
37
38
39
40
# File 'lib/threadpool.rb', line 32

def pop_task
  task = nil
  @mutex.synchronize do
    @cv.wait_while { @queue.size == 0 }
    task = @queue.shift
    @cv.broadcast
  end
  task
end

#push_task(task) ⇒ Object



22
23
24
25
26
27
28
29
30
# File 'lib/threadpool.rb', line 22

def push_task(task)
  @mutex.synchronize do
    raise PoolStopped.new if @stopped
    @cv.wait_while { @max_queue_size > 0 && @queue.size >= @max_queue_size }
    @queue.push(task)
    @cv.broadcast
  end
  task
end

#shutdownObject



42
43
44
45
46
47
48
49
# File 'lib/threadpool.rb', line 42

def shutdown
  @mutex.synchronize do
    @stopped = true
    @threads.each { @queue.push(:stop) }
    @cv.broadcast
  end
  @threads.each { |thread| thread.join }
end

#start_workerObject



51
52
53
54
55
56
57
# File 'lib/threadpool.rb', line 51

def start_worker
  while true
    task = pop_task
    return if task == :stop
    task.execute
  end
end

#syncObject

wait for current work to complete



60
61
62
63
# File 'lib/threadpool.rb', line 60

def sync
  tasks = @mutex.synchronize { @queue.dup }
  tasks.each { |task| task.join }
end