Class: CloudCrowd::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
ModelStatus
Defined in:
lib/cloud_crowd/models/job.rb

Overview

A chunk of work that will be farmed out into many WorkUnits to be processed in parallel by each active CloudCrowd::Worker. Jobs are defined by a list of inputs (usually public urls to files), an action (the name of a script that CloudCrowd knows how to run), and, eventually a corresponding list of output.

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ModelStatus

#complete?, #display_status, #failed?, included, #incomplete?, #merging?, #processing?, #splitting?, #succeeded?

Class Method Details

.create_from_request(h) ⇒ Object

Create a Job from an incoming JSON request, and add it to the queue.



19
20
21
22
23
24
25
26
27
# File 'lib/cloud_crowd/models/job.rb', line 19

def self.create_from_request(h)
  self.create(
    :inputs       => h['inputs'].to_json,
    :action       => h['action'],
    :options      => (h['options'] || {}).to_json,
    :email        => h['email'],
    :callback_url => h['callback_url']
  )
end

Instance Method Details

#action_classObject

Retrieve the class for this Job’s Action.



103
104
105
106
107
# File 'lib/cloud_crowd/models/job.rb', line 103

def action_class
  @action_class ||= CloudCrowd.actions[self.action]
  return @action_class if @action_class
  raise Error::ActionNotFound, "no action named: '#{self.action}' could be found"
end

#all_work_units_complete?Boolean

Have all of the WorkUnits finished?

Returns:

  • (Boolean)


78
79
80
# File 'lib/cloud_crowd/models/job.rb', line 78

def all_work_units_complete?
  self.work_units.incomplete.count <= 0
end

#any_work_units_failed?Boolean

Have any of the WorkUnits failed?

Returns:

  • (Boolean)


83
84
85
# File 'lib/cloud_crowd/models/job.rb', line 83

def any_work_units_failed?
  self.work_units.failed.count > 0
end

#check_for_completionObject

After work units are marked successful, we check to see if all of them have finished, if so, continue on to the next phase of the job.



31
32
33
34
35
36
37
38
39
40
41
# File 'lib/cloud_crowd/models/job.rb', line 31

def check_for_completion
  return unless all_work_units_complete?
  set_next_status    
  outs = gather_outputs_from_work_units
  return queue_for_workers(outs) if merging?
  if complete?
    update_attributes(:outputs => outs, :time => time_taken)
    fire_callback if callback_url
  end
  self
end

#cleanup_assetsObject

Cleaning up after a job will remove all of its files from S3 or the filesystem. Destroying a Job will cleanup_assets first. Run this in a separate thread to get out of the transaction’s way. TODO: Convert this into a ‘cleanup’ work unit that gets run by a worker.



73
74
75
# File 'lib/cloud_crowd/models/job.rb', line 73

def cleanup_assets
  AssetStore.new.cleanup(self)
end

#colorObject

Generate a stable 8-bit Hex color code, based on the Job’s id.



129
130
131
# File 'lib/cloud_crowd/models/job.rb', line 129

def color
  @color ||= Digest::MD5.hexdigest(self.id.to_s)[-7...-1]
end

#done_splitting?Boolean

This job is done splitting if it’s finished with its splitting work units.

Returns:

  • (Boolean)


93
94
95
# File 'lib/cloud_crowd/models/job.rb', line 93

def done_splitting?
  splittable? && work_units.splitting.count <= 0
end

#fire_callbackObject

If a callback_url is defined, post the Job’s JSON to it upon completion. The callback_url may include HTTP basic authentication, if you like:

http://user:[email protected]/job_complete

If the callback_url is successfully pinged, we proceed to cleanup the job. TODO: This should be moved into a Work Unit…



60
61
62
63
64
65
66
67
# File 'lib/cloud_crowd/models/job.rb', line 60

def fire_callback
  begin
    RestClient.post(callback_url, {:job => self.to_json})
    self.destroy
  rescue RestClient::Exception => e
    puts "Failed to fire job callback. Hmmm, what should happen here?"
  end
end

#mergeable?Boolean

This job is mergeable if its Action has a merge method.

Returns:

  • (Boolean)


98
99
100
# File 'lib/cloud_crowd/models/job.rb', line 98

def mergeable?
  self.processing? && self.action_class.public_instance_methods.include?('merge')
end

#percent_completeObject

How complete is this Job? Unfortunately, with the current processing sequence, the percent_complete can pull a fast one and go backwards. This happens when there’s a single large input that takes a long time to split, and when it finally does it creates a whole swarm of work units. This seems unavoidable.



114
115
116
117
118
119
120
# File 'lib/cloud_crowd/models/job.rb', line 114

def percent_complete
  return 99  if merging?
  return 100 if complete?
  unit_count = work_units.count
  return 100 if unit_count <= 0
  (work_units.complete.count / unit_count.to_f * 100).round
end

#set_next_statusObject

Transition this Job’s current status to the appropriate next one, based on the state of the WorkUnits and the nature of the Action.



45
46
47
48
49
50
51
52
# File 'lib/cloud_crowd/models/job.rb', line 45

def set_next_status
  update_attribute(:status,
    any_work_units_failed? ? FAILED     :
    self.splitting?        ? PROCESSING :
    self.mergeable?        ? MERGING    :
                             SUCCEEDED
  )
end

#splittable?Boolean

This job is splittable if its Action has a split method.

Returns:

  • (Boolean)


88
89
90
# File 'lib/cloud_crowd/models/job.rb', line 88

def splittable?
  self.action_class.public_instance_methods.include? 'split'
end

#time_takenObject

How long has this Job taken?



123
124
125
126
# File 'lib/cloud_crowd/models/job.rb', line 123

def time_taken
  return self.time if self.time
  Time.now - self.created_at
end

#to_json(opts = {}) ⇒ Object

A JSON representation of this job includes the statuses of its component WorkUnits, as well as any completed outputs.



135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/cloud_crowd/models/job.rb', line 135

def to_json(opts={})
  atts = {
    'id'                => id,
    'color'             => color,
    'status'            => display_status, 
    'percent_complete'  => percent_complete,
    'work_units'        => work_units.count,
    'time_taken'        => time_taken
  }
  atts['outputs'] = JSON.parse(outputs) if outputs
  atts['email']   = email               if email
  atts.to_json
end