Class: CloudCrowd::WorkUnit

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
ModelStatus
Defined in:
lib/cloud_crowd/models/work_unit.rb

Overview

A WorkUnit is an atomic chunk of work from a job, processing a single input through a single action. The WorkUnits are run in parallel, with each worker daemon processing one at a time. The splitting and merging stages of a job are each run as a single WorkUnit.

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ModelStatus

#complete?, #display_status, #failed?, included, #incomplete?, #merging?, #processing?, #splitting?, #succeeded?

Class Method Details

.cancel_all_reservationsObject

Cancels all outstanding WorkUnit reservations for all processes. (Useful in the console for debugging.)



59
60
61
# File 'lib/cloud_crowd/models/work_unit.rb', line 59

def self.cancel_all_reservations
  WorkUnit.update_all('reservation = null')
end

.cancel_reservationsObject

Cancels all outstanding WorkUnit reservations for this process.



53
54
55
# File 'lib/cloud_crowd/models/work_unit.rb', line 53

def self.cancel_reservations
  WorkUnit.reserved.update_all('reservation = null')
end

.distribute_to_nodesObject

Attempt to send a list of WorkUnits to nodes with available capacity. A single central server process stops the same WorkUnit from being distributed to multiple nodes by reserving it first. The algorithm used should be lock-free.

We loop over the WorkUnits reserved by this process and try to match them to Nodes that are capable of handling the Action. WorkUnits get removed from the availability list when they are successfully sent, and Nodes get removed when they are busy or have the action in question disabled.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/cloud_crowd/models/work_unit.rb', line 29

def self.distribute_to_nodes
  return unless WorkUnit.reserve_available
  work_units = WorkUnit.reserved
  available_nodes = NodeRecord.available
  while node = available_nodes.shift and unit = work_units.shift do
    if node.actions.include? unit.action
      if node.send_work_unit(unit)
        available_nodes.push(node) unless node.busy?
        next
      end
    end
    work_units.push(unit)
  end
ensure
  WorkUnit.cancel_reservations
end

.find_by_worker_name(name) ⇒ Object

Look up a WorkUnit by the worker that’s currently processing it. Specified by pid@host.



65
66
67
68
69
# File 'lib/cloud_crowd/models/work_unit.rb', line 65

def self.find_by_worker_name(name)
  pid, host = name.split('@')
  node = NodeRecord.find_by_host(host)
  node && node.work_units.find_by_worker_pid(pid)
end

.reserve_availableObject

Reserves all available WorkUnits for this process. Returns false if there were none available.



48
49
50
# File 'lib/cloud_crowd/models/work_unit.rb', line 48

def self.reserve_available
  WorkUnit.available.update_all("reservation = #{$$}") > 0
end

.start(job, action, input, status) ⇒ Object

Convenience method for starting a new WorkUnit.



72
73
74
# File 'lib/cloud_crowd/models/work_unit.rb', line 72

def self.start(job, action, input, status)
  self.create(:job => job, :action => action, :input => input, :status => status)
end

Instance Method Details

#assign_to(node_record, worker_pid) ⇒ Object

When a Node checks out a WorkUnit, establish the connection between WorkUnit and NodeRecord and record the worker_pid.



126
127
128
# File 'lib/cloud_crowd/models/work_unit.rb', line 126

def assign_to(node_record, worker_pid)
  update_attributes!(:node_record => node_record, :worker_pid => worker_pid)
end

#fail(output, time_taken) ⇒ Object

Mark this unit as having failed. May attempt a retry.



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/cloud_crowd/models/work_unit.rb', line 101

def fail(output, time_taken)
  tries = self.attempts + 1
  return try_again if tries < CloudCrowd.config[:work_unit_retries]
  update_attributes({
    :status         => FAILED,
    :node_record    => nil,
    :worker_pid     => nil,
    :attempts       => tries,
    :output         => output,
    :time           => time_taken
  })
  job && job.check_for_completion
end

#finish(result, time_taken) ⇒ Object

Mark this unit as having finished successfully. Splitting work units are handled differently (an optimization) – they immediately fire off all of their resulting WorkUnits for processing, without waiting for the rest of their splitting cousins to complete.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/cloud_crowd/models/work_unit.rb', line 80

def finish(result, time_taken)
  if splitting?
    [JSON.parse(parsed_output(result))].flatten.each do |new_input|
      WorkUnit.start(job, action, new_input, PROCESSING)
    end
    self.destroy
    job.set_next_status if job && job.done_splitting?
  else
    update_attributes({
      :status         => SUCCEEDED,
      :node_record    => nil,
      :worker_pid     => nil,
      :attempts       => attempts + 1,
      :output         => result,
      :time           => time_taken
    })
    job && job.check_for_completion
  end
end

#parsed_output(out = self.output) ⇒ Object

All output needs to be wrapped in a JSON object for consistency (unfortunately, JSON.parse needs the top-level to be an object or array). Convenience method to provide the parsed version.



133
134
135
# File 'lib/cloud_crowd/models/work_unit.rb', line 133

def parsed_output(out = self.output)
  JSON.parse(out)['output']
end

#to_jsonObject

The JSON representation of a WorkUnit shares the Job’s options with all its cousin WorkUnits.



139
140
141
142
143
144
145
146
147
148
149
# File 'lib/cloud_crowd/models/work_unit.rb', line 139

def to_json
  {
    'id'        => self.id,
    'job_id'    => self.job_id,
    'input'     => self.input,
    'attempts'  => self.attempts,
    'action'    => self.action,
    'options'   => JSON.parse(self.job.options),
    'status'    => self.status
  }.to_json
end

#try_againObject

Ever tried. Ever failed. No matter. Try again. Fail again. Fail better.



116
117
118
119
120
121
122
# File 'lib/cloud_crowd/models/work_unit.rb', line 116

def try_again
  update_attributes({
    :node_record  => nil,
    :worker_pid   => nil,
    :attempts     => self.attempts + 1
  })
end