Class: Humboldt::Cli

Inherits:
Thor
  • Object
show all
Includes:
Thor::Actions
Defined in:
lib/humboldt/cli.rb

Constant Summary collapse

DEFAULTS =
{
  data_path: 'data/completes',
  silent: true,
  skip_package: false,
  extra_hadoop_args: [],
  cleanup_before: false,
  instance_count: 4,
  instance_type: 'c1.xlarge',
  spot_instances: nil,
  bid_price: 0.2,
  poll: false,
  skip_prepare: false,
  aws_region: 'eu-west-1',
  hadoop_version: '1.0.3'
}

Instance Method Summary collapse

Instance Method Details

#configureObject



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/humboldt/cli.rb', line 129

def configure
  say("Please ensure you are located at the root directory of the project you are configuring.", :yellow)
  configuration = options_from_config_file
  say('EMR configuration', :green)
  configuration[:ec2_key_name] = ask("EC2 key pair name to enable SSH access to EMR master node: [#{config_file_options_with_defaults[:ec2_key_name]}]")
  configuration[:aws_region] = ask("AWS region: [#{config_file_options_with_defaults[:aws_region]}]")
  configuration[:hadoop_version] = ask("Hadoop version: [#{config_file_options_with_defaults[:hadoop_version]}]")
  configuration[:data_bucket] = ask("Input data S3 bucket: [#{config_file_options_with_defaults[:data_bucket]}]")
  configuration[:job_bucket] = ask("Job S3 bucket (where JAR is uploaded, output logs and job output go to): [#{config_file_options_with_defaults[:job_bucket]}]")
  configuration.each do |key, value|
    value = configuration[key] = config_file_options_with_defaults[key] if value.empty?
    configuration.delete(key) if value.empty? || value == DEFAULTS[key]
  end
  File.open('.humboldt.yml', 'w') { |f| YAML.dump(configuration, f) }
  say('Updated .humboldt.yml', :green)
end

#emr_jobObject



111
112
113
114
115
116
117
118
119
# File 'lib/humboldt/cli.rb', line 111

def emr_job
  if File.exists?('.humboldtjob')
    job_flow_id = File.read('.humboldtjob').strip
    job_flow = emr.job_flows[job_flow_id]
    print_job_flow_extended_status(job_flow)
  else
    say_status(:warning, 'Could not determine last job flow ID')
  end
end

#emr_jobsObject



122
123
124
125
126
# File 'lib/humboldt/cli.rb', line 122

def emr_jobs
  emr.job_flows.each do |job_flow|
    print_job_flow_status(job_flow)
  end
end

#packageObject



33
34
35
36
# File 'lib/humboldt/cli.rb', line 33

def package
  say_status(:package, relative_path(job_package.jar_path))
  job_package.create!
end

#run_emrObject



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/humboldt/cli.rb', line 84

def run_emr
  check_job!
  invoke(:package, [], {}) unless options.skip_package?
  flow = EmrFlow.new(job_config, options[:input], job_package, emr, data_bucket, job_bucket, options[:output])
  if options.cleanup_before?
    say_status(:remove, flow.output_uri)
    flow.cleanup!
  end
  unless options.skip_prepare?
    say_status(:upload, flow.jar_uri)
    flow.prepare!
  end
  say_status(:warning, "No EC2 key name configured. You will not be able to access the master node via SSH.", :yellow) unless options[:ec2_key_name]
  job_flow = flow.run!(
    bid_price: options[:bid_price],
    instance_count: options[:instance_count],
    instance_type: options[:instance_type],
    spot_instances: options[:spot_instances],
    extra_hadoop_args: options[:extra_hadoop_args],
    ec2_key_name: options[:ec2_key_name],
    hadoop_version: options[:hadoop_version]
  )
  File.open('.humboldtjob', 'w') { |io| io.puts(job_flow.job_flow_id) }
  say_status(:started, %{EMR job flow "#{job_flow.job_flow_id}"})
end

#run_localObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/humboldt/cli.rb', line 48

def run_local
  check_job!
  invoke(:package, [], {}) unless options.skip_package?
  output_path = options[:output] || "data/#{job_config}/output"
  output_path_parent = File.dirname(output_path)
  if options.cleanup_before?
    remove_file(output_path)
  else
    check_local_output!(output_path)
  end
  unless File.exists?(output_path_parent)
    empty_directory(output_path_parent)
  end
  input_glob = File.join(options[:data_path], options[:input])
  hadoop_config_path = options[:hadoop_config] || default_hadoop_config_path
  run_command('hadoop', 'jar', project_jar, '-conf', hadoop_config_path, job_config, input_glob, output_path, *options[:extra_hadoop_args])
end