Class: Elasticity::JobFlow

Inherits:
Object
  • Object
show all
Defined in:
lib/elasticity/job_flow.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(access = nil, secret = nil) ⇒ JobFlow

Returns a new instance of JobFlow.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/elasticity/job_flow.rb', line 28

def initialize(access=nil, secret=nil)
  @action_on_failure = 'TERMINATE_JOB_FLOW'
  @hadoop_version = '1.0.3'
  @name = 'Elasticity Job Flow'
  @ami_version = 'latest'
  @keep_job_flow_alive_when_no_steps = false
  @placement = 'us-east-1a'
  @enable_debugging = false

  @access_key = access
  @secret_key = secret
  @visible_to_all_users = false

  @bootstrap_actions = []
  @jobflow_steps = []
  @installed_steps = []

  @instance_groups = {}
  set_master_instance_group(Elasticity::InstanceGroup.new)
  set_core_instance_group(Elasticity::InstanceGroup.new)
  @instance_count = 2
  @master_instance_type = 'm1.small'
  @slave_instance_type = 'm1.small'

  @access_key = access
  @secret_key = secret
end

Instance Attribute Details

#access_keyObject (readonly)

Returns the value of attribute access_key.



25
26
27
# File 'lib/elasticity/job_flow.rb', line 25

def access_key
  @access_key
end

#action_on_failureObject

Returns the value of attribute action_on_failure.



10
11
12
# File 'lib/elasticity/job_flow.rb', line 10

def action_on_failure
  @action_on_failure
end

#ami_versionObject

Returns the value of attribute ami_version.



18
19
20
# File 'lib/elasticity/job_flow.rb', line 18

def ami_version
  @ami_version
end

#ec2_key_nameObject

Returns the value of attribute ec2_key_name.



11
12
13
# File 'lib/elasticity/job_flow.rb', line 11

def ec2_key_name
  @ec2_key_name
end

#ec2_subnet_idObject

Returns the value of attribute ec2_subnet_id.



20
21
22
# File 'lib/elasticity/job_flow.rb', line 20

def ec2_subnet_id
  @ec2_subnet_id
end

#enable_debuggingObject

Returns the value of attribute enable_debugging.



23
24
25
# File 'lib/elasticity/job_flow.rb', line 23

def enable_debugging
  @enable_debugging
end

#hadoop_versionObject

Returns the value of attribute hadoop_version.



13
14
15
# File 'lib/elasticity/job_flow.rb', line 13

def hadoop_version
  @hadoop_version
end

#instance_countObject

Returns the value of attribute instance_count.



14
15
16
# File 'lib/elasticity/job_flow.rb', line 14

def instance_count
  @instance_count
end

#keep_job_flow_alive_when_no_stepsObject

Returns the value of attribute keep_job_flow_alive_when_no_steps.



19
20
21
# File 'lib/elasticity/job_flow.rb', line 19

def keep_job_flow_alive_when_no_steps
  @keep_job_flow_alive_when_no_steps
end

#log_uriObject

Returns the value of attribute log_uri.



15
16
17
# File 'lib/elasticity/job_flow.rb', line 15

def log_uri
  @log_uri
end

#master_instance_typeObject

Returns the value of attribute master_instance_type.



16
17
18
# File 'lib/elasticity/job_flow.rb', line 16

def master_instance_type
  @master_instance_type
end

#nameObject

Returns the value of attribute name.



12
13
14
# File 'lib/elasticity/job_flow.rb', line 12

def name
  @name
end

#placementObject

Returns the value of attribute placement.



21
22
23
# File 'lib/elasticity/job_flow.rb', line 21

def placement
  @placement
end

#secret_keyObject (readonly)

Returns the value of attribute secret_key.



26
27
28
# File 'lib/elasticity/job_flow.rb', line 26

def secret_key
  @secret_key
end

#slave_instance_typeObject

Returns the value of attribute slave_instance_type.



17
18
19
# File 'lib/elasticity/job_flow.rb', line 17

def slave_instance_type
  @slave_instance_type
end

#visible_to_all_usersObject

Returns the value of attribute visible_to_all_users.



22
23
24
# File 'lib/elasticity/job_flow.rb', line 22

def visible_to_all_users
  @visible_to_all_users
end

Class Method Details

.from_jobflow_id(access, secret, jobflow_id, region = 'us-east-1') ⇒ Object



56
57
58
59
60
61
62
# File 'lib/elasticity/job_flow.rb', line 56

def self.from_jobflow_id(access, secret, jobflow_id, region = 'us-east-1')
  JobFlow.new(access, secret).tap do |j|
    j.instance_variable_set(:@region, region)
    j.instance_variable_set(:@jobflow_id, jobflow_id)
    j.instance_variable_set(:@installed_steps, j.status.installed_steps)
  end
end

Instance Method Details

#add_bootstrap_action(bootstrap_action) ⇒ Object



92
93
94
95
96
97
# File 'lib/elasticity/job_flow.rb', line 92

def add_bootstrap_action(bootstrap_action)
  if is_jobflow_running?
    raise JobFlowRunningError, 'To modify bootstrap actions, please create a new job flow.'
  end
  @bootstrap_actions << bootstrap_action
end

#add_step(jobflow_step) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/elasticity/job_flow.rb', line 114

def add_step(jobflow_step)
  if is_jobflow_running?
    jobflow_steps = []
    if jobflow_step.requires_installation? && !@installed_steps.include?(jobflow_step.class)
      jobflow_steps.concat(jobflow_step.aws_installation_steps)
    end
    jobflow_steps << jobflow_step.to_aws_step(self)
    emr.add_jobflow_steps(@jobflow_id, {:steps => jobflow_steps})
  else
    @jobflow_steps << jobflow_step
  end
end

#runObject



127
128
129
130
131
132
# File 'lib/elasticity/job_flow.rb', line 127

def run
  if is_jobflow_running?
    raise JobFlowRunningError, 'Cannot run a job flow multiple times.  To do more with this job flow, please use #add_step.'
  end
  @jobflow_id = emr.run_job_flow(jobflow_config)
end

#set_core_instance_group(instance_group) ⇒ Object



104
105
106
107
# File 'lib/elasticity/job_flow.rb', line 104

def set_core_instance_group(instance_group)
  instance_group.role = 'CORE'
  @instance_groups[:core] = instance_group
end

#set_master_instance_group(instance_group) ⇒ Object



99
100
101
102
# File 'lib/elasticity/job_flow.rb', line 99

def set_master_instance_group(instance_group)
  instance_group.role = 'MASTER'
  @instance_groups[:master] = instance_group
end

#set_task_instance_group(instance_group) ⇒ Object



109
110
111
112
# File 'lib/elasticity/job_flow.rb', line 109

def set_task_instance_group(instance_group)
  instance_group.role = 'TASK'
  @instance_groups[:task] = instance_group
end

#shutdownObject



134
135
136
137
138
139
# File 'lib/elasticity/job_flow.rb', line 134

def shutdown
  if !is_jobflow_running?
    raise JobFlowNotStartedError, 'Cannot #shutdown a job flow that has not yet been #run.'
  end
  emr.terminate_jobflows(@jobflow_id)
end

#statusObject



141
142
143
144
145
146
# File 'lib/elasticity/job_flow.rb', line 141

def status
  if !is_jobflow_running?
    raise JobFlowNotStartedError, 'Please #run this job flow before attempting to retrieve status.'
  end
  emr.describe_jobflow(@jobflow_id)
end

#wait_for_completion(&on_wait) ⇒ Object



148
149
150
151
# File 'lib/elasticity/job_flow.rb', line 148

def wait_for_completion(&on_wait)
  l = Elasticity::Looper.new(method(:retry_check), on_wait)
  l.go
end