Class: Rubydoop::JobDefinition
- Inherits:
-
Object
- Object
- Rubydoop::JobDefinition
- Defined in:
- lib/ext/rubydoop.rb
Instance Method Summary collapse
- #cache_file(file, options = {}) ⇒ Object
- #enable_compression! ⇒ Object
- #framework ⇒ Object
- #input(paths, options = {}) ⇒ Object
- #inputtt ⇒ Object
- #local_mode? ⇒ Boolean
- #mapper(cls) ⇒ Object
- #mapperrr ⇒ Object
- #reducer(cls) ⇒ Object
- #reducerrr ⇒ Object
-
#secondary_sort(start_index, end_index) ⇒ Object
Configures the job for secondary sort on the specified slice of the mapper output key.
Instance Method Details
#cache_file(file, options = {}) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/ext/rubydoop.rb', line 61 def cache_file(file, = {}) symlink = .fetch(:as, File.basename(file)) if local_mode? && !Hadoop::Mapreduce::Job.instance_methods.include?(:add_cache_file) unless File.symlink?(symlink) && File.readlink(symlink) == file FileUtils.ln_s file, symlink end else uri = java.net.URI.new("#{file}\##{symlink}") Hadoop::FileCache::DistributedCache.add_cache_file(uri, @job.configuration) end end |
#enable_compression! ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/ext/rubydoop.rb', line 34 def enable_compression! unless local_mode? if framework == :mapreduce set 'mapreduce.map.output.compress', true set 'mapreduce.output.fileoutputformat.compress', true set 'mapreduce.map.output.compress.codec', 'org.apache.hadoop.io.compress.GzipCodec' set 'mapreduce.output.fileoutputformat.compress.codec', 'org.apache.hadoop.io.compress.GzipCodec' set 'mapreduce.output.fileoutputformat.compress.type', 'BLOCK' else set 'mapred.compress.map.output', true set 'mapred.output.compress', true set 'mapred.map.output.compression.codec', 'org.apache.hadoop.io.compress.GzipCodec' set 'mapred.output.compression.codec', 'org.apache.hadoop.io.compress.GzipCodec' set 'mapred.output.compression.type', 'BLOCK' end end end |
#framework ⇒ Object
52 53 54 |
# File 'lib/ext/rubydoop.rb', line 52 def framework @framework ||= @job.configuration.get('mapreduce.framework.name') ? :mapreduce : :mapred end |
#input(paths, options = {}) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/ext/rubydoop.rb', line 20 def input(paths, ={}) = .dup format = [:format] STDERR.puts "Warning! Using `format: :combined_text` will not work with remote input paths (e.g. S3) and Hadoop 1.x. Cf. https://issues.apache.org/jira/browse/MAPREDUCE-1806" if format == :combined_text unless format.nil? or format.is_a?(Class) class_name = format.to_s.gsub(/^.|_./) {|x| x[-1,1].upcase } + "InputFormat" begin [:format] = Humboldt::JavaLib.const_get(class_name) rescue NameError end end inputtt(paths, ) end |
#inputtt ⇒ Object
19 |
# File 'lib/ext/rubydoop.rb', line 19 alias inputtt input |
#local_mode? ⇒ Boolean
56 57 58 59 |
# File 'lib/ext/rubydoop.rb', line 56 def local_mode? property = framework == :mapreduce ? 'mapreduce.framework.name' : 'mapred.job.tracker' @job.configuration.get(property) == 'local' end |
#mapper(cls) ⇒ Object
6 7 8 9 10 |
# File 'lib/ext/rubydoop.rb', line 6 def mapper(cls) map_output_key cls.output_key.const_get(:HADOOP) if cls.respond_to?(:output_key) map_output_value cls.output_value.const_get(:HADOOP) if cls.respond_to?(:output_value) mapperrr cls end |
#mapperrr ⇒ Object
5 |
# File 'lib/ext/rubydoop.rb', line 5 alias mapperrr mapper |
#reducer(cls) ⇒ Object
13 14 15 16 17 |
# File 'lib/ext/rubydoop.rb', line 13 def reducer(cls) output_key cls.output_key.const_get(:HADOOP) if cls.respond_to?(:output_key) output_value cls.output_value.const_get(:HADOOP) if cls.respond_to?(:output_value) reducerrr cls end |
#reducerrr ⇒ Object
12 |
# File 'lib/ext/rubydoop.rb', line 12 alias reducerrr reducer |
#secondary_sort(start_index, end_index) ⇒ Object
Configures the job for secondary sort on the specified slice of the mapper output key.
Hadoop comes with a partitioner that can partition the map output based on a slice of the map output key. Humboldt ships with a comparator that uses the same configuration. Together they can be used to implement secondary sort.
Secondary sort is a mapreduce pattern where you emit a key, but partition and group only on a subset of that key. This has the result that each reduce invocation will see values grouped by the subset, but ordered by the whole key. It is used, among other things, to efficiently count distinct values.
Say you want to count the number of distinct visitors to a site. Your input is pairs of site and visitor IDs. The naïve implementation is to emit the site as key and the visitor ID as value and then, in the reducer, collect all IDs in a set, and emit the site and the size of the set of IDs. This is very memory inefficient, and impractical. For any interesting amount of data you will not be able to keep all the visitor IDs in memory.
What you do, instead, is to concatenate the site and visitor ID and emit that as key, and the visitor ID as value. It might seem wasteful to emit the visitor ID twice, but it’s necessary since Hadoop will only give you the key for the first value in each group.
You then instruct Hadoop to partition and group on just the site part of the key. Hadoop will still sort the values by their full key, so within each group the values will be sorted by visitor ID. In the reducer it’s now trivial to loop over the values and just increment a counter each time the visitor ID changes.
You configure which part of the key to partition and group by specifying the start and end indexes. The reason why they are indexes and not a start index and a length, like Ruby’s ‘String#slice`, is that you also can use negative indexes to count from the end. Negative indexes are useful for example when you don’t know how wide the part of the key that you want use is. In the example above if you use the domain to identify sites these can be of different length. If your visitor IDs are 20 characters you can use 0 and -21 as your indexes.
119 120 121 122 123 124 |
# File 'lib/ext/rubydoop.rb', line 119 def secondary_sort(start_index, end_index) @job.set_partitioner_class(Hadoop::Mapreduce::Lib::Partition::BinaryPartitioner) Hadoop::Mapreduce::Lib::Partition::BinaryPartitioner.set_offsets(@job.configuration, start_index, end_index) @job.set_grouping_comparator_class(Humboldt::JavaLib::BinaryComparator) Humboldt::JavaLib::BinaryComparator.set_offsets(@job.configuration, start_index, end_index) end |