Class: Myreplicator::Parallelizer
- Inherits:
-
Object
- Object
- Myreplicator::Parallelizer
- Defined in:
- lib/transporter/parallelizer.rb
Instance Attribute Summary collapse
-
#queue ⇒ Object
Returns the value of attribute queue.
Instance Method Summary collapse
-
#done? ⇒ Boolean
Returns true when all jobs are processed and no thread is running.
-
#initialize(*args) ⇒ Parallelizer
constructor
A new instance of Parallelizer.
-
#manage_threads ⇒ Object
Clears dead threads, frees thread pool for more jobs Exits when no more threads are left.
-
#run ⇒ Object
Runs while there are jobs in the queue Waits for a second and checks for available threads Exits when all jobs are allocated in threads.
Constructor Details
#initialize(*args) ⇒ Parallelizer
Returns a new instance of Parallelizer.
16 17 18 19 20 21 22 |
# File 'lib/transporter/parallelizer.rb', line 16 def initialize *args = args. @queue = Queue.new @threads = [] @max_threads = [:max_threads].nil? ? 10 : [:max_threads] @klass = [:klass].constantize end |
Instance Attribute Details
#queue ⇒ Object
Returns the value of attribute queue.
14 15 16 |
# File 'lib/transporter/parallelizer.rb', line 14 def queue @queue end |
Instance Method Details
#done? ⇒ Boolean
Returns true when all jobs are processed and no thread is running
86 87 88 89 90 91 |
# File 'lib/transporter/parallelizer.rb', line 86 def done? if @queue.size == 0 && @threads.size == 0 return true end return false end |
#manage_threads ⇒ Object
Clears dead threads, frees thread pool for more jobs Exits when no more threads are left
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/transporter/parallelizer.rb', line 61 def manage_threads Thread.new do while(@threads.size > 0) done = [] @threads.each do |t| done << t if t.stop? end done.each{|d| @threads.delete(d)} # Clear dead threads # If no more jobs are left, mark done if done? @done = true else sleep 2 # Wait for more threads to spawn end end end end |
#run ⇒ Object
Runs while there are jobs in the queue Waits for a second and checks for available threads Exits when all jobs are allocated in threads
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/transporter/parallelizer.rb', line 29 def run @done = false @manager_running = false reaper = nil while @queue.size > 0 if @threads.size <= @max_threads @threads << Thread.new(@queue.pop) do |proc| @klass.new.instance_exec(proc[:params], &proc[:block]) end else unless @manager_running reaper = manage_threads @manager_running = true end sleep 1 end end # Run manager if thread size never reached max reaper = manage_threads unless @manager_running # Waits until all threads are completed # Before exiting reaper.join end |