Class: CloudCrowd::Action

Inherits:
Object
  • Object
show all
Defined in:
lib/cloud_crowd/action.rb

Overview

As you write your custom actions, have them inherit from CloudCrowd::Action. All actions must implement a process method, which should return a JSON-serializable object that will be used as the output for the work unit. See the default actions for examples.

Optionally, actions may define split and merge methods to do mapping and reducing around the input. split should return an array of URLs – to be mapped into WorkUnits and processed in parallel. In the merge step, input will be an array of all the resulting outputs from calling process.

All actions have use of an individual work_directory, for scratch files, and spend their duration inside of it, so relative paths work well.

Note that Actions inherit a backticks (‘) method that raises an Exception if the external command fails.

Constant Summary collapse

FILE_URL =
/\Afile:\/\//

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(status, input, options, store) ⇒ Action

Initializing an Action sets up all of the read-only variables that form the bulk of the API for action subclasses. (Paths to read from and write to). It creates the work_directory and moves into it. If we’re not merging multiple results, it downloads the input file into the work_directory before starting.



29
30
31
32
33
34
35
36
# File 'lib/cloud_crowd/action.rb', line 29

def initialize(status, input, options, store)
  @input, @options, @store = input, options, store
  @job_id, @work_unit_id = options['job_id'], options['work_unit_id']
  @work_directory = File.expand_path(File.join(@store.temp_storage_path, local_storage_prefix))
  FileUtils.mkdir_p(@work_directory) unless File.exists?(@work_directory)
  parse_input
  download_input
end

Instance Attribute Details

#file_nameObject (readonly)

Returns the value of attribute file_name.



22
23
24
# File 'lib/cloud_crowd/action.rb', line 22

def file_name
  @file_name
end

#inputObject (readonly)

Returns the value of attribute input.



22
23
24
# File 'lib/cloud_crowd/action.rb', line 22

def input
  @input
end

#input_pathObject (readonly)

Returns the value of attribute input_path.



22
23
24
# File 'lib/cloud_crowd/action.rb', line 22

def input_path
  @input_path
end

#optionsObject (readonly)

Returns the value of attribute options.



22
23
24
# File 'lib/cloud_crowd/action.rb', line 22

def options
  @options
end

#work_directoryObject (readonly)

Returns the value of attribute work_directory.



22
23
24
# File 'lib/cloud_crowd/action.rb', line 22

def work_directory
  @work_directory
end

Instance Method Details

#`(command) ⇒ Object

Actions have a backticks command that raises a CommandFailed exception on failure, so that processing doesn’t just blithely continue.



74
75
76
77
78
79
# File 'lib/cloud_crowd/action.rb', line 74

def `(command)
  result    = super(command)
  exit_code = $?.to_i
  raise Error::CommandFailed.new(result, exit_code) unless exit_code == 0
  result
end

#cleanup_work_directoryObject

After the Action has finished, we remove the work directory and return to the root directory (where workers run by default).



68
69
70
# File 'lib/cloud_crowd/action.rb', line 68

def cleanup_work_directory
  FileUtils.rm_r(@work_directory) if File.exists?(@work_directory)
end

#download(url, path) ⇒ Object

Download a file to the specified path.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/cloud_crowd/action.rb', line 44

def download(url, path)
  if url.match(FILE_URL)
    FileUtils.cp(url.sub(FILE_URL, ''), path)
  else
    File.open(path, 'w+') do |file|
      Net::HTTP.get_response(URI(url)) do |response|
        response.read_body do |chunk|
          file.write chunk
        end
      end
    end
  end
  path
end

#processObject

Each Action subclass must implement a process method, overriding this.

Raises:

  • (NotImplementedError)


39
40
41
# File 'lib/cloud_crowd/action.rb', line 39

def process
  raise NotImplementedError, "CloudCrowd::Actions must override 'process' with their own processing code."
end

#save(file_path) ⇒ Object

Takes a local filesystem path, saves the file to S3, and returns the public (or authenticated) url on S3 where the file can be accessed.



61
62
63
64
# File 'lib/cloud_crowd/action.rb', line 61

def save(file_path)
  save_path = File.join(remote_storage_prefix, File.basename(file_path))
  @store.save(file_path, save_path)
end