Class: CloudCrowd::WorkUnit
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- CloudCrowd::WorkUnit
- Includes:
- ModelStatus
- Defined in:
- lib/cloud_crowd/models/work_unit.rb
Overview
A WorkUnit is an atomic chunk of work from a job, processing a single input through a single action. The WorkUnits are run in parallel, with each worker daemon processing one at a time. The splitting and merging stages of a job are each run as a single WorkUnit.
Class Method Summary collapse
-
.cancel_all_reservations ⇒ Object
Cancels all outstanding WorkUnit reservations for all processes.
-
.cancel_reservations ⇒ Object
Cancels all outstanding WorkUnit reservations for this process.
-
.distribute_to_nodes ⇒ Object
Attempt to send a list of WorkUnits to nodes with available capacity.
-
.find_by_worker_name(name) ⇒ Object
Look up a WorkUnit by the worker that’s currently processing it.
-
.reserve_available ⇒ Object
Reserves all available WorkUnits for this process.
-
.start(job, action, input, status) ⇒ Object
Convenience method for starting a new WorkUnit.
Instance Method Summary collapse
-
#assign_to(node_record, worker_pid) ⇒ Object
When a Node checks out a WorkUnit, establish the connection between WorkUnit and NodeRecord and record the worker_pid.
-
#fail(output, time_taken) ⇒ Object
Mark this unit as having failed.
-
#finish(result, time_taken) ⇒ Object
Mark this unit as having finished successfully.
-
#parsed_output(out = self.output) ⇒ Object
All output needs to be wrapped in a JSON object for consistency (unfortunately, JSON.parse needs the top-level to be an object or array).
-
#to_json ⇒ Object
The JSON representation of a WorkUnit shares the Job’s options with all its cousin WorkUnits.
-
#try_again ⇒ Object
Ever tried.
Methods included from ModelStatus
#complete?, #display_status, #failed?, included, #incomplete?, #merging?, #processing?, #splitting?, #succeeded?
Class Method Details
.cancel_all_reservations ⇒ Object
Cancels all outstanding WorkUnit reservations for all processes. (Useful in the console for debugging.)
59 60 61 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 59 def self.cancel_all_reservations WorkUnit.update_all('reservation = null') end |
.cancel_reservations ⇒ Object
Cancels all outstanding WorkUnit reservations for this process.
53 54 55 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 53 def self.cancel_reservations WorkUnit.reserved.update_all('reservation = null') end |
.distribute_to_nodes ⇒ Object
Attempt to send a list of WorkUnits to nodes with available capacity. A single central server process stops the same WorkUnit from being distributed to multiple nodes by reserving it first. The algorithm used should be lock-free.
We loop over the WorkUnits reserved by this process and try to match them to Nodes that are capable of handling the Action. WorkUnits get removed from the availability list when they are successfully sent, and Nodes get removed when they are busy or have the action in question disabled.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 29 def self.distribute_to_nodes return unless WorkUnit.reserve_available work_units = WorkUnit.reserved available_nodes = NodeRecord.available while node = available_nodes.shift and unit = work_units.shift do if node.actions.include? unit.action if node.send_work_unit(unit) available_nodes.push(node) unless node.busy? next end end work_units.push(unit) end ensure WorkUnit.cancel_reservations end |
.find_by_worker_name(name) ⇒ Object
Look up a WorkUnit by the worker that’s currently processing it. Specified by pid@host.
65 66 67 68 69 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 65 def self.find_by_worker_name(name) pid, host = name.split('@') node = NodeRecord.find_by_host(host) node && node.work_units.find_by_worker_pid(pid) end |
.reserve_available ⇒ Object
Reserves all available WorkUnits for this process. Returns false if there were none available.
48 49 50 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 48 def self.reserve_available WorkUnit.available.update_all("reservation = #{$$}") > 0 end |
.start(job, action, input, status) ⇒ Object
Convenience method for starting a new WorkUnit.
72 73 74 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 72 def self.start(job, action, input, status) self.create(:job => job, :action => action, :input => input, :status => status) end |
Instance Method Details
#assign_to(node_record, worker_pid) ⇒ Object
When a Node checks out a WorkUnit, establish the connection between WorkUnit and NodeRecord and record the worker_pid.
126 127 128 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 126 def assign_to(node_record, worker_pid) update_attributes!(:node_record => node_record, :worker_pid => worker_pid) end |
#fail(output, time_taken) ⇒ Object
Mark this unit as having failed. May attempt a retry.
101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 101 def fail(output, time_taken) tries = self.attempts + 1 return try_again if tries < CloudCrowd.config[:work_unit_retries] update_attributes({ :status => FAILED, :node_record => nil, :worker_pid => nil, :attempts => tries, :output => output, :time => time_taken }) job && job.check_for_completion end |
#finish(result, time_taken) ⇒ Object
Mark this unit as having finished successfully. Splitting work units are handled differently (an optimization) – they immediately fire off all of their resulting WorkUnits for processing, without waiting for the rest of their splitting cousins to complete.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 80 def finish(result, time_taken) if splitting? [JSON.parse(parsed_output(result))].flatten.each do |new_input| WorkUnit.start(job, action, new_input, PROCESSING) end self.destroy job.set_next_status if job && job.done_splitting? else update_attributes({ :status => SUCCEEDED, :node_record => nil, :worker_pid => nil, :attempts => attempts + 1, :output => result, :time => time_taken }) job && job.check_for_completion end end |
#parsed_output(out = self.output) ⇒ Object
All output needs to be wrapped in a JSON object for consistency (unfortunately, JSON.parse needs the top-level to be an object or array). Convenience method to provide the parsed version.
133 134 135 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 133 def parsed_output(out = self.output) JSON.parse(out)['output'] end |
#to_json ⇒ Object
The JSON representation of a WorkUnit shares the Job’s options with all its cousin WorkUnits.
139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 139 def to_json { 'id' => self.id, 'job_id' => self.job_id, 'input' => self.input, 'attempts' => self.attempts, 'action' => self.action, 'options' => JSON.parse(self.job.), 'status' => self.status }.to_json end |
#try_again ⇒ Object
Ever tried. Ever failed. No matter. Try again. Fail again. Fail better.
116 117 118 119 120 121 122 |
# File 'lib/cloud_crowd/models/work_unit.rb', line 116 def try_again update_attributes({ :node_record => nil, :worker_pid => nil, :attempts => self.attempts + 1 }) end |