Class: ThreadPuddle
- Inherits:
-
Object
- Object
- ThreadPuddle
- Defined in:
- lib/thread_puddle.rb
Instance Attribute Summary collapse
-
#size ⇒ Object
readonly
Returns the value of attribute size.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
Instance Method Summary collapse
-
#initialize(size = 24) ⇒ ThreadPuddle
constructor
A new instance of ThreadPuddle.
- #perform(*args, &block) ⇒ Object
- #shutdown(timeout = nil) ⇒ Object
- #status ⇒ Object
- #submit(*args, &block) ⇒ Object
Constructor Details
#initialize(size = 24) ⇒ ThreadPuddle
Returns a new instance of ThreadPuddle.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/thread_puddle.rb', line 7 def initialize size = 24 @size = size @state = :running @queue = Queue.new @pool = size.times.map do Thread.new do catch :exit do loop do job, args, result_queue = queue.deq if result_queue result_queue.enq job.call *args else # async job.call *args end end end end end end |
Instance Attribute Details
#size ⇒ Object (readonly)
Returns the value of attribute size.
5 6 7 |
# File 'lib/thread_puddle.rb', line 5 def size @size end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
5 6 7 |
# File 'lib/thread_puddle.rb', line 5 def state @state end |
Instance Method Details
#perform(*args, &block) ⇒ Object
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/thread_puddle.rb', line 36 def perform *args, &block if state == :running result_queue = Queue.new queue.enq [block, args, result_queue] result_queue.deq else raise "thread pool has been shutdown" end end |
#shutdown(timeout = nil) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/thread_puddle.rb', line 47 def shutdown timeout = nil state = :shutting_down size.times do submit do throw :exit end end if timeout shutdown_with_timeout timeout else pool.each &:join end queue.clear state = :terminated size = 0 end |
#status ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/thread_puddle.rb', line 67 def status { :size => size, :state => state, :threads => pool.map(&:status), :queue => { :size => queue.size, :consumers => queue.num_waiting, }, } end |
#submit(*args, &block) ⇒ Object
28 29 30 31 32 33 34 |
# File 'lib/thread_puddle.rb', line 28 def submit *args, &block if state == :running queue.enq [block, args] else raise "thread pool has been shutdown" end end |