Class: Elasticity::EMR
- Inherits:
-
Object
- Object
- Elasticity::EMR
- Defined in:
- lib/elasticity/emr.rb
Instance Attribute Summary collapse
-
#aws_request ⇒ Object
readonly
Returns the value of attribute aws_request.
Instance Method Summary collapse
- #==(other) ⇒ Object
-
#add_instance_groups(jobflow_id, instance_group_configs) {|aws_result| ... } ⇒ Object
Adds a new group of instances to the specified jobflow.
-
#add_jobflow_steps(jobflow_id, steps_config) {|aws_result| ... } ⇒ Object
Add a step (or steps) to the specified job flow.
-
#describe_jobflow(jobflow_id) {|aws_result| ... } ⇒ Object
Describe a specific jobflow.
-
#describe_jobflows(params = {}) {|aws_result| ... } ⇒ Object
Lists all jobflows in all states.
-
#direct(params) ⇒ Object
Pass the specified params hash directly through to the AWS request URL.
-
#initialize(aws_access_key_id, aws_secret_access_key, options = {}) ⇒ EMR
constructor
A new instance of EMR.
-
#modify_instance_groups(instance_group_config) {|aws_result| ... } ⇒ Object
Set the number of instances in the specified instance groups to the specified counts.
-
#run_job_flow(job_flow_config) {|aws_result| ... } ⇒ Object
Start a job flow with the specified configuration.
-
#set_termination_protection(jobflow_ids, protection_enabled = true) {|aws_result| ... } ⇒ Object
Enabled or disable “termination protection” on the specified job flows.
-
#terminate_jobflows(jobflow_id) {|aws_result| ... } ⇒ Object
Terminate the specified jobflow.
Constructor Details
#initialize(aws_access_key_id, aws_secret_access_key, options = {}) ⇒ EMR
7 8 9 |
# File 'lib/elasticity/emr.rb', line 7 def initialize(aws_access_key_id, aws_secret_access_key, = {}) @aws_request = Elasticity::AwsRequest.new(aws_access_key_id, aws_secret_access_key, ) end |
Instance Attribute Details
#aws_request ⇒ Object (readonly)
Returns the value of attribute aws_request.
5 6 7 |
# File 'lib/elasticity/emr.rb', line 5 def aws_request @aws_request end |
Instance Method Details
#==(other) ⇒ Object
227 228 229 230 231 |
# File 'lib/elasticity/emr.rb', line 227 def ==(other) return false unless other.is_a? EMR return false unless @aws_request == other.aws_request true end |
#add_instance_groups(jobflow_id, instance_group_configs) {|aws_result| ... } ⇒ Object
Adds a new group of instances to the specified jobflow. Elasticity maps a more Ruby-like syntax to the Amazon options. An exhaustive hash follows although not all of these options are required (or valid!) at once. Please see the EMR docs for details although even then you’re going to need to experiment :)
instance_group_config = {
:bid_price => 5,
:instance_count => 1,
:instance_role => "TASK",
:market => "SPOT",
:name => "Go Canucks Go!"
:type => "m1.small",
}
add_instance_groups takes an array of {}. Returns an array of the instance IDs that were created by the specified configs.
["ig-2GOVEN6HVJZID", "ig-1DU9M2UQMM051", "ig-3DZRW4Y2X4S", ...]
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/elasticity/emr.rb', line 61 def add_instance_groups(jobflow_id, instance_group_configs) params = { :operation => 'AddInstanceGroups', :job_flow_id => jobflow_id, :instance_groups => instance_group_configs } aws_result = @aws_request.submit(params) xml_doc = Nokogiri::XML(aws_result) xml_doc.remove_namespaces! instance_group_ids = [] xml_doc.xpath('/AddInstanceGroupsResponse/AddInstanceGroupsResult/InstanceGroupIds/member').each do |member| instance_group_ids << member.text end yield aws_result if block_given? instance_group_ids end |
#add_jobflow_steps(jobflow_id, steps_config) {|aws_result| ... } ⇒ Object
Add a step (or steps) to the specified job flow.
emr.add_jobflow_step("j-123", {
:steps => [
{
:action_on_failure => "TERMINATE_JOB_FLOW",
:hadoop_jar_step => {
:args => [
"s3://elasticmapreduce/libs/pig/pig-script",
"--base-path",
"s3://elasticmapreduce/libs/pig/",
"--install-pig"
],
:jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
},
:name => "Setup Pig"
}
]
})
97 98 99 100 101 102 103 104 |
# File 'lib/elasticity/emr.rb', line 97 def add_jobflow_steps(jobflow_id, steps_config) params = { :operation => 'AddJobFlowSteps', :job_flow_id => jobflow_id }.merge!(steps_config) aws_result = @aws_request.submit(params) yield aws_result if block_given? end |
#describe_jobflow(jobflow_id) {|aws_result| ... } ⇒ Object
Describe a specific jobflow.
describe_jobflow("j-3UN6WX5RRO2AG")
Raises ArgumentError if the specified jobflow does not exist.
16 17 18 19 20 21 22 23 24 25 |
# File 'lib/elasticity/emr.rb', line 16 def describe_jobflow(jobflow_id) aws_result = @aws_request.submit({ :operation => 'DescribeJobFlows', :job_flow_ids => [jobflow_id] }) xml_doc = Nokogiri::XML(aws_result) xml_doc.remove_namespaces! yield aws_result if block_given? JobFlowStatus.from_members_nodeset(xml_doc.xpath('/DescribeJobFlowsResponse/DescribeJobFlowsResult/JobFlows/member')).first end |
#describe_jobflows(params = {}) {|aws_result| ... } ⇒ Object
Lists all jobflows in all states.
To override this behaviour, pass additional filters as specified in the AWS documentation - docs.amazonwebservices.com/ElasticMapReduce/latest/API/index.html?API_DescribeJobFlows.html.
describe_jobflows(:CreatedBefore => "2011-10-04")
33 34 35 36 37 38 39 40 41 |
# File 'lib/elasticity/emr.rb', line 33 def describe_jobflows(params = {}) aws_result = @aws_request.submit( params.merge({:operation => 'DescribeJobFlows'}) ) xml_doc = Nokogiri::XML(aws_result) xml_doc.remove_namespaces! yield aws_result if block_given? JobFlowStatus.from_members_nodeset(xml_doc.xpath('/DescribeJobFlowsResponse/DescribeJobFlowsResult/JobFlows/member')) end |
#direct(params) ⇒ Object
Pass the specified params hash directly through to the AWS request URL. Use this if you want to perform an operation that hasn’t yet been wrapped by Elasticity or you just want to see the response XML for yourself :)
223 224 225 |
# File 'lib/elasticity/emr.rb', line 223 def direct(params) @aws_request.submit(params) end |
#modify_instance_groups(instance_group_config) {|aws_result| ... } ⇒ Object
Set the number of instances in the specified instance groups to the specified counts. Note that this modifies the request count, which is not the same as the running count. I.e. you request instances and then wait for them to be created.
Takes a {} of instance group IDs => desired instance count.
{"ig-1" => 40, "ig-2" => 5, ...}
114 115 116 117 118 119 120 121 |
# File 'lib/elasticity/emr.rb', line 114 def modify_instance_groups(instance_group_config) params = { :operation => 'ModifyInstanceGroups', :instance_groups => instance_group_config.map { |k, v| {:instance_group_id => k, :instance_count => v} } } aws_result = @aws_request.submit(params) yield aws_result if block_given? end |
#run_job_flow(job_flow_config) {|aws_result| ... } ⇒ Object
Start a job flow with the specified configuration. This is a very thin wrapper around the AWS API, so in order to use it directly you’ll need to have the PDF API reference handy, which can be found here:
awsdocs.s3.amazonaws.com/ElasticMapReduce/20090331/emr-api-20090331.pdf
Here is a sample job flow configuration that should help. This job flow starts by installing Pig then running a Pig script. It is based off of the Pig demo script from Amazon.
emr.run_job_flow({
:name => "Elasticity Test Flow (EMR Pig Script)",
:instances => {
:ec2_key_name => "sharethrough-dev",
:hadoop_version => "0.20",
:instance_count => 2,
:master_instance_type => "m1.small",
:placement => {
:availability_zone => "us-east-1a"
},
:slave_instance_type => "m1.small",
},
:steps => [
{
:action_on_failure => "TERMINATE_JOB_FLOW",
:hadoop_jar_step => {
:args => [
"s3://elasticmapreduce/libs/pig/pig-script",
"--base-path",
"s3://elasticmapreduce/libs/pig/",
"--install-pig"
],
:jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
},
:name => "Setup Pig"
},
{
:action_on_failure => "TERMINATE_JOB_FLOW",
:hadoop_jar_step => {
:args => [
"s3://elasticmapreduce/libs/pig/pig-script",
"--run-pig-script",
"--args",
"-p",
"INPUT=s3n://elasticmapreduce/samples/pig-apache/input",
"-p",
"OUTPUT=s3n://slif-elasticity/pig-apache/output/2011-04-19",
"s3n://elasticmapreduce/samples/pig-apache/do-reports.pig"
],
:jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
},
:name => "Run Pig Script"
}
]
})
178 179 180 181 182 183 184 185 186 187 |
# File 'lib/elasticity/emr.rb', line 178 def run_job_flow(job_flow_config) params = { :operation => 'RunJobFlow', }.merge!(job_flow_config) aws_result = @aws_request.submit(params) yield aws_result if block_given? xml_doc = Nokogiri::XML(aws_result) xml_doc.remove_namespaces! xml_doc.xpath('/RunJobFlowResponse/RunJobFlowResult/JobFlowId').text end |
#set_termination_protection(jobflow_ids, protection_enabled = true) {|aws_result| ... } ⇒ Object
Enabled or disable “termination protection” on the specified job flows. Termination protection prevents a job flow from being terminated by a user initiated action, although the job flow will still terminate naturally.
Takes an [] of job flow IDs.
["j-1B4D1XP0C0A35", "j-1YG2MYL0HVYS5", ...]
197 198 199 200 201 202 203 204 205 |
# File 'lib/elasticity/emr.rb', line 197 def set_termination_protection(jobflow_ids, protection_enabled=true) params = { :operation => 'SetTerminationProtection', :termination_protected => protection_enabled, :job_flow_ids => jobflow_ids } aws_result = @aws_request.submit(params) yield aws_result if block_given? end |
#terminate_jobflows(jobflow_id) {|aws_result| ... } ⇒ Object
Terminate the specified jobflow. Amazon does not define a return value for this operation, so you’ll need to poll #describe_jobflows to see the state of the jobflow. Raises ArgumentError if the specified job flow does not exist.
211 212 213 214 215 216 217 218 |
# File 'lib/elasticity/emr.rb', line 211 def terminate_jobflows(jobflow_id) params = { :operation => 'TerminateJobFlows', :job_flow_ids => [jobflow_id] } aws_result = @aws_request.submit(params) yield aws_result if block_given? end |