Class: Wukong::Processor

Inherits:
Hanuman::Stage show all
Includes:
Vayacondios::Notifications, Logging
Defined in:
lib/wukong/processor.rb,
lib/wukong/widget/filters.rb,
lib/wukong/widget/processors.rb,
lib/wukong/widget/serializers.rb,
lib/wukong/widget/reducers/bin.rb,
lib/wukong/widget/reducers/sort.rb,
lib/wukong/widget/reducers/uniq.rb,
lib/wukong/widget/reducers/count.rb,
lib/wukong/widget/reducers/group.rb,
lib/wukong/widget/reducers/moments.rb,
lib/wukong/widget/reducers/accumulator.rb,
lib/wukong/widget/reducers/group_concat.rb

Overview

The Processor is the basic unit of computation in Wukong. A processor can be thought of as an arbitrary function that takes certain inputs and produces certain (or no) outputs.

A Processor can be written and tested purely in Ruby and on your local machine. You can glue processors together

Direct Known Subclasses

Accumulator, Extract, Filter, Flatten, Logger, Map, Serializer, Topic

Defined Under Namespace

Modules: BufferedProcessor, StdoutProcessor Classes: Accumulator, Bin, Count, Extract, Filter, Flatten, FromCsv, FromDelimited, FromJson, FromTsv, Group, GroupConcat, Head, Identity, Limit, Logger, Map, Moments, NotRegexpFilter, Null, Recordize, RegexpFilter, Reject, Sample, Select, Serializer, Sort, Tail, ToCsv, ToDelimited, ToInspect, ToJson, ToTsv, Topic, Uniq

Constant Summary collapse

SerializerError =
Class.new(Error)

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

included

Methods included from Hanuman::StageClassMethods

#builder, #label, #register, #set_builder

Class Method Details

.configure(settings) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/wukong/processor.rb', line 47

def configure(settings)
  settings.description = description if description
  fields.each_pair do |name, field|
    next if field.doc == false || field.doc.to_s == 'false'
    next if [:log, :notifier].include?(name)
    field_props = {}.tap do |props|
      props[:description] = field.doc unless field.doc == "#{name} field"
      field_type = (field.type.respond_to?(:product) ? field.type.product : field.type)
      configliere_type = case field_type
      when String                then nil
      when TrueClass, FalseClass then :boolean
      else field_type
      end
      
      props[:type]        = configliere_type if configliere_type
      props[:default]     = field.default if field.default
    end
    existing_value = settings[name]
    settings.define(name, field_props)
    settings[name] = existing_value unless existing_value.nil?
  end
end

.consumes(*args) ⇒ Object



27
28
29
30
31
# File 'lib/wukong/processor.rb', line 27

def consumes(*args)
  options   = args.extract_options!
  @consumes = options[:as]
  validate_and_set_serialization(:from, args.first)
end

.description(desc = nil) ⇒ Object



22
23
24
25
# File 'lib/wukong/processor.rb', line 22

def description desc=nil
  @description = desc if desc
  @description
end

.produces(*args) ⇒ Object



33
34
35
36
37
# File 'lib/wukong/processor.rb', line 33

def produces(*args)
  options   = args.extract_options!
  @produces = options[:as]
  validate_and_set_serialization(:to, args.first)      
end

.valid_serializer?(label) ⇒ Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/wukong/processor.rb', line 39

def valid_serializer? label
  label
end

.validate_and_set_serialization(direction, label) ⇒ Object



43
44
45
# File 'lib/wukong/processor.rb', line 43

def validate_and_set_serialization(direction, label)
  instance_variable_set("@serialization_#{direction}", label) if %w[ tsv json xml ].include?(label.to_s)
end

Instance Method Details

#expected_record_type(type) ⇒ Object



72
73
74
# File 'lib/wukong/processor.rb', line 72

def expected_record_type(type)
  self.class.instance_variable_get("@#{type}")
end

#expected_serialization(direction) ⇒ Object



76
77
78
# File 'lib/wukong/processor.rb', line 76

def expected_serialization(direction)
  self.class.instance_variable_get("@serialization_#{direction.to_s}")
end

#finalize {|record| ... } ⇒ Object

This method is called to signal the last record has been received but that further processing may still be done, events still be yielded, &c.

This can be used within an aggregating processor (like a reducer in a map/reduce job) to start processing the final aggregate of records since the "last record" has already been received.

Override this method in your subclass

Yields:

  • record the record you want to yield

Yield Parameters:

  • record (Object)

    the yielded record



128
129
# File 'lib/wukong/processor.rb', line 128

def finalize
end

#perform_action(*args) {|record| ... } ⇒ Object

When instantiated with a block, the block will replace this method.

Parameters:

  • args (Array<Object>)

Yields:

  • record a record that might be yielded by the block

Yield Parameters:

  • record (Object)

    the yielded record



86
87
# File 'lib/wukong/processor.rb', line 86

def perform_action(*args)
end

#process(record) {|record| ... } ⇒ Object

This method is called once per record.

Override this method in your subclass.

Parameters:

  • record (Object)

Yields:

  • record the record you want to yield

Yield Parameters:

  • record (Object)

    the yielded record



112
113
114
# File 'lib/wukong/processor.rb', line 112

def process(record, &emit)
  yield record
end

#receive_action(action) ⇒ Object

:nodoc:

The action attribute is turned into the perform action method.

Parameters:

  • action (Proc)


94
95
96
# File 'lib/wukong/processor.rb', line 94

def receive_action(action)
  self.define_singleton_method(:perform_action, &action)
end

#setupObject

This method is called after the processor class has been instantiated but before any records are given to it to process.

Override this method in your subclass.



102
103
# File 'lib/wukong/processor.rb', line 102

def setup
end

#stopObject

This method is called after all records have been passed. It signals that processing should stop.

Override this method in your subclass.



135
136
# File 'lib/wukong/processor.rb', line 135

def stop
end