Class: CloudCrowd::Action

Inherits:
Object
  • Object
show all
Defined in:
lib/cloud_crowd/action.rb

Overview

As you write your custom actions, have them inherit from CloudCrowd::Action. All actions must implement a process method, which should return a JSON-serializable object that will be used as the output for the work unit. See the default actions for examples.

Optionally, actions may define split and merge methods to do mapping and reducing around the input. split should return an array of URLs – to be mapped into WorkUnits and processed in parallel. In the merge step, input will be an array of all the resulting outputs from calling process.

All actions have use of an individual work_directory, for scratch files, and spend their duration inside of it, so relative paths work well.

Note that Actions inherit a backticks (‘) method that raises an Exception if the external command fails.

Constant Summary collapse

FILE_URL =
/\Afile:\/\//

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(status, input, options, store) ⇒ Action

Initializing an Action sets up all of the read-only variables that form the bulk of the API for action subclasses. (Paths to read from and write to). It creates the work_directory and moves into it. If we’re not merging multiple results, it downloads the input file into the work_directory before starting.



29
30
31
32
33
34
35
36
# File 'lib/cloud_crowd/action.rb', line 29

def initialize(status, input, options, store)
  @input, @options, @store = input, options, store
  @job_id, @work_unit_id = options['job_id'], options['work_unit_id']
  @work_directory = File.expand_path(File.join(@store.temp_storage_path, local_storage_prefix))
  FileUtils.mkdir_p(@work_directory) unless File.exists?(@work_directory)
  parse_input
  download_input
end

Instance Attribute Details

#file_nameObject (readonly)

Returns the value of attribute file_name.



22
23
24
# File 'lib/cloud_crowd/action.rb', line 22

def file_name
  @file_name
end

#inputObject (readonly)

Returns the value of attribute input.



22
23
24
# File 'lib/cloud_crowd/action.rb', line 22

def input
  @input
end

#input_pathObject (readonly)

Returns the value of attribute input_path.



22
23
24
# File 'lib/cloud_crowd/action.rb', line 22

def input_path
  @input_path
end

#optionsObject (readonly)

Returns the value of attribute options.



22
23
24
# File 'lib/cloud_crowd/action.rb', line 22

def options
  @options
end

#work_directoryObject (readonly)

Returns the value of attribute work_directory.



22
23
24
# File 'lib/cloud_crowd/action.rb', line 22

def work_directory
  @work_directory
end

Instance Method Details

#`(command) ⇒ Object

Actions have a backticks command that raises a CommandFailed exception on failure, so that processing doesn’t just blithely continue.



74
75
76
77
78
79
# File 'lib/cloud_crowd/action.rb', line 74

def `(command)
  result    = super(command)
  exit_code = $?.to_i
  raise Error::CommandFailed.new(result, exit_code) unless exit_code == 0
  result
end

#cleanup_work_directoryObject

After the Action has finished, we remove the work directory and return to the root directory (where workers run by default).



68
69
70
# File 'lib/cloud_crowd/action.rb', line 68

def cleanup_work_directory
  FileUtils.rm_r(@work_directory) if File.exists?(@work_directory)
end

#download(url, path) ⇒ Object

Download a file to the specified path.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/cloud_crowd/action.rb', line 44

def download(url, path)
  if url.match(FILE_URL)
    FileUtils.cp(url.sub(FILE_URL, ''), path)
  else
    File.open(path, 'w+') do |file|
      Net::HTTP.get_response(URI(url)) do |response|
        response.read_body do |chunk|
          file.write chunk
        end
      end
    end
  end
  path
end

#download_inputObject (private)

If the input is a URL, download the file before beginning processing.



118
119
120
121
122
123
124
125
# File 'lib/cloud_crowd/action.rb', line 118

def download_input
  return unless input_is_url?
  Dir.chdir(@work_directory) do
    @input_path = File.join(@work_directory, safe_filename(@input))
    @file_name = File.basename(@input_path, File.extname(@input_path))
    download(@input, @input_path)
  end
end

#input_is_url?Boolean (private)

Returns:

  • (Boolean)


113
114
115
# File 'lib/cloud_crowd/action.rb', line 113

def input_is_url?
  !URI.parse(@input).scheme.nil? rescue false
end

#local_storage_prefixObject (private)

The directory prefix to use for local storage.

action]/unit_[work_unit_id


101
102
103
104
# File 'lib/cloud_crowd/action.rb', line 101

def local_storage_prefix
  @local_storage_prefix ||= Inflector.underscore(self.class) +
    (@work_unit_id ? "/unit_#{@work_unit_id}" : '')
end

#parse_inputObject (private)

If we think that the input is JSON, replace it with the parsed form. It would be great if the JSON module had an is_json? method.



108
109
110
111
# File 'lib/cloud_crowd/action.rb', line 108

def parse_input
  return unless ['[', '{'].include? @input[0..0]
  @input = JSON.parse(@input) rescue @input
end

#processObject

Each Action subclass must implement a process method, overriding this.

Raises:

  • (NotImplementedError)


39
40
41
# File 'lib/cloud_crowd/action.rb', line 39

def process
  raise NotImplementedError, "CloudCrowd::Actions must override 'process' with their own processing code."
end

#remote_storage_prefixObject (private)

The directory prefix to use for remote storage.

action]/job_[job_id


94
95
96
97
# File 'lib/cloud_crowd/action.rb', line 94

def remote_storage_prefix
  @remote_storage_prefix ||= Inflector.underscore(self.class) +
    "/job_#{@job_id}" + (@work_unit_id ? "/unit_#{@work_unit_id}" : '')
end

#safe_filename(url) ⇒ Object (private)

Convert an unsafe URL into a filesystem-friendly filename.



85
86
87
88
89
90
# File 'lib/cloud_crowd/action.rb', line 85

def safe_filename(url)
  url  = url.sub(/\?.*\Z/, '')
  ext  = File.extname(url)
  name = URI.unescape(File.basename(url)).gsub(/[^a-zA-Z0-9_\-.]/, '-').gsub(/-+/, '-')
  File.basename(name, ext).gsub('.', '-') + ext
end

#save(file_path) ⇒ Object

Takes a local filesystem path, saves the file to S3, and returns the public (or authenticated) url on S3 where the file can be accessed.



61
62
63
64
# File 'lib/cloud_crowd/action.rb', line 61

def save(file_path)
  save_path = File.join(remote_storage_prefix, File.basename(file_path))
  @store.save(file_path, save_path)
end