Class: CloudCrowd::WorkUnit

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
ModelStatus
Defined in:
lib/cloud_crowd/models/work_unit.rb

Overview

A WorkUnit is an atomic chunk of work from a job, processing a single input through a single action. The WorkUnits are run in parallel, with each worker daemon processing one at a time. The splitting and merging stages of a job are each run as a single WorkUnit.

Defined Under Namespace

Classes: Serializer

Constant Summary collapse

MAX_RESERVATION =

We use a random number in (0…MAX_RESERVATION) to reserve work units. The size of the maximum signed integer in MySQL – SQLite has no limit.

2147483647
RESERVATION_LIMIT =

We only reserve a certain number of WorkUnits in a single go, to avoid reserving the entire table.

25

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ModelStatus

#complete?, #display_status, #failed?, included, #incomplete?, #merging?, #processing?, #splitting?, #succeeded?

Class Method Details

.cancel_all_reservationsObject

Cancels all outstanding WorkUnit reservations for all processes. (Useful in the console for debugging.)



95
96
97
# File 'lib/cloud_crowd/models/work_unit.rb', line 95

def self.cancel_all_reservations
  WorkUnit.update_all('reservation = null')
end

.cancel_reservations(reservation) ⇒ Object

Cancels all outstanding WorkUnit reservations for this process.



89
90
91
# File 'lib/cloud_crowd/models/work_unit.rb', line 89

def self.cancel_reservations(reservation)
  WorkUnit.reserved(reservation).update_all('reservation = null')
end

.distribute_to_nodesObject

Attempt to send a list of WorkUnits to nodes with available capacity. A single central server process stops the same WorkUnit from being distributed to multiple nodes by reserving it first. The algorithm used should be lock-free.

We reserve WorkUnits for this process in chunks of RESERVATION_LIMIT size, and try to match them to Nodes that are capable of handling the Action. WorkUnits get removed from the availability list when they are successfully sent, and Nodes get removed when they are busy or have the action in question disabled.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/cloud_crowd/models/work_unit.rb', line 38

def self.distribute_to_nodes
  reservation = nil
  loop do

    # Find the available nodes, and determine what actions we're capable
    # of running at the moment.
    available_nodes   = NodeRecord.available.to_a
    available_actions = NodeRecord.available_actions

    filter            = "action in (#{available_actions.map{|a| "'#{a}'"}.join(',')})"
    
    # If there aren't any available nodes or actions don't bother doing anything.
    return if available_nodes.empty? or available_actions.empty?

    # Reserve a handful of available work units.
    WorkUnit.cancel_reservations(reservation) if reservation
    return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT, :conditions => filter)
    work_units = WorkUnit.reserved(reservation).to_a

    # Round robin through the nodes and units, sending the unit if the node
    # is able to process it.
    while (unit = work_units.shift) and available_nodes.any? do
      while node = available_nodes.shift do
        if node.actions.include?(unit.action) and node.send_work_unit(unit)
          available_nodes.push(node) unless node.busy?
          break
        end
      end
      work_units.push(unit) unless unit.assigned?
    end

    # If we still have units at this point, or we're fresh out of nodes,
    # that means we're done.
    return if work_units.any? || available_nodes.empty?
  end
ensure
  WorkUnit.cancel_reservations(reservation) if reservation
end

.find_by_worker_name(name) ⇒ Object

Look up a WorkUnit by the worker that’s currently processing it. Specified by pid@host.



101
102
103
104
105
# File 'lib/cloud_crowd/models/work_unit.rb', line 101

def self.find_by_worker_name(name)
  pid, host = name.split('@')
  node = NodeRecord.find_by_host(host)
  node && node.work_units.find_by_worker_pid(pid)
end

.reserve_available(options = {}) ⇒ Object

Reserves all available WorkUnits for this process. Returns false if there were none available.



79
80
81
82
83
84
85
86
# File 'lib/cloud_crowd/models/work_unit.rb', line 79

def self.reserve_available(options={})
  reservation = SecureRandom.random_number(MAX_RESERVATION)
  conditions = "reservation is null and node_record_id is null and status in (#{INCOMPLETE.join(',')}) and #{options[:conditions]}"
  query = WorkUnit.where(conditions)
  query = query.limit(options[:limit]) if options[:limit]
  any = query.update_all("reservation = #{reservation}") > 0
  any && reservation
end

.start(job, action, input, status) ⇒ Object

Convenience method for starting a new WorkUnit.



108
109
110
111
# File 'lib/cloud_crowd/models/work_unit.rb', line 108

def self.start(job, action, input, status)
  input = input.to_json unless input.is_a? String
  self.create(:job => job, :action => action, :input => input, :status => status)
end

Instance Method Details

#active_model_serializerObject



191
# File 'lib/cloud_crowd/models/work_unit.rb', line 191

def active_model_serializer; Serializer; end

#assign_to(node_record, worker_pid) ⇒ Object

When a Node checks out a WorkUnit, establish the connection between WorkUnit and NodeRecord and record the worker_pid.



168
169
170
# File 'lib/cloud_crowd/models/work_unit.rb', line 168

def assign_to(node_record, worker_pid)
  update_attributes!(:node_record => node_record, :worker_pid => worker_pid)
end

#assigned?Boolean

Returns:

  • (Boolean)


172
173
174
# File 'lib/cloud_crowd/models/work_unit.rb', line 172

def assigned?
  !!(node_record_id && worker_pid)
end

#cancel_reservationObject

If the node can’t process the unit, cancel it’s reservation.



162
163
164
# File 'lib/cloud_crowd/models/work_unit.rb', line 162

def cancel_reservation
  update_attributes!(:reservation => nil)
end

#fail(output, time_taken) ⇒ Object

Mark this unit as having failed. May attempt a retry.



138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/cloud_crowd/models/work_unit.rb', line 138

def fail(output, time_taken)
  tries = self.attempts + 1
  return try_again if tries < CloudCrowd.config[:work_unit_retries]
  update_attributes({
    :status         => FAILED,
    :node_record    => nil,
    :worker_pid     => nil,
    :attempts       => tries,
    :output         => output,
    :time           => time_taken
  })
  job && job.check_for_completion
end

#finish(result, time_taken) ⇒ Object

Mark this unit as having finished successfully. Splitting work units are handled differently (an optimization) – they immediately fire off all of their resulting WorkUnits for processing, without waiting for the rest of their splitting cousins to complete.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/cloud_crowd/models/work_unit.rb', line 117

def finish(result, time_taken)
  if splitting?
    [parsed_output(result)].flatten.each do |new_input|
      WorkUnit.start(job, action, new_input, PROCESSING)
    end
    self.destroy
    job.set_next_status if job && job.done_splitting?
  else
    update_attributes({
      :status         => SUCCEEDED,
      :node_record    => nil,
      :worker_pid     => nil,
      :attempts       => attempts + 1,
      :output         => result,
      :time           => time_taken
    })
    job && job.check_for_completion
  end
end

#parsed_output(out = self.output) ⇒ Object

All output needs to be wrapped in a JSON object for consistency (unfortunately, JSON.parse needs the top-level to be an object or array). Convenience method to provide the parsed version.



179
180
181
# File 'lib/cloud_crowd/models/work_unit.rb', line 179

def parsed_output(out = self.output)
  JSON.parse(out)['output']
end

#to_jsonObject



192
# File 'lib/cloud_crowd/models/work_unit.rb', line 192

def to_json; Serializer.new(self).to_json; end

#try_againObject

Ever tried. Ever failed. No matter. Try again. Fail again. Fail better.



153
154
155
156
157
158
159
# File 'lib/cloud_crowd/models/work_unit.rb', line 153

def try_again
  update_attributes({
    :node_record  => nil,
    :worker_pid   => nil,
    :attempts     => self.attempts + 1
  })
end