Class: Wukong::Streamer::Base
Direct Known Subclasses
AccumulatingReducer, LineStreamer, RankAndBinReducer, RecordStreamer, Streamer, StructStreamer
Instance Attribute Summary collapse
-
#own_options ⇒ Object
readonly
Options, initially set from the command-line args – see Script#process_argv!.
Class Method Summary collapse
-
.mapper(*args, &block) ⇒ Object
Creates a new object of this class and injects the given block as the process method.
-
.run(options = {}) ⇒ Object
Creates a new object of this class and runs it.
Instance Method Summary collapse
-
#after_stream ⇒ Object
Called exactly once, after streaming completes.
-
#bad_record!(key, *args) ⇒ Object
To track processing errors inline, pass the line back to bad_record!.
-
#before_stream ⇒ Object
Called exactly once, before streaming begins.
- #each_record(&block) ⇒ Object
-
#emit(record) ⇒ Object
Serializes the record to output.
-
#initialize(options = {}) ⇒ Base
constructor
Accepts option hash from script runner.
-
#mapper(&mapper_block) ⇒ Object
Defines a process method on the fly to execute the given mapper.
-
#monitor ⇒ Object
A periodic logger to track progress.
- #options ⇒ Object
-
#process(*args, &block) ⇒ Object
Process each record in turn, yielding the records to emit.
-
#recordize(line) ⇒ Object
Default recordizer: returns array of fields by splitting at tabs.
-
#run(options = {}) ⇒ Object
Delegates back to Wukong to run this instance as a mapper.
-
#stream ⇒ Object
Pass each record to
#process. - #track(record) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Base
Accepts option hash from script runner
12 13 14 |
# File 'lib/wukong/streamer/base.rb', line 12 def initialize ={} @own_options = end |
Instance Attribute Details
#own_options ⇒ Object (readonly)
Options, initially set from the command-line args – see Script#process_argv!
7 8 9 |
# File 'lib/wukong/streamer/base.rb', line 7 def @own_options end |
Class Method Details
.mapper(*args, &block) ⇒ Object
Creates a new object of this class and injects the given block as the process method
109 110 111 |
# File 'lib/wukong/streamer/base.rb', line 109 def self.mapper *args, &block self.new.mapper(*args, &block) end |
Instance Method Details
#after_stream ⇒ Object
Called exactly once, after streaming completes
49 50 |
# File 'lib/wukong/streamer/base.rb', line 49 def after_stream end |
#bad_record!(key, *args) ⇒ Object
To track processing errors inline, pass the line back to bad_record!
82 83 84 85 |
# File 'lib/wukong/streamer/base.rb', line 82 def bad_record! key, *args warn "Bad record #{args.inspect[0..400]}" puts ["bad_record-"+key, *args].join("\t") end |
#before_stream ⇒ Object
Called exactly once, before streaming begins
45 46 |
# File 'lib/wukong/streamer/base.rb', line 45 def before_stream end |
#each_record(&block) ⇒ Object
40 41 42 |
# File 'lib/wukong/streamer/base.rb', line 40 def each_record &block $stdin.each(&block) end |
#emit(record) ⇒ Object
Serializes the record to output.
Emits a single line of tab-separated fields created by calling #to_flat on the record and joining with “t”.
Does no escaping or processing of the record – that’s to_flat’s job, or yours if you override this method.
68 69 70 |
# File 'lib/wukong/streamer/base.rb', line 68 def emit record puts record.to_flat.join("\t") end |
#mapper(&mapper_block) ⇒ Object
Defines a process method on the fly to execute the given mapper.
This is still experimental. Among other limitations, you can’t use yield – you have to call emit() directly.
97 98 99 100 101 102 103 104 105 |
# File 'lib/wukong/streamer/base.rb', line 97 def mapper &mapper_block @mapper_block = mapper_block.to_proc self.instance_eval do def process *args, &block instance_exec(*args, &@mapper_block) end end self end |
#monitor ⇒ Object
A periodic logger to track progress
88 89 90 |
# File 'lib/wukong/streamer/base.rb', line 88 def monitor @monitor ||= PeriodicMonitor.new end |
#options ⇒ Object
16 17 18 |
# File 'lib/wukong/streamer/base.rb', line 16 def Settings.deep_merge end |
#process(*args, &block) ⇒ Object
Process each record in turn, yielding the records to emit
75 76 |
# File 'lib/wukong/streamer/base.rb', line 75 def process *args, &block end |
#recordize(line) ⇒ Object
Default recordizer: returns array of fields by splitting at tabs
55 56 57 |
# File 'lib/wukong/streamer/base.rb', line 55 def recordize line line.split("\t") rescue nil end |
#run(options = {}) ⇒ Object
Delegates back to Wukong to run this instance as a mapper
114 115 116 |
# File 'lib/wukong/streamer/base.rb', line 114 def run ={} Wukong.run(self, nil, ) end |
#stream ⇒ Object
Pass each record to #process
23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/wukong/streamer/base.rb', line 23 def stream Log.info("Streaming on:\t%s" % [Script.input_file]) unless Script.input_file.blank? before_stream each_record do |line| record = recordize(line.chomp) or next process(*record) do |output_record| emit output_record end track(record) end after_stream end |
#track(record) ⇒ Object
36 37 38 |
# File 'lib/wukong/streamer/base.rb', line 36 def track record monitor.periodically(record.to_s[0..1000]) end |