Class: Elasticity::JobFlow
- Inherits:
-
Object
- Object
- Elasticity::JobFlow
- Defined in:
- lib/elasticity/job_flow.rb
Instance Attribute Summary collapse
-
#access_key ⇒ Object
readonly
Returns the value of attribute access_key.
-
#action_on_failure ⇒ Object
Returns the value of attribute action_on_failure.
-
#ami_version ⇒ Object
Returns the value of attribute ami_version.
-
#ec2_key_name ⇒ Object
Returns the value of attribute ec2_key_name.
-
#ec2_subnet_id ⇒ Object
Returns the value of attribute ec2_subnet_id.
-
#enable_debugging ⇒ Object
Returns the value of attribute enable_debugging.
-
#hadoop_version ⇒ Object
Returns the value of attribute hadoop_version.
-
#instance_count ⇒ Object
Returns the value of attribute instance_count.
-
#keep_job_flow_alive_when_no_steps ⇒ Object
Returns the value of attribute keep_job_flow_alive_when_no_steps.
-
#log_uri ⇒ Object
Returns the value of attribute log_uri.
-
#master_instance_type ⇒ Object
Returns the value of attribute master_instance_type.
-
#name ⇒ Object
Returns the value of attribute name.
-
#placement ⇒ Object
Returns the value of attribute placement.
-
#secret_key ⇒ Object
readonly
Returns the value of attribute secret_key.
-
#slave_instance_type ⇒ Object
Returns the value of attribute slave_instance_type.
-
#visible_to_all_users ⇒ Object
Returns the value of attribute visible_to_all_users.
Class Method Summary collapse
Instance Method Summary collapse
- #add_bootstrap_action(bootstrap_action) ⇒ Object
- #add_step(jobflow_step) ⇒ Object
-
#initialize(access = nil, secret = nil) ⇒ JobFlow
constructor
A new instance of JobFlow.
- #run ⇒ Object
- #set_core_instance_group(instance_group) ⇒ Object
- #set_master_instance_group(instance_group) ⇒ Object
- #set_task_instance_group(instance_group) ⇒ Object
- #shutdown ⇒ Object
- #status ⇒ Object
- #wait_for_completion(&on_wait) ⇒ Object
Constructor Details
#initialize(access = nil, secret = nil) ⇒ JobFlow
Returns a new instance of JobFlow.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/elasticity/job_flow.rb', line 28 def initialize(access=nil, secret=nil) @action_on_failure = 'TERMINATE_JOB_FLOW' @hadoop_version = '1.0.3' @name = 'Elasticity Job Flow' @ami_version = 'latest' @keep_job_flow_alive_when_no_steps = false @placement = 'us-east-1a' @enable_debugging = false @access_key = access @secret_key = secret @visible_to_all_users = false @bootstrap_actions = [] @jobflow_steps = [] @installed_steps = [] @instance_groups = {} set_master_instance_group(Elasticity::InstanceGroup.new) set_core_instance_group(Elasticity::InstanceGroup.new) @instance_count = 2 @master_instance_type = 'm1.small' @slave_instance_type = 'm1.small' @access_key = access @secret_key = secret end |
Instance Attribute Details
#access_key ⇒ Object (readonly)
Returns the value of attribute access_key.
25 26 27 |
# File 'lib/elasticity/job_flow.rb', line 25 def access_key @access_key end |
#action_on_failure ⇒ Object
Returns the value of attribute action_on_failure.
10 11 12 |
# File 'lib/elasticity/job_flow.rb', line 10 def action_on_failure @action_on_failure end |
#ami_version ⇒ Object
Returns the value of attribute ami_version.
18 19 20 |
# File 'lib/elasticity/job_flow.rb', line 18 def ami_version @ami_version end |
#ec2_key_name ⇒ Object
Returns the value of attribute ec2_key_name.
11 12 13 |
# File 'lib/elasticity/job_flow.rb', line 11 def ec2_key_name @ec2_key_name end |
#ec2_subnet_id ⇒ Object
Returns the value of attribute ec2_subnet_id.
20 21 22 |
# File 'lib/elasticity/job_flow.rb', line 20 def ec2_subnet_id @ec2_subnet_id end |
#enable_debugging ⇒ Object
Returns the value of attribute enable_debugging.
23 24 25 |
# File 'lib/elasticity/job_flow.rb', line 23 def enable_debugging @enable_debugging end |
#hadoop_version ⇒ Object
Returns the value of attribute hadoop_version.
13 14 15 |
# File 'lib/elasticity/job_flow.rb', line 13 def hadoop_version @hadoop_version end |
#instance_count ⇒ Object
Returns the value of attribute instance_count.
14 15 16 |
# File 'lib/elasticity/job_flow.rb', line 14 def instance_count @instance_count end |
#keep_job_flow_alive_when_no_steps ⇒ Object
Returns the value of attribute keep_job_flow_alive_when_no_steps.
19 20 21 |
# File 'lib/elasticity/job_flow.rb', line 19 def keep_job_flow_alive_when_no_steps @keep_job_flow_alive_when_no_steps end |
#log_uri ⇒ Object
Returns the value of attribute log_uri.
15 16 17 |
# File 'lib/elasticity/job_flow.rb', line 15 def log_uri @log_uri end |
#master_instance_type ⇒ Object
Returns the value of attribute master_instance_type.
16 17 18 |
# File 'lib/elasticity/job_flow.rb', line 16 def master_instance_type @master_instance_type end |
#name ⇒ Object
Returns the value of attribute name.
12 13 14 |
# File 'lib/elasticity/job_flow.rb', line 12 def name @name end |
#placement ⇒ Object
Returns the value of attribute placement.
21 22 23 |
# File 'lib/elasticity/job_flow.rb', line 21 def placement @placement end |
#secret_key ⇒ Object (readonly)
Returns the value of attribute secret_key.
26 27 28 |
# File 'lib/elasticity/job_flow.rb', line 26 def secret_key @secret_key end |
#slave_instance_type ⇒ Object
Returns the value of attribute slave_instance_type.
17 18 19 |
# File 'lib/elasticity/job_flow.rb', line 17 def slave_instance_type @slave_instance_type end |
#visible_to_all_users ⇒ Object
Returns the value of attribute visible_to_all_users.
22 23 24 |
# File 'lib/elasticity/job_flow.rb', line 22 def visible_to_all_users @visible_to_all_users end |
Class Method Details
.from_jobflow_id(access, secret, jobflow_id, region = 'us-east-1') ⇒ Object
56 57 58 59 60 61 62 |
# File 'lib/elasticity/job_flow.rb', line 56 def self.from_jobflow_id(access, secret, jobflow_id, region = 'us-east-1') JobFlow.new(access, secret).tap do |j| j.instance_variable_set(:@region, region) j.instance_variable_set(:@jobflow_id, jobflow_id) j.instance_variable_set(:@installed_steps, j.status.installed_steps) end end |
Instance Method Details
#add_bootstrap_action(bootstrap_action) ⇒ Object
87 88 89 90 91 92 |
# File 'lib/elasticity/job_flow.rb', line 87 def add_bootstrap_action(bootstrap_action) if is_jobflow_running? raise JobFlowRunningError, 'To modify bootstrap actions, please create a new job flow.' end @bootstrap_actions << bootstrap_action end |
#add_step(jobflow_step) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/elasticity/job_flow.rb', line 109 def add_step(jobflow_step) if is_jobflow_running? jobflow_steps = [] if jobflow_step.requires_installation? && !@installed_steps.include?(jobflow_step.class) jobflow_steps.concat(jobflow_step.aws_installation_steps) end jobflow_steps << jobflow_step.to_aws_step(self) emr.add_jobflow_steps(@jobflow_id, {:steps => jobflow_steps}) else @jobflow_steps << jobflow_step end end |
#run ⇒ Object
122 123 124 125 126 127 |
# File 'lib/elasticity/job_flow.rb', line 122 def run if is_jobflow_running? raise JobFlowRunningError, 'Cannot run a job flow multiple times. To do more with this job flow, please use #add_step.' end @jobflow_id = emr.run_job_flow(jobflow_config) end |
#set_core_instance_group(instance_group) ⇒ Object
99 100 101 102 |
# File 'lib/elasticity/job_flow.rb', line 99 def set_core_instance_group(instance_group) instance_group.role = 'CORE' @instance_groups[:core] = instance_group end |
#set_master_instance_group(instance_group) ⇒ Object
94 95 96 97 |
# File 'lib/elasticity/job_flow.rb', line 94 def set_master_instance_group(instance_group) instance_group.role = 'MASTER' @instance_groups[:master] = instance_group end |
#set_task_instance_group(instance_group) ⇒ Object
104 105 106 107 |
# File 'lib/elasticity/job_flow.rb', line 104 def set_task_instance_group(instance_group) instance_group.role = 'TASK' @instance_groups[:task] = instance_group end |
#shutdown ⇒ Object
129 130 131 132 133 134 |
# File 'lib/elasticity/job_flow.rb', line 129 def shutdown if !is_jobflow_running? raise JobFlowNotStartedError, 'Cannot #shutdown a job flow that has not yet been #run.' end emr.terminate_jobflows(@jobflow_id) end |
#status ⇒ Object
136 137 138 139 140 141 |
# File 'lib/elasticity/job_flow.rb', line 136 def status if !is_jobflow_running? raise JobFlowNotStartedError, 'Please #run this job flow before attempting to retrieve status.' end emr.describe_jobflow(@jobflow_id) end |
#wait_for_completion(&on_wait) ⇒ Object
143 144 145 146 |
# File 'lib/elasticity/job_flow.rb', line 143 def wait_for_completion(&on_wait) l = Elasticity::Looper.new(method(:retry_check), on_wait) l.go end |