Class: Wukong::Processor

Inherits:
Hanuman::Stage show all
Includes:
Logging
Defined in:
lib/wukong/processor.rb,
lib/wukong/widget/echo.rb,
lib/wukong/widget/logger.rb,
lib/wukong/widget/extract.rb,
lib/wukong/widget/filters.rb,
lib/wukong/widget/operators.rb,
lib/wukong/widget/serializers.rb,
lib/wukong/widget/reducers/bin.rb,
lib/wukong/widget/reducers/sort.rb,
lib/wukong/widget/reducers/uniq.rb,
lib/wukong/widget/reducers/count.rb,
lib/wukong/widget/reducers/group.rb,
lib/wukong/widget/reducers/moments.rb,
lib/wukong/widget/reducers/improver.rb,
lib/wukong/widget/reducers/join_xml.rb,
lib/wukong/widget/reducers/accumulator.rb,
lib/wukong/widget/reducers/group_concat.rb

Overview

The Processor is the basic unit of computation in Wukong. A processor can be thought of as an arbitrary function that takes certain inputs and produces certain (or no) outputs.

A Processor can be written and tested purely in Ruby and on your local machine. You can glue processors together

Defined Under Namespace

Classes: Accumulator, Bin, Compact, Count, Echo, Extract, Filter, Flatten, FromCsv, FromDelimited, FromJson, FromTsv, Group, GroupConcat, Head, Identity, Improver, JoinXML, Limit, Logger, Map, Moments, NotRegexpFilter, Null, Recordize, RegexpFilter, Reject, Sample, Select, Serializer, Sort, Tail, ToCsv, ToDelimited, ToInspect, ToJson, ToTsv, Uniq

Constant Summary collapse

SerializerError =
Class.new(Error)

Instance Attribute Summary

Attributes included from Hanuman::StageInstanceMethods

#graph

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

included

Methods inherited from Hanuman::Stage

#clone

Methods included from Hanuman::StageClassMethods

#builder, #label, #register, #set_builder

Methods included from Hanuman::StageInstanceMethods

#add_link, #linkable_name, #root

Class Method Details

.configure(settings) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/wukong/processor.rb', line 26

def configure(settings)
  settings.description = description if description
  fields.each_pair do |name, field|
    next if field.doc == false || field.doc.to_s == 'false'
    next if [:log].include?(name)
    field_props = {}.tap do |props|
      props[:description] = field.doc unless field.doc == "#{name} field"
      field_type = (field.type.respond_to?(:product) ? field.type.product : field.type)
      configliere_type = case field_type
      when String                then nil
      when TrueClass, FalseClass then :boolean
      else field_type
      end
      
      props[:type]        = configliere_type if configliere_type
      props[:default]     = field.default if field.default
    end
    existing_value = settings[name]
    settings.define(name, field_props)
    settings[name] = existing_value unless existing_value.nil?
  end
end

.description(desc = nil) ⇒ Object



21
22
23
24
# File 'lib/wukong/processor.rb', line 21

def description desc=nil
  @description = desc if desc
  @description
end

Instance Method Details

#finalize {|record| ... } ⇒ Object

This method is called to signal the last record has been received but that further processing may still be done, events still be yielded, &c.

This can be used within an aggregating processor (like a reducer in a map/reduce job) to start processing the final aggregate of records since the "last record" has already been received.

Override this method in your subclass

Yields:

  • record the record you want to yield

Yield Parameters:

  • record (Object)

    the yielded record



99
100
# File 'lib/wukong/processor.rb', line 99

def finalize
end

#perform_action(*args) {|record| ... } ⇒ Object

When instantiated with a block, the block will replace this method.

Parameters:

  • args (Array<Object>)

Yields:

  • record a record that might be yielded by the block

Yield Parameters:

  • record (Object)

    the yielded record



57
58
# File 'lib/wukong/processor.rb', line 57

def perform_action(*args)
end

#process(record) {|record| ... } ⇒ Object

This method is called once per record.

Override this method in your subclass.

Parameters:

  • record (Object)

Yields:

  • record the record you want to yield

Yield Parameters:

  • record (Object)

    the yielded record



83
84
85
# File 'lib/wukong/processor.rb', line 83

def process(record, &emit)
  yield record
end

#receive_action(action) ⇒ Object

:nodoc:

The action attribute is turned into the perform action method.

Parameters:

  • action (Proc)


65
66
67
# File 'lib/wukong/processor.rb', line 65

def receive_action(action)
  self.define_singleton_method(:perform_action, &action)
end

#setupObject

This method is called after the processor class has been instantiated but before any records are given to it to process.

Override this method in your subclass.



73
74
# File 'lib/wukong/processor.rb', line 73

def setup
end

#stopObject

This method is called after all records have been passed. It signals that processing should stop.

Override this method in your subclass.



106
107
# File 'lib/wukong/processor.rb', line 106

def stop
end