Class: CloudCrowd::WorkUnit
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- CloudCrowd::WorkUnit
- Includes:
- ModelStatus
- Defined in:
- lib/cloud_crowd/models/work_unit.rb
Overview
A WorkUnit is an atomic chunk of work from a job, processing a single input through a single action. The WorkUnits are run in parallel, with each worker daemon processing one at a time. The splitting and merging stages of a job are each run as a single WorkUnit.
Defined Under Namespace
Classes: Serializer
Constant Summary collapse
- MAX_RESERVATION =
We use a random number in (0…MAX_RESERVATION) to reserve work units. The size of the maximum signed integer in MySQL – SQLite has no limit.
2147483647- RESERVATION_LIMIT =
We only reserve a certain number of WorkUnits in a single go, to avoid reserving the entire table.
25
Class Method Summary collapse
-
.cancel_all_reservations ⇒ Object
Cancels all outstanding WorkUnit reservations for all processes.
-
.cancel_reservations(reservation) ⇒ Object
Cancels all outstanding WorkUnit reservations for this process.
-
.distribute_to_nodes ⇒ Object
Attempt to send a list of WorkUnits to nodes with available capacity.
-
.find_by_worker_name(name) ⇒ Object
Look up a WorkUnit by the worker that’s currently processing it.
-
.reserve_available(options = {}) ⇒ Object
Reserves all available WorkUnits for this process.
-
.start(job, action, input, status) ⇒ Object
Convenience method for starting a new WorkUnit.
Instance Method Summary collapse
- #active_model_serializer ⇒ Object
-
#assign_to(node_record, worker_pid) ⇒ Object
When a Node checks out a WorkUnit, establish the connection between WorkUnit and NodeRecord and record the worker_pid.
- #assigned? ⇒ Boolean
-
#cancel_reservation ⇒ Object
If the node can’t process the unit, cancel it’s reservation.
-
#fail(output, time_taken) ⇒ Object
Mark this unit as having failed.
-
#finish(result, time_taken) ⇒ Object
Mark this unit as having finished successfully.
-
#parsed_output(out = self.output) ⇒ Object
All output needs to be wrapped in a JSON object for consistency (unfortunately, JSON.parse needs the top-level to be an object or array).
- #to_json ⇒ Object
-
#try_again ⇒ Object
Ever tried.
Methods included from ModelStatus
#complete?, #display_status, #failed?, included, #incomplete?, #merging?, #processing?, #splitting?, #succeeded?
Class Method Details
.cancel_all_reservations ⇒ Object
Cancels all outstanding WorkUnit reservations for all processes. (Useful in the console for debugging.)
91 92 93 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 91 def self.cancel_all_reservations WorkUnit.update_all('reservation = null') end |
.cancel_reservations(reservation) ⇒ Object
Cancels all outstanding WorkUnit reservations for this process.
85 86 87 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 85 def self.cancel_reservations(reservation) WorkUnit.reserved(reservation).update_all('reservation = null') end |
.distribute_to_nodes ⇒ Object
Attempt to send a list of WorkUnits to nodes with available capacity. A single central server process stops the same WorkUnit from being distributed to multiple nodes by reserving it first. The algorithm used should be lock-free.
We reserve WorkUnits for this process in chunks of RESERVATION_LIMIT size, and try to match them to Nodes that are capable of handling the Action. WorkUnits get removed from the availability list when they are successfully sent, and Nodes get removed when they are busy or have the action in question disabled.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 38 def self.distribute_to_nodes reservation = nil loop do # Find the available nodes, and determine what actions we're capable # of running at the moment. available_nodes = NodeRecord.available.to_a available_actions = available_nodes.map {|node| node.actions }.flatten.uniq filter = "action in (#{available_actions.map{|a| "'#{a}'"}.join(',')})" # Reserve a handful of available work units. WorkUnit.cancel_reservations(reservation) if reservation return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT, :conditions => filter) work_units = WorkUnit.reserved(reservation).to_a # Round robin through the nodes and units, sending the unit if the node # is able to process it. while (unit = work_units.shift) and available_nodes.any? do while node = available_nodes.shift do if node.actions.include?(unit.action) and node.send_work_unit(unit) available_nodes.push(node) unless node.busy? break end end work_units.push(unit) unless unit.assigned? end # If we still have units at this point, or we're fresh out of nodes, # that means we're done. return if work_units.any? || available_nodes.empty? end ensure WorkUnit.cancel_reservations(reservation) if reservation end |
.find_by_worker_name(name) ⇒ Object
Look up a WorkUnit by the worker that’s currently processing it. Specified by pid@host.
97 98 99 100 101 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 97 def self.find_by_worker_name(name) pid, host = name.split('@') node = NodeRecord.find_by_host(host) node && node.work_units.find_by_worker_pid(pid) end |
.reserve_available(options = {}) ⇒ Object
Reserves all available WorkUnits for this process. Returns false if there were none available.
75 76 77 78 79 80 81 82 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 75 def self.reserve_available(={}) reservation = SecureRandom.random_number(MAX_RESERVATION) conditions = "reservation is null and node_record_id is null and status in (#{INCOMPLETE.join(',')}) and #{options[:conditions]}" query = WorkUnit.where(conditions) query.limit([:limit]) if [:limit] any = query.update_all("reservation = #{reservation}") > 0 any && reservation end |
.start(job, action, input, status) ⇒ Object
Convenience method for starting a new WorkUnit.
104 105 106 107 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 104 def self.start(job, action, input, status) input = input.to_json unless input.is_a? String self.create(:job => job, :action => action, :input => input, :status => status) end |
Instance Method Details
#active_model_serializer ⇒ Object
187 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 187 def active_model_serializer; Serializer; end |
#assign_to(node_record, worker_pid) ⇒ Object
When a Node checks out a WorkUnit, establish the connection between WorkUnit and NodeRecord and record the worker_pid.
164 165 166 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 164 def assign_to(node_record, worker_pid) update_attributes!(:node_record => node_record, :worker_pid => worker_pid) end |
#assigned? ⇒ Boolean
168 169 170 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 168 def assigned? !!(node_record_id && worker_pid) end |
#cancel_reservation ⇒ Object
If the node can’t process the unit, cancel it’s reservation.
158 159 160 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 158 def cancel_reservation update_attributes!(:reservation => nil) end |
#fail(output, time_taken) ⇒ Object
Mark this unit as having failed. May attempt a retry.
134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 134 def fail(output, time_taken) tries = self.attempts + 1 return try_again if tries < CloudCrowd.config[:work_unit_retries] update_attributes({ :status => FAILED, :node_record => nil, :worker_pid => nil, :attempts => tries, :output => output, :time => time_taken }) job && job.check_for_completion end |
#finish(result, time_taken) ⇒ Object
Mark this unit as having finished successfully. Splitting work units are handled differently (an optimization) – they immediately fire off all of their resulting WorkUnits for processing, without waiting for the rest of their splitting cousins to complete.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 113 def finish(result, time_taken) if splitting? [parsed_output(result)].flatten.each do |new_input| WorkUnit.start(job, action, new_input, PROCESSING) end self.destroy job.set_next_status if job && job.done_splitting? else update_attributes({ :status => SUCCEEDED, :node_record => nil, :worker_pid => nil, :attempts => attempts + 1, :output => result, :time => time_taken }) job && job.check_for_completion end end |
#parsed_output(out = self.output) ⇒ Object
All output needs to be wrapped in a JSON object for consistency (unfortunately, JSON.parse needs the top-level to be an object or array). Convenience method to provide the parsed version.
175 176 177 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 175 def parsed_output(out = self.output) JSON.parse(out)['output'] end |
#to_json ⇒ Object
188 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 188 def to_json; Serializer.new(self).to_json; end |
#try_again ⇒ Object
Ever tried. Ever failed. No matter. Try again. Fail again. Fail better.
149 150 151 152 153 154 155 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 149 def try_again update_attributes({ :node_record => nil, :worker_pid => nil, :attempts => self.attempts + 1 }) end |