Class: Elasticity::JobFlow

Inherits:
Object show all
Defined in:
lib/elasticity/job_flow.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(access = nil, secret = nil) ⇒ JobFlow

Returns a new instance of JobFlow.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/elasticity/job_flow.rb', line 25

def initialize(access=nil, secret=nil)
  @action_on_failure = 'TERMINATE_JOB_FLOW'
  @hadoop_version = '1.0.3'
  @name = 'Elasticity Job Flow'
  @ami_version = 'latest'
  @keep_job_flow_alive_when_no_steps = false
  @placement = 'us-east-1a'

  @access_key = access
  @secret_key = secret

  @bootstrap_actions = []
  @jobflow_steps = []
  @installed_steps = []

  @instance_groups = {}
  set_master_instance_group(Elasticity::InstanceGroup.new)
  set_core_instance_group(Elasticity::InstanceGroup.new)
  @instance_count = 2
  @master_instance_type = 'm1.small'
  @slave_instance_type = 'm1.small'

  @access_key = access
  @secret_key = secret
end

Instance Attribute Details

#access_keyObject (readonly)

Returns the value of attribute access_key.



22
23
24
# File 'lib/elasticity/job_flow.rb', line 22

def access_key
  @access_key
end

#action_on_failureObject

Returns the value of attribute action_on_failure.



9
10
11
# File 'lib/elasticity/job_flow.rb', line 9

def action_on_failure
  @action_on_failure
end

#ami_versionObject

Returns the value of attribute ami_version.



17
18
19
# File 'lib/elasticity/job_flow.rb', line 17

def ami_version
  @ami_version
end

#ec2_key_nameObject

Returns the value of attribute ec2_key_name.



10
11
12
# File 'lib/elasticity/job_flow.rb', line 10

def ec2_key_name
  @ec2_key_name
end

#ec2_subnet_idObject

Returns the value of attribute ec2_subnet_id.



19
20
21
# File 'lib/elasticity/job_flow.rb', line 19

def ec2_subnet_id
  @ec2_subnet_id
end

#hadoop_versionObject

Returns the value of attribute hadoop_version.



12
13
14
# File 'lib/elasticity/job_flow.rb', line 12

def hadoop_version
  @hadoop_version
end

#instance_countObject

Returns the value of attribute instance_count.



13
14
15
# File 'lib/elasticity/job_flow.rb', line 13

def instance_count
  @instance_count
end

#keep_job_flow_alive_when_no_stepsObject

Returns the value of attribute keep_job_flow_alive_when_no_steps.



18
19
20
# File 'lib/elasticity/job_flow.rb', line 18

def keep_job_flow_alive_when_no_steps
  @keep_job_flow_alive_when_no_steps
end

#log_uriObject

Returns the value of attribute log_uri.



14
15
16
# File 'lib/elasticity/job_flow.rb', line 14

def log_uri
  @log_uri
end

#master_instance_typeObject

Returns the value of attribute master_instance_type.



15
16
17
# File 'lib/elasticity/job_flow.rb', line 15

def master_instance_type
  @master_instance_type
end

#nameObject

Returns the value of attribute name.



11
12
13
# File 'lib/elasticity/job_flow.rb', line 11

def name
  @name
end

#placementObject

Returns the value of attribute placement.



20
21
22
# File 'lib/elasticity/job_flow.rb', line 20

def placement
  @placement
end

#secret_keyObject (readonly)

Returns the value of attribute secret_key.



23
24
25
# File 'lib/elasticity/job_flow.rb', line 23

def secret_key
  @secret_key
end

#slave_instance_typeObject

Returns the value of attribute slave_instance_type.



16
17
18
# File 'lib/elasticity/job_flow.rb', line 16

def slave_instance_type
  @slave_instance_type
end

Class Method Details

.from_jobflow_id(access, secret, jobflow_id, region = 'us-east-1') ⇒ Object



51
52
53
54
55
56
57
# File 'lib/elasticity/job_flow.rb', line 51

def self.from_jobflow_id(access, secret, jobflow_id, region = 'us-east-1')
  JobFlow.new(access, secret).tap do |j|
    j.instance_variable_set(:@region, region)
    j.instance_variable_set(:@jobflow_id, jobflow_id)
    j.instance_variable_set(:@installed_steps, j.status.installed_steps)
  end
end

Instance Method Details

#add_bootstrap_action(bootstrap_action) ⇒ Object



75
76
77
78
# File 'lib/elasticity/job_flow.rb', line 75

def add_bootstrap_action(bootstrap_action)
  raise_if is_jobflow_running?, JobFlowRunningError, 'To modify bootstrap actions, please create a new job flow.'
  @bootstrap_actions << bootstrap_action
end

#add_step(jobflow_step) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/elasticity/job_flow.rb', line 95

def add_step(jobflow_step)
  if is_jobflow_running?
    jobflow_steps = []
    if jobflow_step.requires_installation? && !@installed_steps.include?(jobflow_step.class)
      jobflow_steps << jobflow_step.aws_installation_step
    end
    jobflow_steps << jobflow_step.to_aws_step(self)
    emr.add_jobflow_steps(@jobflow_id, {:steps => jobflow_steps})
  else
    @jobflow_steps << jobflow_step
  end
end

#runObject



108
109
110
111
112
# File 'lib/elasticity/job_flow.rb', line 108

def run
  raise_if @jobflow_steps.empty?, JobFlowMissingStepsError, 'Cannot run a job flow without adding steps.  Please use #add_step.'
  raise_if is_jobflow_running?, JobFlowRunningError, 'Cannot run a job flow multiple times.  To do more with this job flow, please use #add_step.'
  @jobflow_id = emr.run_job_flow(jobflow_config)
end

#set_core_instance_group(instance_group) ⇒ Object



85
86
87
88
# File 'lib/elasticity/job_flow.rb', line 85

def set_core_instance_group(instance_group)
  instance_group.role = 'CORE'
  @instance_groups[:core] = instance_group
end

#set_master_instance_group(instance_group) ⇒ Object



80
81
82
83
# File 'lib/elasticity/job_flow.rb', line 80

def set_master_instance_group(instance_group)
  instance_group.role = 'MASTER'
  @instance_groups[:master] = instance_group
end

#set_task_instance_group(instance_group) ⇒ Object



90
91
92
93
# File 'lib/elasticity/job_flow.rb', line 90

def set_task_instance_group(instance_group)
  instance_group.role = 'TASK'
  @instance_groups[:task] = instance_group
end

#shutdownObject



114
115
116
117
# File 'lib/elasticity/job_flow.rb', line 114

def shutdown
  raise_unless is_jobflow_running?, JobFlowNotStartedError, 'Cannot #shutdown a job flow that has not yet been #run.'
  emr.terminate_jobflows(@jobflow_id)
end

#statusObject



119
120
121
122
# File 'lib/elasticity/job_flow.rb', line 119

def status
  raise_unless is_jobflow_running?, JobFlowNotStartedError, 'Please #run this job flow before attempting to retrieve status.'
  emr.describe_jobflow(@jobflow_id)
end