Class: Rubydoop::JobDefinition

Inherits:
Object
  • Object
show all
Defined in:
lib/ext/rubydoop.rb

Instance Method Summary collapse

Instance Method Details

#cache_file(file, options = {}) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
# File 'lib/ext/rubydoop.rb', line 48

def cache_file(file, options = {})
  symlink = options.fetch(:as, File.basename(file))
  if local_mode? && !Hadoop::Mapreduce::Job.instance_methods.include?(:add_cache_file)
    unless File.symlink?(symlink) && File.readlink(symlink) == file
      FileUtils.ln_s file, symlink
    end
  else
    uri = java.net.URI.new("#{file}\##{symlink}")
    Hadoop::FileCache::DistributedCache.add_cache_file(uri, @job.configuration)
  end
end

#enable_compression!Object



34
35
36
37
38
39
40
41
42
# File 'lib/ext/rubydoop.rb', line 34

def enable_compression!
  unless local_mode?
    set 'mapred.compress.map.output', true
    set 'mapred.output.compress', true
    set 'mapred.map.output.compression.codec', 'org.apache.hadoop.io.compress.GzipCodec'
    set 'mapred.output.compression.codec', 'org.apache.hadoop.io.compress.GzipCodec'
    set 'mapred.output.compression.type', 'BLOCK'
  end
end

#input(paths, options = {}) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/ext/rubydoop.rb', line 20

def input(paths, options={})
  options = options.dup
  format = options[:format]
  STDERR.puts "Warning! Using `format: :combined_text` will not work with remote input paths (e.g. S3) and Hadoop 1.x. Cf. https://issues.apache.org/jira/browse/MAPREDUCE-1806" if format == :combined_text
  unless format.nil? or format.is_a?(Class)
    class_name = format.to_s.gsub(/^.|_./) {|x| x[-1,1].upcase } + "InputFormat"
    begin
      options[:format] = Humboldt::JavaLib.const_get(class_name)
    rescue NameError
    end
  end
  inputtt(paths, options)
end

#inputttObject



19
# File 'lib/ext/rubydoop.rb', line 19

alias inputtt input

#local_mode?Boolean

Returns:

  • (Boolean)


44
45
46
# File 'lib/ext/rubydoop.rb', line 44

def local_mode?
  @job.configuration.get('mapred.job.tracker') == 'local'
end

#mapper(cls) ⇒ Object



6
7
8
9
10
# File 'lib/ext/rubydoop.rb', line 6

def mapper(cls)
  map_output_key cls.output_key.const_get(:HADOOP) if cls.respond_to?(:output_key)
  map_output_value cls.output_value.const_get(:HADOOP) if cls.respond_to?(:output_value)
  mapperrr cls
end

#mapperrrObject



5
# File 'lib/ext/rubydoop.rb', line 5

alias mapperrr mapper

#reducer(cls) ⇒ Object



13
14
15
16
17
# File 'lib/ext/rubydoop.rb', line 13

def reducer(cls)
  output_key cls.output_key.const_get(:HADOOP) if cls.respond_to?(:output_key)
  output_value cls.output_value.const_get(:HADOOP) if cls.respond_to?(:output_value)
  reducerrr cls
end

#reducerrrObject



12
# File 'lib/ext/rubydoop.rb', line 12

alias reducerrr reducer