Module: Enumerable

Defined in:
lib/threadlimiter/enumerable.rb

Instance Method Summary collapse

Instance Method Details

#clustered_threaded_collect(number_of_clusters = -1,, &block) ⇒ Object Also known as: clustered_threaded_map

Like Enumerable#collect(), but all blocks are clustered. Each cluster is run concurrently in a thread, using ThreadLimiter.new(number_of_clusters) and its fork(). Set number_of_clusters to -1 to skip clustering.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/threadlimiter/enumerable.rb', line 27

def clustered_threaded_collect(number_of_clusters=-1, &block)
  if number_of_clusters <= 0
    threaded_collect(number_of_clusters, &block)
  else
    clusters	= []	# One cluster per thread.
    last_pos	= nil
    res	= []

    self.each_with_index do |object, pos|
      (clusters[pos%number_of_clusters] ||= []) << object

      last_pos	= pos
    end

    clusters.threaded_collect(-1) do |cluster|
      cluster.collect do |object|
        if block.arity > 1 and object.kind_of?(Enumerable)
          yield(*object.to_a)
        else
          yield(object)
        end
      end + (cluster.length == clusters[0].length ? [] : [nil])	# Add padding nil, in order to be able to transpose
    end.transpose.each do |array|
      res.concat(array)
    end

    res[0..last_pos]	# Remove padding nils.
  end
end

#clustered_threaded_each(number_of_clusters = -1,, &block) ⇒ Object

Like Enumerable#each(), but all blocks are clustered. Each cluster is run concurrently in a thread, using ThreadLimiter.new(number_of_clusters) and its fork(). Set number_of_clusters to -1 to skip clustering.



112
113
114
115
116
# File 'lib/threadlimiter/enumerable.rb', line 112

def clustered_threaded_each(number_of_clusters=-1, &block)
  clustered_threaded_collect(number_of_clusters=-1, &block)

  self
end

#clustered_threaded_reject(number_of_clusters = -1,, &block) ⇒ Object

Like Enumerable#reject(), but all blocks are clustered. Each cluster is run concurrently in a thread, using ThreadLimiter.new(number_of_clusters) and its fork(). Set number_of_clusters to -1 to skip clustering.



104
105
106
# File 'lib/threadlimiter/enumerable.rb', line 104

def clustered_threaded_reject(number_of_clusters=-1, &block)
  self.zip(self.clustered_threaded_collect(number_of_clusters=-1, &block)).inject([]){|r, (o, b)| r << o unless b ; r}
end

#clustered_threaded_select(number_of_clusters = -1,, &block) ⇒ Object

Like Enumerable#select(), but all blocks are clustered. Each cluster is run concurrently in a thread, using ThreadLimiter.new(number_of_clusters) and its fork(). Set number_of_clusters to -1 to skip clustering.



96
97
98
# File 'lib/threadlimiter/enumerable.rb', line 96

def clustered_threaded_select(number_of_clusters=-1, &block)
  self.zip(self.clustered_threaded_collect(number_of_clusters=-1, &block)).inject([]){|r, (o, b)| r << o if b ; r}
end

#threaded_collect(limit = -1,, &block) ⇒ Object Also known as: threaded_map

Like Enumerable#collect(), but each block is run concurrently in a thread, using ThreadLimiter.new(limit) and its fork(). Set limit to 0 to use plain old collect() without any threading.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/threadlimiter/enumerable.rb', line 5

def threaded_collect(limit=-1, &block)
  if limit == 0
    self.collect(&block)
  else
    thread_limiter	= ThreadLimiter.new(limit)

    self.collect do |object|
      if block.arity > 1 and object.kind_of?(Enumerable)
        thread_limiter.fork(*object.to_a, &block)
      else
        thread_limiter.fork(object, &block)
      end
    end.collect do |thread|
      thread.value
    end
  end
end

#threaded_each(limit = -1,, &block) ⇒ Object

Like Enumerable#each(), but each block is run concurrently in a thread, using ThreadLimiter.new(limit) and its fork(). Set limit to 0 to use plain old each() without any threading.



82
83
84
85
86
87
88
89
90
# File 'lib/threadlimiter/enumerable.rb', line 82

def threaded_each(limit=-1, &block)
  if limit == 0
    self.each(&block)
  else
    threaded_collect(limit=-1, &block)

    self
  end
end

#threaded_reject(limit = -1,, &block) ⇒ Object

Like Enumerable#reject(), but each block is run concurrently in a thread, using ThreadLimiter.new(limit) and its fork(). Set limit to 0 to use plain old reject() without any threading.



71
72
73
74
75
76
77
# File 'lib/threadlimiter/enumerable.rb', line 71

def threaded_reject(limit=-1, &block)
  if limit == 0
    self.reject(&block)
  else
    self.zip(self.threaded_collect(limit=-1, &block)).inject([]){|r, (o, b)| r << o unless b ; r}
  end
end

#threaded_select(limit = -1,, &block) ⇒ Object

Like Enumerable#select(), but each block is run concurrently in a thread, using ThreadLimiter.new(limit) and its fork(). Set limit to 0 to use plain old select() without any threading.



60
61
62
63
64
65
66
# File 'lib/threadlimiter/enumerable.rb', line 60

def threaded_select(limit=-1, &block)
  if limit == 0
    self.select(&block)
  else
    self.zip(self.threaded_collect(limit=-1, &block)).inject([]){|r, (o, b)| r << o if b ; r}
  end
end