Class: Rubydoop::JobDefinition
- Inherits:
-
Object
- Object
- Rubydoop::JobDefinition
- Defined in:
- lib/ext/rubydoop.rb
Instance Method Summary collapse
- #cache_file(file, options = {}) ⇒ Object
- #enable_compression! ⇒ Object
- #input(paths, options = {}) ⇒ Object
- #inputtt ⇒ Object
- #local_mode? ⇒ Boolean
- #mapper(cls) ⇒ Object
- #mapperrr ⇒ Object
- #reducer(cls) ⇒ Object
- #reducerrr ⇒ Object
Instance Method Details
#cache_file(file, options = {}) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/ext/rubydoop.rb', line 48 def cache_file(file, = {}) symlink = .fetch(:as, File.basename(file)) if local_mode? && !Hadoop::Mapreduce::Job.instance_methods.include?(:add_cache_file) unless File.symlink?(symlink) && File.readlink(symlink) == file FileUtils.ln_s file, symlink end else uri = java.net.URI.new("#{file}\##{symlink}") Hadoop::FileCache::DistributedCache.add_cache_file(uri, @job.configuration) end end |
#enable_compression! ⇒ Object
34 35 36 37 38 39 40 41 42 |
# File 'lib/ext/rubydoop.rb', line 34 def enable_compression! unless local_mode? set 'mapred.compress.map.output', true set 'mapred.output.compress', true set 'mapred.map.output.compression.codec', 'org.apache.hadoop.io.compress.GzipCodec' set 'mapred.output.compression.codec', 'org.apache.hadoop.io.compress.GzipCodec' set 'mapred.output.compression.type', 'BLOCK' end end |
#input(paths, options = {}) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/ext/rubydoop.rb', line 20 def input(paths, ={}) = .dup format = [:format] STDERR.puts "Warning! Using `format: :combined_text` will not work with remote input paths (e.g. S3) and Hadoop 1.x. Cf. https://issues.apache.org/jira/browse/MAPREDUCE-1806" if format == :combined_text unless format.nil? or format.is_a?(Class) class_name = format.to_s.gsub(/^.|_./) {|x| x[-1,1].upcase } + "InputFormat" begin [:format] = Humboldt::JavaLib.const_get(class_name) rescue NameError end end inputtt(paths, ) end |
#inputtt ⇒ Object
19 |
# File 'lib/ext/rubydoop.rb', line 19 alias inputtt input |
#local_mode? ⇒ Boolean
44 45 46 |
# File 'lib/ext/rubydoop.rb', line 44 def local_mode? @job.configuration.get('mapred.job.tracker') == 'local' end |
#mapper(cls) ⇒ Object
6 7 8 9 10 |
# File 'lib/ext/rubydoop.rb', line 6 def mapper(cls) map_output_key cls.output_key.const_get(:HADOOP) if cls.respond_to?(:output_key) map_output_value cls.output_value.const_get(:HADOOP) if cls.respond_to?(:output_value) mapperrr cls end |
#mapperrr ⇒ Object
5 |
# File 'lib/ext/rubydoop.rb', line 5 alias mapperrr mapper |
#reducer(cls) ⇒ Object
13 14 15 16 17 |
# File 'lib/ext/rubydoop.rb', line 13 def reducer(cls) output_key cls.output_key.const_get(:HADOOP) if cls.respond_to?(:output_key) output_value cls.output_value.const_get(:HADOOP) if cls.respond_to?(:output_value) reducerrr cls end |
#reducerrr ⇒ Object
12 |
# File 'lib/ext/rubydoop.rb', line 12 alias reducerrr reducer |