Class: Elasticrawl::CombineJob

Inherits:
Job
  • Object
show all
Defined in:
lib/elasticrawl/combine_job.rb

Overview

Represents an Elastic MapReduce job flow that combines the results of multiple Elasticrawl Parse jobs. Parse jobs write their results per segment. Combine jobs aggregate parse results into a single set of files.

Inherits from Job which is the ActiveRecord model class.

Instance Method Summary collapse

Methods inherited from Job

#confirm_message, #history, #result_message

Instance Method Details

#log_uriObject

Returns the S3 location for storing Elastic MapReduce job logs.



43
44
45
46
# File 'lib/elasticrawl/combine_job.rb', line 43

def log_uri
  s3_path = "/logs/2-combine/#{self.job_name}/"
  build_s3_uri(s3_path)
end

#runObject

Runs the job by calling the Elastic MapReduce API.



31
32
33
34
35
36
37
38
39
40
# File 'lib/elasticrawl/combine_job.rb', line 31

def run
  emr_config = job_config['emr_config']
  job_flow_id = run_job_flow(emr_config)

  if job_flow_id.present?
    self.job_flow_id = job_flow_id
    self.save
    self.result_message
  end
end

#set_input_jobs(input_jobs) ⇒ Object

Takes in an array of parse jobs that are to be combined. Creates a single job step whose input paths are the outputs of the parse jobs.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/elasticrawl/combine_job.rb', line 10

def set_input_jobs(input_jobs)
  segment_count = 0
  input_paths = []

  input_jobs.each do |job_name|
    input_job = Job.where(:job_name => job_name,
                          :type => 'Elasticrawl::ParseJob').first_or_initialize
    step_count = input_job.job_steps.count

    if step_count > 0
      segment_count += step_count
      input_paths << set_input_path(input_job)
    end
  end

  self.job_name = set_job_name
  self.job_desc = set_job_desc(segment_count)
  job_steps.push(create_job_step(input_paths.join(',')))
end