Class: DStream::Burst
Instance Method Summary collapse
- #apply(s) ⇒ Object
-
#initialize(delay, size) ⇒ Burst
constructor
A new instance of Burst.
Methods inherited from Abstract
Constructor Details
#initialize(delay, size) ⇒ Burst
Returns a new instance of Burst.
83 84 85 86 |
# File 'lib/d-stream.rb', line 83 def initialize(delay, size) @delay = delay @size = size end |
Instance Method Details
#apply(s) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/d-stream.rb', line 88 def apply(s) q = SizedQueue.new(1) stop = Object.new t = Thread.new do Thread.current.abort_on_exception = true i = 0 s.each do |e| sleep(@delay) if (i % @size).zero? q << e i += 1 end q << stop end Enumerator.new do |y| loop do e = q.pop break if stop.equal?(e) y << e end t.join end.lazy end |