Class: DatWorkerPool::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/dat-worker-pool/runner.rb

Defined Under Namespace

Modules: OptionalTimeout Classes: LoggerProxy, NullLoggerProxy

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ Runner

Returns a new instance of Runner.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/dat-worker-pool/runner.rb', line 13

def initialize(args)
  @num_workers   = args[:num_workers]
  @queue         = args[:queue]
  @worker_class  = args[:worker_class]
  @worker_params = args[:worker_params]

  @logger_proxy = if args[:logger]
    LoggerProxy.new(args[:logger])
  else
    NullLoggerProxy.new
  end

  @workers           = LockedArray.new
  @available_workers = LockedSet.new
end

Instance Attribute Details

#logger_proxyObject (readonly)

Returns the value of attribute logger_proxy.



11
12
13
# File 'lib/dat-worker-pool/runner.rb', line 11

def logger_proxy
  @logger_proxy
end

#num_workersObject (readonly)

Returns the value of attribute num_workers.



10
11
12
# File 'lib/dat-worker-pool/runner.rb', line 10

def num_workers
  @num_workers
end

#queueObject (readonly)

Returns the value of attribute queue.



11
12
13
# File 'lib/dat-worker-pool/runner.rb', line 11

def queue
  @queue
end

#worker_classObject (readonly)

Returns the value of attribute worker_class.



10
11
12
# File 'lib/dat-worker-pool/runner.rb', line 10

def worker_class
  @worker_class
end

#worker_paramsObject (readonly)

Returns the value of attribute worker_params.



10
11
12
# File 'lib/dat-worker-pool/runner.rb', line 10

def worker_params
  @worker_params
end

Instance Method Details

#available_worker_countObject



70
71
72
# File 'lib/dat-worker-pool/runner.rb', line 70

def available_worker_count
  @available_workers.size
end

#log(&message_block) ⇒ Object



86
87
88
# File 'lib/dat-worker-pool/runner.rb', line 86

def log(&message_block)
  @logger_proxy.runner_log(&message_block)
end

#make_worker_available(worker) ⇒ Object



78
79
80
# File 'lib/dat-worker-pool/runner.rb', line 78

def make_worker_available(worker)
  @available_workers.add(worker.object_id)
end

#make_worker_unavailable(worker) ⇒ Object



82
83
84
# File 'lib/dat-worker-pool/runner.rb', line 82

def make_worker_unavailable(worker)
  @available_workers.remove(worker.object_id)
end

#shutdown(timeout = nil, backtrace = nil) ⇒ Object

the workers should be told to shutdown before the queue because the queue shutdown will wake them up; a worker popping on a shutdown queue will always get nil back and will loop as fast as allowed until its shutdown flag is flipped, so shutting down the workers then the queue keeps them from looping as fast as possible; if any kind of standard error or the expected timeout error (assuming the workers take too long to shutdown) is raised, force a shutdown; this ensures we shutdown as best as possible instead of letting ruby kill the threads when the process exits; non-timeout errors will be re-raised so they can be caught and handled (or shown when the process exits)



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/dat-worker-pool/runner.rb', line 49

def shutdown(timeout = nil, backtrace = nil)
  log do
    timeout_message = timeout ? "#{timeout} second(s)" : "none"
    "Shutting down worker pool (timeout: #{timeout_message})"
  end
  begin
    @workers.with_lock{ |m, ws| ws.each(&:dwp_signal_shutdown) }
    @queue.dwp_signal_shutdown
    OptionalTimeout.new(timeout) do
      @queue.dwp_shutdown
      wait_for_workers_to_shutdown
    end
  rescue StandardError => exception
    force_workers_to_shutdown(exception, timeout, backtrace)
    raise exception
  rescue TimeoutInterruptError => exception
    force_workers_to_shutdown(exception, timeout, backtrace)
  end
  log{ "Finished shutting down" }
end

#startObject



33
34
35
36
37
# File 'lib/dat-worker-pool/runner.rb', line 33

def start
  log{ "Starting worker pool with #{@num_workers} worker(s)" }
  @queue.dwp_start
  @num_workers.times.each{ |n| build_worker(n + 1) }
end

#worker_available?Boolean

Returns:

  • (Boolean)


74
75
76
# File 'lib/dat-worker-pool/runner.rb', line 74

def worker_available?
  self.available_worker_count > 0
end

#worker_log(worker, &message_block) ⇒ Object



90
91
92
# File 'lib/dat-worker-pool/runner.rb', line 90

def worker_log(worker, &message_block)
  @logger_proxy.worker_log(worker, &message_block)
end

#workersObject



29
30
31
# File 'lib/dat-worker-pool/runner.rb', line 29

def workers
  @workers.values
end