Class: DStream::Trickle
Instance Method Summary collapse
- #apply(s) ⇒ Object
-
#initialize(rate) ⇒ Trickle
constructor
A new instance of Trickle.
Methods inherited from Abstract
Constructor Details
#initialize(rate) ⇒ Trickle
Returns a new instance of Trickle.
53 54 55 |
# File 'lib/d-stream.rb', line 53 def initialize(rate) @rate = rate end |
Instance Method Details
#apply(s) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/d-stream.rb', line 57 def apply(s) q = SizedQueue.new(1) stop = Object.new t = Thread.new do Thread.current.abort_on_exception = true s.each do |e| q << e sleep(1.0 / @rate) end q << stop end Enumerator.new do |y| loop do e = q.pop break if stop.equal?(e) y << e end t.join end.lazy end |