Class: Gel::WorkPool

Inherits:
Object
  • Object
show all
Defined in:
lib/gel/work_pool.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(concurrency, monitor: Monitor.new, name: nil, collect_errors: false) ⇒ WorkPool

Returns a new instance of WorkPool.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/gel/work_pool.rb', line 11

def initialize(concurrency, monitor: Monitor.new, name: nil, collect_errors: false)
  @monitor = monitor
  @name = name

  @queue_order = nil

  @concurrency = concurrency
  @workers = []
  @shutdown = false

  @work_cond = @monitor.new_cond
  @idle_cond = @monitor.new_cond

  @queue = []
  @count = 0
  @errors = collect_errors ? [] : nil

  if block_given?
    begin
      result = yield self
      join
      result
    ensure
      stop
    end
  end
end

Instance Attribute Details

#countObject (readonly)

Returns the value of attribute count.



8
9
10
# File 'lib/gel/work_pool.rb', line 8

def count
  @count
end

#errorsObject (readonly)

Returns the value of attribute errors.



9
10
11
# File 'lib/gel/work_pool.rb', line 9

def errors
  @errors
end

#queue_orderObject

Returns the value of attribute queue_order.



6
7
8
# File 'lib/gel/work_pool.rb', line 6

def queue_order
  @queue_order
end

Instance Method Details

#idle?Boolean

Returns:

  • (Boolean)


88
89
90
91
92
# File 'lib/gel/work_pool.rb', line 88

def idle?
  @monitor.synchronize do
    @queue.empty? && @workers.none? { |w| w[:active] }
  end
end

#joinObject



110
111
112
113
114
115
116
117
# File 'lib/gel/work_pool.rb', line 110

def join
  wait
  @monitor.synchronize do
    if @errors && e = @errors.first
      raise e.last
    end
  end
end

#queue(job = nil, label, &block) ⇒ Object

Raises:

  • (ArgumentError)


125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/gel/work_pool.rb', line 125

def queue(job = nil, label, &block)
  raise ArgumentError if job && block
  job ||= block
  label ||= job

  @monitor.synchronize do
    @queue << [job, label]
    @count += 1
    reorder_queue!
    @work_cond.signal
  end
end

#reorder_queue!Object



138
139
140
141
142
# File 'lib/gel/work_pool.rb', line 138

def reorder_queue!
  @monitor.synchronize do
    @queue.sort_by!(&@queue_order)
  end if @queue_order
end

#startObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/gel/work_pool.rb', line 39

def start
  @monitor.synchronize do
    while @workers.size < @concurrency
      @workers << Thread.new do
        Thread.current.name = @name if @name && Thread.current.respond_to?(:name=)
        Thread.current.abort_on_exception = true

        catch(:stop) do
          loop do
            current_job = nil
            @monitor.synchronize do
              Thread.current[:active] = nil
              @work_cond.wait_until do
                @idle_cond.broadcast
                @shutdown || @queue.first
              end
              throw :stop if @shutdown
              current_job = @queue.shift
              Thread.current[:active] = current_job[1]
              @idle_cond.broadcast
            end

            begin
              current_job[0].call
            rescue Exception => ex
              if @errors
                $stderr.puts ex if $DEBUG
                @monitor.synchronize do
                  @errors << [current_job, ex]
                end
              else
                $stderr.puts "Unhandled exception in work pool #{@name.inspect} for job #{current_job[1].inspect}:\n#{ex.inspect}\n#{ex.backtrace.map { |s| s.sub(/^/, "   ") }.join("\n")}"
              end
            end
          end
        end
      end
    end
  end
end

#statusObject



119
120
121
122
123
# File 'lib/gel/work_pool.rb', line 119

def status
  @monitor.synchronize do
    { active: @workers.map { |w| w[:active] }.compact, queued: @queue.size }
  end
end

#stopObject



80
81
82
83
84
85
86
# File 'lib/gel/work_pool.rb', line 80

def stop
  @monitor.synchronize do
    @shutdown = true
    @work_cond.broadcast
  end
  @workers.each(&:join).clear
end

#tick!Object



94
95
96
97
98
# File 'lib/gel/work_pool.rb', line 94

def tick!
  @monitor.synchronize do
    @idle_cond.broadcast
  end
end

#waitObject



100
101
102
103
104
105
106
107
108
# File 'lib/gel/work_pool.rb', line 100

def wait
  @monitor.synchronize do
    start if @workers.empty?

    @idle_cond.wait_until do
      (!block_given? || yield) && idle?
    end
  end
end