Class: DStream::Buffer
Instance Method Summary collapse
- #apply(s) ⇒ Object
-
#initialize(size) ⇒ Buffer
constructor
A new instance of Buffer.
Methods inherited from Abstract
Constructor Details
#initialize(size) ⇒ Buffer
Returns a new instance of Buffer.
116 117 118 |
# File 'lib/d-stream.rb', line 116 def initialize(size) @size = size end |
Instance Method Details
#apply(s) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/d-stream.rb', line 120 def apply(s) q = SizedQueue.new(@size) stop = Object.new t = Thread.new do Thread.current.abort_on_exception = true s.each { |e| q << e } q << stop end Enumerator.new do |y| loop do e = q.pop break if stop.equal?(e) y << e end t.join end.lazy end |