Class: CloudCrowd::Action
- Inherits:
-
Object
- Object
- CloudCrowd::Action
- Defined in:
- lib/cloud_crowd/action.rb
Overview
As you write your custom actions, have them inherit from CloudCrowd::Action. All actions must implement a process
method, which should return a JSON-serializable object that will be used as the output for the work unit. See the default actions for examples.
Optionally, actions may define split
and merge
methods to do mapping and reducing around the input
. split
should return an array of URLs – to be mapped into WorkUnits and processed in parallel. In the merge
step, input
will be an array of all the resulting outputs from calling process.
All actions have use of an individual work_directory
, for scratch files, and spend their duration inside of it, so relative paths work well.
Note that Actions inherit a backticks (‘) method that raises an Exception if the external command fails.
Constant Summary collapse
- FILE_URL =
/\Afile:\/\//
Instance Attribute Summary collapse
-
#file_name ⇒ Object
readonly
Returns the value of attribute file_name.
-
#input ⇒ Object
readonly
Returns the value of attribute input.
-
#input_path ⇒ Object
readonly
Returns the value of attribute input_path.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#work_directory ⇒ Object
readonly
Returns the value of attribute work_directory.
Instance Method Summary collapse
-
#`(command) ⇒ Object
Actions have a backticks command that raises a CommandFailed exception on failure, so that processing doesn’t just blithely continue.
-
#cleanup_work_directory ⇒ Object
After the Action has finished, we remove the work directory and return to the root directory (where workers run by default).
-
#download(url, path) ⇒ Object
Download a file to the specified path.
-
#download_input ⇒ Object
private
If the input is a URL, download the file before beginning processing.
-
#initialize(status, input, options, store) ⇒ Action
constructor
Initializing an Action sets up all of the read-only variables that form the bulk of the API for action subclasses.
- #input_is_url? ⇒ Boolean private
-
#local_storage_prefix ⇒ Object
private
The directory prefix to use for local storage.
-
#parse_input ⇒ Object
private
If we think that the input is JSON, replace it with the parsed form.
-
#process ⇒ Object
Each Action subclass must implement a
process
method, overriding this. -
#remote_storage_prefix ⇒ Object
private
The directory prefix to use for remote storage.
-
#safe_filename(url) ⇒ Object
private
Convert an unsafe URL into a filesystem-friendly filename.
-
#save(file_path) ⇒ Object
Takes a local filesystem path, saves the file to S3, and returns the public (or authenticated) url on S3 where the file can be accessed.
Constructor Details
#initialize(status, input, options, store) ⇒ Action
Initializing an Action sets up all of the read-only variables that form the bulk of the API for action subclasses. (Paths to read from and write to). It creates the work_directory
and moves into it. If we’re not merging multiple results, it downloads the input file into the work_directory
before starting.
29 30 31 32 33 34 35 36 |
# File 'lib/cloud_crowd/action.rb', line 29
def initialize(status, input, options, store)
@input, @options, @store = input, options, store
@job_id, @work_unit_id = options['job_id'], options['work_unit_id']
@work_directory = File.expand_path(File.join(@store.temp_storage_path, local_storage_prefix))
FileUtils.mkdir_p(@work_directory) unless File.exists?(@work_directory)
parse_input
download_input
end
|
Instance Attribute Details
#file_name ⇒ Object (readonly)
Returns the value of attribute file_name.
22 23 24 |
# File 'lib/cloud_crowd/action.rb', line 22
def file_name
@file_name
end
|
#input ⇒ Object (readonly)
Returns the value of attribute input.
22 23 24 |
# File 'lib/cloud_crowd/action.rb', line 22
def input
@input
end
|
#input_path ⇒ Object (readonly)
Returns the value of attribute input_path.
22 23 24 |
# File 'lib/cloud_crowd/action.rb', line 22
def input_path
@input_path
end
|
#options ⇒ Object (readonly)
Returns the value of attribute options.
22 23 24 |
# File 'lib/cloud_crowd/action.rb', line 22
def options
@options
end
|
#work_directory ⇒ Object (readonly)
Returns the value of attribute work_directory.
22 23 24 |
# File 'lib/cloud_crowd/action.rb', line 22
def work_directory
@work_directory
end
|
Instance Method Details
#`(command) ⇒ Object
Actions have a backticks command that raises a CommandFailed exception on failure, so that processing doesn’t just blithely continue.
74 75 76 77 78 79 |
# File 'lib/cloud_crowd/action.rb', line 74
def `(command)
result = super(command)
exit_code = $?.to_i
raise Error::CommandFailed.new(result, exit_code) unless exit_code == 0
result
end
|
#cleanup_work_directory ⇒ Object
After the Action has finished, we remove the work directory and return to the root directory (where workers run by default).
68 69 70 |
# File 'lib/cloud_crowd/action.rb', line 68
def cleanup_work_directory
FileUtils.rm_r(@work_directory) if File.exists?(@work_directory)
end
|
#download(url, path) ⇒ Object
Download a file to the specified path.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/cloud_crowd/action.rb', line 44
def download(url, path)
if url.match(FILE_URL)
FileUtils.cp(url.sub(FILE_URL, ''), path)
else
File.open(path, 'w+') do |file|
Net::HTTP.get_response(URI(url)) do |response|
response.read_body do |chunk|
file.write chunk
end
end
end
end
path
end
|
#download_input ⇒ Object (private)
If the input is a URL, download the file before beginning processing.
118 119 120 121 122 123 124 125 |
# File 'lib/cloud_crowd/action.rb', line 118
def download_input
return unless input_is_url?
Dir.chdir(@work_directory) do
@input_path = File.join(@work_directory, safe_filename(@input))
@file_name = File.basename(@input_path, File.extname(@input_path))
download(@input, @input_path)
end
end
|
#input_is_url? ⇒ Boolean (private)
113 114 115 |
# File 'lib/cloud_crowd/action.rb', line 113
def input_is_url?
!URI.parse(@input).scheme.nil? rescue false
end
|
#local_storage_prefix ⇒ Object (private)
The directory prefix to use for local storage.
- action]/unit_[work_unit_id
101 102 103 104 |
# File 'lib/cloud_crowd/action.rb', line 101
def local_storage_prefix
@local_storage_prefix ||= Inflector.underscore(self.class) +
(@work_unit_id ? "/unit_#{@work_unit_id}" : '')
end
|
#parse_input ⇒ Object (private)
If we think that the input is JSON, replace it with the parsed form. It would be great if the JSON module had an is_json? method.
108 109 110 111 |
# File 'lib/cloud_crowd/action.rb', line 108
def parse_input
return unless ['[', '{'].include? @input[0..0]
@input = JSON.parse(@input) rescue @input
end
|
#process ⇒ Object
Each Action subclass must implement a process
method, overriding this.
39 40 41 |
# File 'lib/cloud_crowd/action.rb', line 39
def process
raise NotImplementedError, "CloudCrowd::Actions must override 'process' with their own processing code."
end
|
#remote_storage_prefix ⇒ Object (private)
The directory prefix to use for remote storage.
- action]/job_[job_id
94 95 96 97 |
# File 'lib/cloud_crowd/action.rb', line 94
def remote_storage_prefix
@remote_storage_prefix ||= Inflector.underscore(self.class) +
"/job_#{@job_id}" + (@work_unit_id ? "/unit_#{@work_unit_id}" : '')
end
|
#safe_filename(url) ⇒ Object (private)
Convert an unsafe URL into a filesystem-friendly filename.
85 86 87 88 89 90 |
# File 'lib/cloud_crowd/action.rb', line 85
def safe_filename(url)
url = url.sub(/\?.*\Z/, '')
ext = File.extname(url)
name = URI.unescape(File.basename(url)).gsub(/[^a-zA-Z0-9_\-.]/, '-').gsub(/-+/, '-')
File.basename(name, ext).gsub('.', '-') + ext
end
|
#save(file_path) ⇒ Object
Takes a local filesystem path, saves the file to S3, and returns the public (or authenticated) url on S3 where the file can be accessed.
61 62 63 64 |
# File 'lib/cloud_crowd/action.rb', line 61
def save(file_path)
save_path = File.join(remote_storage_prefix, File.basename(file_path))
@store.save(file_path, save_path)
end
|