Class: CloudCrowd::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/cloud_crowd/worker.rb

Overview

The Worker, forked off from the Node when a new WorkUnit is received, launches an Action for processing. Workers will only ever receive WorkUnits that they are able to handle (for which they have a corresponding action in their actions directory). If communication with the central server is interrupted, the Worker will repeatedly attempt to complete its unit – every Worker::RETRY_WAIT seconds. Any exceptions that take place during the course of the Action will cause the Worker to mark the WorkUnit as having failed. When finished, the Worker’s process exits, minimizing the potential for memory leaks.

Constant Summary collapse

RETRY_WAIT =

Wait five seconds to retry, after internal communcication errors.

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(node, unit) ⇒ Worker

A new Worker customizes itself to its WorkUnit at instantiation.



22
23
24
25
26
27
28
29
30
# File 'lib/cloud_crowd/worker.rb', line 22

def initialize(node, unit)
  @start_time = Time.now
  @pid        = $$
  @node       = node
  @unit       = unit
  @status     = @unit['status']
  @retry_wait = RETRY_WAIT
  $0 = "#{unit['action']} (#{unit['id']}) [cloud-crowd-worker]"
end

Instance Attribute Details

#nodeObject (readonly)

Returns the value of attribute node.



19
20
21
# File 'lib/cloud_crowd/worker.rb', line 19

def node
  @node
end

#pidObject (readonly)

Returns the value of attribute pid.



19
20
21
# File 'lib/cloud_crowd/worker.rb', line 19

def pid
  @pid
end

#statusObject (readonly)

Returns the value of attribute status.



19
20
21
# File 'lib/cloud_crowd/worker.rb', line 19

def status
  @status
end

#unitObject (readonly)

Returns the value of attribute unit.



19
20
21
# File 'lib/cloud_crowd/worker.rb', line 19

def unit
  @unit
end

Instance Method Details

#base_paramsObject (private)

Common parameters to send back to central upon unit completion, regardless of success or failure.



131
132
133
134
135
# File 'lib/cloud_crowd/worker.rb', line 131

def base_params
  { :pid  => @pid,
    :id   => @unit['id'],
    :time => time_taken }
end

#complete_work_unit(result) ⇒ Object

Return output to the central server, marking the WorkUnit done.



33
34
35
36
37
38
39
# File 'lib/cloud_crowd/worker.rb', line 33

def complete_work_unit(result)
  keep_trying_to "complete work unit" do
    data = base_params.merge({:status => 'succeeded', :output => result})
    @node.central["/work/#{data[:id]}"].put(data)
    log "finished #{display_work_unit} in #{data[:time]} seconds"
  end
end

#display_work_unitObject

Loggable details describing what the Worker is up to.



68
69
70
# File 'lib/cloud_crowd/worker.rb', line 68

def display_work_unit
  "unit ##{@unit['id']} (#{@unit['action']}/#{CloudCrowd.display_status(@status)})"
end

#enhanced_unit_optionsObject

There are some potentially important attributes of the WorkUnit that we’d like to pass into the Action – in case it needs to know them. They will always be made available in the options hash.



113
114
115
116
117
118
119
# File 'lib/cloud_crowd/worker.rb', line 113

def enhanced_unit_options
  @unit['options'].merge({
    'job_id'        => @unit['job_id'],
    'work_unit_id'  => @unit['id'],
    'attempts'      => @unit['attempts']
  })
end

#fail_work_unit(exception) ⇒ Object

Mark the WorkUnit failed, returning the exception to central.



42
43
44
45
46
47
48
# File 'lib/cloud_crowd/worker.rb', line 42

def fail_work_unit(exception)
  keep_trying_to "mark work unit as failed" do
    data = base_params.merge({:status => 'failed', :output => {'output' => exception.message}.to_json})
    @node.central["/work/#{data[:id]}"].put(data)
    log "failed #{display_work_unit} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}"
  end
end

#keep_trying_to(title) ⇒ Object

We expect and require internal communication between the central server and the workers to succeed. If it fails for any reason, log it, and then keep trying the same request.



53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/cloud_crowd/worker.rb', line 53

def keep_trying_to(title)
  begin
    yield
  rescue RestClient::ResourceNotFound => e
    log "work unit ##{@unit['id']} doesn't exist. discarding..."
  rescue Exception => e
    log "failed to #{title} -- retry in #{@retry_wait} seconds"
    log e.message
    log e.backtrace
    sleep @retry_wait
    retry
  end
end

#log(message) ⇒ Object (private)

Log a message to the daemon log. Includes PID for identification.



138
139
140
# File 'lib/cloud_crowd/worker.rb', line 138

def log(message)
  puts "Worker ##{@pid}: #{message}" unless ENV['RACK_ENV'] == 'test'
end

#runObject

Run this worker inside of a fork. Attempts to exit cleanly. Wraps run_work_unit to benchmark the execution time, if requested.



99
100
101
102
103
104
105
106
107
108
# File 'lib/cloud_crowd/worker.rb', line 99

def run
  trap_signals
  log "starting #{display_work_unit}"
  if @unit['options']['benchmark']
    log("ran #{display_work_unit} in " + Benchmark.measure { run_work_unit }.to_s)
  else
    run_work_unit
  end
  Process.exit!
end

#run_work_unitObject

Executes the WorkUnit by running the Action, catching all exceptions as failures. We capture the thread so that we can kill it from the outside, when exiting.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/cloud_crowd/worker.rb', line 75

def run_work_unit
  begin
    result = nil
    action_class = CloudCrowd.actions[@unit['action']]
    action = action_class.new(@status, @unit['input'], enhanced_unit_options, @node.asset_store)
    Dir.chdir(action.work_directory) do
      result = case @status
      when PROCESSING then action.process
      when SPLITTING  then action.split
      when MERGING    then action.merge
      else raise Error::StatusUnspecified, "work units must specify their status"
      end
    end
    action.cleanup_work_directory if action
    complete_work_unit({'output' => result}.to_json)
  rescue Exception => e
    action.cleanup_work_directory if action
    fail_work_unit(e)
  end
  @node.resolve_work(@unit['id'])
end

#time_takenObject

How long has this worker been running for?



122
123
124
# File 'lib/cloud_crowd/worker.rb', line 122

def time_taken
  Time.now - @start_time
end

#trap_signalsObject (private)

When signaled to exit, make sure that the Worker shuts down without firing the Node’s at_exit callbacks.



144
145
146
147
148
# File 'lib/cloud_crowd/worker.rb', line 144

def trap_signals
  Signal.trap('QUIT') { Process.exit! }
  Signal.trap('INT')  { Process.exit! }
  Signal.trap('TERM') { Process.exit! }
end