Class: DStream::Buffer

Inherits:
Abstract show all
Defined in:
lib/d-stream.rb

Instance Method Summary collapse

Methods inherited from Abstract

#inspect

Constructor Details

#initialize(size) ⇒ Buffer

Returns a new instance of Buffer.



116
117
118
# File 'lib/d-stream.rb', line 116

def initialize(size)
  @size = size
end

Instance Method Details

#apply(s) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/d-stream.rb', line 120

def apply(s)
  q = SizedQueue.new(@size)
  stop = Object.new

  t =
    Thread.new do
      Thread.current.abort_on_exception = true
      s.each { |e| q << e }
      q << stop
    end

  Enumerator.new do |y|
    loop do
      e = q.pop
      break if stop.equal?(e)
      y << e
    end
    t.join
  end.lazy
end