Class: Elasticity::EMR
- Inherits:
-
Object
- Object
- Elasticity::EMR
- Defined in:
- lib/elasticity/emr.rb
Instance Attribute Summary collapse
-
#aws_request ⇒ Object
readonly
Returns the value of attribute aws_request.
Instance Method Summary collapse
- #==(other) ⇒ Object
-
#add_instance_groups(jobflow_id, instance_group_configs) {|aws_result| ... } ⇒ Object
Adds a new group of instances to the specified jobflow.
-
#add_jobflow_steps(jobflow_id, steps_config) {|aws_result| ... } ⇒ Object
Add a step (or steps) to the specified job flow.
-
#describe_jobflow(jobflow_id) {|aws_result| ... } ⇒ Object
Describe a specific jobflow.
-
#describe_jobflow_xml(jobflow_id) ⇒ Object
This is primarily for debugging purposes, providing insight into how Amazon internally represents jobs.
-
#describe_jobflows(params = {}) {|aws_result| ... } ⇒ Object
Lists all jobflows in all states.
-
#direct(params) ⇒ Object
Pass the specified params hash directly through to the AWS request URL.
-
#initialize(aws_access_key_id = nil, aws_secret_access_key = nil, options = {}) ⇒ EMR
constructor
A new instance of EMR.
-
#modify_instance_groups(instance_group_config) {|aws_result| ... } ⇒ Object
Set the number of instances in the specified instance groups to the specified counts.
-
#run_job_flow(job_flow_config) {|aws_result| ... } ⇒ Object
Start a job flow with the specified configuration.
-
#set_termination_protection(jobflow_ids, protection_enabled = true) {|aws_result| ... } ⇒ Object
Enabled or disable “termination protection” on the specified job flows.
-
#terminate_jobflows(jobflow_id) {|aws_result| ... } ⇒ Object
Terminate the specified jobflow.
Constructor Details
#initialize(aws_access_key_id = nil, aws_secret_access_key = nil, options = {}) ⇒ EMR
Returns a new instance of EMR.
7 8 9 |
# File 'lib/elasticity/emr.rb', line 7 def initialize(aws_access_key_id=nil, aws_secret_access_key=nil, = {}) @aws_request = Elasticity::AwsRequest.new(aws_access_key_id, aws_secret_access_key, ) end |
Instance Attribute Details
#aws_request ⇒ Object (readonly)
Returns the value of attribute aws_request.
5 6 7 |
# File 'lib/elasticity/emr.rb', line 5 def aws_request @aws_request end |
Instance Method Details
#==(other) ⇒ Object
236 237 238 239 240 |
# File 'lib/elasticity/emr.rb', line 236 def ==(other) return false unless other.is_a? EMR return false unless @aws_request == other.aws_request true end |
#add_instance_groups(jobflow_id, instance_group_configs) {|aws_result| ... } ⇒ Object
Adds a new group of instances to the specified jobflow. Elasticity maps a more Ruby-like syntax to the Amazon options. An exhaustive hash follows although not all of these options are required (or valid!) at once. Please see the EMR docs for details although even then you’re going to need to experiment :)
instance_group_config = {
:bid_price => 5,
:instance_count => 1,
:instance_role => "TASK",
:market => "SPOT",
:name => "Go Canucks Go!"
:type => "m1.small",
}
add_instance_groups takes an array of {}. Returns an array of the instance IDs that were created by the specified configs.
["ig-2GOVEN6HVJZID", "ig-1DU9M2UQMM051", "ig-3DZRW4Y2X4S", ...]
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/elasticity/emr.rb', line 70 def add_instance_groups(jobflow_id, instance_group_configs) params = { :operation => 'AddInstanceGroups', :job_flow_id => jobflow_id, :instance_groups => instance_group_configs } aws_result = @aws_request.submit(params) xml_doc = Nokogiri::XML(aws_result) xml_doc.remove_namespaces! instance_group_ids = [] xml_doc.xpath('/AddInstanceGroupsResponse/AddInstanceGroupsResult/InstanceGroupIds/member').each do |member| instance_group_ids << member.text end yield aws_result if block_given? instance_group_ids end |
#add_jobflow_steps(jobflow_id, steps_config) {|aws_result| ... } ⇒ Object
Add a step (or steps) to the specified job flow.
emr.add_jobflow_step("j-123", {
:steps => [
{
:action_on_failure => "TERMINATE_JOB_FLOW",
:hadoop_jar_step => {
:args => [
"s3://elasticmapreduce/libs/pig/pig-script",
"--base-path",
"s3://elasticmapreduce/libs/pig/",
"--install-pig"
],
:jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
},
:name => "Setup Pig"
}
]
})
106 107 108 109 110 111 112 113 |
# File 'lib/elasticity/emr.rb', line 106 def add_jobflow_steps(jobflow_id, steps_config) params = { :operation => 'AddJobFlowSteps', :job_flow_id => jobflow_id }.merge!(steps_config) aws_result = @aws_request.submit(params) yield aws_result if block_given? end |
#describe_jobflow(jobflow_id) {|aws_result| ... } ⇒ Object
Describe a specific jobflow.
describe_jobflow("j-3UN6WX5RRO2AG")
Raises ArgumentError if the specified jobflow does not exist.
16 17 18 19 20 21 22 23 24 25 |
# File 'lib/elasticity/emr.rb', line 16 def describe_jobflow(jobflow_id) aws_result = @aws_request.submit({ :operation => 'DescribeJobFlows', :job_flow_ids => [jobflow_id] }) xml_doc = Nokogiri::XML(aws_result) xml_doc.remove_namespaces! yield aws_result if block_given? JobFlowStatus.from_members_nodeset(xml_doc.xpath('/DescribeJobFlowsResponse/DescribeJobFlowsResult/JobFlows/member')).first end |
#describe_jobflow_xml(jobflow_id) ⇒ Object
This is primarily for debugging purposes, providing insight into how Amazon internally represents jobs. It’s used to reverse-engineer how API calls construct jobflows.
30 31 32 33 34 |
# File 'lib/elasticity/emr.rb', line 30 def describe_jobflow_xml(jobflow_id) describe_jobflow(jobflow_id) do |xml| return xml end end |
#describe_jobflows(params = {}) {|aws_result| ... } ⇒ Object
Lists all jobflows in all states.
To override this behaviour, pass additional filters as specified in the AWS documentation - docs.amazonwebservices.com/ElasticMapReduce/latest/API/index.html?API_DescribeJobFlows.html.
describe_jobflows(:CreatedBefore => "2011-10-04")
42 43 44 45 46 47 48 49 50 |
# File 'lib/elasticity/emr.rb', line 42 def describe_jobflows(params = {}) aws_result = @aws_request.submit( params.merge({:operation => 'DescribeJobFlows'}) ) xml_doc = Nokogiri::XML(aws_result) xml_doc.remove_namespaces! yield aws_result if block_given? JobFlowStatus.from_members_nodeset(xml_doc.xpath('/DescribeJobFlowsResponse/DescribeJobFlowsResult/JobFlows/member')) end |
#direct(params) ⇒ Object
Pass the specified params hash directly through to the AWS request URL. Use this if you want to perform an operation that hasn’t yet been wrapped by Elasticity or you just want to see the response XML for yourself :)
232 233 234 |
# File 'lib/elasticity/emr.rb', line 232 def direct(params) @aws_request.submit(params) end |
#modify_instance_groups(instance_group_config) {|aws_result| ... } ⇒ Object
Set the number of instances in the specified instance groups to the specified counts. Note that this modifies the request count, which is not the same as the running count. I.e. you request instances and then wait for them to be created.
Takes a {} of instance group IDs => desired instance count.
{"ig-1" => 40, "ig-2" => 5, ...}
123 124 125 126 127 128 129 130 |
# File 'lib/elasticity/emr.rb', line 123 def modify_instance_groups(instance_group_config) params = { :operation => 'ModifyInstanceGroups', :instance_groups => instance_group_config.map { |k, v| {:instance_group_id => k, :instance_count => v} } } aws_result = @aws_request.submit(params) yield aws_result if block_given? end |
#run_job_flow(job_flow_config) {|aws_result| ... } ⇒ Object
Start a job flow with the specified configuration. This is a very thin wrapper around the AWS API, so in order to use it directly you’ll need to have the PDF API reference handy, which can be found here:
awsdocs.s3.amazonaws.com/ElasticMapReduce/20090331/emr-api-20090331.pdf
Here is a sample job flow configuration that should help. This job flow starts by installing Pig then running a Pig script. It is based off of the Pig demo script from Amazon.
emr.run_job_flow({
:name => "Elasticity Test Flow (EMR Pig Script)",
:instances => {
:ec2_key_name => "sharethrough-dev",
:hadoop_version => "0.20",
:instance_count => 2,
:master_instance_type => "m1.small",
:placement => {
:availability_zone => "us-east-1a"
},
:slave_instance_type => "m1.small",
},
:steps => [
{
:action_on_failure => "TERMINATE_JOB_FLOW",
:hadoop_jar_step => {
:args => [
"s3://elasticmapreduce/libs/pig/pig-script",
"--base-path",
"s3://elasticmapreduce/libs/pig/",
"--install-pig"
],
:jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
},
:name => "Setup Pig"
},
{
:action_on_failure => "TERMINATE_JOB_FLOW",
:hadoop_jar_step => {
:args => [
"s3://elasticmapreduce/libs/pig/pig-script",
"--run-pig-script",
"--args",
"-p",
"INPUT=s3n://elasticmapreduce/samples/pig-apache/input",
"-p",
"OUTPUT=s3n://slif-elasticity/pig-apache/output/2011-04-19",
"s3n://elasticmapreduce/samples/pig-apache/do-reports.pig"
],
:jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
},
:name => "Run Pig Script"
}
]
})
187 188 189 190 191 192 193 194 195 196 |
# File 'lib/elasticity/emr.rb', line 187 def run_job_flow(job_flow_config) params = { :operation => 'RunJobFlow', }.merge!(job_flow_config) aws_result = @aws_request.submit(params) yield aws_result if block_given? xml_doc = Nokogiri::XML(aws_result) xml_doc.remove_namespaces! xml_doc.xpath('/RunJobFlowResponse/RunJobFlowResult/JobFlowId').text end |
#set_termination_protection(jobflow_ids, protection_enabled = true) {|aws_result| ... } ⇒ Object
Enabled or disable “termination protection” on the specified job flows. Termination protection prevents a job flow from being terminated by a user initiated action, although the job flow will still terminate naturally.
Takes an [] of job flow IDs.
["j-1B4D1XP0C0A35", "j-1YG2MYL0HVYS5", ...]
206 207 208 209 210 211 212 213 214 |
# File 'lib/elasticity/emr.rb', line 206 def set_termination_protection(jobflow_ids, protection_enabled=true) params = { :operation => 'SetTerminationProtection', :termination_protected => protection_enabled, :job_flow_ids => jobflow_ids } aws_result = @aws_request.submit(params) yield aws_result if block_given? end |
#terminate_jobflows(jobflow_id) {|aws_result| ... } ⇒ Object
Terminate the specified jobflow. Amazon does not define a return value for this operation, so you’ll need to poll #describe_jobflows to see the state of the jobflow. Raises ArgumentError if the specified job flow does not exist.
220 221 222 223 224 225 226 227 |
# File 'lib/elasticity/emr.rb', line 220 def terminate_jobflows(jobflow_id) params = { :operation => 'TerminateJobFlows', :job_flow_ids => [jobflow_id] } aws_result = @aws_request.submit(params) yield aws_result if block_given? end |