Class: Enumerator::Async

Inherits:
Enumerator show all
Defined in:
lib/enumerator/async.rb

Defined Under Namespace

Classes: Lockset

Instance Method Summary collapse

Constructor Details

#initialize(enum, pool_size = nil) ⇒ Async

Returns a new instance of Async.



23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/enumerator/async.rb', line 23

def initialize(enum, pool_size = nil)
  if pool_size
    unless pool_size >= 1
      message = "Thread pool size is invalid! Expected a positive integer but got: #{pool_size}"
      raise ArgumentError, message
    end
    pool_size = pool_size.to_i
  end

  @enum = enum
  @pool_size = pool_size
  @lockset = Lockset.new
end

Instance Method Details

#each(&work) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/enumerator/async.rb', line 51

def each(&work)
  raise_error('each') unless block_given?
  
  if @pool_size
    queue = SizedQueue.new @pool_size
    
    threads = @pool_size.times.map do
      Thread.new do
        loop do
          item = queue.pop
          item != EOQ ? evaluate(item, &work) : break
        end
      end
    end
    
    begin
      loop { queue.push @enum.next }
    rescue StopIteration
    ensure
      @pool_size.times { queue.push EOQ }
    end

    threads.each(&:join)
    @enum.rewind
  else
    unlimited_threads(&work).each(&:join)
  end
  self
end

#map(&work) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/enumerator/async.rb', line 99

def map(&work)
  raise_error('map') unless block_given?
  
  if @pool_size
    outs = []
    with_index do |item, index|
      outs[index] = evaluate(item, &work)
    end
    outs
  else
    unlimited_threads(&work).map(&:value)
  end
end

#sizeObject



47
48
49
# File 'lib/enumerator/async.rb', line 47

def size
  @enum.size
end

#syncObject Also known as: to_enum



41
42
43
# File 'lib/enumerator/async.rb', line 41

def sync
  @enum
end

#to_aObject



37
38
39
# File 'lib/enumerator/async.rb', line 37

def to_a
  @enum.to_a
end

#with_index(start = 0, &work) ⇒ Object



81
82
83
84
85
86
87
88
# File 'lib/enumerator/async.rb', line 81

def with_index(start = 0, &work)
  @enum = @enum.with_index(start)
  if block_given?
    each(&work)
  else
    self
  end
end

#with_object(obj, &work) ⇒ Object



90
91
92
93
94
95
96
97
# File 'lib/enumerator/async.rb', line 90

def with_object(obj, &work)
  @enum = @enum.with_object(obj)
  if block_given?
    each(&work); obj
  else
    self
  end
end