Class: DStream::Trickle

Inherits:
Abstract show all
Defined in:
lib/d-stream.rb

Instance Method Summary collapse

Methods inherited from Abstract

#inspect

Constructor Details

#initialize(rate) ⇒ Trickle

Returns a new instance of Trickle.



53
54
55
# File 'lib/d-stream.rb', line 53

def initialize(rate)
  @rate = rate
end

Instance Method Details

#apply(s) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/d-stream.rb', line 57

def apply(s)
  q = SizedQueue.new(1)
  stop = Object.new

  t =
    Thread.new do
      Thread.current.abort_on_exception = true
      s.each do |e|
        q << e
        sleep(1.0 / @rate)
      end
      q << stop
    end

  Enumerator.new do |y|
    loop do
      e = q.pop
      break if stop.equal?(e)
      y << e
    end
    t.join
  end.lazy
end