Class: Elasticity::EMR

Inherits:
Object
  • Object
show all
Defined in:
lib/elasticity/emr.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(aws_access_key_id = nil, aws_secret_access_key = nil, options = {}) ⇒ EMR

Returns a new instance of EMR.



7
8
9
# File 'lib/elasticity/emr.rb', line 7

def initialize(aws_access_key_id=nil, aws_secret_access_key=nil, options = {})
  @aws_request = Elasticity::AwsRequest.new(aws_access_key_id, aws_secret_access_key, options)
end

Instance Attribute Details

#aws_requestObject (readonly)

Returns the value of attribute aws_request.



5
6
7
# File 'lib/elasticity/emr.rb', line 5

def aws_request
  @aws_request
end

Instance Method Details

#==(other) ⇒ Object



236
237
238
239
240
# File 'lib/elasticity/emr.rb', line 236

def ==(other)
  return false unless other.is_a? EMR
  return false unless @aws_request == other.aws_request
  true
end

#add_instance_groups(jobflow_id, instance_group_configs) {|aws_result| ... } ⇒ Object

Adds a new group of instances to the specified jobflow. Elasticity maps a more Ruby-like syntax to the Amazon options. An exhaustive hash follows although not all of these options are required (or valid!) at once. Please see the EMR docs for details although even then you’re going to need to experiment :)

instance_group_config = {
  :bid_price => 5,
  :instance_count => 1,
  :instance_role => "TASK",
  :market => "SPOT",
  :name => "Go Canucks Go!"
  :type => "m1.small",
}

add_instance_groups takes an array of {}. Returns an array of the instance IDs that were created by the specified configs.

["ig-2GOVEN6HVJZID", "ig-1DU9M2UQMM051", "ig-3DZRW4Y2X4S", ...]

Yields:

  • (aws_result)


70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/elasticity/emr.rb', line 70

def add_instance_groups(jobflow_id, instance_group_configs)
  params = {
    :operation => 'AddInstanceGroups',
    :job_flow_id => jobflow_id,
    :instance_groups => instance_group_configs
  }
  aws_result = @aws_request.submit(params)
  xml_doc = Nokogiri::XML(aws_result)
  xml_doc.remove_namespaces!
  instance_group_ids = []
  xml_doc.xpath('/AddInstanceGroupsResponse/AddInstanceGroupsResult/InstanceGroupIds/member').each do |member|
    instance_group_ids << member.text
  end
  yield aws_result if block_given?
  instance_group_ids
end

#add_jobflow_steps(jobflow_id, steps_config) {|aws_result| ... } ⇒ Object

Add a step (or steps) to the specified job flow.

emr.add_jobflow_step("j-123", {
  :steps => [
    {
      :action_on_failure => "TERMINATE_JOB_FLOW",
      :hadoop_jar_step => {
        :args => [
          "s3://elasticmapreduce/libs/pig/pig-script",
            "--base-path",
            "s3://elasticmapreduce/libs/pig/",
            "--install-pig"
        ],
        :jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
      },
      :name => "Setup Pig"
    }
  ]
})

Yields:

  • (aws_result)


106
107
108
109
110
111
112
113
# File 'lib/elasticity/emr.rb', line 106

def add_jobflow_steps(jobflow_id, steps_config)
  params = {
    :operation => 'AddJobFlowSteps',
    :job_flow_id => jobflow_id
  }.merge!(steps_config)
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
end

#describe_jobflow(jobflow_id) {|aws_result| ... } ⇒ Object

Describe a specific jobflow.

describe_jobflow("j-3UN6WX5RRO2AG")

Raises ArgumentError if the specified jobflow does not exist.

Yields:

  • (aws_result)


16
17
18
19
20
21
22
23
24
25
# File 'lib/elasticity/emr.rb', line 16

def describe_jobflow(jobflow_id)
  aws_result = @aws_request.submit({
    :operation => 'DescribeJobFlows',
    :job_flow_ids => [jobflow_id]
  })
  xml_doc = Nokogiri::XML(aws_result)
  xml_doc.remove_namespaces!
  yield aws_result if block_given?
  JobFlowStatus.from_members_nodeset(xml_doc.xpath('/DescribeJobFlowsResponse/DescribeJobFlowsResult/JobFlows/member')).first
end

#describe_jobflow_xml(jobflow_id) ⇒ Object

This is primarily for debugging purposes, providing insight into how Amazon internally represents jobs. It’s used to reverse-engineer how API calls construct jobflows.



30
31
32
33
34
# File 'lib/elasticity/emr.rb', line 30

def describe_jobflow_xml(jobflow_id)
  describe_jobflow(jobflow_id) do |xml|
    return xml
  end
end

#describe_jobflows(params = {}) {|aws_result| ... } ⇒ Object

Lists all jobflows in all states.

To override this behaviour, pass additional filters as specified in the AWS documentation - docs.amazonwebservices.com/ElasticMapReduce/latest/API/index.html?API_DescribeJobFlows.html.

describe_jobflows(:CreatedBefore => "2011-10-04")

Yields:

  • (aws_result)


42
43
44
45
46
47
48
49
50
# File 'lib/elasticity/emr.rb', line 42

def describe_jobflows(params = {})
  aws_result = @aws_request.submit(
    params.merge({:operation => 'DescribeJobFlows'})
  )
  xml_doc = Nokogiri::XML(aws_result)
  xml_doc.remove_namespaces!
  yield aws_result if block_given?
  JobFlowStatus.from_members_nodeset(xml_doc.xpath('/DescribeJobFlowsResponse/DescribeJobFlowsResult/JobFlows/member'))
end

#direct(params) ⇒ Object

Pass the specified params hash directly through to the AWS request URL. Use this if you want to perform an operation that hasn’t yet been wrapped by Elasticity or you just want to see the response XML for yourself :)



232
233
234
# File 'lib/elasticity/emr.rb', line 232

def direct(params)
  @aws_request.submit(params)
end

#modify_instance_groups(instance_group_config) {|aws_result| ... } ⇒ Object

Set the number of instances in the specified instance groups to the specified counts. Note that this modifies the request count, which is not the same as the running count. I.e. you request instances and then wait for them to be created.

Takes a {} of instance group IDs => desired instance count.

{"ig-1" => 40, "ig-2" => 5, ...}

Yields:

  • (aws_result)


123
124
125
126
127
128
129
130
# File 'lib/elasticity/emr.rb', line 123

def modify_instance_groups(instance_group_config)
  params = {
    :operation => 'ModifyInstanceGroups',
    :instance_groups => instance_group_config.map { |k, v| {:instance_group_id => k, :instance_count => v} }
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
end

#run_job_flow(job_flow_config) {|aws_result| ... } ⇒ Object

Start a job flow with the specified configuration. This is a very thin wrapper around the AWS API, so in order to use it directly you’ll need to have the PDF API reference handy, which can be found here:

awsdocs.s3.amazonaws.com/ElasticMapReduce/20090331/emr-api-20090331.pdf

Here is a sample job flow configuration that should help. This job flow starts by installing Pig then running a Pig script. It is based off of the Pig demo script from Amazon.

emr.run_job_flow({
  :name => "Elasticity Test Flow (EMR Pig Script)",
  :instances => {
    :ec2_key_name => "sharethrough-dev",
    :hadoop_version => "0.20",
    :instance_count => 2,
    :master_instance_type => "m1.small",
    :placement => {
      :availability_zone => "us-east-1a"
    },
    :slave_instance_type => "m1.small",
  },
  :steps => [
    {
      :action_on_failure => "TERMINATE_JOB_FLOW",
      :hadoop_jar_step => {
        :args => [
          "s3://elasticmapreduce/libs/pig/pig-script",
            "--base-path",
            "s3://elasticmapreduce/libs/pig/",
            "--install-pig"
        ],
        :jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
      },
      :name => "Setup Pig"
    },
      {
        :action_on_failure => "TERMINATE_JOB_FLOW",
        :hadoop_jar_step => {
          :args => [
            "s3://elasticmapreduce/libs/pig/pig-script",
              "--run-pig-script",
              "--args",
              "-p",
              "INPUT=s3n://elasticmapreduce/samples/pig-apache/input",
              "-p",
              "OUTPUT=s3n://slif-elasticity/pig-apache/output/2011-04-19",
              "s3n://elasticmapreduce/samples/pig-apache/do-reports.pig"
          ],
          :jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
        },
        :name => "Run Pig Script"
      }
  ]
})

Yields:

  • (aws_result)


187
188
189
190
191
192
193
194
195
196
# File 'lib/elasticity/emr.rb', line 187

def run_job_flow(job_flow_config)
  params = {
    :operation => 'RunJobFlow',
  }.merge!(job_flow_config)
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
  xml_doc = Nokogiri::XML(aws_result)
  xml_doc.remove_namespaces!
  xml_doc.xpath('/RunJobFlowResponse/RunJobFlowResult/JobFlowId').text
end

#set_termination_protection(jobflow_ids, protection_enabled = true) {|aws_result| ... } ⇒ Object

Enabled or disable “termination protection” on the specified job flows. Termination protection prevents a job flow from being terminated by a user initiated action, although the job flow will still terminate naturally.

Takes an [] of job flow IDs.

["j-1B4D1XP0C0A35", "j-1YG2MYL0HVYS5", ...]

Yields:

  • (aws_result)


206
207
208
209
210
211
212
213
214
# File 'lib/elasticity/emr.rb', line 206

def set_termination_protection(jobflow_ids, protection_enabled=true)
  params = {
    :operation => 'SetTerminationProtection',
    :termination_protected => protection_enabled,
    :job_flow_ids => jobflow_ids
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
end

#terminate_jobflows(jobflow_id) {|aws_result| ... } ⇒ Object

Terminate the specified jobflow. Amazon does not define a return value for this operation, so you’ll need to poll #describe_jobflows to see the state of the jobflow. Raises ArgumentError if the specified job flow does not exist.

Yields:

  • (aws_result)


220
221
222
223
224
225
226
227
# File 'lib/elasticity/emr.rb', line 220

def terminate_jobflows(jobflow_id)
  params = {
    :operation => 'TerminateJobFlows',
    :job_flow_ids => [jobflow_id]
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
end