Class: ThreadPool
- Inherits:
-
Object
show all
- Defined in:
- lib/threadpool.rb
Defined Under Namespace
Classes: PoolStopped, Task
Instance Method Summary
collapse
Constructor Details
#initialize(thread_size = 10, queue_size = 100) ⇒ ThreadPool
Returns a new instance of ThreadPool.
8
9
10
11
12
13
14
15
16
|
# File 'lib/threadpool.rb', line 8
def initialize(thread_size=10, queue_size=100)
@mutex = Monitor.new
@cv = @mutex.new_cond
@queue = []
@max_queue_size = queue_size
@threads = []
@stopped = false
thread_size.times { @threads << Thread.new { start_worker } }
end
|
Instance Method Details
#add_work(*args, &callback) ⇒ Object
18
19
20
|
# File 'lib/threadpool.rb', line 18
def add_work(*args, &callback)
push_task(Task.new(*args, &callback))
end
|
#pop_task ⇒ Object
32
33
34
35
36
37
38
39
40
|
# File 'lib/threadpool.rb', line 32
def pop_task
task = nil
@mutex.synchronize do
@cv.wait_while { @queue.size == 0 }
task = @queue.shift
@cv.broadcast
end
task
end
|
#push_task(task) ⇒ Object
22
23
24
25
26
27
28
29
30
|
# File 'lib/threadpool.rb', line 22
def push_task(task)
@mutex.synchronize do
raise PoolStopped.new if @stopped
@cv.wait_while { @max_queue_size > 0 && @queue.size >= @max_queue_size }
@queue.push(task)
@cv.broadcast
end
task
end
|
#shutdown ⇒ Object
42
43
44
45
46
47
48
49
|
# File 'lib/threadpool.rb', line 42
def shutdown
@mutex.synchronize do
@stopped = true
@threads.each { @queue.push(:stop) }
@cv.broadcast
end
@threads.each { |thread| thread.join }
end
|
#start_worker ⇒ Object
51
52
53
54
55
56
57
|
# File 'lib/threadpool.rb', line 51
def start_worker
while true
task = pop_task
return if task == :stop
task.execute
end
end
|
#sync ⇒ Object
wait for current work to complete
60
61
62
63
|
# File 'lib/threadpool.rb', line 60
def sync
tasks = @mutex.synchronize { @queue.dup }
tasks.each { |task| task.join }
end
|