Class: Elasticrawl::CombineJob
- Defined in:
- lib/elasticrawl/combine_job.rb
Overview
Represents an Elastic MapReduce job flow that combines the results of multiple Elasticrawl Parse jobs. Parse jobs write their results per segment. Combine jobs aggregate parse results into a single set of files.
Inherits from Job which is the ActiveRecord model class.
Instance Method Summary collapse
-
#log_uri ⇒ Object
Returns the S3 location for storing Elastic MapReduce job logs.
-
#run ⇒ Object
Runs the job by calling the Elastic MapReduce API.
-
#set_input_jobs(input_jobs) ⇒ Object
Takes in an array of parse jobs that are to be combined.
Methods inherited from Job
#confirm_message, #history, #result_message
Instance Method Details
#log_uri ⇒ Object
Returns the S3 location for storing Elastic MapReduce job logs.
43 44 45 46 |
# File 'lib/elasticrawl/combine_job.rb', line 43 def log_uri s3_path = "/logs/2-combine/#{self.job_name}/" build_s3_uri(s3_path) end |
#run ⇒ Object
Runs the job by calling the Elastic MapReduce API.
31 32 33 34 35 36 37 38 39 40 |
# File 'lib/elasticrawl/combine_job.rb', line 31 def run emr_config = job_config['emr_config'] job_flow_id = run_job_flow(emr_config) if job_flow_id.present? self.job_flow_id = job_flow_id self.save self. end end |
#set_input_jobs(input_jobs) ⇒ Object
Takes in an array of parse jobs that are to be combined. Creates a single job step whose input paths are the outputs of the parse jobs.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/elasticrawl/combine_job.rb', line 10 def set_input_jobs(input_jobs) segment_count = 0 input_paths = [] input_jobs.each do |job_name| input_job = Job.where(:job_name => job_name, :type => 'Elasticrawl::ParseJob').first_or_initialize step_count = input_job.job_steps.count if step_count > 0 segment_count += step_count input_paths << set_input_path(input_job) end end self.job_name = set_job_name self.job_desc = set_job_desc(segment_count) job_steps.push(create_job_step(input_paths.join(','))) end |