Class: CloudCrowd::Job
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- CloudCrowd::Job
- Includes:
- ModelStatus
- Defined in:
- lib/cloud_crowd/models/job.rb
Overview
A chunk of work that will be farmed out into many WorkUnits to be processed in parallel by each active CloudCrowd::Worker. Jobs are defined by a list of inputs (usually public urls to files), an action (the name of a script that CloudCrowd knows how to run), and, eventually a corresponding list of output.
Constant Summary collapse
- CLEANUP_GRACE_PERIOD =
That’s a week.
7
Class Method Summary collapse
-
.cleanup_all(opts = {}) ⇒ Object
Clean up all jobs beyond a certain age.
-
.create_from_request(h) ⇒ Object
Create a Job from an incoming JSON request, and add it to the queue.
Instance Method Summary collapse
-
#action_class ⇒ Object
Retrieve the class for this Job’s Action.
-
#all_work_units_complete? ⇒ Boolean
Have all of the WorkUnits finished?.
-
#any_work_units_failed? ⇒ Boolean
Have any of the WorkUnits failed?.
-
#as_json(opts = {}) ⇒ Object
A JSON representation of this job includes the statuses of its component WorkUnits, as well as any completed outputs.
-
#check_for_completion ⇒ Object
After work units are marked successful, we check to see if all of them have finished, if so, continue on to the next phase of the job.
-
#cleanup_assets ⇒ Object
Cleaning up after a job will remove all of its files from S3 or the filesystem.
-
#color ⇒ Object
Generate a stable 8-bit Hex color code, based on the Job’s id.
-
#done_splitting? ⇒ Boolean
This job is done splitting if it’s finished with its splitting work units.
-
#fire_callback ⇒ Object
If a
callback_url
is defined, post the Job’s JSON to it upon completion. -
#gather_outputs_from_work_units ⇒ Object
private
When the WorkUnits are all finished, gather all their outputs together before removing them from the database entirely.
-
#mergeable? ⇒ Boolean
This job is mergeable if its Action has a
merge
method. -
#percent_complete ⇒ Object
How complete is this Job? Unfortunately, with the current processing sequence, the percent_complete can pull a fast one and go backwards.
-
#queue_for_workers(input = nil) ⇒ Object
private
When starting a new job, or moving to a new stage, split up the inputs into WorkUnits, and queue them.
-
#restart ⇒ Object
This occurs often enough that it’s time just to add a restart.
-
#set_next_status ⇒ Object
Transition this Job’s current status to the appropriate next one, based on the state of the WorkUnits and the nature of the Action.
-
#splittable? ⇒ Boolean
This job is splittable if its Action has a
split
method. -
#time_taken ⇒ Object
How long has this Job taken?.
Methods included from ModelStatus
#complete?, #display_status, #failed?, included, #incomplete?, #merging?, #processing?, #splitting?, #succeeded?
Class Method Details
.cleanup_all(opts = {}) ⇒ Object
Clean up all jobs beyond a certain age.
42 43 44 45 46 47 |
# File 'lib/cloud_crowd/models/job.rb', line 42
def self.cleanup_all(opts = {})
days = opts[:days] || CLEANUP_GRACE_PERIOD
self.complete.older_than(days).find_in_batches(:batch_size => 100) do |jobs|
jobs.each {|job| job.destroy }
end
end
|
.create_from_request(h) ⇒ Object
Create a Job from an incoming JSON request, and add it to the queue.
31 32 33 34 35 36 37 38 39 |
# File 'lib/cloud_crowd/models/job.rb', line 31
def self.create_from_request(h)
self.create(
:inputs => h['inputs'].to_json,
:action => h['action'],
:options => (h['options'] || {}).to_json,
:email => h['email'],
:callback_url => h['callback_url']
)
end
|
Instance Method Details
#action_class ⇒ Object
Retrieve the class for this Job’s Action.
132 133 134 135 136 |
# File 'lib/cloud_crowd/models/job.rb', line 132
def action_class
@action_class ||= CloudCrowd.actions[self.action]
return @action_class if @action_class
raise Error::ActionNotFound, "no action named: '#{self.action}' could be found"
end
|
#all_work_units_complete? ⇒ Boolean
Have all of the WorkUnits finished?
107 108 109 |
# File 'lib/cloud_crowd/models/job.rb', line 107
def all_work_units_complete?
self.work_units.incomplete.count <= 0
end
|
#any_work_units_failed? ⇒ Boolean
Have any of the WorkUnits failed?
112 113 114 |
# File 'lib/cloud_crowd/models/job.rb', line 112
def any_work_units_failed?
self.work_units.failed.count > 0
end
|
#as_json(opts = {}) ⇒ Object
A JSON representation of this job includes the statuses of its component WorkUnits, as well as any completed outputs.
164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/cloud_crowd/models/job.rb', line 164
def as_json(opts={})
atts = {
'id' => id,
'color' => color,
'status' => display_status,
'percent_complete' => percent_complete,
'work_units' => work_units.count,
'time_taken' => time_taken
}
atts['outputs'] = JSON.parse(outputs) if outputs
atts['email'] = email if email
atts
end
|
#check_for_completion ⇒ Object
After work units are marked successful, we check to see if all of them have finished, if so, continue on to the next phase of the job.
51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/cloud_crowd/models/job.rb', line 51
def check_for_completion
return unless all_work_units_complete?
set_next_status
outs = gather_outputs_from_work_units
return queue_for_workers([outs]) if merging?
if complete?
update_attributes(:outputs => outs, :time => time_taken)
CloudCrowd.log "Job ##{id} (#{action}) #{display_status}." unless ENV['RACK_ENV'] == 'test'
CloudCrowd.defer { fire_callback } if callback_url
end
self
end
|
#cleanup_assets ⇒ Object
Cleaning up after a job will remove all of its files from S3 or the filesystem. Destroying a Job will cleanup_assets first. Run this in a separate thread to get out of the transaction’s way. TODO: Convert this into a ‘cleanup’ work unit that gets run by a worker.
102 103 104 |
# File 'lib/cloud_crowd/models/job.rb', line 102
def cleanup_assets
# AssetStore.new.cleanup(self)
end
|
#color ⇒ Object
Generate a stable 8-bit Hex color code, based on the Job’s id.
158 159 160 |
# File 'lib/cloud_crowd/models/job.rb', line 158
def color
@color ||= Digest::MD5.hexdigest(self.id.to_s)[-7...-1]
end
|
#done_splitting? ⇒ Boolean
This job is done splitting if it’s finished with its splitting work units.
122 123 124 |
# File 'lib/cloud_crowd/models/job.rb', line 122
def done_splitting?
splittable? && work_units.splitting.count <= 0
end
|
#fire_callback ⇒ Object
If a callback_url
is defined, post the Job’s JSON to it upon completion. The callback_url
may include HTTP basic authentication, if you like:
http://user:[email protected]/job_complete
If the callback URL returns a ‘201 Created’ HTTP status code, CloudCrowd will assume that the resource has been successfully created, and the Job will be cleaned up.
89 90 91 92 93 94 95 96 |
# File 'lib/cloud_crowd/models/job.rb', line 89
def fire_callback
begin
response = RestClient.post(callback_url, {:job => self.to_json})
CloudCrowd.defer { self.destroy } if response && response.code == 201
rescue RestClient::Exception => e
CloudCrowd.log "Job ##{id} (#{action}) failed to fire callback: #{callback_url}\n#{e.backtrace}"
end
end
|
#gather_outputs_from_work_units ⇒ Object (private)
When the WorkUnits are all finished, gather all their outputs together before removing them from the database entirely. Returns their merged JSON.
183 184 185 186 187 188 |
# File 'lib/cloud_crowd/models/job.rb', line 183
def gather_outputs_from_work_units
units = self.work_units.complete
outs = self.work_units.complete.map {|u| u.parsed_output }
self.work_units.complete.destroy_all
outs.to_json
end
|
#mergeable? ⇒ Boolean
This job is mergeable if its Action has a merge
method.
127 128 129 |
# File 'lib/cloud_crowd/models/job.rb', line 127
def mergeable?
self.processing? && self.action_class.public_instance_methods.map {|m| m.to_sym }.include?(:merge)
end
|
#percent_complete ⇒ Object
How complete is this Job? Unfortunately, with the current processing sequence, the percent_complete can pull a fast one and go backwards. This happens when there’s a single large input that takes a long time to split, and when it finally does it creates a whole swarm of work units. This seems unavoidable.
143 144 145 146 147 148 149 |
# File 'lib/cloud_crowd/models/job.rb', line 143
def percent_complete
return 99 if merging?
return 100 if complete?
unit_count = work_units.count
return 100 if unit_count <= 0
(work_units.complete.count / unit_count.to_f * 100).round
end
|
#queue_for_workers(input = nil) ⇒ Object (private)
When starting a new job, or moving to a new stage, split up the inputs into WorkUnits, and queue them. Workers will start picking them up right away.
193 194 195 196 197 |
# File 'lib/cloud_crowd/models/job.rb', line 193
def queue_for_workers(input=nil)
input ||= JSON.parse(self.inputs)
input.each {|i| WorkUnit.start(self, action, i, status) }
self
end
|
#restart ⇒ Object
This occurs often enough that it’s time just to add a restart.
76 77 78 79 80 |
# File 'lib/cloud_crowd/models/job.rb', line 76
def restart
self.status = self.splittable? ? CloudCrowd::SPLITTING : CloudCrowd::PROCESSING
self.save
self.send(:queue_for_workers)
end
|
#set_next_status ⇒ Object
Transition this Job’s current status to the appropriate next one, based on the state of the WorkUnits and the nature of the Action.
66 67 68 69 70 71 72 73 |
# File 'lib/cloud_crowd/models/job.rb', line 66
def set_next_status
update_attribute(:status,
any_work_units_failed? ? FAILED :
self.splitting? ? PROCESSING :
self.mergeable? ? MERGING :
SUCCEEDED
)
end
|
#splittable? ⇒ Boolean
This job is splittable if its Action has a split
method.
117 118 119 |
# File 'lib/cloud_crowd/models/job.rb', line 117
def splittable?
self.action_class.public_instance_methods.map {|m| m.to_sym }.include? :split
end
|
#time_taken ⇒ Object
How long has this Job taken?
152 153 154 155 |
# File 'lib/cloud_crowd/models/job.rb', line 152
def time_taken
return self.time if self.time
Time.now - self.created_at
end
|