5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# File 'lib/mapahead.rb', line 5
def stream_map enumerable, work_ahead=1, &blk
return enumerable if blk.nil?
Enumerator.new do |y|
pool = Concurrent::FixedThreadPool.new(work_ahead)
results = Queue.new
work_ahead.times do
work = enumerable.next
pool.post do
results << blk.call(work)
end
end
while !pool.shutdown? || !results.empty?
begin
y << results.pop(true)
work = enumerable.next
pool.post do
results << blk.call(work)
end
rescue ThreadError
nil
rescue StopIteration
pool.shutdown
end
end
end
end
|