Class: Enumerator::Async
- Inherits:
-
Enumerator
- Object
- Enumerator
- Enumerator::Async
- Defined in:
- lib/enumerator/async.rb
Defined Under Namespace
Classes: Lockset
Instance Method Summary collapse
- #each(&work) ⇒ Object
-
#initialize(enum, pool_size = nil) ⇒ Async
constructor
A new instance of Async.
- #map(&work) ⇒ Object
- #size ⇒ Object
- #sync ⇒ Object (also: #to_enum)
- #to_a ⇒ Object
- #with_index(start = 0, &work) ⇒ Object
- #with_object(obj, &work) ⇒ Object
Constructor Details
#initialize(enum, pool_size = nil) ⇒ Async
Returns a new instance of Async.
23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/enumerator/async.rb', line 23 def initialize(enum, pool_size = nil) if pool_size unless pool_size >= 1 = "Thread pool size is invalid! Expected a positive integer but got: #{pool_size}" raise ArgumentError, end pool_size = pool_size.to_i end @enum = enum @pool_size = pool_size @lockset = Lockset.new end |
Instance Method Details
#each(&work) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/enumerator/async.rb', line 51 def each(&work) raise_error('each') unless block_given? if @pool_size queue = SizedQueue.new @pool_size threads = @pool_size.times.map do Thread.new do loop do item = queue.pop item != EOQ ? evaluate(item, &work) : break end end end begin loop { queue.push @enum.next } rescue StopIteration ensure @pool_size.times { queue.push EOQ } end threads.each(&:join) @enum.rewind else unlimited_threads(&work).each(&:join) end self end |
#map(&work) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/enumerator/async.rb', line 99 def map(&work) raise_error('map') unless block_given? if @pool_size outs = [] with_index do |item, index| outs[index] = evaluate(item, &work) end outs else unlimited_threads(&work).map(&:value) end end |
#size ⇒ Object
47 48 49 |
# File 'lib/enumerator/async.rb', line 47 def size @enum.size end |
#sync ⇒ Object Also known as: to_enum
41 42 43 |
# File 'lib/enumerator/async.rb', line 41 def sync @enum end |
#to_a ⇒ Object
37 38 39 |
# File 'lib/enumerator/async.rb', line 37 def to_a @enum.to_a end |
#with_index(start = 0, &work) ⇒ Object
81 82 83 84 85 86 87 88 |
# File 'lib/enumerator/async.rb', line 81 def with_index(start = 0, &work) @enum = @enum.with_index(start) if block_given? each(&work) else self end end |
#with_object(obj, &work) ⇒ Object
90 91 92 93 94 95 96 97 |
# File 'lib/enumerator/async.rb', line 90 def with_object(obj, &work) @enum = @enum.with_object(obj) if block_given? each(&work); obj else self end end |