Class: Gel::WorkPool
- Inherits:
-
Object
- Object
- Gel::WorkPool
- Defined in:
- lib/gel/work_pool.rb
Instance Attribute Summary collapse
-
#count ⇒ Object
readonly
Returns the value of attribute count.
-
#errors ⇒ Object
readonly
Returns the value of attribute errors.
-
#queue_order ⇒ Object
Returns the value of attribute queue_order.
Instance Method Summary collapse
- #idle? ⇒ Boolean
-
#initialize(concurrency, monitor: Monitor.new, name: nil, collect_errors: false) ⇒ WorkPool
constructor
A new instance of WorkPool.
- #join ⇒ Object
- #queue(job = nil, label, &block) ⇒ Object
- #reorder_queue! ⇒ Object
- #start ⇒ Object
- #status ⇒ Object
- #stop ⇒ Object
- #tick! ⇒ Object
- #wait ⇒ Object
Constructor Details
#initialize(concurrency, monitor: Monitor.new, name: nil, collect_errors: false) ⇒ WorkPool
Returns a new instance of WorkPool.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/gel/work_pool.rb', line 11 def initialize(concurrency, monitor: Monitor.new, name: nil, collect_errors: false) @monitor = monitor @name = name @queue_order = nil @concurrency = concurrency @workers = [] @shutdown = false @work_cond = @monitor.new_cond @idle_cond = @monitor.new_cond @queue = [] @count = 0 @errors = collect_errors ? [] : nil if block_given? begin result = yield self join result ensure stop end end end |
Instance Attribute Details
#count ⇒ Object (readonly)
Returns the value of attribute count.
8 9 10 |
# File 'lib/gel/work_pool.rb', line 8 def count @count end |
#errors ⇒ Object (readonly)
Returns the value of attribute errors.
9 10 11 |
# File 'lib/gel/work_pool.rb', line 9 def errors @errors end |
#queue_order ⇒ Object
Returns the value of attribute queue_order.
6 7 8 |
# File 'lib/gel/work_pool.rb', line 6 def queue_order @queue_order end |
Instance Method Details
#idle? ⇒ Boolean
88 89 90 91 92 |
# File 'lib/gel/work_pool.rb', line 88 def idle? @monitor.synchronize do @queue.empty? && @workers.none? { |w| w[:active] } end end |
#join ⇒ Object
110 111 112 113 114 115 116 117 |
# File 'lib/gel/work_pool.rb', line 110 def join wait @monitor.synchronize do if @errors && e = @errors.first raise e.last end end end |
#queue(job = nil, label, &block) ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/gel/work_pool.rb', line 125 def queue(job = nil, label, &block) raise ArgumentError if job && block job ||= block label ||= job @monitor.synchronize do @queue << [job, label] @count += 1 reorder_queue! @work_cond.signal end end |
#reorder_queue! ⇒ Object
138 139 140 141 142 |
# File 'lib/gel/work_pool.rb', line 138 def reorder_queue! @monitor.synchronize do @queue.sort_by!(&@queue_order) end if @queue_order end |
#start ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/gel/work_pool.rb', line 39 def start @monitor.synchronize do while @workers.size < @concurrency @workers << Thread.new do Thread.current.name = @name if @name && Thread.current.respond_to?(:name=) Thread.current.abort_on_exception = true catch(:stop) do loop do current_job = nil @monitor.synchronize do Thread.current[:active] = nil @work_cond.wait_until do @idle_cond.broadcast @shutdown || @queue.first end throw :stop if @shutdown current_job = @queue.shift Thread.current[:active] = current_job[1] @idle_cond.broadcast end begin current_job[0].call rescue Exception => ex if @errors $stderr.puts ex if $DEBUG @monitor.synchronize do @errors << [current_job, ex] end else $stderr.puts "Unhandled exception in work pool #{@name.inspect} for job #{current_job[1].inspect}:\n#{ex.inspect}\n#{ex.backtrace.map { |s| s.sub(/^/, " ") }.join("\n")}" end end end end end end end end |
#status ⇒ Object
119 120 121 122 123 |
# File 'lib/gel/work_pool.rb', line 119 def status @monitor.synchronize do { active: @workers.map { |w| w[:active] }.compact, queued: @queue.size } end end |
#stop ⇒ Object
80 81 82 83 84 85 86 |
# File 'lib/gel/work_pool.rb', line 80 def stop @monitor.synchronize do @shutdown = true @work_cond.broadcast end @workers.each(&:join).clear end |
#tick! ⇒ Object
94 95 96 97 98 |
# File 'lib/gel/work_pool.rb', line 94 def tick! @monitor.synchronize do @idle_cond.broadcast end end |
#wait ⇒ Object
100 101 102 103 104 105 106 107 108 |
# File 'lib/gel/work_pool.rb', line 100 def wait @monitor.synchronize do start if @workers.empty? @idle_cond.wait_until do (!block_given? || yield) && idle? end end end |