Class: ThreadLimiter

Inherits:
Object
  • Object
show all
Defined in:
lib/threadlimiter/threadlimiter.rb

Overview

Fork threads like Thread.fork, but limit the number of concurrently running threads.

ThreadLimiter isn’t a thread pool. Each fork really starts a new thread.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(limit = -1)) ⇒ ThreadLimiter

Initialize the ThreadLimiter. The optional parameter limit is the maximum number of concurrently running threads. Set limit to -1 or 0 to fork threads without limiting the number of concurrently running threads.



11
12
13
14
15
16
17
# File 'lib/threadlimiter/threadlimiter.rb', line 11

def initialize(limit=-1)
  @limit	= limit	# The maximum number of concurrently running threads.
  @running	= 0	# The number of currently running threads.

  @mutex	= Mutex.new
  @cv		= ConditionVariable.new
end

Class Method Details

.handle_clusters(enumeration, number_of_clusters, method_name, &block) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/threadlimiter/threadlimiter.rb', line 53

def self.handle_clusters(enumeration, number_of_clusters, method_name, &block)
  clusters	= []	# One cluster per fork.
  last_pos	= -1
  res	= []

  enumeration.each do |object|
    last_pos += 1

    (clusters[last_pos%number_of_clusters] ||= []) << object
  end

  clusters.__send__(method_name, -1) do |cluster|
    cluster.collect do |object|
      if block.arity > 1 and object.kind_of?(Enumerable)
        yield(*object.to_a)
      else
        yield(object)
      end
    end
  end.collect do |cluster|
    cluster + (cluster.length == clusters[0].length ? [] : [nil])	# Add padding nil, in order to be able to transpose
  end.transpose.each do |array|
    res.concat(array)
  end

  res[0..last_pos]	# Remove padding nil.
end

Instance Method Details

#fork(*args, &block) ⇒ Object

Fork a thread. The given block is run within the thread. It behaves like Thread.fork(). In fact, it invokes Thread.fork() and returns its result. The list of arguments is passed to Thread.fork().



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/threadlimiter/threadlimiter.rb', line 25

def fork(*args, &block)
  if @limit <= 0
    Thread.fork(*args, &block)
  else
    @mutex.synchronize do
      while @running >= @limit
        @cv.wait(@mutex)
      end

      @running	+= 1
    end

    Thread.fork do
      begin
        res	= yield(*args)
      ensure
        @mutex.synchronize do
          @running	-= 1
        end

        @cv.signal	if @limit > 0
      end

      res
    end
  end
end