Class: Hydra::Worker

Inherits:
Object
  • Object
show all
Includes:
Messages::Worker
Defined in:
lib/hydra/worker.rb

Overview

Hydra class responsible to dispatching runners and communicating with the master.

The Worker is never run directly by a user. Workers are created by a Master to delegate to Runners.

The general convention is to have one Worker per machine on a distributed network.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Worker

Create a new worker.

  • io: The IO object to use to communicate with the master

  • num_runners: The number of runners to launch



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/hydra/worker.rb', line 17

def initialize(opts = {})
  @verbose = opts.fetch(:verbose) { false }
  @io = opts.fetch(:io) { raise "No IO Object" }
  @runners = []
  @listeners = []
  @options = opts.fetch(:options) { "" }

  load_worker_initializer

  @runner_event_listeners = Array(opts.fetch(:runner_listeners) { nil })
  @runner_event_listeners.select{|l| l.is_a? String}.each do |l|
    @runner_event_listeners.delete_at(@runner_event_listeners.index(l))
    listener = eval(l)
    @runner_event_listeners << listener if listener.is_a?(Hydra::RunnerListener::Abstract)
  end
  @runner_log_file = opts.fetch(:runner_log_file) { nil }

  boot_runners(opts.fetch(:runners) { 1 })
  @io.write(Hydra::Messages::Worker::WorkerBegin.new)

  process_messages

  @runners.each{|r| Process.wait r[:pid] }
end

Instance Attribute Details

#runnersObject (readonly)

Returns the value of attribute runners.



13
14
15
# File 'lib/hydra/worker.rb', line 13

def runners
  @runners
end

Instance Method Details

#delegate_file(message) ⇒ Object

When the master sends a file down to the worker, it hits this method. Then the worker delegates the file down to a runner.



62
63
64
65
66
# File 'lib/hydra/worker.rb', line 62

def delegate_file(message)
  runner = idle_runner
  runner[:idle] = false
  runner[:io].write(RunFile.new(eval(message.serialize)))
end

#load_worker_initializerObject



42
43
44
45
46
47
48
49
# File 'lib/hydra/worker.rb', line 42

def load_worker_initializer
  if File.exist?('./hydra_worker_init.rb')
    trace('Requiring hydra_worker_init.rb')
    require 'hydra_worker_init'
  else
    trace('hydra_worker_init.rb not present')
  end
end

#relay_results(message, runner) ⇒ Object

When a runner finishes, it sends the results up to the worker. Then the worker sends the results up to the master.



70
71
72
73
# File 'lib/hydra/worker.rb', line 70

def relay_results(message, runner)
  runner[:idle] = true
  @io.write(Results.new(eval(message.serialize)))
end

#request_file(message, runner) ⇒ Object

When a runner wants a file, it hits this method with a message. Then the worker bubbles the file request up to the master.



55
56
57
58
# File 'lib/hydra/worker.rb', line 55

def request_file(message, runner)
  @io.write(RequestFile.new)
  runner[:idle] = true
end

#shutdownObject

When a master issues a shutdown order, it hits this method, which causes the worker to send shutdown messages to its runners.



77
78
79
80
81
82
83
84
85
86
# File 'lib/hydra/worker.rb', line 77

def shutdown
  @running = false
  trace "Notifying #{@runners.size} Runners of Shutdown"
  @runners.each do |r|
    trace "Sending Shutdown to Runner"
    trace "\t#{r.inspect}"
    r[:io].write(Shutdown.new)
  end
  Thread.exit
end