Class: Humboldt::Cli
- Inherits:
-
Thor
- Object
- Thor
- Humboldt::Cli
- Includes:
- Thor::Actions
- Defined in:
- lib/humboldt/cli.rb
Constant Summary collapse
- DEFAULTS =
{ data_path: 'data/completes', silent: true, skip_package: false, extra_hadoop_args: [], cleanup_before: false, instance_count: 4, instance_type: 'c1.xlarge', spot_instances: nil, bid_price: 0.2, poll: false, skip_prepare: false, aws_region: 'eu-west-1', hadoop_version: '1.0.3' }
Instance Method Summary collapse
- #configure ⇒ Object
- #emr_job ⇒ Object
- #emr_jobs ⇒ Object
- #package ⇒ Object
- #run_emr ⇒ Object
- #run_local ⇒ Object
Instance Method Details
#configure ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/humboldt/cli.rb', line 129 def configure say("Please ensure you are located at the root directory of the project you are configuring.", :yellow) configuration = say('EMR configuration', :green) configuration[:ec2_key_name] = ask("EC2 key pair name to enable SSH access to EMR master node: [#{[:ec2_key_name]}]") configuration[:aws_region] = ask("AWS region: [#{[:aws_region]}]") configuration[:hadoop_version] = ask("Hadoop version: [#{[:hadoop_version]}]") configuration[:data_bucket] = ask("Input data S3 bucket: [#{[:data_bucket]}]") configuration[:job_bucket] = ask("Job S3 bucket (where JAR is uploaded, output logs and job output go to): [#{[:job_bucket]}]") configuration.each do |key, value| value = configuration[key] = [key] if value.empty? configuration.delete(key) if value.empty? || value == DEFAULTS[key] end File.open('.humboldt.yml', 'w') { |f| YAML.dump(configuration, f) } say('Updated .humboldt.yml', :green) end |
#emr_job ⇒ Object
111 112 113 114 115 116 117 118 119 |
# File 'lib/humboldt/cli.rb', line 111 def emr_job if File.exists?('.humboldtjob') job_flow_id = File.read('.humboldtjob').strip job_flow = emr.job_flows[job_flow_id] print_job_flow_extended_status(job_flow) else say_status(:warning, 'Could not determine last job flow ID') end end |
#emr_jobs ⇒ Object
122 123 124 125 126 |
# File 'lib/humboldt/cli.rb', line 122 def emr_jobs emr.job_flows.each do |job_flow| print_job_flow_status(job_flow) end end |
#package ⇒ Object
33 34 35 36 |
# File 'lib/humboldt/cli.rb', line 33 def package say_status(:package, relative_path(job_package.jar_path)) job_package.create! end |
#run_emr ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/humboldt/cli.rb', line 84 def run_emr check_job! invoke(:package, [], {}) unless .skip_package? flow = EmrFlow.new(job_config, [:input], job_package, emr, data_bucket, job_bucket, [:output]) if .cleanup_before? say_status(:remove, flow.output_uri) flow.cleanup! end unless .skip_prepare? say_status(:upload, flow.jar_uri) flow.prepare! end say_status(:warning, "No EC2 key name configured. You will not be able to access the master node via SSH.", :yellow) unless [:ec2_key_name] job_flow = flow.run!( bid_price: [:bid_price], instance_count: [:instance_count], instance_type: [:instance_type], spot_instances: [:spot_instances], extra_hadoop_args: [:extra_hadoop_args], ec2_key_name: [:ec2_key_name], hadoop_version: [:hadoop_version] ) File.open('.humboldtjob', 'w') { |io| io.puts(job_flow.job_flow_id) } say_status(:started, %{EMR job flow "#{job_flow.job_flow_id}"}) end |
#run_local ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/humboldt/cli.rb', line 48 def run_local check_job! invoke(:package, [], {}) unless .skip_package? output_path = [:output] || "data/#{job_config}/output" output_path_parent = File.dirname(output_path) if .cleanup_before? remove_file(output_path) else check_local_output!(output_path) end unless File.exists?(output_path_parent) empty_directory(output_path_parent) end input_glob = File.join([:data_path], [:input]) hadoop_config_path = [:hadoop_config] || default_hadoop_config_path run_command('hadoop', 'jar', project_jar, '-conf', hadoop_config_path, job_config, input_glob, output_path, *[:extra_hadoop_args]) end |