Class: Pacer::Pipes::StreamSortPipe
- Defined in:
- lib/pacer/pipe/stream_sort_pipe.rb
Overview
has 3 states:
-
starts in rebalancing mode to build up a working set which stores elements in 4 silos
-
if the first silo becomes empty, it goes into clearing mode
-
in clearing mode, it will empty all elements out of the first non-empty silo
-
it then goes back into rebalancing mode
Number of silos is 4 by default, a number I pulled out of my ass without any testing.
Instance Attribute Summary
Attributes inherited from RubyPipe
Instance Method Summary collapse
-
#initialize(queue_length = 100, silo_size = 10) ⇒ StreamSortPipe
constructor
A new instance of StreamSortPipe.
- #processNextStart ⇒ Object
- #setSiloSize(n) ⇒ Object
Methods inherited from RubyPipe
#enablePath, #reset, #setStarts
Constructor Details
#initialize(queue_length = 100, silo_size = 10) ⇒ StreamSortPipe
Returns a new instance of StreamSortPipe.
11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/pacer/pipe/stream_sort_pipe.rb', line 11 def initialize(queue_length = 100, silo_size = 10) super() @queue_length = queue_length @rebalancing = [] @rebal_length = 0 @first_silo = [] @second_silo = [] @third_silo = [] @clearing = [] @silo_size = silo_size end |
Instance Method Details
#processNextStart ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/pacer/pipe/stream_sort_pipe.rb', line 27 def processNextStart if @clearing.any? return @clearing.shift end while @rebal_length < @queue_length and @starts.hasNext @rebalancing << @starts.next @rebal_length += 1 end if @rebalancing.any? if @starts.hasNext @rebalancing.sort! @first_silo = @rebalancing.slice(0, @silo_size) @second_silo = @rebalancing.slice(@silo_size, @silo_size * 2) || [] @third_silo = @rebalancing.slice(@silo_size * 2..-1) || [] @rebalancing = [] else @clearing = @rebalancing.sort @rebalancing = [] return processNextStart end end if @starts.hasNext if @first_silo.any? element = @starts.next begin if element < @first_silo.last @first_silo << element @first_silo.sort! elsif @second_silo.none? or element < @second_silo.last @second_silo.unshift element else @third_silo << element end rescue => e @first_silo.unshift element end return @first_silo.shift else @clearing = @second_silo @clearing.sort! @rebalancing = @third_silo @rebal_length = @rebalancing.length return @rebalancing.shift end else @clearing = @first_silo + @second_silo.sort! + @third_silo.sort! return processNextStart if @clearing.any? end raise EmptyPipe.instance end |
#setSiloSize(n) ⇒ Object
23 24 25 |
# File 'lib/pacer/pipe/stream_sort_pipe.rb', line 23 def setSiloSize(n) @silo_size = n end |