Class: DStream::Burst

Inherits:
Abstract show all
Defined in:
lib/d-stream.rb

Instance Method Summary collapse

Methods inherited from Abstract

#inspect

Constructor Details

#initialize(delay, size) ⇒ Burst

Returns a new instance of Burst.



83
84
85
86
# File 'lib/d-stream.rb', line 83

def initialize(delay, size)
  @delay = delay
  @size = size
end

Instance Method Details

#apply(s) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/d-stream.rb', line 88

def apply(s)
  q = SizedQueue.new(1)
  stop = Object.new

  t =
    Thread.new do
      Thread.current.abort_on_exception = true
      i = 0
      s.each do |e|
        sleep(@delay) if (i % @size).zero?
        q << e
        i += 1
      end
      q << stop
    end

  Enumerator.new do |y|
    loop do
      e = q.pop
      break if stop.equal?(e)
      y << e
    end
    t.join
  end.lazy
end