Class: Wukong::Streamer::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/wukong/streamer/base.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Base

Accepts option hash from script runner



12
13
14
# File 'lib/wukong/streamer/base.rb', line 12

def initialize options={}
  @own_options = options
end

Instance Attribute Details

#own_optionsObject (readonly)

Options, initially set from the command-line args – see Script#process_argv!



7
8
9
# File 'lib/wukong/streamer/base.rb', line 7

def own_options
  @own_options
end

Class Method Details

.mapper(*args, &block) ⇒ Object

Creates a new object of this class and injects the given block as the process method



109
110
111
# File 'lib/wukong/streamer/base.rb', line 109

def self.mapper *args, &block
  self.new.mapper(*args, &block)
end

.run(options = {}) ⇒ Object

Creates a new object of this class and runs it



119
120
121
# File 'lib/wukong/streamer/base.rb', line 119

def self.run options={}
  Wukong.run(self.new, nil, options)
end

Instance Method Details

#after_streamObject

Called exactly once, after streaming completes



49
50
# File 'lib/wukong/streamer/base.rb', line 49

def after_stream
end

#bad_record!(key, *args) ⇒ Object

To track processing errors inline, pass the line back to bad_record!



82
83
84
85
# File 'lib/wukong/streamer/base.rb', line 82

def bad_record! key, *args
  warn "Bad record #{args.inspect[0..400]}"
  puts ["bad_record-"+key, *args].join("\t")
end

#before_streamObject

Called exactly once, before streaming begins



45
46
# File 'lib/wukong/streamer/base.rb', line 45

def before_stream
end

#each_record(&block) ⇒ Object



40
41
42
# File 'lib/wukong/streamer/base.rb', line 40

def each_record &block
  $stdin.each(&block)
end

#emit(record) ⇒ Object

Serializes the record to output.

Emits a single line of tab-separated fields created by calling #to_flat on the record and joining with “t”.

Does no escaping or processing of the record – that’s to_flat’s job, or yours if you override this method.



68
69
70
# File 'lib/wukong/streamer/base.rb', line 68

def emit record
  puts record.to_flat.join("\t")
end

#mapper(&mapper_block) ⇒ Object

Defines a process method on the fly to execute the given mapper.

This is still experimental. Among other limitations, you can’t use yield – you have to call emit() directly.



97
98
99
100
101
102
103
104
105
# File 'lib/wukong/streamer/base.rb', line 97

def mapper &mapper_block
  @mapper_block = mapper_block.to_proc
  self.instance_eval do
    def process *args, &block
      instance_exec(*args, &@mapper_block)
    end
  end
  self
end

#monitorObject

A periodic logger to track progress



88
89
90
# File 'lib/wukong/streamer/base.rb', line 88

def monitor
  @monitor ||= PeriodicMonitor.new
end

#optionsObject



16
17
18
# File 'lib/wukong/streamer/base.rb', line 16

def options
  Settings.deep_merge own_options
end

#process(*args, &block) ⇒ Object

Process each record in turn, yielding the records to emit



75
76
# File 'lib/wukong/streamer/base.rb', line 75

def process *args, &block
end

#recordize(line) ⇒ Object

Default recordizer: returns array of fields by splitting at tabs



55
56
57
# File 'lib/wukong/streamer/base.rb', line 55

def recordize line
  line.split("\t") rescue nil
end

#run(options = {}) ⇒ Object

Delegates back to Wukong to run this instance as a mapper



114
115
116
# File 'lib/wukong/streamer/base.rb', line 114

def run options={}
  Wukong.run(self, nil, options)
end

#streamObject

Pass each record to #process



23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/wukong/streamer/base.rb', line 23

def stream
  Log.info("Streaming on:\t%s" % [Script.input_file]) unless Script.input_file.blank?
  before_stream
  each_record do |line|
    record = recordize(line.chomp) or next
    process(*record) do |output_record|
      emit output_record
    end
    track(record)
  end
  after_stream
end

#track(record) ⇒ Object



36
37
38
# File 'lib/wukong/streamer/base.rb', line 36

def track record
  monitor.periodically(record.to_s[0..1000])
end