Class: Elasticity::JobFlow

Inherits:
Object
  • Object
show all
Defined in:
lib/elasticity/job_flow.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeJobFlow

Returns a new instance of JobFlow.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/elasticity/job_flow.rb', line 38

def initialize
  @action_on_failure = 'TERMINATE_JOB_FLOW'
  @name = 'Elasticity Job Flow'
  @keep_job_flow_alive_when_no_steps = false
  self.placement = 'us-east-1a'
  @enable_debugging = false

  @visible_to_all_users = false

  @bootstrap_actions = []
  @aws_applications = []
  @aws_configurations = []
  @jobflow_steps = []
  @installed_steps = []

  @instance_groups = {}
  set_master_instance_group(Elasticity::InstanceGroup.new)
  set_core_instance_group(Elasticity::InstanceGroup.new)
  @instance_count = 2
  @master_instance_type = 'm1.small'
  @slave_instance_type = 'm1.small'

  @timeout = 60
end

Instance Attribute Details

#action_on_failureObject

Returns the value of attribute action_on_failure.



11
12
13
# File 'lib/elasticity/job_flow.rb', line 11

def action_on_failure
  @action_on_failure
end

#additional_infoObject

Returns the value of attribute additional_info.



33
34
35
# File 'lib/elasticity/job_flow.rb', line 33

def additional_info
  @additional_info
end

#additional_master_security_groupsObject

Returns the value of attribute additional_master_security_groups.



34
35
36
# File 'lib/elasticity/job_flow.rb', line 34

def additional_master_security_groups
  @additional_master_security_groups
end

#additional_slave_security_groupsObject

Returns the value of attribute additional_slave_security_groups.



35
36
37
# File 'lib/elasticity/job_flow.rb', line 35

def additional_slave_security_groups
  @additional_slave_security_groups
end

#ami_versionObject

Returns the value of attribute ami_version.



20
21
22
# File 'lib/elasticity/job_flow.rb', line 20

def ami_version
  @ami_version
end

#aws_applicationsObject

Returns the value of attribute aws_applications.



31
32
33
# File 'lib/elasticity/job_flow.rb', line 31

def aws_applications
  @aws_applications
end

#aws_configurationsObject

Returns the value of attribute aws_configurations.



32
33
34
# File 'lib/elasticity/job_flow.rb', line 32

def aws_configurations
  @aws_configurations
end

#ec2_key_nameObject

Returns the value of attribute ec2_key_name.



12
13
14
# File 'lib/elasticity/job_flow.rb', line 12

def ec2_key_name
  @ec2_key_name
end

#ec2_subnet_idObject

Returns the value of attribute ec2_subnet_id.



23
24
25
# File 'lib/elasticity/job_flow.rb', line 23

def ec2_subnet_id
  @ec2_subnet_id
end

#enable_debuggingObject

Returns the value of attribute enable_debugging.



27
28
29
# File 'lib/elasticity/job_flow.rb', line 27

def enable_debugging
  @enable_debugging
end

#instance_countObject

Returns the value of attribute instance_count.



15
16
17
# File 'lib/elasticity/job_flow.rb', line 15

def instance_count
  @instance_count
end

#job_flow_roleObject

Returns the value of attribute job_flow_role.



28
29
30
# File 'lib/elasticity/job_flow.rb', line 28

def job_flow_role
  @job_flow_role
end

#jobflow_idObject

Returns the value of attribute jobflow_id.



30
31
32
# File 'lib/elasticity/job_flow.rb', line 30

def jobflow_id
  @jobflow_id
end

#keep_job_flow_alive_when_no_stepsObject

Returns the value of attribute keep_job_flow_alive_when_no_steps.



22
23
24
# File 'lib/elasticity/job_flow.rb', line 22

def keep_job_flow_alive_when_no_steps
  @keep_job_flow_alive_when_no_steps
end

#log_uriObject

Returns the value of attribute log_uri.



16
17
18
# File 'lib/elasticity/job_flow.rb', line 16

def log_uri
  @log_uri
end

#master_instance_typeObject

Returns the value of attribute master_instance_type.



18
19
20
# File 'lib/elasticity/job_flow.rb', line 18

def master_instance_type
  @master_instance_type
end

#nameObject

Returns the value of attribute name.



14
15
16
# File 'lib/elasticity/job_flow.rb', line 14

def name
  @name
end

#placementObject

Returns the value of attribute placement.



24
25
26
# File 'lib/elasticity/job_flow.rb', line 24

def placement
  @placement
end

#regionObject

Returns the value of attribute region.



25
26
27
# File 'lib/elasticity/job_flow.rb', line 25

def region
  @region
end

#release_labelObject

Returns the value of attribute release_label.



21
22
23
# File 'lib/elasticity/job_flow.rb', line 21

def release_label
  @release_label
end

#security_configurationObject

Returns the value of attribute security_configuration.



13
14
15
# File 'lib/elasticity/job_flow.rb', line 13

def security_configuration
  @security_configuration
end

#service_roleObject

Returns the value of attribute service_role.



29
30
31
# File 'lib/elasticity/job_flow.rb', line 29

def service_role
  @service_role
end

#slave_instance_typeObject

Returns the value of attribute slave_instance_type.



19
20
21
# File 'lib/elasticity/job_flow.rb', line 19

def slave_instance_type
  @slave_instance_type
end

#tagsObject

Returns the value of attribute tags.



17
18
19
# File 'lib/elasticity/job_flow.rb', line 17

def tags
  @tags
end

#timeoutObject

Returns the value of attribute timeout.



36
37
38
# File 'lib/elasticity/job_flow.rb', line 36

def timeout
  @timeout
end

#visible_to_all_usersObject

Returns the value of attribute visible_to_all_users.



26
27
28
# File 'lib/elasticity/job_flow.rb', line 26

def visible_to_all_users
  @visible_to_all_users
end

Class Method Details

.from_jobflow_id(jobflow_id, region = 'us-east-1') ⇒ Object



63
64
65
66
67
68
69
# File 'lib/elasticity/job_flow.rb', line 63

def self.from_jobflow_id(jobflow_id, region = 'us-east-1')
  JobFlow.new.tap do |j|
    j.instance_variable_set(:@region, region)
    j.instance_variable_set(:@jobflow_id, jobflow_id)
    j.instance_variable_set(:@installed_steps, ClusterStepStatus.installed_steps(j.cluster_step_status))
  end
end

Instance Method Details

#add_application(application) ⇒ Object



120
121
122
123
124
125
# File 'lib/elasticity/job_flow.rb', line 120

def add_application(application)
  raise JobFlowRunningError, 'To add applications, please create a new job flow.' if is_jobflow_running?
  application = Application.new(name: application) if application.is_a?(String)
  fail "application is not an Elasticity::Application" unless application.is_a?(Application)
  @aws_applications << application
end

#add_bootstrap_action(bootstrap_action) ⇒ Object



113
114
115
116
117
118
# File 'lib/elasticity/job_flow.rb', line 113

def add_bootstrap_action(bootstrap_action)
  if is_jobflow_running?
    raise JobFlowRunningError, 'To modify bootstrap actions, please create a new job flow.'
  end
  @bootstrap_actions << bootstrap_action
end

#add_configuration(configuration) ⇒ Object



127
128
129
130
# File 'lib/elasticity/job_flow.rb', line 127

def add_configuration(configuration)
  raise JobFlowRunningError, 'To add configurations, please create a new job flow.' if is_jobflow_running?
  @aws_configurations << configuration
end

#add_step(jobflow_step) ⇒ Object



159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/elasticity/job_flow.rb', line 159

def add_step(jobflow_step)
  if is_jobflow_running?
    jobflow_steps = []
    if jobflow_step.requires_installation? && !@installed_steps.include?(jobflow_step.class)
      jobflow_steps.concat(jobflow_step.aws_installation_steps)
    end
    jobflow_steps << jobflow_step.to_aws_step(self)
    emr.add_jobflow_steps(@jobflow_id, jobflow_steps)
  else
    @jobflow_steps << jobflow_step
  end
end

#cluster_statusObject



186
187
188
189
190
191
# File 'lib/elasticity/job_flow.rb', line 186

def cluster_status
  if !is_jobflow_running?
    raise JobFlowNotStartedError, 'Please #run this job flow before attempting to retrieve status.'
  end
  ClusterStatus.from_aws_data(emr.describe_cluster(@jobflow_id))
end

#cluster_step_statusObject



193
194
195
196
197
198
# File 'lib/elasticity/job_flow.rb', line 193

def cluster_step_status
  if !is_jobflow_running?
    raise JobFlowNotStartedError, 'Please #run this job flow before attempting to retrieve status.'
  end
  ClusterStepStatus.from_aws_list_data(emr.list_steps(@jobflow_id))
end

#runObject



172
173
174
175
176
177
# File 'lib/elasticity/job_flow.rb', line 172

def run
  if is_jobflow_running?
    raise JobFlowRunningError, 'Cannot run a job flow multiple times.  To do more with this job flow, please use #add_step.'
  end
  @jobflow_id = emr.run_job_flow(jobflow_config)
end

#set_core_ebs_configuration(ebs_configuration) ⇒ Object



151
152
153
# File 'lib/elasticity/job_flow.rb', line 151

def set_core_ebs_configuration(ebs_configuration)
  @instance_groups[:core].set_ebs_configuration(ebs_configuration)
end

#set_core_instance_group(instance_group) ⇒ Object



137
138
139
140
# File 'lib/elasticity/job_flow.rb', line 137

def set_core_instance_group(instance_group)
  instance_group.role = 'CORE'
  @instance_groups[:core] = instance_group
end

#set_master_ebs_configuration(ebs_configuration) ⇒ Object



147
148
149
# File 'lib/elasticity/job_flow.rb', line 147

def set_master_ebs_configuration(ebs_configuration)
  @instance_groups[:master].set_ebs_configuration(ebs_configuration)
end

#set_master_instance_group(instance_group) ⇒ Object



132
133
134
135
# File 'lib/elasticity/job_flow.rb', line 132

def set_master_instance_group(instance_group)
  instance_group.role = 'MASTER'
  @instance_groups[:master] = instance_group
end

#set_task_ebs_configuration(ebs_configuration) ⇒ Object



155
156
157
# File 'lib/elasticity/job_flow.rb', line 155

def set_task_ebs_configuration(ebs_configuration)
  @instance_groups[:task].set_ebs_configuration(ebs_configuration)
end

#set_task_instance_group(instance_group) ⇒ Object



142
143
144
145
# File 'lib/elasticity/job_flow.rb', line 142

def set_task_instance_group(instance_group)
  instance_group.role = 'TASK'
  @instance_groups[:task] = instance_group
end

#shutdownObject



179
180
181
182
183
184
# File 'lib/elasticity/job_flow.rb', line 179

def shutdown
  if !is_jobflow_running?
    raise JobFlowNotStartedError, 'Cannot #shutdown a job flow that has not yet been #run.'
  end
  emr.terminate_jobflows([@jobflow_id])
end

#wait_for_completion(&on_wait) ⇒ Object



200
201
202
203
# File 'lib/elasticity/job_flow.rb', line 200

def wait_for_completion(&on_wait)
  l = Elasticity::Looper.new(method(:retry_check), on_wait)
  l.go
end